In this workbook we will be performing data ingestion and validation for week 6 of Data Glacier's virtual internship.

Summary of original file before modifications (note: changes were made to make it more manageable):
~3285000 rows, 29 columns, 11.2GB. A fifth of this file was read in (so that performance was not a big issue) in various ways and converted to dataframes.

Summary of dataframe after modifications (df_main): 447301 rows, 23 columns, 65.4MB in csv format.

Summary of the small test file: 1000 rows, 23 columns, 102KB.

We tested various ways of reading in a fifth of the original 11.2GB file (with pandas, dask, modin with dask, modin with ray):

In [1]:
import os
path = 'D:/pydatafiles/airline-data'
os.chdir(path)

In [2]:
import time 
import pandas as pd

In [3]:
start_time = time.process_time()
df_pd=pd.read_csv('airline.csv.shuffle', encoding='latin-1', nrows=657267)
print(time.process_time() - start_time, "seconds")

3.0625 seconds


In [4]:
# The size of the file
import os
print(os.path.getsize('airline.csv.shuffle')/1024/1024/1024 , "GB")

11.203072734177113 GB


In [5]:
df_pd.head()

Unnamed: 0,ActualElapsedTime,AirTime,ArrDelay,ArrTime,CRSArrTime,CRSDepTime,CRSElapsedTime,CancellationCode,Cancelled,CarrierDelay,...,Month,NASDelay,Origin,SecurityDelay,TailNum,TaxiIn,TaxiOut,UniqueCarrier,WeatherDelay,Year
0,53.0,32.0,-8.0,1642.0,1650,1545,65.0,,0,,...,10,,DCA,,N443US,7.0,14.0,US,,2002
1,164.0,155.0,-11.0,1754.0,1805,1610,175.0,,0,,...,12,,MCO,,N755,2.0,7.0,WN,,1999
2,60.0,,15.0,2005.0,1950,1850,60.0,,0,,...,12,,ATL,,,,,DL,,1993
3,51.0,,-5.0,1818.0,1823,1728,55.0,,0,,...,9,,MEM,,,,,AA,,1989
4,45.0,29.0,2.0,1120.0,1118,1030,48.0,,0,0.0,...,6,0.0,CVG,0.0,N785CA,3.0,13.0,OH,0.0,2006


In [7]:
#!pip install dask

In [8]:
import dask.dataframe as dd

In [9]:
start_time = time.process_time()
df_dask=dd.read_csv('airline.csv.shuffle',encoding='latin-1',assume_missing=True).head(n=657267)
#df_dask = df_dask.repartition()
#df_dask_computed=df_dask.compute()
print(time.process_time() - start_time, "seconds")

3.671875 seconds


In [10]:
#conda install -c conda-forge modin

In [11]:
#conda update pandas

In [12]:
import os
os.environ["MODIN_ENGINE"] = "dask"
import modin.pandas as pd

In [13]:
#!pip install modin

In [14]:
from dask.distributed import Client
client = Client()
start_time = time.process_time()
df_modin_dask=pd.read_csv('airline.csv.shuffle',encoding='latin-1', nrows=657267)
print(time.process_time() - start_time, "seconds")

1.5625 seconds


Data types of partitions are different! Please refer to the troubleshooting section of the Modin documentation to fix this issue.


In [15]:
os.environ["MODIN_ENGINE"] = "ray"
start_time = time.process_time()
df_modin_ray=pd.read_csv('airline.csv.shuffle',encoding='latin-1', nrows=657267)
print(time.process_time() - start_time, "seconds")

1.125 seconds


From the above code, we found that modin with ray was the fastest method for reading the file.

Let's check how many missing values there are for each column:

In [16]:
df_modin_ray.isnull().sum(axis = 0)

ActualElapsedTime     14043
AirTime              209660
ArrDelay              14043
ArrTime               14025
CRSArrTime                0
CRSDepTime                0
CRSElapsedTime          140
CancellationCode     653375
Cancelled                 0
CarrierDelay         475876
DayOfWeek                 0
DayofMonth                0
DepDelay              12513
DepTime               12513
Dest                      0
Distance               1110
Diverted                  0
FlightNum                 0
LateAircraftDelay    475876
Month                     0
NASDelay             475876
Origin                    0
SecurityDelay        475876
TailNum              199470
TaxiIn               199540
TaxiOut              199471
UniqueCarrier             0
WeatherDelay         475876
Year                      0
dtype: int64

Since modin_ray was fastest, this is the main one we will be referring to.

We manually performed some basic data cleaning to make it more manageable:

In [17]:
df_main=df_modin_ray

In [18]:
df_main.to_csv('df_main.csv')

To request implementation, send an email to feature_requests@modin.org.


In [19]:
#I've removed columns with very high number of missing values (over 50%)
del df_main['CancellationCode']
del df_main['CarrierDelay']
del df_main['LateAircraftDelay']
del df_main['NASDelay']
del df_main['SecurityDelay']
del df_main['WeatherDelay']

In [20]:
df_main.columns = df_main.columns.str.replace(' ', '')

In [21]:
df_main.head()
df_main=df_main.dropna()

In [22]:
df_main.columns=df_main.columns.str.lower()

The code in the following block is from https://github.com/DataGlacier/DSVIICODE/tree/main , it helps us ensure that any data ingested follows the conventions of the YAML file (which will be created in the following section) and also performs some validation and preprocessing 

In [23]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)

def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
#    df.dropna(axis='rows') #drop rows with missing values
#    df.columns = df.columns.str.replace(' ', '') #remove spaces
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


Let's get the columns names line by line, this will be used to create the YAML file:

In [24]:
for col in df_main.columns:
    print(" - "+col.lower())

 - actualelapsedtime
 - airtime
 - arrdelay
 - arrtime
 - crsarrtime
 - crsdeptime
 - crselapsedtime
 - cancelled
 - dayofweek
 - dayofmonth
 - depdelay
 - deptime
 - dest
 - distance
 - diverted
 - flightnum
 - month
 - origin
 - tailnum
 - taxiin
 - taxiout
 - uniquecarrier
 - year


Let's create the YAML file:

In [25]:
%%writefile file.yaml
file_type: csv
dataset_name: testfile
file_name: test_data
table_name: edsurv
inbound_delimiter: ','
outbound_delimiter: '|'
skip_leading_rows: 0
columns:
    - actualelapsedtime
    - airtime
    - arrdelay
    - arrtime
    - crsarrtime
    - crsdeptime
    - crselapsedtime
    - cancelled
    - dayofweek
    - dayofmonth
    - depdelay
    - deptime
    - dest
    - distance
    - diverted
    - flightnum
    - month
    - origin
    - tailnum
    - taxiin
    - taxiout
    - uniquecarrier
    - year

Overwriting file.yaml


Let's read the config file:

In [26]:
import testutility as util
config_data = util.read_config_file("file.yaml")

Let's make sure the data from the config file is correct:

In [27]:
config_data

{'file_type': 'csv',
 'dataset_name': 'testfile',
 'file_name': 'test_data',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 0,
 'columns': ['actualelapsedtime',
  'airtime',
  'arrdelay',
  'arrtime',
  'crsarrtime',
  'crsdeptime',
  'crselapsedtime',
  'cancelled',
  'dayofweek',
  'dayofmonth',
  'depdelay',
  'deptime',
  'dest',
  'distance',
  'diverted',
  'flightnum',
  'month',
  'origin',
  'tailnum',
  'taxiin',
  'taxiout',
  'uniquecarrier',
  'year']}

Let's read the small test file with modin and ray:

In [28]:
os.environ["MODIN_ENGINE"] = "ray"
#client = Client()
df_sample=pd.read_csv('test_data.csv', delimiter=',', encoding='latin-1')


Now let's read the test file using the config file and modin and ray:

In [29]:
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.reset_index(drop=True, inplace=True)
df.head()

Unnamed: 0,actualelapsedtime,airtime,arrdelay,arrtime,crsarrtime,crsdeptime,crselapsedtime,cancelled,dayofweek,dayofmonth,...,distance,diverted,flightnum,month,origin,tailnum,taxiin,taxiout,uniquecarrier,year
0,53,32,-8,1642,1650,1545,65,0,4,10,...,205,0,209,10,DCA,N443US,7,14,US,2002
1,164,155,-11,1754,1805,1610,175,0,4,2,...,1072,0,109,12,MCO,N755,2,7,WN,1999
2,45,29,2,1120,1118,1030,48,0,1,19,...,116,0,5873,6,CVG,N785CA,3,13,OH,2006
3,49,37,2,1137,1135,1048,47,0,4,2,...,156,0,353,1,MYR,N934VJ,6,6,US,1997
4,61,40,-3,1537,1540,1440,60,0,7,20,...,140,0,3281,7,DFW,N286AE,7,14,MQ,2008


Make sure the column names and number of columns are the same for df and the config_data:

In [30]:
util.col_header_val(df,config_data)

column name and column length validation passed


1

Display the column names for df and YAML:

In [31]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['actualelapsedtime', 'airtime', 'arrdelay', 'arrtime', 'crsarrtime',
       'crsdeptime', 'crselapsedtime', 'cancelled', 'dayofweek', 'dayofmonth',
       'depdelay', 'deptime', 'dest', 'distance', 'diverted', 'flightnum',
       'month', 'origin', 'tailnum', 'taxiin', 'taxiout', 'uniquecarrier',
       'year'],
      dtype='object')
columns of YAML are: ['actualelapsedtime', 'airtime', 'arrdelay', 'arrtime', 'crsarrtime', 'crsdeptime', 'crselapsedtime', 'cancelled', 'dayofweek', 'dayofmonth', 'depdelay', 'deptime', 'dest', 'distance', 'diverted', 'flightnum', 'month', 'origin', 'tailnum', 'taxiin', 'taxiout', 'uniquecarrier', 'year']


Print text depending on whether or not the tests are passed:

In [32]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


Let's test the larger dataframe now:

In [33]:
util.col_header_val(df_main,config_data)

column name and column length validation passed


1

Validations have passed and the code is working as intended.