# Basic Test of Converting APS data from CSVs to Pandas Dataframes

Finn O'Shea  
September 02, 2021

In [1]:
# dumb overhead for using external files while they are being edited
%load_ext autoreload
%autoreload 2

In [2]:
import os
import csv

import numpy as np
import pandas as pd
import dask.dataframe as dd
import h5py

from APS_data_collector import APSRunCollector,APSScenarioCollector
from aps_data_handler import APSDataCollector

In [3]:
datapointer = '/sdf/group/ml/datasets/bes_anomaly_data/PSTrips-2020-01-13'
runpointer = 'Run2019-1'
subrunpointer = '2019-02-08-02:42:00'
#subrunpointer = 'referenceData'
datapath = os.path.join(datapointer,runpointer)

### Test the Run Collectors

In [None]:
%%time
aps = APSRunCollector(datapath)
df = aps.collateData()
df.head()

In [43]:
print(np.product(df.shape))
print(df.memory_usage(deep=True).sum())
print(df.dtypes)

31881960
114852008
sector  magnet  feature       units
S1B     H3      Time          s        float32
                CAerrors      None        int8
                CapTempAI     Deg C    float32
                CurrentAI     Amps     float32
                MagTempAI     Deg C    float32
                                        ...   
S15B    H4      CAerrors      None        int8
                CapTempAI     Deg C    float32
                CurrentAI     Amps     float32
                MagTempAI     Deg C    float32
                PeakAbsDevAI  Amps     float32
Length: 9960, dtype: object


In [44]:
%%time
test = pd.HDFStore(os.path.join(datapath,'pandas_storage.h5'))
test['base'] = df
test.close()

CPU times: user 193 ms, sys: 495 ms, total: 688 ms
Wall time: 5.88 s


### Naming the MultiIndex is much slower than creating the multiindex at concat time (in Pandas)

In [54]:
df1 = pd.DataFrame(np.random.rand(1000,6))
df2 = pd.DataFrame(np.random.rand(1000,6))
df3 = pd.DataFrame(np.random.rand(1000,6))

%timeit dfd = pd.concat([df1,df2,df3],axis=1,keys=["fizz","bazz","flip"])

dfd = pd.concat([df1,df2,df3],axis=1,keys=["fizz","bazz","flip"])
dfd.head()

254 µs ± 1.41 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


Unnamed: 0_level_0,fizz,fizz,fizz,fizz,fizz,fizz,bazz,bazz,bazz,bazz,bazz,bazz,flip,flip,flip,flip,flip,flip
Unnamed: 0_level_1,0,1,2,3,4,5,0,1,2,3,4,5,0,1,2,3,4,5
0,0.472276,0.33282,0.592198,0.695907,0.823825,0.659873,0.357168,0.620498,0.265807,0.359424,0.207772,0.449107,0.790607,0.540454,0.061107,0.117042,0.300095,0.346726
1,0.384289,0.06664,0.831452,0.378931,0.067541,0.65338,0.604996,0.316239,0.82148,0.363741,0.637241,0.179462,0.733529,0.106543,0.182755,0.719828,0.291171,0.628923
2,0.714896,0.994184,0.9617,0.408498,0.109999,0.397716,0.919232,0.03688,0.687869,0.177036,0.70876,0.522732,0.387159,0.540464,0.631843,0.596958,0.753865,0.456254
3,0.654557,0.923313,0.913113,0.423568,0.386911,0.267413,0.048419,0.616982,0.024739,0.457064,0.309357,0.56978,0.356678,0.622107,0.66393,0.549415,0.868343,0.120479
4,0.89143,0.459317,0.980282,0.029994,0.828379,0.272915,0.821909,0.094424,0.44725,0.498684,0.010852,0.415361,0.766249,0.677473,0.6481,0.677259,0.317236,0.041473


In [52]:
cols1 = pd.MultiIndex.from_product([["One"],range(6)],names=["First","Second"])
df1 = pd.DataFrame(np.random.rand(1000,6),columns=cols1)
cols2 = pd.MultiIndex.from_product([["Two"],range(6)],names=["First","Second"])
df2 = pd.DataFrame(np.random.rand(1000,6),columns=cols2)
cols3 = pd.MultiIndex.from_product([["Three"],range(6)],names=["First","Second"])
df3 = pd.DataFrame(np.random.rand(1000,6),columns=cols3)

%timeit dfd = pd.concat([df1,df2,df3],axis=1)

555 µs ± 2 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


### Tryout Dask for Opening Lots of CSVs and stitching them together

Doesn't work directly because Dask assumes all the CSVs are continuations of the same columns, whereas the data I have are separate columns.  
Notice that the extra features in the quad and sextupole files have been cut out because they did not appear in the first file, which was S10A:H1.csv, a horizontal corrector.

Futhermore, Dask doesn't allow multiline headers in CSV files, so it imports the units as the first row. 

In [38]:
dfd = dd.read_csv(os.path.join(datapath,'*.csv'))
dfd.head()

Unnamed: 0,Time,CAerrors,S10A:H1:CapTempAI,S10A:H1:CurrentAI,S10A:H1:MagTempAI,S10A:H1:PeakAbsDevAI
0,s,,Deg C,Amps,Deg C,Amps
1,1.549410496000000e+09,0.0,2.9792428970e+01,-1.3616879272e+02,2.5030525208e+01,2.0457183942e-02
2,1.549410560000000e+09,0.0,2.9792428970e+01,-1.3615959167e+02,2.5054944992e+01,2.0457183942e-02
3,1.549410624000000e+09,0.0,2.9743589401e+01,-1.3615959167e+02,2.5006105423e+01,2.0457183942e-02
4,1.549410688000000e+09,0.0,2.9841270447e+01,-1.3615959167e+02,2.5030525208e+01,2.0457183942e-02


### Try to MultiIndex a Dask Dataframe - Works!

In [123]:
dfd = dd.read_csv(os.path.join(datapath,'S9A:Q3.csv'))
units = dfd.loc['0',:].compute().fillna('None').values[0]
dfd = dfd[dfd['Time'] != 's'] # slow, but the only way possible to drop the first row in Dask
colnames = dfd.columns.values
dfd.columns = pd.MultiIndex.from_arrays([colnames,units])

In [120]:
dfd.head()

Unnamed: 0_level_0,Time,CAerrors,S9A:Q3:CapTempAI,S9A:Q3:CurrentAI,S9A:Q3:DacAI,S9A:Q3:IGBTTempAI,S9A:Q3:MagTempAI,S9A:Q3:OutVoltageAI,S9A:Q3:PeakAbsDevAI
Unnamed: 0_level_1,s,None,Deg C,Amps,Amps,Deg C,Deg C,Volts,Amps
1,1549410496.0,0.0,33.821735382,314.34289551,314.34347534,33.79731369,25.616605759,9.377289772,0.01335301809
2,1549410560.0,0.0,33.772891998,314.3505249,314.34347534,33.894992828,25.56776619,9.4505491257,0.01335301809
3,1549410624.0,0.0,33.870574951,314.34289551,314.34347534,33.772891998,25.372406006,9.3528690338,0.01335301809
4,1549410688.0,0.0,33.772891998,314.34289551,314.34347534,33.748474121,25.592185974,9.4261293411,0.01335301809
5,1549410752.0,0.0,33.772891998,314.34289551,314.34347534,33.79731369,25.641025543,9.4261293411,0.01335301809


### OK, read in each CSV as a pandas dataframe, convert to dask, concat them

In [142]:
pd.MultiIndex.from_tuples(list(zip(*[["test"]*len(units),colnames,units]))) # multiindex testing

MultiIndex([('test',                'Time',     's'),
            ('test',            'CAerrors',  'None'),
            ('test',    'S9A:Q3:CapTempAI', 'Deg C'),
            ('test',    'S9A:Q3:CurrentAI',  'Amps'),
            ('test',        'S9A:Q3:DacAI',  'Amps'),
            ('test',   'S9A:Q3:IGBTTempAI', 'Deg C'),
            ('test',    'S9A:Q3:MagTempAI', 'Deg C'),
            ('test', 'S9A:Q3:OutVoltageAI', 'Volts'),
            ('test', 'S9A:Q3:PeakAbsDevAI',  'Amps')],
           )

In [204]:
def pandasOpenCSV(filename,run=None,scenario=None):
    """
    Opens a CSV using the Dask framework and MultiIndexes the columns.
    """
    df = pd.read_csv(filename,header=[0,1])
    cols = df.columns.values
    names = []
    units = []
    for x in cols:
        if x[0] == 'CAerrors': # special case
            units.append('None')
        else:
            units.append(x[1])
        # I can't find a way around the repeating that follows
        nome = x[0].split(':')
        if len(nome) == 3:
            sector = nome[0]
            magnet = nome[1]
            names.append(nome[2])
        else:
            names.append(x[0])
    n_cols = len(cols)
    
    if not run: # user did not give a run name
        run = 'Run'
    if not scenario: # user did not give a scenario name
        scenario = 'Scenario'
    
    ci = zip(*[[run]*n_cols,
               [scenario]*n_cols,
               [sector]*n_cols,
               [magnet]*n_cols,
               colnames,
               units])
    mi = pd.MultiIndex.from_tuples(list(ci))
    
    df.columns = mi # create the full multiindex columns
    return df

def collateScenario(directory):
    """
    Collates all the CSV files from a single directory using dask.
    """
    pass
    

In [205]:
df = pandasOpenCSV(os.path.join(datapath,'S9A:Q3.csv'))

In [207]:
df.head()

Unnamed: 0_level_0,Run,Run,Run,Run,Run,Run,Run,Run,Run
Unnamed: 0_level_1,Scenario,Scenario,Scenario,Scenario,Scenario,Scenario,Scenario,Scenario,Scenario
Unnamed: 0_level_2,S9A,S9A,S9A,S9A,S9A,S9A,S9A,S9A,S9A
Unnamed: 0_level_3,Q3,Q3,Q3,Q3,Q3,Q3,Q3,Q3,Q3
Unnamed: 0_level_4,Time,CAerrors,S9A:Q3:CapTempAI,S9A:Q3:CurrentAI,S9A:Q3:DacAI,S9A:Q3:IGBTTempAI,S9A:Q3:MagTempAI,S9A:Q3:OutVoltageAI,S9A:Q3:PeakAbsDevAI
Unnamed: 0_level_5,s,None,Deg C,Amps,Amps,Deg C,Deg C,Volts,Amps
0,1549410000.0,0,33.821735,314.342896,314.343475,33.797314,25.616606,9.37729,0.013353
1,1549411000.0,0,33.772892,314.350525,314.343475,33.894993,25.567766,9.450549,0.013353
2,1549411000.0,0,33.870575,314.342896,314.343475,33.772892,25.372406,9.352869,0.013353
3,1549411000.0,0,33.772892,314.342896,314.343475,33.748474,25.592186,9.426129,0.013353
4,1549411000.0,0,33.772892,314.342896,314.343475,33.797314,25.641026,9.426129,0.013353


### Figure out which scenarios have pathological data

In [12]:
for fn in os.listdir(datapointer):
    path = os.path.join(datapointer,fn)
    if os.path.isdir(path):
        for gn in os.listdir(path):
            gath = os.path.join(path,gn)
            if os.path.isdir(gath):
                hath = os.path.join(gath,'S1A:H3.csv') # arbitrary file
                try:
                    print("{:s}/{:s}".format(fn,gn))
                    csv = pd.read_csv(hath,header=[0,1],dtype='float')
                except ValueError: # pathological data
                    print("pathological data found in: {:s}/{:s}".format(fn,gn))
                    
# print('special cases:')
# try:
#     special_case = "/sdf/home/f/foshea/bes_anomalies/temp_scenario/S1A:H3.csv"
#     csv = pd.read_csv(special_case,header=[0,1],dtype='float')
# except ValueError: # pathological data
#     print("pathological data found in: {:s}".format(special_case))

# try:
#     special_case = "/sdf/home/f/foshea/bes_anomalies/temp_scenario_2/S1A:H3.csv"
#     csv = pd.read_csv(special_case,header=[0,1],dtype='float')
# except ValueError: # pathological data
#     print("pathological data found in: {:s}".format(special_case))
                    

Run2014-2/2014-08-11-10:19:00
Run2014-2/2014-07-19-16:54:00
Run2014-2/referenceData
Run2014-2/2014-08-11-14:20:00
Run2014-2/2014-06-12-16:27:00
Run2014-3/2014-10-07-04:58:00
Run2014-3/referenceData
Run2015-2/referenceData
Run2015-2/2015-07-09-23:59:00
Run2015-3/2015-10-28-12:52:00
Run2015-3/referenceData
Run2016-1/2016-04-08-19:02:00
Run2016-1/2016-03-07-16:31:00
Run2016-1/referenceData
Run2019-1/2019-02-28-11:35:00
Run2019-1/2019-02-15-06:21:00
Run2019-1/2019-02-08-02:42:00
Run2019-1/referenceData
Run2019-1/2019-02-15-02:59:00
Run2018-3/referenceData
Run2018-3/2018-11-09-16:18:00
Run2018-3/2018-10-10-05:02:00
Run2016-3/2016-10-08-10:42:00
Run2016-3/2016-10-10-01:14:00
Run2016-3/2016-10-09-13:31:00
Run2016-3/2016-12-02-14:14:00
Run2016-3/2016-10-20-23:29:00
Run2016-3/referenceData
Run2016-3/2016-10-20-05:06:00
Run2016-2/2016-06-01-20:26:00
Run2016-2/2016-08-20-10:21:00
Run2016-2/2016-06-15-08:46:00
Run2016-2/referenceData
Run2016-2/2016-08-19-08:26:00
Run2018-2/2018-06-28-19:01:00
Run2

### Create an HDF5 file with all the data in it

Run the first cell here to do the conversion - it took about 70 minutes on a single server with 4 GB of memory.  
The input data was 40 GB.

The other cells are for diagnosing problems.

In [4]:
%%timeit -n1 -r1
aps = APSDataCollector(datapointer,'/sdf/group/ml/datasets/bes_anomaly_data/PSTrips-2020-01-13_ver2.h5')
aps.createGroupStructure()

1h 21min 1s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [45]:
t = h5py.File('/sdf/home/f/foshea/bes_anomalies/PSTrips-2020-01-13.h5','r')
for k,v in t['run2019-3/2019-10-13-21:47:00'.lower()].attrs.items():
    print(k,' :: ',v)
t.close()

badMagnet  ::  V3
badSector  ::  S13A
dirName  ::  Run2019-3/2019-10-13-21:47:00
endDate  ::  2019/10/13
endTime  ::  21:47:00
path  ::  /sdf/group/ml/datasets/bes_anomaly_data/PSTrips-2020-01-13/Run2019-3/2019-10-13-21:47:00
run  ::  Run2019-3
startDate  ::  2019/10/11
startTime  ::  08:01:00


In [50]:
%%timeit -n1 -r1

with h5py.File('test.h5','w') as t:
    filepointer = '/sdf/group/ml/datasets/bes_anomaly_data/PSTrips-2020-01-13/Run2019-1/2019-02-08-02:42:00'
    aps.addMagnetData(filepointer,t)

45.5 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [4]:
with h5py.File('test_2.h5','r') as t:
    pass
t = h5py.File('test_2.h5','r')
sectors = len(t.keys())
print(sectors)
t.close()

80


In [14]:
# t = h5py.File('test.h5')


def crawlHDF(hdf):
    
    def countAll(group,sc,mc,fc):
        s = 0
        m = 0
        f = 0
        try:
            keys = group.keys()
        except AttributeError: # its a dataset
            keys = []
            
        if len(keys) == 0: # count the feature
            f += 1
        else: # group
            for key in keys: # count sub elements
                a,b,c = countAll(group[key],sc,mc,fc)
                s += a
                m += b
                f += c
        # count the current element
        # this counts base as a sector, but no matter, you get 81
        split_name = group.name.split('/')
        if len(split_name) == 2: # sectors are "two deep"
            s += 1
        elif len(split_name) == 3: # magnets are "three deep"
            m += 1
        return sc+s,mc+m,fc+f
        
    return countAll(hdf,0,0,0)
        
    
def visitor_func(name, node):
    if isinstance(node, h5py.Dataset):
         # node is a dataset
        pass
    else:
         # node is a group
        print(name)
# t.visititems(visitor_func)
# print(crawlHDF(t))
# t.close()


### The files in directory Run2016-3/2016-12-02-14:14:00 are broken, fix them

The files in Run2016-3/2016-12-02-14:14:00 have an extra header and units row thrown in the file on lines 7652 and 7653.  The below functions fix this

A similar problem was found in Run2019-3/referenceData on lines 3602 and 3603

In [10]:
%%timeit -n1 -r1

#baddir = '/sdf/group/ml/datasets/bes_anomaly_data/PSTrips-2020-01-13/Run2016-3/2016-12-02-14:14:00'
baddir = '/sdf/group/ml/datasets/bes_anomaly_data/PSTrips-2020-01-13/Run2019-3/referenceData'

for filename in os.listdir(baddir):
    full_path = os.path.join(baddir,filename)
    if filename.split('.')[-1] == 'csv':
        csv = pd.read_csv(full_path,header=[0,1])
        # trade out for the various files, notice the off-by-three difference due to the header and zero index
        #csv.drop(labels=[7649,7650],axis=0,inplace=True) # drop the extra text, Run2016-3
        csv.drop(labels=[3599,3600],axis=0,inplace=True) # drop the extra text, Run2019-3
        csv.rename(columns={'Unnamed: 1_level_1':''},inplace=True) # make the units for CAerrors blank
        try:
            csv.to_csv(os.path.join('/sdf/home/f/foshea/bes_anomalies/temp_scenario_2',filename),index=False,float_format='%.15e')
        except:
            print('{:s} failed to process, what else is wrong with it?'.format(filename))

1min 54s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [17]:
# make sure they are all the same length
checkdir = '/sdf/group/ml/datasets/bes_anomaly_data/PSTrips-2020-01-13/Run2016-3/2016-12-02-14:14:00'
checkdir = '/sdf/home/f/foshea/bes_anomalies/temp_scenario'
all_good = True
for filename in os.listdir(checkdir):
    full_path = os.path.join(checkdir,filename)
    if filename.split('.')[-1] == 'csv':
        csv = pd.read_csv(full_path,header=[0,1])
        # trade out for the various files, notice the off-by-two difference due to the header
        #if len(csv) != 9765: # Run2016-3
        if len(csv) != 8145: # Run2019-3
            all_good *= False
print(all_good)

True


In [23]:
with open(os.path.join(baddir,'S26B:Q3.csv'),'r') as csvfile:
    csv = pd.read_csv(full_path,header=[0,1])
csv.head()
list(csv.columns.get_level_values(0)[1:])

['Time',
 'CAerrors',
 'S15B:H4:CapTempAI',
 'S15B:H4:CurrentAI',
 'S15B:H4:MagTempAI',
 'S15B:H4:PeakAbsDevAI']

In [23]:
csv.columns.set_levels?

A giant markdown to keep Jupyter from pushing the information off the screen all the time.






































































