In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
from pathlib import Path
import sys
import logging
import inspect
logging.basicConfig(level=logging.INFO)
print(sys.executable)

/home/peromoseq/anaconda3/envs/airflow/bin/python3


### Get recording info (google sheets)

In [3]:
import requests
import pandas as pd
from io import BytesIO

In [4]:
# spreadsheet_url = 'https://docs.google.com/spreadsheet/ccc?key=14HIqUaSl_n-91hpAvmACY_iVY9nLKdlA6qklhxfZon0&output=csv&gid=0'
spreadsheet_url = "https://docs.google.com/spreadsheet/ccc?key=1jACsUmxuJ9Une59qmvzZGc1qXezKhKzD1zho2sEfcrU&output=csv&gid=0"
response = requests.get(spreadsheet_url)
recording_df = pd.read_csv(BytesIO(response.content))

In [5]:
recording_df[:3]

Unnamed: 0,Subject,duration_m,video_recording_id,ephys_id,calibration_id,calibration_board_shape,calibration_square_size,video_location_on_o2,ephys_location_on_o2,calibration_location_on_o2,samplerate,trigger_pin
0,M04002,10,24-05-01-13-26-43-110846,2024-05-01_13-26-37,24-05-01-13-45-07-825493,,,/n/groups/datta/tim_sainburg/datasets/chronic2...,/n/groups/datta/tim_sainburg/datasets/chronic2...,/n/groups/datta/tim_sainburg/datasets/chronic2...,150,2


### Submit job

In [6]:
output_directory = Path("/n/groups/datta/tim_sainburg/datasets/scratch/") / "240808-3d-pipeline"

In [7]:
from multicamera_airflow_pipeline.tim_240731.interface.o2 import O2Runner
from pathlib import Path
from datetime import datetime

INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Python interpreter binary location: /home/peromoseq/anaconda3/envs/airflow/bin/python3
  from .autonotebook import tqdm as notebook_tqdm


In [8]:
for idx, recording_row in recording_df.iterrows():
    break

In [9]:
samplerate = recording_row.samplerate
trigger_pin = recording_row.trigger_pin

In [10]:
job_directory = Path('/n/groups/datta/tim_sainburg/datasets/scratch/jobs')
job_directory.mkdir(exist_ok=True, parents=True)

In [11]:
current_datetime_str = datetime.now().strftime('%Y%m%d_%H%M%S')
current_datetime_str

'20240808_110615'

In [12]:
remote_job_directory = job_directory / current_datetime_str

In [13]:
# where the data is located
recording_directory = Path(recording_row.video_location_on_o2) / recording_row.video_recording_id
# where to save data
output_directory_camera_sync = output_directory / 'camera_sync' / recording_row.video_recording_id
output_directory_camera_sync.mkdir(parents=True, exist_ok=True)

In [14]:
params = {
    "recording_directory": recording_directory.as_posix(),
    "output_directory_camera_sync": output_directory_camera_sync.as_posix(),
    "samplerate": samplerate,
    "trigger_pin": trigger_pin,
}

In [15]:
runner = O2Runner(
    job_name_prefix = 'test_submit_camera_sync',
    remote_job_directory = remote_job_directory,
    conda_env = "/n/groups/datta/tim_sainburg/conda_envs/peromoseq",
    o2_username = "tis697",
    o2_server="login.o2.rc.hms.harvard.edu",
    job_params = params, 
    o2_n_cpus = 1,
    o2_memory="16G",
    o2_time_limit="1:00:00",
    o2_queue="short",
)

INFO:paramiko.transport:Connected (version 2.0, client OpenSSH_7.4)
INFO:paramiko.transport:Auth banner: b'Problems logging in?\nUse your lower case HMS ID, like abc123, not ABC123.\nIf locked out, see:\nhttps://it.hms.harvard.edu/i-want/reset-password-or-unlock-your-hms-account\n'
INFO:paramiko.transport:Authentication (publickey) successful!


In [16]:
def sync_cameras(params):
    from multicamera_airflow_pipeline.tim_240731.sync.sync_cameras import CameraSynchronizer
    synchronizer = CameraSynchronizer(
        recording_directory = params["recording_directory"],
        output_directory = params["output_directory_camera_sync"],
        samplerate = params["samplerate"], # camera sample rate
        trigger_pin = params["trigger_pin"], # Which pin camera trigger was on
    )
    synchronizer.run()

In [17]:
runner.python_script = f"""
# load params
import yaml
params_file = "{runner.remote_job_directory / f"{runner.job_name}.params.yaml"}"
with open(params_file, 'r') as file:
    params = yaml.safe_load(file)
    
# grab sync cameras function
{inspect.getsource(sync_cameras)}

# run 
sync_cameras(params)
"""

In [18]:
print(runner.python_script)


# load params
import yaml
params_file = "/n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_110615/test_submit_camera_sync_24-08-08-2024-06-15-755024.params.yaml"
with open(params_file, 'r') as file:
    params = yaml.safe_load(file)
    
# grab sync cameras function
def sync_cameras(params):
    from multicamera_airflow_pipeline.tim_240731.sync.sync_cameras import CameraSynchronizer
    synchronizer = CameraSynchronizer(
        recording_directory = params["recording_directory"],
        output_directory = params["output_directory_camera_sync"],
        samplerate = params["samplerate"], # camera sample rate
        trigger_pin = params["trigger_pin"], # Which pin camera trigger was on
    )
    synchronizer.run()


# run 
sync_cameras(params)



In [19]:
runner.run()

INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Creating remote job directory: /n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_110615
INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Creating remote directory: /n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_110615
INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Successfully created remote directory: /n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_110615
INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Writing job files to remote directory: /n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_110615
INFO:paramiko.transport.sftp:[chan 1] Opened sftp connection (server version 3)
INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Transferring /tmp/tmpmmoav1op to login.o2.rc.hms.harvard.edu:/n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_110615/test_submit_camera_sync_24-08-08-2024-06-15-755024.py
INFO:multicamera_airflow_pipeline.tim_240731

In [32]:
# check job status every n seconds
runner.check_job_status()

INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:Checking job status: 43618564
INFO:multicamera_airflow_pipeline.tim_240731.interface.o2:The job has finished successfully.


True

In [33]:
def check_camera_sync_completion(output_directory):
    output_directory = Path(output_directory)
    if (output_directory / "camera_sync.csv").exists():
        return True
    else:
        return False

In [35]:
check_camera_sync_completion(params['output_directory_camera_sync'])

True

In [None]:
f"sacct -j {runner.slurm_job_id}"

In [26]:
runner.output_log

PosixPath('/n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_110615/test_submit_camera_sync_24-08-08-2024-06-15-755024.log')

In [29]:
!tail {runner.output_log}

tail: cannot open '/n/groups/datta/tim_sainburg/datasets/scratch/jobs/20240808_110615/test_submit_camera_sync_24-08-08-2024-06-15-755024.log' for reading: No such file or directory
