In [1]:
import tarfile
import os

Extracting Prometheus metrics for tarball

In [2]:
def extract_tarball(tarfile_path, tarfile_target):
    file = tarfile.open(tarfile_path)
    for filename in file.getnames():
        print(filename)
    file.extractall(tarfile_target)
    file.close()

In [3]:
tarfile_path = "../../prometheus/metrics/mahti_metrics_110423-210423.tgz"
tarfile_target = "./mahti_metrics_110423-210423"

In [4]:
# Extracting tarballfile = tarfile.open(tarfile_path)
print("Extracting files:")
extract_tarball(tarfile_path, tarfile_target)

Extracting files:
04-21-2023_10-54-01
04-21-2023_10-54-01/slurm_partition_availability_1w.csv.tgz
04-21-2023_10-54-01/slurm_partition_avg_alloc_mem_1w.csv.tgz
04-21-2023_10-54-01/slurm_partition_job_end_time_1d_offset_3d.csv.tgz
04-21-2023_10-54-01/slurm_partition_avg_allocated_cpus_per_job_1w.csv.tgz
04-21-2023_10-54-01/slurm_partition_job_end_time_1d_offset_4d.csv.tgz
04-21-2023_10-54-01/slurm_partition_avg_allocated_nodes_per_job_1w.csv.tgz
04-21-2023_10-54-01/slurm_partition_job_queue_time_1d.csv.tgz
04-21-2023_10-54-01/slurm_partition_avg_cpus_load_lower_1w.csv.tgz
04-21-2023_10-54-01/slurm_partition_job_start_time_1d.csv.tgz
04-21-2023_10-54-01/slurm_partition_avg_cpus_load_upper_1w.csv.tgz
04-21-2023_10-54-01/slurm_partition_job_end_time_1d_offset_5d.csv.tgz
04-21-2023_10-54-01/slurm_partition_avg_execution_time_per_job_1w.csv.tgz
04-21-2023_10-54-01/slurm_partition_job_state_1d.csv.tgz
04-21-2023_10-54-01/slurm_partition_avg_free_mem_lower_1w.csv.tgz
04-21-2023_10-54-01/slurm_p

In [10]:
def getPartitionHeader(metric_names):
    header = "date," + ",".join(metric_names) + '\n'
    return header

def getJobsHeader(metric_names):
    header = "job_id," + ",".join(metric_names) + '\n'
    return header

def get_job_metrics_names(metrics):
    metric_names = []
    for partition in metrics.keys(): # Partition iterator
        for job_id in metrics[partition].keys(): # Jobs iterator
            for metric_name in metrics[partition][job_id].keys(): # Metrics iterator:
                if metric_name not in metric_names:
                    metric_names.append(metric_name)
    return metric_names

def read_partition_metrics(metrics_file, metrics):
    metric_names = []
    with open(metrics_file, 'r') as f:
        lines = f.readlines()
        for line in lines[1:]:
            line_split = line.split(',')
            metric_name = line_split[0]
            metric_name = metric_name.replace('slurm_partition_', '')
            if metric_name not in metric_names:
                metric_names.append(metric_name)
            partition = line_split[-1].rstrip()
            if partition not in metrics:
                metrics[partition] = {}
            if metric_name not in metrics[partition]:
                metrics[partition][metric_name] = {}
            index = 2
            for split in line_split[1::2]:
                if '[' in split or ']' in split:
                    # Collect metrics
                    timestamp = split[2:]
                    metric = line_split[index][0:-2].lstrip()
                    index += 2
                    metrics[partition][metric_name][timestamp] = metric

def read_job_metrics(metrics_file, metrics):
    metric_names = []
    with open(metrics_file, 'r') as f:
        lines = f.readlines()
        for line in lines[1:]:
            line_split = line.split(',')
            metric_name = line_split[0]
            metric_name = metric_name.replace('slurm_partition_', '')
            if metric_name not in metric_names:
                metric_names.append(metric_name)
            time_limit = line_split[-1].rstrip()
            submit_time = line_split[-2]
            priority = line_split[-3]
            partition = line_split[-4]
            job_id = line_split[-5]
            if partition not in metrics:
                metrics[partition] = {}
            if job_id not in metrics[partition]:
                metrics[partition][job_id] = {}
            if "time_limit" not in metrics[partition][job_id]:
                metrics[partition][job_id]["time_limit"] = time_limit
            if "submit_time" not in metrics[partition][job_id]:
                metrics[partition][job_id]["submit_time"] = submit_time
            if "priority" not in metrics[partition][job_id]:
                metrics[partition][job_id]["priority"] = priority
            if metric_name not in metrics[partition][job_id]:
                metric_value = line_split[2][:-2].lstrip()
                metrics[partition][job_id][metric_name] = metric_value

def write_partition_metrics(metrics):
    if not os.path.exists("partitions"):
        os.makedirs("partitions")

    for partition in metrics.keys(): # Partition iterator
        filename = partition + '_partition.csv'
        print("Creating " + filename)
        with open('partitions/' + filename, 'w') as w:
            # Write header
            metric_names = metrics[partition].keys()
            header = getPartitionHeader(metric_names)
            w.write(header)
            # Collect metrics
            partition_metrics = {}
            for metric_name in metrics[partition].keys(): # Metrics iterator
                for timestamp in metrics[partition][metric_name].keys(): # Timestamp iterator
                    if timestamp not in partition_metrics:
                        partition_metrics[timestamp] = {}
                    metric = metrics[partition][metric_name][timestamp]
                    partition_metrics[timestamp][metric_name] = metric  
            # Write metrics
            *_, last = metric_names
            for timestamp in partition_metrics.keys():
                w.write(timestamp + ',')

                for metric_name in metric_names:
                    if partition_metrics[timestamp][metric_name] is None:
                        w.write(',')
                    else:
                        w.write(partition_metrics[timestamp][metric_name])
                        if last != metric_name:
                            w.write(',')
                w.write('\n')

def write_job_metrics(metrics):
    if not os.path.exists("jobs"):
        os.makedirs("jobs")

    metric_names = get_job_metrics_names(metrics)
    for partition in metrics.keys(): # Partition iterator
        filename = partition + '_jobs.csv'
        print("Creating " + filename)
        with open('jobs/' + filename, 'w') as w:
            # Write header
            header = getJobsHeader(metric_names)
            w.write(header)
            # Collect metrics
            partition_metrics = {}
            for job_id in metrics[partition].keys(): # Jobs iterator
            # Write job metrics
                *_, last = metric_names
                w.write(job_id + ',')
                for metric_name in metric_names:
                    if metric_name not in metrics[partition][job_id]:
                        w.write(',')
                    else:
                        w.write(metrics[partition][job_id][metric_name])
                        if last != metric_name:
                            w.write(',')
                w.write('\n')


For each metric file, process metrics in file
Process partition metrics
Generate cvs for partition metrics
TODO: Process job metrics
TODO: Generate cvs for job metrics

In [11]:
# Read Metrics (Partition|Jobs)
# Process the tarball with Prometheus metrics, and generates corresponding csv per partition for average partition and jobs metrics
partition_metrics = {}
job_metrics = {}
for root, dirs, files in os.walk(tarfile_target):
    tarfile_target = root   
    for file in files:
        tarfile_path = tarfile_target + '/' + file
        print("Extracting: " + tarfile_path)
        # unzip file tarball
        extract_tarball(tarfile_path, tarfile_target)
        # read metrics
        metrics_file = tarfile_target + '/' + file.replace(".tgz", "")
        if not file.startswith("slurm_partition_job"): 
            read_partition_metrics(metrics_file, partition_metrics)
        else:
            read_job_metrics(metrics_file, job_metrics)
        # remove unzip tarball
        os.remove(metrics_file)

# Write Partition metrics in csv
write_partition_metrics(partition_metrics)

# Write Partition metrics in csv
write_job_metrics(job_metrics)


Extracting: ./mahti_metrics_110423-210423/04-21-2023_10-54-01/slurm_partition_job_queue_time_1d.csv.tgz
slurm_partition_job_queue_time_1d.csv
Extracting: ./mahti_metrics_110423-210423/04-21-2023_10-54-01/slurm_partition_job_start_time_1d.csv.tgz
slurm_partition_job_start_time_1d.csv
Extracting: ./mahti_metrics_110423-210423/04-21-2023_10-54-01/slurm_partition_job_state_1d.csv.tgz
slurm_partition_job_state_1d.csv
Extracting: ./mahti_metrics_110423-210423/04-21-2023_10-54-01/slurm_partition_job_time_left_1d.csv.tgz
slurm_partition_job_time_left_1d.csv
Extracting: ./mahti_metrics_110423-210423/04-21-2023_10-54-01/slurm_partition_job_end_time_1d.csv.tgz
slurm_partition_job_end_time_1d.csv
Extracting: ./mahti_metrics_110423-210423/04-21-2023_10-54-01/slurm_partition_job_allocated_minimum_requested_nodes_1d.csv.tgz
slurm_partition_job_allocated_minimum_requested_nodes_1d.csv
Extracting: ./mahti_metrics_110423-210423/04-21-2023_10-54-01/slurm_partition_job_execution_time_1d.csv.tgz
slurm_part