### Notebook for aggregating a collection of HPC tasks on GraphWorld
Given a type of experiment, this notebook takes all the json result files of a collection of HPC tasks and moves them into a single file in the `processed` directory. It also maintains a summary file in the same folder for all files for the experiment. Finally it loads the result files and prints basic statistics not part of the summary file (see last cell of this file).

Set `RAW_DIR` to the raw experiments you want to process.

Set `PROCESSED_DIR` to where the processed results should be stored.
The processed results will be stored in shards. Each time this notebook is ran, 1 shard is created. E.g. the shard size depends on the contents of the `RAW_DIR`.

The processing assumes that the raw results come from our HPC experimental setup.

In [6]:
import os
import pandas as pd
import json
import ast
import re
import math


In [7]:
def process(root, run_to_process):
  processed = f'{root}/processed'
  raw = f'{root}/raw/{run_to_process}'
  if not os.path.isdir(raw):
    print(raw, 'is not a directory.')
    return

  PROCESSED_SHARDS = f'{processed}/shards'

  if not os.path.exists(PROCESSED_SHARDS):
      os.makedirs(PROCESSED_SHARDS)

  # Read (existing) summary file for experiment
  try:
      with open(f'{processed}/summary.json', 'r') as f:
          summary = json.load(f)
  except FileNotFoundError:
      summary = {
          'N_GRAPHS': 0,
          'N_RUNS': 0,
          'RUN_GRAPHS': [],
          'RUN_MARG': [],
          'RAW_FILES': []
      }

  if run_to_process in summary['RAW_FILES']:
      print(f'WARNING: {run_to_process} has already been processed!')
      return

  summary['N_RUNS'] += 1
  summary['RAW_FILES'] += [run_to_process]

  # Here we read the json shards of each HPC task, 
  # aggregate them and store everything in one file in the processed folder
  lines = []
  results_file_regex = r'results\.ndjson-(\d{5})-of-(\d{5})'
  successful_runs = []

  for sub_dir in next(os.walk(raw))[1]:
    sub_dir_full = os.path.join(raw, sub_dir)
    is_successful = False
    result_files = filter(lambda file: re.match(results_file_regex, file), os.listdir(sub_dir_full))
    for result_file in result_files:
      with open(os.path.join(sub_dir_full, result_file)) as f:
        lines.extend(f.readlines())
      is_successful = True
    if is_successful:
      successful_runs += [sub_dir]
                  
  with open(f'{processed}/shards/{summary["N_RUNS"]}.ndjson', "w") as dst:
    for line in lines:
      dst.write(line) # Write all graph experiments to same file

  # Load lines dataframe for printing statistics
  records = map(json.loads, lines)
  results_df = pd.DataFrame.from_records(records)

  # Getting running times
  times = []

  for task in next(os.walk(raw))[1]:
    if not task in successful_runs:
      continue
    with open(f'{raw}/slurm_{task}.out', 'r') as f:
      last_line = lines[-1].split(" ")[1]
      match = re.search(r'\d+', last_line)

      if match:
        lines = f.readlines()
        times.append(int(match.group()) // 60)
      else:
          print(f)
      
  if len(times) == 0:
      times = [math.nan]

  # Getting basic statistics of raw data
  N_GRAPHS = len(results_df)
  N_METHODS = len([col for col in results_df if 'encoder_hidden_channels' in col])
  N_TASKS = len(next(os.walk(raw))[1])

  AVG_TIME = sum(times) / len(times)
  MAX_TIME = max(times)
  MIN_TIME = min(times)

  # Getting methods that have crashed / are skipped
  skipped_methods = {}
  for s_col in [col for col in results_df if '_skipped' in col]:
      count = results_df[s_col].sum()
      if count > 0:
          skipped_methods.update({s_col.removesuffix('_skipped'): count})

  # Update summary file
  summary['N_GRAPHS'] += N_GRAPHS
  summary['RUN_GRAPHS'].append(N_GRAPHS)
  if not 'marginal_param' in results_df.columns:
     assert results_df.shape[0] == 0
     return
     
  marg = results_df['marginal_param'].astype(str).unique()
  if len(marg) > 1:
    summary['RUN_MARG'].append("mixed")
  elif len(marg) == 0:
    summary['RUN_MARG'].append([])
  else:
    summary['RUN_MARG'].append(ast.literal_eval(marg[0]))

  with open(f'{processed}/summary.json', 'w') as s:
    s.write(json.dumps(summary))


  # Printing statistics
  print('------- Task/Graph statistics -------')
  print(f'Total processed tasks: {N_TASKS}')
  print(f'Total processed graphs: {N_GRAPHS}')
  print(f'Graphs per task: {N_GRAPHS / N_TASKS}')
  print(f'Avg task runtime (min): {AVG_TIME} ({AVG_TIME / (N_GRAPHS / N_TASKS)} per graph)')
  print(f'Max task runtime (min): {MAX_TIME} ({MAX_TIME / (N_GRAPHS / N_TASKS)} per graph)')
  print(f'Min task runtime (min): {MIN_TIME} ({MIN_TIME / (N_GRAPHS / N_TASKS)} per graph)\n')

  print('------- Skipped (crashed) methods -------')
  for k,v in skipped_methods.items():
      print(f'{k} skipped {v} times')

In [8]:
# mode = '-2-3-marg'
# RUN_TO_PROCESS = 'p_to_q_ratio-avg_degree_2'
# RAW_DIR = f'/home/data_shares/scara/graphworld/results/mode{mode}/raw/{RUN_TO_PROCESS}'
# PROCESSED_DIR = f'/home/data_shares/scara/graphworld/results/mode{mode}/processed'

mode = '-2-3-marg'
root = f'/home/data_shares/scara/graphworld/results/mode{mode}'
runs = os.listdir(f'/home/data_shares/scara/graphworld/results/mode{mode}/raw')

In [9]:
for run in runs:
    process(root, run)

------- Task/Graph statistics -------
Total processed tasks: 101
Total processed graphs: 1000
Graphs per task: 9.900990099009901
Avg task runtime (min): 439.98 (44.43798 per graph)
Max task runtime (min): 769 (77.669 per graph)
Min task runtime (min): 6 (0.606 per graph)

------- Skipped (crashed) methods -------
------- Task/Graph statistics -------
Total processed tasks: 100
Total processed graphs: 820
Graphs per task: 8.2
Avg task runtime (min): 873.2682926829268 (106.49613325401548 per graph)
Max task runtime (min): 1159 (141.34146341463415 per graph)
Min task runtime (min): 6 (0.7317073170731708 per graph)

------- Skipped (crashed) methods -------
------- Task/Graph statistics -------
Total processed tasks: 100
Total processed graphs: 910
Graphs per task: 9.1
Avg task runtime (min): 821.4395604395604 (90.26808356478686 per graph)
Max task runtime (min): 1197 (131.53846153846155 per graph)
Min task runtime (min): 6 (0.6593406593406593 per graph)

------- Skipped (crashed) methods 