In [5]:
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import os
import docker
import logging
import time
import concurrent.futures
from datetime import datetime
import csv
import re

In [None]:
# Read in the monitoring results data
results = "/usr/local/bin/results"
fin_containers = "/usr/local/bin/results/died_nextflow_containers.csv"
start_containers = "/usr/local/bin/results/started_nextflow_containers.csv"

for root, dirs, files in os.walk(results):
    # print(i)
    for file in files:
        if file.endswith(".csv"):
            file_path = os.path.join(root, file)
            data = pd.read_csv(file_path, index_col=0)
            print(f"Found CSV file: {file_path}")
        

In [None]:
# Sanity checks

# Check 1: Compare started vs. finished containers
fin_df = pd.read_csv(fin_containers, index_col=0)
start_df = pd.read_csv(start_containers, index_col=0)
missing_containers = []
# print(fin_df.columns)
for container in fin_df['ContainerID']:
    if container not in start_df['ContainerID'].values:
        missing_containers.append(container)
    # print(f"Container {container} is present in finished containers.")
if missing_containers:
    print("The following containers are missing from the started containers list:")
    for container in missing_containers:
        print(container)
print("Amount of missing containers:",len(missing_containers))

# Check 2: Compare finished containers with task cpu data

In [None]:
# Write container working directories and nf-core task names into a dict for entity matching 
df = pd.read_csv(fin_containers)
container_workdirs = {}
# print(df.head())
for idx, row in df.iterrows():
    container_workdirs[row['Name']] = row['WorkDir']
    
    
for name, workdir in container_workdirs.items():
    print(f"Container {name} has work directory {workdir}") 

In [100]:
# Extract each cAdvisor task in its own file and dataframe
nextflow_pattern = r"nxf-[A-Za-z0-9]{23}"
results = "/usr/local/bin/results"

for root, dirs, files in os.walk(results):
    if os.path.basename(root) == "cAdvisor":
        cAdvisor_path = root
        for metric in os.listdir(cAdvisor_path):
            metric_path = os.path.join(cAdvisor_path, metric)
            if os.path.isdir(metric_path):
                containers_dir = os.path.join(metric_path, "containers")
                os.makedirs(containers_dir, exist_ok=True)
                for file in os.listdir(metric_path):
                    if file.endswith(".csv"):
                        file_path = os.path.join(metric_path, file)
                        print(f"Processing file: {file_path}")
                        df = pd.read_csv(file_path)
                        col = 'instance'
                        for container_name in df[col].unique():
                            if pd.isna(container_name):
                                continue
                            if re.match(nextflow_pattern, str(container_name)):
                                container_df = df[df[col] == container_name]
                                out_path = os.path.join(containers_dir, f"{container_name}.csv")
                                container_df.to_csv(out_path, index=False)
                                # print(f"Saved data for {container_name} to {out_path}")

Processing file: /usr/local/bin/results/task_network_data/cAdvisor/container_network_transmit_bytes_total/container_network_transmit_bytes_total.csv
Processing file: /usr/local/bin/results/task_network_data/cAdvisor/container_network_receive_bytes_total/container_network_receive_bytes_total.csv
Processing file: /usr/local/bin/results/task_network_data/cAdvisor/container_network_transmit_errors_total/container_network_transmit_errors_total.csv
Processing file: /usr/local/bin/results/task_network_data/cAdvisor/container_network_receive_packets_dropped_total/container_network_receive_packets_dropped_total.csv
Processing file: /usr/local/bin/results/task_network_data/cAdvisor/container_network_receive_packets_total/container_network_receive_packets_total.csv
Processing file: /usr/local/bin/results/task_memory_data/cAdvisor/container_memory_failures_total/container_memory_failures_total.csv
Processing file: /usr/local/bin/results/task_memory_data/cAdvisor/container_memory_usage_bytes/contai

In [101]:
# Add the containers working directory to every cAdvisor task time series file in all metrics
for root, dirs, files in os.walk(results):
    if os.path.basename(root) == "containers":
        for file in files:
            if file.endswith(".csv"):
                file_path = os.path.join(root, file)
                fin_container_df = pd.read_csv(file_path)
                container_name = os.path.splitext(file)[0]
                if container_name in container_workdirs:
                    workdir = container_workdirs[container_name]
                    fin_container_df['WorkDir'] = workdir
                    fin_container_df.to_csv(file_path, index=False)
                    print(f"Updated {file_path} with work directory {workdir}")

Updated /usr/local/bin/results/task_network_data/cAdvisor/container_network_transmit_bytes_total/containers/nxf-7YmtU0huW8HeHSt09MlA9RgN.csv with work directory /storage/nf-core/exec/work/03/f64cb99f85b92bb2cc40c4188ae895
Updated /usr/local/bin/results/task_network_data/cAdvisor/container_network_transmit_bytes_total/containers/nxf-g0xdtR6Gqt0awE3Yh9JIaZkz.csv with work directory /storage/nf-core/exec/work/29/ad543a0e568e393ca16a1004fb9ffb
Updated /usr/local/bin/results/task_network_data/cAdvisor/container_network_transmit_bytes_total/containers/nxf-8L2S9NgKP0YAIrhQ1YAfxlhT.csv with work directory /storage/nf-core/exec/work/b7/9463339793f7be0528f2908d5cf85c
Updated /usr/local/bin/results/task_network_data/cAdvisor/container_network_transmit_bytes_total/containers/nxf-v0iqV5IxswF0Crd7UUyWoKiq.csv with work directory /storage/nf-core/exec/work/d9/3c8dacafdac13cdd5d484a07709a44
Updated /usr/local/bin/results/task_network_data/cAdvisor/container_network_transmit_bytes_total/containers/nxf-

In [None]:
# Extract slurm job metadata out of time-series data and write them into separate files
slurm_metadata_path = "/usr/local/bin/results/task_metadata/slurm-job-exporter/slurm_job_id"

for file in os.listdir(slurm_metadata_path):
    if file.endswith("slurm_job_id.csv"):
        file_path = os.path.join(slurm_metadata_path, file)
        print(f"Reading file: {file_path}")
        df = pd.read_csv(file_path)
        rm_columns = ['num_cpus', 'work_dir','job_name','value', 'instance', 'partition', 'priority', 'run_time',
            'slurm_job_pid', 'std_err', 'std_in', 'submit_time', 'threads_per_core', 'user']
        df.drop(columns=rm_columns, inplace=True, errors='ignore')
        slurm_job_col = 'job_state'
        
        # print(df.head())
        for job_name in df[slurm_job_col].unique():
            if pd.isna(job_name):
                continue
            job_df = df[df[slurm_job_col] == job_name]
            out_path = os.path.join(slurm_metadata_path, f"{job_name}.csv")
            job_df.to_csv(out_path, index=False)
            print(f"Saved data for {job_name} to {out_path}")

In [93]:
# Add the nf-core task name to the finished containers file
for file in os.listdir(slurm_metadata_path):
    if file.endswith("slurm_job_id.csv"):
        file_path = os.path.join(slurm_metadata_path, file)
        print(f"Reading file: {file_path}")
        df = pd.read_csv(file_path)
        rm_columns = ['num_cpus', 'work_dir','job_name','value', 'instance', 'partition', 'priority', 'run_time',
            'slurm_job_pid', 'std_err', 'std_in', 'submit_time', 'threads_per_core', 'user']
        df.drop(columns=rm_columns, inplace=True, errors='ignore')

        fin_df = pd.read_csv(fin_containers)
        if 'WorkDir' in fin_df.columns and 'num_tasks' in df.columns:
            for idx, row in df.iterrows():
                work_dir = row['num_tasks']  
                slurm_job = row['job_state'] 
                if pd.isna(work_dir) or pd.isna(slurm_job):
                    print(f"Skipping row {idx} due to missing WorkDir or slurm_job.")
                    continue
                # Update fin_df where WorkDir matches
                fin_df.loc[fin_df['WorkDir'] == work_dir, 'Nextflow'] = slurm_job

            # Write back the updated fin_df
            fin_df.to_csv(fin_containers, index=False)
            print(f"Updated {fin_containers} with slurm job info.")
        else:
            print("WorkDir or num_tasks column missing in DataFrames.")

Reading file: /usr/local/bin/results/task_metadata/slurm-job-exporter/slurm_job_id/slurm_job_id.csv
Updated /usr/local/bin/results/died_nextflow_containers.csv with slurm job info.


In [102]:
# Update: process all containers directories under all metrics, not just the first cAdvisor found
for root, dirs, files in os.walk(results):
    if os.path.basename(root) == "containers":
        for file in files:
            if file.endswith(".csv"):
                file_path = os.path.join(root, file)
                container_df = pd.read_csv(file_path)
                if 'WorkDir' in container_df.columns:
                    workdir = container_df['WorkDir'].iloc[0]
                    match = fin_df[fin_df['WorkDir'] == workdir]
                    if not match.empty and 'Nextflow' in match.columns:
                        nextflow_value = match['Nextflow'].values[0]
                        container_df['Nextflow'] = nextflow_value
                        container_df.to_csv(file_path, index=False)
                        print(f"Updated {file_path} with Nextflow value {nextflow_value}")

Updated /usr/local/bin/results/task_network_data/cAdvisor/container_network_transmit_bytes_total/containers/nxf-7YmtU0huW8HeHSt09MlA9RgN.csv with Nextflow value nf-NFCORE_SAREK_PREPARE_INTERVALS_GATK4_INTERVALLISTTOBED_(genome)
Updated /usr/local/bin/results/task_network_data/cAdvisor/container_network_transmit_bytes_total/containers/nxf-g0xdtR6Gqt0awE3Yh9JIaZkz.csv with Nextflow value nf-NFCORE_SAREK_SAREK_VCF_QC_BCFTOOLS_VCFTOOLS_VCFTOOLS_SUMMARY_(test)
Updated /usr/local/bin/results/task_network_data/cAdvisor/container_network_transmit_bytes_total/containers/nxf-8L2S9NgKP0YAIrhQ1YAfxlhT.csv with Nextflow value nf-NFCORE_SAREK_PREPARE_INTERVALS_CREATE_INTERVALS_BED_(genome.interval_list)
Updated /usr/local/bin/results/task_network_data/cAdvisor/container_network_transmit_bytes_total/containers/nxf-v0iqV5IxswF0Crd7UUyWoKiq.csv with Nextflow value nf-NFCORE_SAREK_SAREK_BAM_MARKDUPLICATES_CRAM_QC_MOSDEPTH_SAMTOOLS_SAMTOOLS_STATS_(test)
Updated /usr/local/bin/results/task_network_data/cA

In [91]:
# Time-series data treatments