In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
from batch_client import AWSBatch
from datetime import datetime
from spikeinterface.sorters import get_default_sorter_params

In [None]:
# Instantiate client
batch = AWSBatch(profile_name='shulerlab')

# Get existing job queues and job definitions
my_job_queues = batch.list_job_queues()
my_job_definitions = batch.list_job_definitions()

print(f"Existing Job Queues: {[j.get('jobQueueName') for j in my_job_queues]}")
print(f"Existing Job Definitions: {[j.get('jobDefinitionName') + ':' + str(j.get('revision')) for j in my_job_definitions]}")

In [None]:
# Define sorters and sorter parameters
sorters_names = ["kilosort2_5", "kilosort3"]
sorters_params = dict()
for s in sorters_names:
    sorters_params[s] = get_default_sorter_params(s)
    sorters_params[s].update({
        "n_jobs": 8, 
        "chunk_duration": "2s", 
        "progress_bar": True,
    })

sorters_params

In [None]:
# Define job arguments
job_kwargs = {
    "SOURCE_AWS_S3_BUCKET": "sample-si",
    "SOURCE_AWS_S3_BUCKET_FOLDER": "ES029_2022-09-14_bot72_0_g0_imec0",
    "TARGET_AWS_S3_BUCKET": "sample-si",
    "TARGET_AWS_S3_BUCKET_FOLDER": "results",
    "DATA_TYPE": "spikeglx",
    # "READ_RECORDING_KWARGS": {"stream_id": "imec.ap"},
    "SORTERS": ",".join(sorters_names),
    "SORTERS_PARAMS": sorters_params,
    "TEST_WITH_SUB_RECORDING": True,
    "SUB_RECORDING_N_FRAMES": 33180000
}

# Submit job
response_job = batch.submit_job(
    job_name="my-job-from-notebook-extended-storage-17", 
    job_queue=my_job_queues[0]["jobQueueName"],
    job_definition=my_job_definitions[0]["jobDefinitionName"] + ":" + str(my_job_definitions[0]["revision"]),
    job_kwargs=job_kwargs,
    attempt_duration_seconds=7200,
)

In [None]:
r = batch.describe_job(job_id=response_job["jobId"])
r["status"]

In [None]:
# Check job status every 60 seconds, until succeeded
out = False
while not out:
    r = batch.describe_job(job_id=response_job["jobId"])
    status = r["status"]
    print(f"Job status: {status}")
    if status == "SUCCEEDED":
        out = True

# Get Job logs
log_events = batch.get_job_logs(job_id=response_job["jobId"])
for e in log_events:
    msg = datetime.fromtimestamp(e["timestamp"]/1000.).strftime("%d/%m/%Y, %H:%M:%S") + ' - ' + e["message"]
    print(msg)