https://pythonhosted.org/jupyter_runner/

In [61]:
#TODO handling big data for tests
import os
import sys

cachedDirPath = os.environ.get('CACHED_PATH')
cachedDirPath = cachedDirPath + "/" if cachedDirPath is not None else "./cached/"
if os.environ.get('RCT_MOCKDATA_GENERATOR_DEL_CACHE') != 'false':
    os.system(f"rm -rf {cachedDirPath}")
os.system(f"mkdir -p {cachedDirPath}")

0

In [62]:
RCT_USER = os.environ.get('RCT_USER')
RCT_PASSWORD = os.environ.get('RCT_PASSWORD')
RCT_DATABASE = os.environ.get('RCT_DATABASE')
RCT_DATABASE_HOST = os.environ.get('RCT_DATABASE_HOST')

In [63]:
os.system(sys.executable + " -m pip install pandas")
os.system(sys.executable + " -m pip install numpy")

0

In [64]:
import pandas as pd
import numpy as np
import re
from collections import defaultdict
from numpy.random import randint, uniform, choice

In [65]:
gen_rand_letter = lambda: chr(np.random.randint(ord('a'), ord('z')))
gen_rand_char = lambda: chr(np.random.randint(ord('a'), ord('z')))
gen_rand_string = lambda n: ''.join([gen_rand_char() for _ in range(n)])

In [66]:
print("creating tables")

creating tables


In [67]:
def read_csv(path):
    return pd.read_csv(path, index_col=0)

# Beam directory

In [68]:
cached_beams_dictionary_df_path = cachedDirPath + "beams_dictionary_df"

beams_types = ['PbPb', 'pp', 'pPb', 'nn', 'np']
beams_dictionary = [(i, bt) for i, bt in enumerate(beams_types)]

if not os.path.exists(cached_beams_dictionary_df_path):
    beams_dictionary_df = pd.DataFrame(beams_dictionary)
    beams_dictionary_df.rename(columns={
        0:'id',
        1:'beam_type',
    }, inplace=True)
    beams_dictionary_df.to_csv(cached_beams_dictionary_df_path)
else:
    beams_dictionary_df = read_csv(cached_beams_dictionary_df_path)

beams_dictionary_df

Unnamed: 0,id,beam_type
0,0,PbPb
1,1,pp
2,2,pPb
3,3,nn
4,4,np


# Periods

In [69]:
cached_periods_df_path = cachedDirPath + "periods_df"

size = 30
years = [str(y) for y in range(2010, 2021)]
periods_names = np.unique([f'LHC{choice(years)}{gen_rand_letter()}' for i in range(size)])
periods_names[0] = "LHC2000."
beams_types_id = [randint(0, len(beams_types)) for _ in range(len(periods_names))]

if not os.path.exists(cached_periods_df_path):
    periods = [(i, n[:3] + n[5:], int(n[3:7]), t) for (i, (n, t)) in enumerate(zip(periods_names, beams_types_id))]
    periods_df = pd.DataFrame(periods)
    periods_df.rename(columns={
        0: 'id',
        1: 'name',
        2: 'year',
        3: 'beam_type_id'
    }, inplace=True)
    periods_df.loc[0, "beam_type_id"] = 1
    periods_df.to_csv(cached_periods_df_path)
else:
    periods_df = read_csv(cached_periods_df_path)

periods_df[:10]

Unnamed: 0,id,name,year,beam_type_id
0,0,LHC00.,2000,1
1,1,LHC10j,2010,1
2,2,LHC10p,2010,4
3,3,LHC10r,2010,4
4,4,LHC11c,2011,4
5,5,LHC11v,2011,2
6,6,LHC11w,2011,2
7,7,LHC12c,2012,1
8,8,LHC12l,2012,4
9,9,LHC12v,2012,2


# Runs

In [70]:
gen_B_field = lambda: f'{choice(["+", "-"])}{uniform(0, 2):.7} T'

In [71]:
cached_runs_df_path = cachedDirPath + "runs_df"

if not os.path.exists(cached_runs_df_path):
    runs = [np.unique(randint(
                                    pi*1000,
                                    (pi+1)*1000,
                                    np.random.randint(25, 60)))
                    for pi in range(len(periods_names))]
    runTypes = ['technical', 'data', 'cosmic', 'callibration', 'sim']
    energyForPeriodsRuns = dict([(i, randint(500, 1500)) for i in range(len(periods_names))])
    runs_df = pd.DataFrame([
                        (
                      -1, 
                      pi,
                      run_number,
                      randint(1000, 10000),
                      randint(10000, 100000),
                      gen_B_field(),
                      energyForPeriodsRuns[pi],
                      f'IR-{gen_rand_string(5)}',
                      randint(12345, 23456), 
                      f'trigg_conf-{gen_rand_string(5)}', 
                      randint(123456, 234567), 
                      choice(runTypes),
                      f'mu-{gen_rand_string(6)}', 
                      randint(1000000000000,5999999999999), 
                      randint(6000000000000,9999999999999)
                    ) for pi, runs in enumerate(runs)
                        for run_number in runs
                    ])

    runs_df.rename(columns={
        0: 'id',
        1: 'period_id',
        2: 'run_number',
        3: 'start',
        4: 'end',
        5: 'b_field',
        6: 'energy_per_beam',
        7: 'ir',
        8: 'filling_scheme',
        9: 'triggers_conf',
        10: 'fill_number',
        11: 'run_type',
        12: 'mu',
        13: 'time_trg_start',
        14: 'time_trg_end'
    }, inplace=True)
    runs_df['id'] = pd.Series(range(0, len(runs_df)))
    
    runs_df.to_csv(cached_runs_df_path)
else:
    runs_df = read_csv(cached_runs_df_path)
  
runs_df

Unnamed: 0,id,period_id,run_number,start,end,b_field,energy_per_beam,ir,filling_scheme,triggers_conf,fill_number,run_type,mu,time_trg_start,time_trg_end
0,0,0,17,9370,48973,-0.08293881 T,585,IR-bjlro,15489,trigg_conf-ntxvp,179869,sim,mu-apmxcl,4721362316946,8531188373067
1,1,0,34,9800,59892,+1.641294 T,585,IR-lgtad,15056,trigg_conf-luujb,201604,sim,mu-hrouvb,5111424356062,7601457801419
2,2,0,41,2698,23438,-0.7429934 T,585,IR-kqjok,18017,trigg_conf-yxshr,234434,technical,mu-firkqi,5387157056949,9042016167880
3,3,0,52,5964,27423,+1.784077 T,585,IR-hrrxc,22125,trigg_conf-yuyyo,197848,technical,mu-wkoxyq,4570073914088,7812417961169
4,4,0,57,8461,90693,-0.9407322 T,585,IR-ylhpb,17846,trigg_conf-ynkrg,224526,callibration,mu-doqqsy,2825967680837,7360353334636
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1283,1283,29,29948,6504,27292,+1.883185 T,1350,IR-pkywn,17140,trigg_conf-tmcyc,178440,sim,mu-rsglld,2579703265329,8291791555381
1284,1284,29,29957,3951,26429,-0.2927248 T,1350,IR-oggpp,14888,trigg_conf-avdxq,192454,callibration,mu-sorxwv,5641389273054,9279888990315
1285,1285,29,29960,9366,31635,-1.662224 T,1350,IR-ymdio,17903,trigg_conf-ywepf,231464,technical,mu-reotme,3320405065725,9665123282332
1286,1286,29,29964,9878,26640,+1.806893 T,1350,IR-nsxup,23059,trigg_conf-fmlyj,202820,sim,mu-ddvwcl,2062670919627,8186084677357


# Data Passess

### pass_types

In [72]:
cached_pass_types_df_path = cachedDirPath + "pass_types_df"

if not os.path.exists(cached_pass_types_df_path):
    pass_types = ['technical', 'data', 'calibration']
    pass_types = [(i, bt) for i, bt in enumerate(pass_types)]
    pass_types_df = pd.DataFrame(pass_types)
    pass_types_df.rename(columns={
        0:'id',
        1:'pass_type',
    }, inplace=True)

    pass_types_df.to_csv(cached_pass_types_df_path)
else:
    pass_types_df = read_csv(cached_pass_types_df_path)
    
pass_types_df

Unnamed: 0,id,pass_type
0,0,technical
1,1,data
2,2,calibration


### data_passes

In [74]:
cached_data_passes_df_path = cachedDirPath + "data_passes_df"


if not os.path.exists(cached_data_passes_df_path):
    data_passes_names = [periods_df[['id','name']].sample().iloc[0] for _ in range(70)]
    for i in range(len(data_passes_names)):
        data_passes_names[i]['name']  += '_' + gen_rand_string(10) 
    data_passes_df = pd.DataFrame([
        (i,
        n, 
        choice(['dec', '']), 
        randint(0, len(pass_types)),
        choice(['jira-', '']),
        choice(['ML-', '']),
        randint(10, 100), 
        f'sof-v.{randint(5)}.{randint(5)}-{gen_rand_string(2)}',
        123456,
        ) for i, (period_id, n) in enumerate(data_passes_names)
    ])
    data_passes_df.rename(columns={
        0: 'id',
        1: 'name',
        2: 'description',
        3: 'pass_type',
        4: 'jira',
        5: 'ml',
        6: 'number_of_events',
        7: 'software_version',
        8: 'size',
    }, inplace=True)

    data_passes_df.to_csv(cached_data_passes_df_path)
else:
    data_passes_df = read_csv(cached_data_passes_df_path)

data_passes_df




Unnamed: 0,id,name,description,pass_type,jira,ml,number_of_events,software_version,size,period_id
0,0,LHC11w,,2,jira-,ML-,65,sof-v.4.2-oq,123456,6
1,1,LHC11c,,2,jira-,ML-,91,sof-v.0.4-op,123456,4
2,2,LHC18b,,0,jira-,,24,sof-v.3.2-qm,123456,23
3,3,LHC16b,,1,jira-,,65,sof-v.3.2-aq,123456,18
4,4,LHC10p,,1,jira-,,71,sof-v.3.2-hq,123456,2
...,...,...,...,...,...,...,...,...,...,...
65,65,LHC16i,,2,jira-,,56,sof-v.2.3-pi,123456,20
66,66,LHC20j,,2,,ML-,19,sof-v.3.0-tr,123456,28
67,67,LHC10p,,2,jira-,,92,sof-v.4.4-ij,123456,2
68,68,LHC14o,dec,1,jira-,,78,sof-v.2.2-iv,123456,15


### data_passes_runs

In [None]:
cached_data_passes_runs_path = cachedDirPath + "data_passes_runs"

if not os.path.exists(cached_data_passes_runs_path):
    data_passes_runs = [runs_df['id'].sample(n=randint(10, 60), replace=False).unique()
                        for an in range(len(data_passes_df))]
    data_passes_runs_df = pd.DataFrame([
        (-1,
        prod_id,
        run_id
        )
        for prod_id, rs in enumerate(data_passes_runs)
            for run_id in rs
    ])
    data_passes_runs_df.rename(columns={
        0: 'id',
        1: 'data_pass_id',
        2: 'run_id'
    }, inplace=True)
    data_passes_runs_df['id'] = pd.Series(range(len(data_passes_runs_df)))

    data_passes_runs_df.to_csv(cached_data_passes_runs_path)
else:
    data_passes_runs_df = read_csv(cached_data_passes_runs_path)

data_passes_runs_df

# Sim passes

### simulation_passes

In [None]:

cached_simulation_passes_df_path = cachedDirPath + "simulation_passes_df"

if not os.path.exists(cached_simulation_passes_df_path):
    simulation_passes_names = [choice(periods_names) + '__' + gen_rand_string(10) for _ in range(100)]
    simulation_passes_df = pd.DataFrame([
        (i,
        n, 
        choice(['dec', '']), 
        choice(['jira-??', '']),
        choice(['ML-??', '']),
        f'PWG-{gen_rand_string(10)}', 
        randint(10, 100)
        ) for i, n in enumerate(simulation_passes_names)
    ])
    simulation_passes_df.rename(columns={
        0: 'id',
        1: 'name',
        2: 'description',
        3: 'jira',
        4: 'ml',
        5: 'pwg',
        6: 'number_of_events'
    }, inplace=True)

    simulation_passes_df.to_csv(cached_simulation_passes_df_path)
else:
    simulation_passes_df = read_csv(cached_simulation_passes_df_path)

simulation_passes_df

### simulation_passes_runs

In [None]:
cached_simulation_passes_runs_path = cachedDirPath + "simulation_passes_runs"

if not os.path.exists(cached_simulation_passes_runs_path):
    simulation_passes_runs = [runs_df['id']
                            .sample(n=randint(10, 100), replace=False)
                            .unique() for an in range(len(simulation_passes_df))
                            ]
    simulation_passes_runs_df = pd.DataFrame([
        (-1,
        prod_id,
        run_id
        )
        for prod_id, rs in enumerate(simulation_passes_runs)
            for run_id in rs
    ])
    simulation_passes_runs_df.rename(columns={
        0: 'id',
        1: 'simulation_pass_id',
        2: 'run_id'
    }, inplace=True)
    simulation_passes_runs_df['id'] = pd.Series(range(len(simulation_passes_runs_df)))

    simulation_passes_runs_df.to_csv(cached_simulation_passes_runs_path)
else:
    simulation_passes_runs_df = read_csv(cached_simulation_passes_runs_path)
simulation_passes_runs_df

### detectors_subsystems

In [None]:
cached_detectors_subsystems_df_path = cachedDirPath + "detectors_subsystems_df"

if not os.path.exists(cached_detectors_subsystems_df_path):
    detectors_names = ['CPV', 'EMC', 'FDD', 'FT0', 'FV0', 'HMP', 'ITS', 'MCH', 'MFT', 'MID', 'PHS', 'TOF', 'TPC', 'TRD', 'ZDC']
    detectors_subsystems = [(i, n) for i, n in enumerate(detectors_names)]
    detectors_subsystems_df = pd.DataFrame(detectors_subsystems)
    detectors_subsystems_df.rename(columns={
        0: 'id',
        1: 'name'
    }, inplace=True)


    detectors_subsystems_df.to_csv(cached_detectors_subsystems_df_path)
else:
    detectors_subsystems_df = read_csv(cached_detectors_subsystems_df_path)

detectors_subsystems_df

### runs_detectors

In [None]:
cached_runs_detectors_df_path = cachedDirPath + "runs_detectors_df"

if not os.path.exists(cached_runs_detectors_df_path):
    runs_detectors = [(
                    run_id, 
                    choice(list(range(len(detectors_subsystems_df))),
                            replace=False,
                            size=randint(1, len(detectors_subsystems_df)//3))
                    ) for run_id in range(len(runs_df))]
    runs_detectors_df = pd.DataFrame([(-1,
                                    run_id,
                                    detector_id)
                                    for run_id, an in runs_detectors
                                        for detector_id in an
                                    ])
    runs_detectors_df.rename(columns={
        0: 'id',
        1: 'run_id',
        2: 'detector_id'
    }, inplace=True)
    runs_detectors_df['id'] = pd.Series(range(len(runs_detectors_df)))


    runs_detectors_df.to_csv(cached_runs_detectors_df_path)
else:
    runs_detectors_df = read_csv(cached_runs_detectors_df_path)

runs_detectors_df

### flags_dictionary

In [None]:
cached_flags_dictionary_df_path = cachedDirPath + "flags_dictionary_df"

if not os.path.exists(cached_flags_dictionary_df_path):
    flags = ['ok', 'good', 'noise', 'dist', 'harm', 'chaotic', 'clear', 'heh']
    flags_dictionary = [(i, f) for i, f in enumerate(flags)]
    flags_dictionary_df = pd.DataFrame(flags_dictionary)
    flags_dictionary_df.rename(columns={0: 'id', 1: 'flag'}, inplace=True)

    flags_dictionary_df.to_csv(cached_flags_dictionary_df_path)
else:
    flags_dictionary_df = read_csv(cached_flags_dictionary_df_path)
flags_dictionary_df

### quality_control_flags

In [None]:

cached_quality_control_flags_df_path = cachedDirPath + "quality_control_flags_df"

if not os.path.exists(cached_quality_control_flags_df_path):
    quality_control_flags_df = pd.merge(data_passes_runs_df.rename(columns={'id':'pass_run_id'}),
                                runs_detectors_df.rename(columns={'id':'run_detector_id'}),
                                how='inner',
                                on='run_id')
    quality_control_flags_df.drop(columns=['data_pass_id', 'detector_id', 'run_id'], inplace=True)
    quality_control_flags_df['start'] = pd.Series([randint(1000000, 5999999)
                                            for _ in range(len(quality_control_flags_df))])
    quality_control_flags_df['end'] = pd.Series([randint(6000000, 9999999)
                                            for _ in range(len(quality_control_flags_df))])

    quality_control_flags_df['flag_type_id'] = pd.Series([flags_dictionary_df['id'].sample().iloc[0]
                                                    for _ in range(len(quality_control_flags_df))])

    quality_control_flags_df['id'] = pd.Series(range(len(quality_control_flags_df)))
    quality_control_flags_df['comment'] = pd.Series([choice(['', 'cc'], p=[0.6, 0.4])
                                                    for _ in range(len(quality_control_flags_df))])


    quality_control_flags_df.to_csv(cached_quality_control_flags_df_path)
else:
    quality_control_flags_df = read_csv(cached_quality_control_flags_df_path)
    
quality_control_flags_df


# Inserting

In [None]:
os.system(sys.executable + " -m pip install psycopg2-binary")
import psycopg2 as pypg

In [None]:
connection = pypg.connect(host=RCT_DATABASE_HOST,
                          user=RCT_USER,
                          dbname=RCT_DATABASE,
                          password=RCT_PASSWORD)
cur = connection.cursor()

In [None]:
import string

def isfloat(s):
    b = True
    try:
        float(s)
    except Exception as e:
        b = False
    return b

def insert_row(row, targetTableName, counter, logExceptions, logstep=1000, wholeDataSize=''):
    selectors_stm = "(\"id\", \"" + "\", \"".join(row.index) + "\")"
    values = [str(a) for a in row]
    values_list = "(DEFAULT, " + ", ".join([s if isfloat(s) else f"\'{s}\'" for s in values])+ ")"

    command = f"INSERT INTO {targetTableName} {selectors_stm} VALUES {values_list}"
    try:
        cur.execute(command)
        connection.commit()
        counter[0] += 1
    except Exception as e:
        if logExceptions:
            print('\n ', end="")
            print(e)
            print(f' inserting to table {targetTableName} {counter}', end='\x1b\r')
        connection.rollback()       
    counter[1] += 1
    if counter[0] % logstep:
        print(f' inserting to table {targetTableName} {counter} / {wholeDataSize}', end='\x1b\r')
        
def insert_table_row_by_row(df: pd.DataFrame, targetTableName: str, logExceptions=True):
    counter = [0, 0]
    print(f' inserting to table {targetTableName} {counter}', end='\x1b\r')
    df.drop(columns=['id']).apply(lambda r:
                                      insert_row(r, targetTableName, counter, logExceptions, wholeDataSize=len(df)),
                                  axis=1)
    print(f' inserting to table {targetTableName} {counter}')



In [None]:
tablesAndNames = [(beams_dictionary_df, 'beams_dictionary'),
                  (periods_df, 'periods'),
                  
                  (runs_df, 'runs'),
                  
                  (pass_types_df, 'pass_types'),
                  (data_passes_df, 'data_passes'),
                  (data_passes_runs_df, 'data_passes_runs'),
                  
                  (simulation_passes_df, 'simulation_passes'),
                  (simulation_passes_runs_df, 'simulation_passes_runs'),
                  
                  (detectors_subsystems_df, 'detectors_subsystems'),
                  (runs_detectors_df, 'runs_detectors'),
                  
                  (flags_dictionary_df, 'flags_types_dictionary'),
                  (quality_control_flags_df, 'quality_control_flags')
                 ]

logExceptions=os.environ.get("LOG_EXCEPTIONS")
logExceptions = True if logExceptions == "true" else False

for (t, n) in tablesAndNames:
    print(f'inserting table {n}')
    insert_table_row_by_row(t, n, logExceptions=logExceptions)
    print(f'table {n} inserted')