# **Utility File**

In [None]:

%%writefile testutility.py

import logging  
import os
import subprocess
import yaml
import pandas as pd
import gc
import datetime
import re
from os.path import getsize
import gzip

################
# FILE READING #
################

def read_config_file(filepath):
  with open(filepath , 'r') as stream:
    try:
      return yaml.safe_load(stream)
    except yaml.YAMLError as exc:
      logging.error(exc)


def replacer(string , char):
  pattern = char + '{2,}'
  string = re.sub(pattern , char , string)
  return string

# This function is for removing white spaces , special characters,
# leading and trailing underscores , and replacing double characters 
# with one character

#############################################
# VALIDATION OF DATA COLUMNS WITH YAML FILE #
#############################################

def col_header_val(df , table_config):
  df.columns = df.columns.str.lower()
  df.columns = df.columns.str.replace("[^\w]" , '_' ,regex = True)
  df.columns = list(map(lambda x: x.strip('_') , list(df.columns)))
  df.columns = list(map(lambda x: replacer(x , '_') , list(df.columns)))
  expected_col = list(map(lambda x: x.lower() , table_config['columns']))
  expected_col.sort()
  df.columns = list(map(lambda x: x.lower() , list(df.columns)))
  df = df.reindex(sorted(df.columns) , axis = 1)

  if len(df.columns)== len(expected_col) and list(expected_col)== list(df.columns):
    print("column name and column length validation passed")
    return 1
  else:
    print("column name and column length validation failed")
    mismatched_columns_file = list(set(df.columns).difference(expected_col))
    print("following file columns are not in the YAML file" , mismatched_columns_file)
    missing_YAML_file = list(set(expected_col).difference(df.columns))
    print("following YAML columns are not in the file uploaded" , missing_YAML_file)
    logging.info(f"df columns:{df.columns}")
    logging.info(f"expected columns:{expected_col}")
    return 0 

def file_summary(filepath , df):
  print("size of file is {0} GB".format(round(getsize(filepath)/(1024*1024*1024) , 2)))
  print("the csv file has {0} rows".format(df.shape[0]))
  print("the csv file has {0} columns".format(len(df.columns)))

def write_file(df , root_folder , outfile , outbound_delimiter):
  outfile_name = root_folder + outfile + '.txt'
  df.to_csv(outfile_name, header=None, index=None, sep=outbound_delimiter, mode='a')
  f_in = open(outfile_name , 'rb')
  f_out = gzip.open(f'{outfile}.txt.gz', 'wb')
  f_out.writelines(f_in)
  f_out.close()
  f_in.close()


Writing testutility.py


# **YAML Configuration File**

In [None]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: test_data
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
root_folder: "/content/drive/MyDrive/"
columns: 
    - antiNucleus
    - event_File
    - event_Number
    - event_Time
    - hist_File
    - multiplicity
    - NaboveLb
    - NbelowLb
    - NLb
    - primary_Tracks
    - prod_Time
    - Pt 
    - run_Number
    - vertexx
    - vertexy
    - vertexz


Writing file.yaml


In [1]:
# I saved the large file on my Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# **Dataset Source**

In [None]:
#https://sdm.lbl.gov/fastbit/data/samples.html

# **File Columns**

In [None]:

#the uploaded dataset does not have headers, so i had to add them manually
column_names = ['__antiNucleus' , 'event__File' , 'event__Number' , 'event__Time' , '__hist__File' , 'multiplicity' , 
                'NaboveLb' , 'NbelowLb__' , 'NLb' , 'primary__Tracks' , 'prod__Time' , '__Pt' ,
                'run__Number' , 'vertexX$$' , 'vertexY' , 'vertexZ']


In [None]:
import testutility as util
import time
import pandas as pd
config_data = util.read_config_file("file.yaml")

In [None]:
config_data

{'columns': ['antiNucleus',
  'event_File',
  'event_Number',
  'event_Time',
  'hist_File',
  'multiplicity',
  'NaboveLb',
  'NbelowLb',
  'NLb',
  'primary_Tracks',
  'prod_Time',
  'Pt',
  'run_Number',
  'vertexx',
  'vertexy',
  'vertexz'],
 'dataset_name': 'testfile',
 'file_name': 'test_data',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'root_folder': '/content/drive/MyDrive/',
 'skip_leading_rows': 1,
 'table_name': 'edsurv'}

# **File Reading Using Parameters**

In [None]:
file_type = config_data['file_type']
source_file = config_data['root_folder'] + config_data['file_name'] + f".{file_type}"

# **File Reading Methods**

# **Dask Method**

In [None]:
pip install fsspec

Collecting fsspec
  Downloading fsspec-2021.9.0-py3-none-any.whl (123 kB)
[?25l[K     |██▋                             | 10 kB 21.9 MB/s eta 0:00:01[K     |█████▎                          | 20 kB 25.6 MB/s eta 0:00:01[K     |████████                        | 30 kB 29.0 MB/s eta 0:00:01[K     |██████████▋                     | 40 kB 21.1 MB/s eta 0:00:01[K     |█████████████▎                  | 51 kB 11.5 MB/s eta 0:00:01[K     |████████████████                | 61 kB 11.1 MB/s eta 0:00:01[K     |██████████████████▌             | 71 kB 7.8 MB/s eta 0:00:01[K     |█████████████████████▏          | 81 kB 8.7 MB/s eta 0:00:01[K     |███████████████████████▉        | 92 kB 8.1 MB/s eta 0:00:01[K     |██████████████████████████▌     | 102 kB 8.7 MB/s eta 0:00:01[K     |█████████████████████████████▏  | 112 kB 8.7 MB/s eta 0:00:01[K     |███████████████████████████████▉| 122 kB 8.7 MB/s eta 0:00:01[K     |████████████████████████████████| 123 kB 8.7 MB/s 
[?25hInst

In [None]:
from dask import dataframe as dd


start = time.time()
dask_df = dd.read_csv(source_file , delimiter = config_data['inbound_delimiter']
                      , names = column_names)
end = time.time()
print("Read csv with dask took: ",(end-start),"sec")

Read csv with dask took:  0.6866400241851807 sec


In [None]:
dask_df.head(5)

Unnamed: 0,__antiNucleus,event__File,eventNumber,event__Time,__histFile,multiplicity,NaboveLb,NbelowLb__,NLb,primaryTracks,prodTime,Pt,runNumber,vertexX$$,vertexY,vertxZ
0,1,1613423,807,20011020.0,1613424,4518,0,0,654,1395,20011200.0,10.955403,2288071,-0.288203,0.407312,10.559091
1,1,1613423,808,20011020.0,1613424,886,0,0,61,371,20011200.0,23.326479,2288071,-0.24733,0.455916,57.810596
2,1,1613423,809,20011020.0,1613424,638,0,0,7,121,20011200.0,2.444299,2288071,-0.390961,0.589534,167.75714
3,4,1613423,810,20011020.0,1613424,4259,0,0,1024,1302,20011200.0,9.521868,2288071,-0.290154,0.446027,8.644362
4,5,1613423,811,20011020.0,1613424,3673,1,0,592,1246,20011200.0,13.560424,2288071,-0.257418,0.419689,29.02236


# **Pandas Method**

In [None]:
start = time.time()
pandas_df = pd.read_csv(source_file , delimiter = config_data['inbound_delimiter']
                      , names = column_names)
end = time.time()
print("Read csv with pandas took: ",(end-start),"sec")

Read csv with pandas took:  37.546066999435425 sec


# **Using Pandas Chunksize**

In [None]:
start = time.time()
chunk = pd.read_csv(source_file , delimiter = config_data['inbound_delimiter']
                      , names = column_names , chunksize = 100000)
end = time.time()
print("Read csv with pandas chunk took: ",(end-start),"sec")
pd_chunk_df = pd.concat(chunk)

Read csv with pandas chunk took:  0.13499212265014648 sec


# **Validation Process**

In [None]:
util.col_header_val(pandas_df , config_data)

column name and column length validation passed


1

In [None]:
print("columns of our data" , list(pandas_df.columns))
print("columns of the YAML file" , config_data['columns'])

columns of our data ['antinucleus', 'event_file', 'event_number', 'event_time', 'hist_file', 'multiplicity', 'nabovelb', 'nbelowlb', 'nlb', 'primary_tracks', 'prod_time', 'pt', 'run_number', 'vertexx', 'vertexy', 'vertexz']
columns of the YAML file ['antiNucleus', 'event_File', 'event_Number', 'event_Time', 'hist_File', 'multiplicity', 'NaboveLb', 'NbelowLb', 'NLb', 'primary_Tracks', 'prod_Time', 'Pt', 'run_Number', 'vertexx', 'vertexy', 'vertexz']


In [None]:
if util.col_header_val(pandas_df , config_data) == 0:
  print("Validation failed")
else :
  print("Validation passed")
  util.write_file(pandas_df , config_data['root_folder'] , 'out' , config_data['outbound_delimiter'])
  print("file writing is successful")


column name and column length validation passed
Validation passed
file writing is successful


# **File Information Summary**

In [None]:
util.file_summary(source_file , pandas_df)

size of file is 1.99 GB
the csv file has 15857625 rows
the csv file has 16 columns
