In [1]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

def summarize_data(df, filepath):
    num_rows = len(df)
    num_col = len(df.columns)
    file_size = os.path.getsize(filepath)
    print(f'Number of rows: {num_rows}')
    print(f'Number of columns: {num_col}')
    print(f'File size: {file_size}')

Overwriting utility.py


Writing YAML File

I needed to use a smaller dataset than recommended, as my current computer could not read in a single file without crashing, even with the use of Google Colab.

In [2]:
%%writefile file.yaml
file_type: csv
dataset_name: custom_data
file_name: /content/drive/MyDrive/custom_data.csv
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - modelyear
    - make
    - model
    - vehicleclass
    - enginesize
    - cylinders
    - transmission
    - fueltype
    - fuelconsumption_city
    - cfuelconsumption_highway
    - fuelconsumption_comb
    - fuelconsumption_comb_mpg
    - co2emissions

Overwriting file.yaml


In [3]:
# Read config file
import utility as util
config_data = util.read_config_file("file.yaml")

In [4]:
config_data['inbound_delimiter']

','

In [5]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'custom_data',
 'file_name': '/content/drive/MyDrive/custom_data.csv',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['modelyear',
  'make',
  'model',
  'vehicleclass',
  'enginesize',
  'cylinders',
  'transmission',
  'fueltype',
  'fuelconsumption_city',
  'cfuelconsumption_highway',
  'fuelconsumption_comb',
  'fuelconsumption_comb_mpg',
  'co2emissions']}

In [6]:
#normal pandas process of the file
import pandas as pd
df_sample = pd.read_csv("/content/drive/MyDrive/custom_data.csv",delimiter=',')
df_sample.head()

Unnamed: 0,modelyear,make,model,vehicleclass,enginesize,cylinders,transmission,fueltype,fuelconsumption_city,cfuelconsumption_highway,fuelconsumption_comb,fuelconsumption_comb_mpg,co2emissions
0,2014,ACURA,ILX,COMPACT,2.0,4,AS5,Z,9.9,6.7,8.5,33,196
1,2014,ACURA,ILX,COMPACT,2.4,4,M6,Z,11.2,7.7,9.6,29,221
2,2014,ACURA,ILX HYBRID,COMPACT,1.5,4,AV7,Z,6.0,5.8,5.9,48,136
3,2014,ACURA,MDX 4WD,SUV - SMALL,3.5,6,AS6,Z,12.7,9.1,11.1,25,255
4,2014,ACURA,RDX AWD,SUV - SMALL,3.5,6,AS6,Z,12.1,8.7,10.6,27,244


Testing different versions of reading in csv file

In [7]:
%%time
import pandas as pd
# read the file using config file
file_type = config_data['file_type']
source_file = config_data['file_name']
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()

CPU times: user 10.7 ms, sys: 3.48 ms, total: 14.2 ms
Wall time: 23.8 ms


  exec(code, glob, local_ns)


Unnamed: 0,modelyear,make,model,vehicleclass,enginesize,cylinders,transmission,fueltype,fuelconsumption_city,cfuelconsumption_highway,fuelconsumption_comb,fuelconsumption_comb_mpg,co2emissions
0,2014,ACURA,ILX,COMPACT,2.0,4,AS5,Z,9.9,6.7,8.5,33,196
1,2014,ACURA,ILX,COMPACT,2.4,4,M6,Z,11.2,7.7,9.6,29,221
2,2014,ACURA,ILX HYBRID,COMPACT,1.5,4,AV7,Z,6.0,5.8,5.9,48,136
3,2014,ACURA,MDX 4WD,SUV - SMALL,3.5,6,AS6,Z,12.7,9.1,11.1,25,255
4,2014,ACURA,RDX AWD,SUV - SMALL,3.5,6,AS6,Z,12.1,8.7,10.6,27,244


In [8]:
%%time
#dask read of config
import dask.dataframe as dd
test_dd = dd.read_csv(source_file)

CPU times: user 446 ms, sys: 50.4 ms, total: 496 ms
Wall time: 568 ms


In [9]:
%%time
import modin.pandas as mpd
test_mod = mpd.read_csv(source_file,config_data['inbound_delimiter'])


    import ray
    ray.init()

2022-10-13 05:14:04,633	INFO worker.py:1518 -- Started a local Ray instance.
  File "/usr/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/usr/local/lib/python3.7/dist-packages/ipykernel_launcher.py", line 16, in <module>
    app.launch_new_instance()
  File "/usr/local/lib/python3.7/dist-packages/traitlets/config/application.py", line 846, in launch_instance
    app.start()
  File "/usr/local/lib/python3.7/dist-packages/ipykernel/kernelapp.py", line 612, in start
    self.io_loop.start()
  File "/usr/local/lib/python3.7/dist-packages/tornado/platform/asyncio.py", line 132, in start
    self.asyncio_loop.run_forever()
  File "/usr/lib/python3.7/asyncio/base_events.py", line 541, in run_forever
    self._run_once()
  File "/usr/lib/python3.7/asyncio/base_events.py", line 1786, in _run_once
    handle._run()
  File "/usr/lib/p

CPU times: user 994 ms, sys: 199 ms, total: 1.19 s
Wall time: 9.89 s


In [10]:
%%time
import ray
test_ray = ray.data.read_csv(source_file)

CPU times: user 64.1 ms, sys: 13.9 ms, total: 78 ms
Wall time: 1.39 s


It appears that for this dataset, the fastest computational time was that of regular pandas.

In [11]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation passed


1

In [12]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['modelyear', 'make', 'model', 'vehicleclass', 'enginesize', 'cylinders',
       'transmission', 'fueltype', 'fuelconsumption_city',
       'cfuelconsumption_highway', 'fuelconsumption_comb',
       'fuelconsumption_comb_mpg', 'co2emissions'],
      dtype='object')
columns of YAML are: ['modelyear', 'make', 'model', 'vehicleclass', 'enginesize', 'cylinders', 'transmission', 'fueltype', 'fuelconsumption_city', 'cfuelconsumption_highway', 'fuelconsumption_comb', 'fuelconsumption_comb_mpg', 'co2emissions']


In [13]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [14]:
df = pd.read_csv(r'/content/drive/MyDrive/custom_data.csv') #read inputfile in a dataframe
df.to_csv(r'/content/drive/MyDrive/custom_data.gz', sep = '|', index=False) #write dataframe df to the outputfile with pipe delimited

In [15]:
import os
def summarize_data(df, filepath):
    num_rows = len(df)
    num_col = len(df.columns)
    file_size = os.path.getsize(filepath)
    print(f'Number of rows: {num_rows}')
    print(f'Number of columns: {num_col}')
    print(f'File size: {file_size}')

In [16]:
summarize_data(df, '/content/drive/MyDrive/custom_data.csv')

Number of rows: 1067
Number of columns: 13
File size: 73703
