# 2023-04-18 | Run AWS Batch Jobs on Production

Run batch transform jobs in `conte-prod` on all westbrook sites.

Steps:

1. fetch authentication tokens for `conte-prod`
1. get list of stations owned by USGS Conte
1. for each station, get list of imagesets
1. for each imageset, create AWS Batch job for PII detector

This notebook is designed to run in the `fpe-pii` conda environment.

```sh
conda activate fpe-pii
```

## Authentication Tokens

Fetch credentials for `conte-prod` using `aws-cli`. Enter password when requested. Final command should list all s3 buckets owned by `conte-prod`.

```sh
conda activate aws-cli
export AWS_PROFILE=conte-prod
# activate VPN
aws s3 ls
```

## Database

Load credentials from `.env.local` and connect to the FPE database using `sqlalchemy` (< 2.0).

In [1]:
%reload_ext dotenv
%dotenv ../../.env.local

In [25]:
import os
import json

from sqlalchemy import create_engine

DATABASE_URL = f"postgresql://{os.getenv('DB_USER')}:{os.getenv('DB_PASSWORD')}@{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_DBNAME')}"
engine = create_engine(DATABASE_URL)
print(engine)

Engine(postgresql://sheds:***@fpe-prod.c5p6gaiawuao.us-west-2.rds.amazonaws.com:5432/postgres)


Fetch stations using `pandas`

In [26]:
import pandas as pd

conte_user_id = "0626d282-0267-40b0-8f17-214c8f72e551"

query = f"SELECT id, name FROM stations WHERE user_id='{conte_user_id}'"

# Execute the SQL query and return the results as a Pandas DataFrame
df_stations = pd.read_sql(query, engine)

print(f"stations: n={len(df_stations)}")

df_stations.sort_values('name')

stations: n=23


Unnamed: 0,id,name
0,12,Avery Brook_Bridge_01171000
1,15,Avery Brook_River Left_01171000
22,14,Avery Brook_River Right_01171000
9,13,Avery Brook_Side_01171000
10,66,Cold River_01168250
2,42,Dry Brook Lower
5,41,Dry Brook Upper
19,65,Green River_01170100
16,38,Lyons Brook Upper
11,63,Lyons Brook lower


In [53]:
# fetch imagesets

query = f"""
    SELECT imagesets.id as imageset_id,
           station_id,
           stations.name as station_name,
           uuid as imageset_uuid,
           n_images,
           pii_status
    FROM imagesets
    LEFT JOIN stations ON imagesets.station_id=stations.id
    WHERE imagesets.pii_status = 'CREATED'
      AND stations.user_id='{conte_user_id}'
      AND imagesets.status='DONE'
    ORDER BY station_name, imageset_id
"""

# Execute the SQL query and return the results as a Pandas DataFrame
df_imagesets = pd.read_sql(query, engine)

print(f"df_imagesets: n={len(df_imagesets)}")

df_imagesets.head()

df_imagesets: n=383


Unnamed: 0,imageset_id,station_id,station_name,imageset_uuid,n_images,pii_status
0,95,12,Avery Brook_Bridge_01171000,fec63b82-d9fa-4844-ab9b-cda8999122b0,820,CREATED
1,97,12,Avery Brook_Bridge_01171000,dccec038-5156-4518-8030-fc92bf1d55b7,1120,CREATED
2,98,12,Avery Brook_Bridge_01171000,b7f5c369-534d-4042-a7f5-2615d92e5eef,1170,CREATED
3,99,12,Avery Brook_Bridge_01171000,0df06fdb-73bc-4905-8c80-22b63942daeb,2771,CREATED
4,100,12,Avery Brook_Bridge_01171000,2497f79f-7f18-4e16-a489-a80dcce0c177,2760,CREATED


In [55]:
df_imagesets[df_imagesets['station_id'] == 12]

Unnamed: 0,imageset_id,station_id,station_name,imageset_uuid,n_images,pii_status
0,95,12,Avery Brook_Bridge_01171000,fec63b82-d9fa-4844-ab9b-cda8999122b0,820,CREATED
1,97,12,Avery Brook_Bridge_01171000,dccec038-5156-4518-8030-fc92bf1d55b7,1120,CREATED
2,98,12,Avery Brook_Bridge_01171000,b7f5c369-534d-4042-a7f5-2615d92e5eef,1170,CREATED
3,99,12,Avery Brook_Bridge_01171000,0df06fdb-73bc-4905-8c80-22b63942daeb,2771,CREATED
4,100,12,Avery Brook_Bridge_01171000,2497f79f-7f18-4e16-a489-a80dcce0c177,2760,CREATED
5,101,12,Avery Brook_Bridge_01171000,db7af6b7-34fb-4c26-b9fd-f6de39dfcf66,2685,CREATED
6,102,12,Avery Brook_Bridge_01171000,0bc292f0-6867-45cd-8678-bcc16a3f093a,531,CREATED
7,103,12,Avery Brook_Bridge_01171000,be5a2c1d-230f-4445-8962-4343726e768d,1390,CREATED
8,104,12,Avery Brook_Bridge_01171000,f3a84096-4d06-40f0-9830-50d07e8e90d7,3192,CREATED
9,227,12,Avery Brook_Bridge_01171000,26368047-02fc-4127-85f1-e57977b7e76b,1893,CREATED


In [29]:
# # filter imagesets for westbrook zero (station.id=29)
# df_imagesets_westbrook_0 = df_imagesets[df_imagesets['station_id'] == 29]

# print(f"# images at westbrook 0: {df_imagesets_westbrook_0['n_images'].sum()}")
# df_imagesets_westbrook_0

# images at westbrook 0: 0


Unnamed: 0,imageset_id,station_id,station_name,imageset_uuid,n_images,pii_status


In [30]:
import boto3, time

AWS_PROFILE="conte-prod"
AWS_REGION="us-west-2"
JOB_ROLE_ARN="arn:aws:iam::694155575325:role/fpe-prod-batch-job-role"

def timestamp():
    return time.strftime("%Y%m%d-%H%M%S")

def get_batch_creds(session, role_arn):
    sts = session.client("sts")
    response = sts.assume_role(
        RoleArn=role_arn,
        RoleSessionName=f"fpe-batch-session--{timestamp()}"
    )
    return response['Credentials']

session = boto3.Session(profile_name=AWS_PROFILE)
creds = get_batch_creds(session, JOB_ROLE_ARN)
print(creds)

{'AccessKeyId': 'ASIA2DHXFQAORZKJ4NVH', 'SecretAccessKey': 'LYaDcocXUkPvkLOTBkp8SMeE7terXarV7rMI0jGA', 'SessionToken': 'FwoGZXIvYXdzEI7//////////wEaDMhnElXUHrWsIC1PXSLGAXSl3yRvsv+aln6sQlm6SyuuExqGNKZYlXwqB2FcyCJVBekd8US9yf/ut9GxiJB5bmOtjlwB8+0coZHcZV6LkreNZb0NAHAaSQSuR7O1S/UBfL2sKG5u3s31fRhwGbOPTqaLuG53YT7Y1ErCk1ZRyaq8CRJTXn7UQHwGIShdHGh0YPxikjTkdgcCh4uCeVuACdPDUKPEgAfL5QoeBRfnj1dSCydgAz3nwvd+qOrNUZXBOhU1kfwZPWdIAKO+QIfoo/dmMd6wiyjbuP+hBjItbFXewQJc4rawiwYS1+groKrPWk/uKF8hYV/7pfNXl9vMZhlkHiReClL6s2k4', 'Expiration': datetime.datetime(2023, 4, 19, 13, 19, 39, tzinfo=tzutc())}


In [31]:
# config
JOB_QUEUE="fpe-prod-batch-job-queue"
JOB_DEFINITION_PII="fpe-prod-batch-job-definition-pii"

def submit_job_to_batch(client, imageset_id):
    response = client.submit_job(
        jobName=f'pii-imageset-{imageset_id}',
        jobQueue=JOB_QUEUE,
        jobDefinition=JOB_DEFINITION_PII,
        containerOverrides={
            'command': [
                'python',
                'detect-imageset.py',
                str(imageset_id)
            ]
        }
    )

    # Extract and return the job ID from the response
    job_id = response['jobId']
    print(f'Job submitted: id={job_id}')
    return job_id

def terminate_batch_job(client, job_id, reason):
    response = client.terminate_job(
        jobId=job_id,
        reason=reason
    )

    return response

In [32]:
batch_session = boto3.Session(aws_access_key_id=creds['AccessKeyId'],
                              aws_secret_access_key=creds['SecretAccessKey'],
                              aws_session_token=creds['SessionToken'],
                              region_name=AWS_REGION)
print(batch_session)
batch_client = batch_session.client('batch')
print(batch_client)

Session(region_name='us-west-2')
<botocore.client.Batch object at 0x000001FA6D445E50>


In [19]:
# submit_job_to_batch(batch_client, 593)

Job submitted: id=c2288af8-8aba-438f-92b7-13c0dd5c851a


'c2288af8-8aba-438f-92b7-13c0dd5c851a'

In [58]:
for _, row in df_imagesets[df_imagesets['station_id'] == 12].iterrows():
    print(f"submitting job: imageset_id={row['imageset_id']}, station_id={row['station_id']}")
    submit_job_to_batch(batch_client, row['imageset_id'])

submitting job: imageset_id=95, station_id=12
Job submitted: id=30031c52-e116-4a70-b9a2-3a517a09bb21
submitting job: imageset_id=97, station_id=12
Job submitted: id=09ad3688-b254-41a4-b086-3f5931978ee0
submitting job: imageset_id=98, station_id=12
Job submitted: id=d103b42e-1d3b-4467-a246-28bd8441cef2
submitting job: imageset_id=99, station_id=12
Job submitted: id=706fe490-df3d-4103-b0e2-23ecfadb0e42
submitting job: imageset_id=100, station_id=12
Job submitted: id=a177a592-42b6-4ea2-a662-d5ac7a58a568
submitting job: imageset_id=101, station_id=12
Job submitted: id=798cf235-a5e6-4dfe-aaee-b7c2527ea35d
submitting job: imageset_id=102, station_id=12
Job submitted: id=4614e8ee-34b7-4a2c-b9ea-255628826bc5
submitting job: imageset_id=103, station_id=12
Job submitted: id=c771ce1e-57b6-47a5-9788-0b7527f94b5d
submitting job: imageset_id=104, station_id=12
Job submitted: id=050c8241-6577-43c2-b6d6-7c2397359a6a
submitting job: imageset_id=227, station_id=12
Job submitted: id=a793bc68-924c-4493-95

In [21]:
terminate_batch_job(batch_client, "c2288af8-8aba-438f-92b7-13c0dd5c851a", "duplicate")

{'ResponseMetadata': {'RequestId': 'fd10969b-384c-4683-9291-0cf9eb0bcbbc',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 19 Apr 2023 02:01:11 GMT',
   'content-type': 'application/json',
   'content-length': '2',
   'connection': 'keep-alive',
   'x-amzn-requestid': 'fd10969b-384c-4683-9291-0cf9eb0bcbbc',
   'access-control-allow-origin': '*',
   'x-amz-apigw-id': 'Dmi4IHg1vHcF28A=',
   'access-control-expose-headers': 'X-amzn-errortype,X-amzn-requestid,X-amzn-errormessage,X-amzn-trace-id,X-amz-apigw-id,date',
   'x-amzn-trace-id': 'Root=1-643f4b66-4fb46a3000df44d1098324a5'},
  'RetryAttempts': 0}}

## Backfill Westbrook

In [44]:
query = f"""
    SELECT stations.name, imagesets.id, imagesets.uuid
    FROM imagesets
    LEFT JOIN stations ON imagesets.station_id=stations.id
    WHERE imagesets.pii_status = 'DONE'
"""

# Execute the SQL query and return the results as a Pandas DataFrame
df_imagesets_backfill = pd.read_sql(query, engine)
df_imagesets_backfill

Unnamed: 0,name,id,uuid
0,West Brook 0_01171100,421,cb2b67b0-0f07-4351-b96d-2659d5b56c9f
1,West Brook 0_01171100,396,33c0736b-f5fb-4da6-91b0-83a72c285382
2,West Brook 0_01171100,526,b94e847f-43c3-43cd-973a-14f0c5af29ad
3,West Brook 0_01171100,593,5a53b364-7a42-4708-b66e-d837c6b05f3e
4,West Brook 0_01171100,289,e8d465f6-5784-4231-967f-9000428e9748
5,West Brook 0_01171100,713,e2a3b2b6-d391-481a-9814-dc61cf83a990
6,West Brook 0_01171100,792,265292ae-007e-4a94-a86c-e01028d85c1f
7,West Brook 0_01171100,892,935ac74c-7fc1-476b-ac66-f6f9070e2209
8,Test Station,254,83220782-1caa-4929-b4a7-f02d954e0ea3
9,West Brook 0_01171100,423,6cd6870a-0260-4687-840d-1ac4ac2794e4


In [40]:
s3_client = boto3.client('s3')

def read_json_from_s3(bucket, key):
    response = s3_client.get_object(Bucket=bucket, Key=key)
    content = response['Body'].read().decode('utf-8')
    return json.loads(content)

def read_pii_results_from_s3(bucket, imageset_uuid):
    key = f"imagesets/{imageset_uuid}/pii.json"
    return read_json_from_s3(bucket, key)

#detections = read_pii_results_from_s3('usgs-chs-conte-prod-fpe-storage', 'cb2b67b0-0f07-4351-b96d-2659d5b56c9f')

In [50]:
from sqlalchemy import MetaData, Column, Table, Integer, JSON

def save_pii_to_database(engine, results):
    data = [
        {
            'image_id': result['image_id'],
            'pii_detections': result['detections']
        } for result in results
    ]
    with engine.connect() as conn:
        table = Table('pii_results', MetaData(bind=conn),
            Column('image_id', Integer, primary_key=True),
            Column('pii_detections', JSON),
            prefixes=['TEMPORARY']
        )
        table.create(conn)
        conn.execute(table.insert(), data)

        conn.execute("""
            UPDATE images
            SET pii_detections = pii_results.pii_detections
            FROM pii_results
            WHERE images.id = pii_results.image_id;
        """)
        table.drop(conn)

In [52]:
for _, row in df_imagesets_backfill.iterrows():
    print(row['uuid'])
    detections = read_pii_results_from_s3('usgs-chs-conte-prod-fpe-storage', row['uuid'])
    save_pii_to_database(engine, detections)

cb2b67b0-0f07-4351-b96d-2659d5b56c9f
33c0736b-f5fb-4da6-91b0-83a72c285382
b94e847f-43c3-43cd-973a-14f0c5af29ad
5a53b364-7a42-4708-b66e-d837c6b05f3e
e8d465f6-5784-4231-967f-9000428e9748
e2a3b2b6-d391-481a-9814-dc61cf83a990
265292ae-007e-4a94-a86c-e01028d85c1f
935ac74c-7fc1-476b-ac66-f6f9070e2209
83220782-1caa-4929-b4a7-f02d954e0ea3
6cd6870a-0260-4687-840d-1ac4ac2794e4
27aae6d9-9417-4b53-bf6a-cd2ce8139ba1
a7918a28-258e-40a7-bf79-8aa1895d4c65
ebfbde2f-222f-4687-9d62-05647f70914b
