In [63]:
import collections

import boto3
import dask.dataframe as dd

In [2]:
df = dd.read_csv("../data/dat_two_years.csv")

In [4]:
station_ids = df["station_id"].unique().compute()

In [6]:
print(f"Number of unique stations: {len(station_ids)}")

Number of unique stations: 975


In [11]:
station_ids = sorted(station_ids.tolist())

In [12]:
client = boto3.client("batch")

In [29]:
station_id = 72
client.submit_job(
    jobName=f"citi-bikecaster-train-{station_id}",
    jobQueue="citi-bikecaster-job-queue",
    jobDefinition="citi-bikecaster-train:4",
    parameters={"station_id": str(station_id)},
)

{'ResponseMetadata': {'RequestId': '0b5a20e0-b957-11e9-a46b-a128daef36e8',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'date': 'Wed, 07 Aug 2019 21:05:11 GMT',
   'content-type': 'application/json',
   'content-length': '85',
   'connection': 'keep-alive',
   'x-amzn-requestid': '0b5a20e0-b957-11e9-a46b-a128daef36e8',
   'x-amz-apigw-id': 'eEZ5LG9EoAMFzRg=',
   'x-amzn-trace-id': 'Root=1-5d4b3d07-31211440ef53a9209be2ab40;Sampled=0'},
  'RetryAttempts': 0},
 'jobName': 'citi-bikecaster-train-72',
 'jobId': '5627b2a1-3b79-43f3-aa0e-62ae53cc9070'}

In [54]:
for station_id in [83, 153, 161, 119]:
    response = client.submit_job(
        jobName=f"citi-bikecaster-train-{station_id}",
        jobQueue="citi-bikecaster-job-queue",
        jobDefinition="citi-bikecaster-train:5",
        parameters={"station_id": str(station_id)},
    )

In [56]:
job_ids = []

for idx, station_id in enumerate(station_ids):
    response = client.submit_job(
        jobName=f"citi-bikecaster-train-{station_id}",
        jobQueue="citi-bikecaster-job-queue",
        jobDefinition="citi-bikecaster-train:5",
        parameters={"station_id": str(station_id)},
    )
    job_ids.append(response["jobId"])
    if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
        print(f"Error submitting job for station {station_id}, at index {idx} in station_ids list")

In [62]:
list(range(0, len(job_ids), 100))[:-1]

[0, 100, 200, 300, 400, 500, 600, 700, 800]

In [87]:
def poll_jobs(job_ids):
    statuses = collections.defaultdict(list)
    slices = list(range(0, len(job_ids), 100))
    slices.append(len(job_ids))
    
    for start, end in zip(slices[:-1], slices[1:]):
        jobs = client.describe_jobs(jobs=job_ids[start:end])["jobs"]
        for job in jobs:
            statuses[job["status"]].append(job["jobId"])
            
    status_keys = sorted(list(statuses.keys()))
    for status in status_keys:
        print(f"- {len(statuses[status])} jobs in {status} state")
    return statuses

        
def resubmit_jobs(station_ids):
    job_ids = statuses["FAILED"]
    slices = list(range(0, len(job_ids), 100))
    slices.append(len(job_ids))
    for start, end in zip(slices[:-1], slices[1:]):
        jobs = client.describe_jobs(jobs=job_ids[start:end])["jobs"]
        for job in jobs:           
            response = client.submit_job(
                jobName=job["jobName"],
                jobQueue=job["jobQueue"],
                jobDefinition=job["jobDefinition"],
                parameters=job["parameters"],
            )
            if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
                print(f"Error submitting job for station {job['parameters']['station_id']}")

In [119]:
s3 = boto3.client("s3")
objects = s3.list_objects(Bucket="insulator-citi-bikecaster", Prefix="models/")

In [121]:
done_station_ids = []
for o in objects["Contents"]:
    if o["Key"].endswith(".pkl"):
        try:
            splits = o["Key"].split("station_")
            station_id = int(splits[-1].rstrip(".pkl"))
            done_station_ids.append(station_id)
        except:
            print(o["Key"])
            pass

In [122]:
len(done_station_ids)

638

In [123]:
not_done_station_ids = list(set(station_ids) - set(done_station_ids))

In [124]:
len(not_done_station_ids)

337

In [150]:
job_ids = []

for idx, station_id in enumerate(not_done_station_ids):
    response = client.submit_job(
        jobName=f"citi-bikecaster-train-{station_id}",
        jobQueue="citi-bikecaster-job-queue",
        jobDefinition="citi-bikecaster-train:5",
        parameters={"station_id": str(station_id)},
        containerOverrides={"memory": 20_000, "vcpus": 4}
    )
    job_ids.append(response["jobId"])
    if response["ResponseMetadata"]["HTTPStatusCode"] != 200:
        print(f"Error submitting job for station {station_id}, at index {idx} in station_ids list")

In [144]:
# resubmit_jobs(poll_jobs(job_ids))

In [151]:
import time
from datetime import datetime as dt

while True:
    print(dt.now().strftime("%Y-%m-%d %H:%M:%S EST"))
    statuses = poll_jobs(job_ids)
    time.sleep(120)

2019-08-08 13:26:31 EST
- 1 jobs in FAILED state
- 241 jobs in RUNNABLE state
- 48 jobs in RUNNING state
- 47 jobs in SUCCEEDED state
2019-08-08 13:28:32 EST
- 3 jobs in FAILED state
- 239 jobs in RUNNABLE state
- 48 jobs in RUNNING state
- 47 jobs in SUCCEEDED state
2019-08-08 13:30:32 EST
- 9 jobs in FAILED state
- 231 jobs in RUNNABLE state
- 48 jobs in RUNNING state
- 49 jobs in SUCCEEDED state
2019-08-08 13:32:33 EST
- 9 jobs in FAILED state
- 222 jobs in RUNNABLE state
- 48 jobs in RUNNING state
- 58 jobs in SUCCEEDED state
2019-08-08 13:34:33 EST
- 9 jobs in FAILED state
- 193 jobs in RUNNABLE state
- 43 jobs in RUNNING state
- 5 jobs in STARTING state
- 87 jobs in SUCCEEDED state
2019-08-08 13:36:33 EST
- 10 jobs in FAILED state
- 192 jobs in RUNNABLE state
- 46 jobs in RUNNING state
- 89 jobs in SUCCEEDED state
2019-08-08 13:38:34 EST
- 12 jobs in FAILED state
- 187 jobs in RUNNABLE state
- 46 jobs in RUNNING state
- 92 jobs in SUCCEEDED state
2019-08-08 13:40:34 EST
- 23 jobs

KeyboardInterrupt: 

In [148]:
response = client.list_jobs(jobQueue="citi-bikecaster-job-queue", jobStatus="RUNNING")

In [149]:
for job in response["jobSummaryList"]:
    client.terminate_job(jobId=job["jobId"], reason="Not enough memory")