In [23]:
import os
import dotenv

from azure.ai.ml import MLClient, Input, Output
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from azure.identity import AzureCliCredential

dotenv.load_dotenv()
subscription = os.getenv(f"subscription_id")
resource_group = os.getenv(f"resource_group")
workspace = os.getenv(f"workspace_name")

ml_client = MLClient(
    AzureCliCredential(), 
    subscription, 
    resource_group, 
    workspace,
)

In [24]:
plant = 'lavey'
sonar_location = 'passe3'
date = "2024-03-06"
datastore_name = 'workspaceblobstore'
base_path_on_datastore = f'{plant}_videos/{sonar_location}/'
base_intermediate_path_on_datastore = f'{plant}_tracking_intermediate_data/{sonar_location}/'
base_output_path_on_datastore = f'{plant}_tracking_output_alternative_algo_settings/{sonar_location}/'
path_on_datastore = f'{base_path_on_datastore}{date.replace("-", "/")}'
intermediate_path_on_datastore = f'{base_intermediate_path_on_datastore}{date.replace("-", "/")}'
output_path_on_datastore = f'{base_output_path_on_datastore}{date.replace("-", "/")}'

uri = f'azureml://subscriptions/{subscription}/resourcegroups/{resource_group}/workspaces/{workspace}/datastores/{datastore_name}/paths/'
uri_input = f'{uri}{path_on_datastore}'
uri_intermediate_data = f'{uri}{intermediate_path_on_datastore}'
uri_output = f'{uri}{output_path_on_datastore}'
# remove sanity_check from the uri_train_val_gt_data if doing sanity check
uri_train_val_gt_data = f"{uri}{plant}_classification/train_data/{sonar_location.replace('_sanity_check', '')}/"
classification_settings_file = f"classification_settings_{plant}_{sonar_location.replace('_sanity_check', '')}.yaml"


# Run all steps in pipeline

In [25]:
from azure.ai.ml import load_component
from azure.ai.ml.dsl import pipeline

run_tracking = load_component(source="./components/kalman_tracking/tracking.yml")
run_classification = load_component(source="./components/classification/classification.yml")

@pipeline(
    compute="Standard-D1-v2",
)
def tracking_all_steps(
    input_videos_dir: Input(type=AssetTypes.URI_FOLDER), 
    train_val_gt_data_dir: Input(type=AssetTypes.URI_FOLDER),
    classification_settings_file: str,
    output_data_uri: str = None,
    intermediate_data_uri: str = None,
    log_level: str = "INFO",
):
    
    tracking_results = run_tracking(
        data=input_videos_dir,
        tracking_config="kalman_tracking_settings.yaml",
    )
    tracking_results.outputs.detections = Output(type="uri_folder", path=intermediate_data_uri, mode=InputOutputModes.RW_MOUNT)
    
    classification_run_results = run_classification(
        classification_settings_file=classification_settings_file,
        train_val_gt_data_dir=train_val_gt_data_dir,
        files_to_classify_dir=tracking_results.outputs.detections,
        log_level=log_level,
    )
    classification_run_results.outputs.classified_detections_dir = Output(
        type="uri_folder",
        path=output_data_uri,
        mode=InputOutputModes.RW_MOUNT,
    )
    classification_run_results.compute = "Standard-A4m-v2"
    
    labeling_results = run_tracking(
        data=input_videos_dir,
        labels_dir=classification_run_results.outputs.classified_detections_dir,
        tracking_config="annotate_video_settings.yaml",
    )
    labeling_results.compute = "Standard-D2"

    return {"detections": labeling_results.outputs.detections}


@pipeline(
    compute="Standard-D1-v2",
)
def tracking_all_steps_pre_labeling(
    input_videos_dir: Input(type=AssetTypes.URI_FOLDER), 
    train_val_gt_data_dir: Input(type=AssetTypes.URI_FOLDER),
    classification_settings_file: str,
    output_data_uri: str = None,
    intermediate_data_uri: str = None,
    log_level: str = "INFO",
):
    
    tracking_results = run_tracking(
        data=input_videos_dir,
        tracking_config="kalman_tracking_settings_pre_labeling.yaml",
    )
    tracking_results.outputs.detections = Output(type="uri_folder", path=intermediate_data_uri, mode=InputOutputModes.RW_MOUNT)
    
    classification_run_results = run_classification(
        classification_settings_file=classification_settings_file,
        train_val_gt_data_dir=train_val_gt_data_dir,
        files_to_classify_dir=tracking_results.outputs.detections,
        log_level=log_level,
    )
    classification_run_results.outputs.classified_detections_dir = Output(
        type="uri_folder",
        path=output_data_uri,
        mode=InputOutputModes.RW_MOUNT,
    )
    classification_run_results.compute = "Standard-A4m-v2"
    
    labeling_results = run_tracking(
        data=input_videos_dir,
        labels_dir=classification_run_results.outputs.classified_detections_dir,
        tracking_config="annotate_video_settings.yaml",
    )
    labeling_results.compute = "Standard-D2"

    return {"detections": labeling_results.outputs.detections}
    

@pipeline(
    compute="Standard-D1-v2",
)
def tracking_base_steps(
    input_data: Input(type=AssetTypes.URI_FOLDER), 
    train_val_data: Input(type=AssetTypes.URI_FOLDER), 
    train_val_gt_data: Input(type=AssetTypes.URI_FOLDER),
    classification_settings_file: str,
    indermediate_data_uri: str = None,
    log_level: str = "INFO",
):
    
    tracking_results = run_tracking(
        data=input_data,
        tracking_config="kalman_tracking_settings.yaml",
    )
    tracking_results.outputs.detections = Output(type="uri_folder", path=indermediate_data_uri, mode=InputOutputModes.RW_MOUNT)
    
    classification_run_results = run_classification(
        train_val_data=train_val_data,
        train_val_gt_data=train_val_gt_data,
        files_to_classify=tracking_results.outputs.detections,
        log_level=log_level,
    )
    classification_run_results.compute = "Standard-A4m-v2"
    
    return {"detections": classification_run_results.outputs.classified_detections_dir}


@pipeline(
    compute="Standard-D1-v2",
)
def classification_and_labeling_videos(
    input_videos_dir: Input(type=AssetTypes.URI_FOLDER),
    train_val_gt_data_dir: Input(type=AssetTypes.URI_FOLDER),
    classification_settings_file: str,
    intermediate_data: Input(type=AssetTypes.URI_FOLDER),
    output_data_uri: str = None,
    log_level: str = "INFO",
):
    
    classification_run_results = run_classification(
        classification_settings_file=classification_settings_file,
        train_val_gt_data_dir=train_val_gt_data_dir,
        files_to_classify_dir=intermediate_data,
        log_level=log_level,
    )
    classification_run_results.outputs.classified_detections_dir = Output(
        type="uri_folder",
        path=output_data_uri,
        mode=InputOutputModes.RW_MOUNT,
    )
    classification_run_results.compute = "Standard-A4m-v2"
    
    labeling_results = run_tracking(
        data=input_videos_dir,
        labels_dir=classification_run_results.outputs.classified_detections_dir,
        tracking_config="annotate_video_settings.yaml",
    )
    labeling_results.compute = "Standard-D2"

    return {"detections": labeling_results.outputs.detections}

@pipeline(
    compute="Standard-D2",
)
def labeling_videos(
    input_data: Input(type=AssetTypes.URI_FOLDER),
    labels_dir: Input(type=AssetTypes.URI_FOLDER),
):
    labeling_results = run_tracking(
        data=input_data,
        labels_dir=labels_dir,
        tracking_config="annotate_video_settings.yaml",
    )

    return {"detections": labeling_results.outputs.detections}

In [26]:
import shutil
from azure.ai.ml import load_component

start_at = "tracking_pre_labeling"

if start_at == "tracking":
    pipeline_job = tracking_all_steps(
        classification_settings_file=classification_settings_file,
        input_videos_dir=Input(type=AssetTypes.URI_FOLDER, path=uri_input, mode=InputOutputModes.RO_MOUNT),
        train_val_gt_data_dir=Input(type=AssetTypes.URI_FOLDER, path=uri_train_val_gt_data, mode=InputOutputModes.RO_MOUNT),
        intermediate_data_uri=uri_intermediate_data,
        output_data_uri=uri_output,
    )
    pipeline_job.outputs.detections = Output(type="uri_folder", path=uri_output, mode=InputOutputModes.RW_MOUNT)
    pipeline_job.tags = {"date": date}
elif start_at == "tracking_pre_labeling":
    pipeline_job = tracking_all_steps_pre_labeling(
        classification_settings_file=classification_settings_file,
        input_videos_dir=Input(type=AssetTypes.URI_FOLDER, path=uri_input, mode=InputOutputModes.RO_MOUNT),
        train_val_gt_data_dir=Input(type=AssetTypes.URI_FOLDER, path=uri_train_val_gt_data, mode=InputOutputModes.RO_MOUNT),
        intermediate_data_uri=uri_intermediate_data,
        output_data_uri=uri_output,
    )
    pipeline_job.outputs.detections = Output(type="uri_folder", path=uri_output, mode=InputOutputModes.RW_MOUNT)
    pipeline_job.tags = {"date": date}
elif start_at == "classification":
    pipeline_job = classification_and_labeling_videos(
        input_videos_dir=Input(type=AssetTypes.URI_FOLDER, path=uri_input, mode=InputOutputModes.DOWNLOAD),
        train_val_gt_data_dir=Input(type=AssetTypes.URI_FOLDER, path=uri_train_val_gt_data, mode=InputOutputModes.DOWNLOAD),
        indermediate_data_uri=uri_intermediate_data,
    )
    pipeline_job.outputs.classified_detection_videos_dir = Output(type="uri_folder", path=uri_output, mode=InputOutputModes.RW_MOUNT)
    pipeline_job.tags = {"date": date}
elif start_at == "labeling_videos":
    pipeline_job = labeling_videos(
        input_data=Input(type=AssetTypes.URI_FOLDER, path=uri_input, mode=InputOutputModes.DOWNLOAD),
        labels_dir=Input(type=AssetTypes.URI_FOLDER, path=uri_output, mode=InputOutputModes.DOWNLOAD),
    )
    pipeline_job.outputs.detections = Output(type="uri_folder", path=uri_output, mode=InputOutputModes.RW_MOUNT)
    pipeline_job.tags = {"date": date}

pipeline_job.display_name = f"{plant}-{sonar_location}-{start_at}-{date}"

# copy library files to job source directory temporarily
pth_cls = './components/classification/src/analysis/classification_utils/'
pth_masks = './components/classification/src/analysis/demo/'
pth = './components/kalman_tracking/src/algorithm/'
shutil.copytree('../analysis/classification_utils/', pth_cls, dirs_exist_ok=True)
shutil.copytree('../analysis/demo/', pth_masks, dirs_exist_ok=True)
shutil.copytree('../algorithm/', pth, dirs_exist_ok=True)

pipeline_job_run = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name=f"track-and-classify-{plant}-{sonar_location}"
)

shutil.rmtree(pth)
shutil.rmtree(pth_masks)
shutil.rmtree(pth_cls)
pipeline_job_run

Uploading src (4.58 MBs): 100%|██████████| 4578229/4578229 [00:00<00:00, 5120449.44it/s]




Experiment,Name,Type,Status,Details Page
track-and-classify-lavey-passe3,willing_night_9z19846qzj,pipeline,NotStarted,Link to Azure Machine Learning studio


# Run Jobs for every day of the year

In [18]:
from typing import Optional
import pandas as pd
import shutil


def generate_paths(
    datastore_uri: str,
    day_list: list,
    date_separator: str = '/',
) -> tuple[str, str, str, str]:
    for date_str in day_list:
        date_str = date_str.replace('-', date_separator)
        path_on_datastore = f'{datastore_uri}{base_path_on_datastore}{date_str}/'
        intermediate_path_on_datastore = f'{datastore_uri}{base_intermediate_path_on_datastore}{date_str}/'
        output_path_on_datastore = f'{datastore_uri}{base_output_path_on_datastore}{date_str}/'

        yield path_on_datastore, intermediate_path_on_datastore, output_path_on_datastore, date_str


def generate_paths_for_range(
        datastore_uri: str,
        start_date: Optional[str] = None,
        end_date: Optional[str] = None,
        date_separator: str = '/',
    ) -> tuple[str, str, str, str]:
    if start_date:
        dates = pd.date_range(start=start_date, end=end_date)
    else:
        dates = ['_test']
        
    dates = [date.strftime(f'%Y{date_separator}%m{date_separator}%d') for date in dates]
    return generate_paths(datastore_uri, dates)

In [21]:
# # for a range of dates
date_generator = generate_paths_for_range(uri, '2024-03-07', '2024-04-30')
# for a list of dates
# failed_jobs = ['2024-03-08', '2024-03-07', '2024-04-26', '2024-04-25', '2024-04-23', '2024-04-21', 
#  '2024-04-20', '2024-04-19', '2024-04-18', '2024-04-17', '2024-04-16', '2024-04-15', 
#  '2024-04-13', '2024-04-12', '2024-04-11', '2024-04-10', '2024-04-09', '2024-04-08', 
#  '2024-04-07', '2024-04-05', '2024-04-04', '2024-04-03', '2024-04-02']
# canceled_jobs = ['2024-03-05', '2024-03-04', '2024-03-10', '2024-03-09', '2024-03-06', '2024-04-30', 
#  '2024-04-29', '2024-04-28', '2024-04-27', '2024-04-24', '2024-04-22', '2024-04-14', 
#  '2024-04-06', '2024-04-01']
# date_generator = generate_paths(uri, canceled_jobs)

In [22]:
# copy library files to job source directory temporarily
pth_cls = './components/classification/src/analysis/classification_utils/'
pth_masks = './components/classification/src/analysis/demo/'
pth = './components/kalman_tracking/src/algorithm/'
shutil.copytree('../analysis/classification_utils/', pth_cls, dirs_exist_ok=True)
shutil.copytree('../analysis/demo/', pth_masks, dirs_exist_ok=True)
shutil.copytree('../algorithm/', pth, dirs_exist_ok=True)

start_at = "tracking_pre_labeling"

for raw_videos_dir_path, intermediate_path_on_datastore, output_path_on_datastore, date_str in date_generator:
    if start_at == "tracking":
        pipeline_job = tracking_all_steps(
                classification_settings_file=classification_settings_file,
                input_videos_dir=Input(type=AssetTypes.URI_FOLDER, path=raw_videos_dir_path, mode=InputOutputModes.RO_MOUNT),
                train_val_gt_data_dir=Input(type=AssetTypes.URI_FOLDER, path=uri_train_val_gt_data, mode=InputOutputModes.RO_MOUNT),
                intermediate_data_uri=intermediate_path_on_datastore,
                output_data_uri=output_path_on_datastore,
            )
    elif start_at == "tracking_pre_labeling":
        pipeline_job = tracking_all_steps_pre_labeling(
                classification_settings_file=classification_settings_file,
                input_videos_dir=Input(type=AssetTypes.URI_FOLDER, path=raw_videos_dir_path, mode=InputOutputModes.RO_MOUNT),
                train_val_gt_data_dir=Input(type=AssetTypes.URI_FOLDER, path=uri_train_val_gt_data, mode=InputOutputModes.RO_MOUNT),
                intermediate_data_uri=intermediate_path_on_datastore,
                output_data_uri=output_path_on_datastore,
            )
    elif start_at == "classification":
        pipeline_job = classification_and_labeling_videos(
                classification_settings_file=classification_settings_file,
                input_videos_dir=Input(type=AssetTypes.URI_FOLDER, path=raw_videos_dir_path, mode=InputOutputModes.RO_MOUNT),
                train_val_gt_data_dir=Input(type=AssetTypes.URI_FOLDER, path=uri_train_val_gt_data, mode=InputOutputModes.RO_MOUNT),
                intermediate_data=Input(type=AssetTypes.URI_FOLDER, path=intermediate_path_on_datastore, mode=InputOutputModes.DOWNLOAD),
                output_data_uri=output_path_on_datastore,
        )
    elif start_at == "labeling_videos":
        pipeline_job = labeling_videos(
            input_data=Input(type=AssetTypes.URI_FOLDER, path=raw_videos_dir_path, mode=InputOutputModes.DOWNLOAD),
            labels_dir=Input(type=AssetTypes.URI_FOLDER, path=output_path_on_datastore, mode=InputOutputModes.DOWNLOAD),
        )
    pipeline_job.outputs.detections = Output(type="uri_folder", path=output_path_on_datastore, mode=InputOutputModes.RW_MOUNT)
    pipeline_job.tags = {"date": date_str, "location": f"{plant}-{sonar_location}"}
    pipeline_job.display_name = f"{plant}-{sonar_location}-{date_str}"
    
    pipeline_job_run = ml_client.jobs.create_or_update(
        pipeline_job, 
        experiment_name=f"{plant}-{sonar_location}-{start_at}",
    )
    print(f'submitted job with tags: {pipeline_job_run.tags}')
    
shutil.rmtree(pth)
shutil.rmtree(pth_masks)
shutil.rmtree(pth_cls)

submitted job with tags: {'date': '2024/03/07', 'location': 'lavey-passe3'}
submitted job with tags: {'date': '2024/03/08', 'location': 'lavey-passe3'}
submitted job with tags: {'date': '2024/03/09', 'location': 'lavey-passe3'}
submitted job with tags: {'date': '2024/03/10', 'location': 'lavey-passe3'}
submitted job with tags: {'date': '2024/03/11', 'location': 'lavey-passe3'}
submitted job with tags: {'date': '2024/03/12', 'location': 'lavey-passe3'}
submitted job with tags: {'date': '2024/03/13', 'location': 'lavey-passe3'}
submitted job with tags: {'date': '2024/03/14', 'location': 'lavey-passe3'}
submitted job with tags: {'date': '2024/03/15', 'location': 'lavey-passe3'}
submitted job with tags: {'date': '2024/03/16', 'location': 'lavey-passe3'}
submitted job with tags: {'date': '2024/03/17', 'location': 'lavey-passe3'}
submitted job with tags: {'date': '2024/03/18', 'location': 'lavey-passe3'}
submitted job with tags: {'date': '2024/03/19', 'location': 'lavey-passe3'}
submitted jo