# Preparing Data 4 the AE

## Imports

In [None]:
import re
import sys, os
import numpy as np
import pandas as pd
import yaml
import pyarrow as pa

import dask.dataframe as dd

### OMS

In [None]:
#append path of oms api github repo
sys.path.append(os.path.abspath('./oms-api-client'))
from omsapi import OMSAPI

### Run-registry: update directory

In [None]:
#!pip install runregistry
#IMPORTANT: change this path based on the output of
#!pip show runregistry
try:
    import runregistry
except:
    sys.path.append('/eos/home-i03/m/mcrucian/.local/lib/python3.9/site-packages')
    import runregistry

## Directory with Monitoring Elements (ME)

In [None]:
path = "/eos/cms/store/group/ml/AD4MVDHackathon/ML4DQM_MUON/MEs"
dirs = os.listdir(path)

# Display the available MEs in the dir
print("Available MEs in dir:")
print('\n'.join(sorted({match for dir_name in os.listdir(path) for match in re.findall(r'hRHGlobal[mp]\d+', dir_name)})))

## Choose ME

In [None]:
out_label = "151024" #used as a date to distinguish different versions
me = "hRHGlobalp4"


dirs = os.listdir(path)
me_dirs = [os.path.join(path, i) for i in dirs if me in i and os.path.isdir(os.path.join(path, i))]
#Filtered files are all the files with relevant ME
files_all = []
for dir in me_dirs:
    files = os.listdir(dir)
    me_files = [dir+"/"+i for i in files if me in i]
    files_all = files_all + me_files
filtered_files = [file for file in files_all if os.path.exists(file) and os.path.getsize(file) >= 601]
#The directories printed below should show the eras we are using (e.g. D,E,F,G)
print('\n'.join(me_dirs))

## Load dataframe with dask

In [None]:
%%time
monitoring_elements = dd.read_parquet(filtered_files)

#Take StreamExpress dataset
monitoring_elements = monitoring_elements[monitoring_elements['dataset'].str.contains("StreamExpress")]

In [None]:
monitoring_elements

In [None]:
dataset = monitoring_elements["dataset"].unique().compute()
print(dataset)

### Runs

In [None]:
run_list = np.sort(np.unique(monitoring_elements["run_number"].unique()))
print(f"Runs from {run_list[0]} to {run_list[-1]}")

# (Skip to a few lines below if already done previously)
## Use run-registry API to download information about the runs and OMS to get luminosity

In [None]:
runreg_df = pd.DataFrame(columns=["run_number", "class", "cscGOOD", "cscSTANDBY", "cscBAD", "cscEMPTY", "BAD"])
runreg_df = runreg_df.astype({
    "run_number": 'int', 
    "class": 'str', 
    "cscGOOD": 'int', 
    "cscSTANDBY": 'int', 
    "cscBAD": 'int', 
    "cscEMPTY": 'int', 
    "BAD": 'bool'
})

total_runs = len(run_list)
percent_increment = total_runs // 10  # Calculate the increment for each 10%

for i, r in enumerate(run_list):
    run = runregistry.get_run(run_number=int(r))
    try:
        dict = {"run_number": int(r), "class": run["class"], "cscGOOD": 0, "cscSTANDBY":0, "cscBAD":0, "cscEMPTY":0, "BAD": False}
        if 'csc-csc' in run["lumisections"]:
            data_dict = run["lumisections"]["csc-csc"]
            for key in data_dict.keys():
                if key == "GOOD":
                    dict["cscGOOD"] = data_dict["GOOD"]
                if key == "STANDBY":
                    dict["cscSTANDBY"] = data_dict["STANDBY"]
                if key == "BAD":
                    dict["cscBAD"] = data_dict["BAD"]
                if key == "EMPTY":
                    dict["cscEMPTY"] = data_dict["EMPTY"]
            del data_dict
    except:
        dict = {"run_number": int(r), "class": "BAD", "cscGOOD": 0, "cscSTANDBY":0, "cscBAD":0, "cscEMPTY":0, "BAD": True}

    if ("Collisions" not in dict["class"]) or dict["cscSTANDBY"]!=0 or dict["cscBAD"]!=0 or dict["cscGOOD"]==0:
        dict["BAD"] = True

    runreg_df = pd.concat([runreg_df, pd.DataFrame([dict])], ignore_index=True)
    
    # Print progress every 10%
    if i % percent_increment == 0:
        print(f'Progress: {i / total_runs * 100}%')

    del dict
    del run

In [None]:
with open("config.yaml", 'r') as f:
    try:
        info = yaml.safe_load(f)
    except yaml.YAMLError as exc:
        print(f"Cannot read the file: {exc}")
        
omsapi = OMSAPI("https://cmsoms.cern.ch/agg/api", "v1", cert_verify=False)
omsapi.auth_oidc(info["APIClient"]["client_ID"], info["APIClient"]["Client_Secret"])

In [None]:
lumi_dfs = []

total_runs = len(run_list)
percent_increment = total_runs // 10  # Calculate the increment for each 10%

for i, r in enumerate(run_list):
    #print("Run :", r)
    ls_query = omsapi.query("lumisections")
    ls_query.filter("run_number", r)
    ls_query.sort("lumisection_number", asc=False).paginate(page=1, per_page=100000)
    response = ls_query.data().json()["data"]

    df = pd.DataFrame([resp["attributes"] for resp in response])
    lumi_dfs.append(df)

    # Print progress every 10%
    if i % percent_increment == 0:
        print(f'Progress: {i / total_runs * 100}%')

lumi_df = pd.concat(lumi_dfs)

lumi_df['castor_ready'] = lumi_df['castor_ready'].fillna(False)
lumi_df['gem_ready'] = lumi_df['gem_ready'].fillna(False)
lumi_df['zdc_ready'] = lumi_df['zdc_ready'].fillna(False)
lumi_df['prescale_index'] = lumi_df['prescale_index'].fillna(-1)
lumi_df['prescale_name'] = lumi_df['prescale_name'].fillna("")

lumi_df = lumi_df.rename(columns={'lumisection_number': 'ls_number'})
lumi_df["mean_lumi"]=(lumi_df["init_lumi"]+lumi_df["end_lumi"])/2

dtype_dict = {'prescale_name': str, 'rp_time_ready': bool, 
              'rp_sect_56_ready': bool, 'rp_sect_45_ready': bool, 
              'start_time': str, 'end_time': str}
lumi_df = lumi_df.astype(dtype_dict)

### Save to file

In [None]:
path = "/eos/cms/store/group/ml/AD4MVDHackathon/ML4DQM_MUON/run_info"
filename = "run_info.h5"

run_list_df = pd.DataFrame(run_list, columns=['run_list'])

# Save all dataframes to HDF5 file
with pd.HDFStore(f"{path}/{filename}", 'w') as store:
    store['run_list'] = run_list_df
    store['lumi_df'] = lumi_df
    store['runreg_df'] = runreg_df

# (Skip to here if already downloaded)
## Load already downloaded data for run information and luminosity

In [None]:
path = "/eos/cms/store/group/ml/AD4MVDHackathon/ML4DQM_MUON/run_info"
filename = "run_info.h5"

with pd.HDFStore(f"{path}/{filename}", 'r') as store:
    run_list_df = store['run_list']
    lumi_df = store['lumi_df']
    runreg_df = store['runreg_df']
    
#Assert that the runs available in the downloaded info data correspond to the ones needed for this dataset
assert(all(run_list == list(run_list_df["run_list"])))

## Use the run and luminosity information

In [None]:
bad_runs = (runreg_df[runreg_df['BAD']])['run_number'].tolist()

In [None]:
monitoring_elements = monitoring_elements[~monitoring_elements['run_number'].isin(bad_runs)]

In [None]:
lumi_df_dask = dd.from_pandas(lumi_df, npartitions=1)
#Conversion to dask dataframe is necessary for concatenation
monitoring_elements = monitoring_elements.merge(lumi_df_dask, on=['run_number', 'ls_number'], how='left')

In [None]:
monitoring_elements = monitoring_elements[
    (monitoring_elements["beams_stable"] == True) &
    (monitoring_elements["cscm_ready"] == True) &
    (monitoring_elements["cms_active"] == True) &
    (monitoring_elements["beam_present"] == True) &
    (monitoring_elements["physics_flag"] == True)
]

In [None]:
monitoring_elements

In [None]:
#For some reason some types in the table are not well understood and thus it is needed to manually
#specify the type of some columns, using pyarrow types
path = "/eos/cms/store/group/ml/AD4MVDHackathon/ML4DQM_MUON/MEs_with_info"

schema = {
    "data":pa.list_(pa.list_(pa.float64())),
    "recorded_lumi": pa.float64(),
    "prescale_name": pa.string(),
    "start_time": pa.string(),
    "rp_sect_56_ready": pa.bool_(),
    "rp_time_ready": pa.bool_(),
    "delivered_lumi": pa.float64(),
    "end_time": pa.string(),
    "rp_sect_45_ready": pa.bool_()
}

monitoring_elements.to_parquet(f'{path}/{me}_{out_label}_s0.parquet', schema=schema)