## Create testutility.py

In [1]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting testutility.py


## Write YAML file

In [2]:
%%writefile file.yaml
file_type: csv
dataset_name: flights
file_name: Combined_Flights_2021
file_path: data/Combined_Flights_2021.csv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
    - FlightDate
    - Airline
    - Origin
    - Dest
    - Cancelled
    - Diverted
    - CRSDepTime
    - DepTime
    - DepDelayMinutes
    - DepDelay
    - ArrTime
    - ArrDelayMinutes
    - AirTime
    - CRSElapsedTime
    - ActualElapsedTime
    - Distance
    - Year
    - Quarter
    - Month
    - DayofMonth
    - DayOfWeek
    - Marketing_Airline_Network
    - Operated_or_Branded_Code_Share_Partners
    - DOT_ID_Marketing_Airline
    - IATA_Code_Marketing_Airline
    - Flight_Number_Marketing_Airline
    - Operating_Airline
    - DOT_ID_Operating_Airline
    - IATA_Code_Operating_Airline
    - Tail_Number
    - Flight_Number_Operating_Airline
    - OriginAirportID
    - OriginAirportSeqID
    - OriginCityMarketID
    - OriginCityName
    - OriginState
    - OriginStateFips
    - OriginStateName
    - OriginWac
    - DestAirportID
    - DestAirportSeqID
    - DestCityMarketID
    - DestCityName
    - DestState
    - DestStateFips
    - DestStateName
    - DestWac
    - DepDel15
    - DepartureDelayGroups
    - DepTimeBlk
    - TaxiOut
    - WheelsOff
    - WheelsOn
    - TaxiIn
    - CRSArrTime
    - ArrDelay
    - ArrDel15
    - ArrivalDelayGroups
    - ArrTimeBlk
    - DistanceGroup
    - DivAirportLandings

Overwriting file.yaml


In [3]:
# Read config file
import testutility as util
config_data = util.read_config_file("file.yaml")

In [4]:
#inspecting data of config file
config_data

{'file_type': 'csv',
 'dataset_name': 'flights',
 'file_name': 'Combined_Flights_2021',
 'file_path': 'data/Combined_Flights_2021.csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['FlightDate',
  'Airline',
  'Origin',
  'Dest',
  'Cancelled',
  'Diverted',
  'CRSDepTime',
  'DepTime',
  'DepDelayMinutes',
  'DepDelay',
  'ArrTime',
  'ArrDelayMinutes',
  'AirTime',
  'CRSElapsedTime',
  'ActualElapsedTime',
  'Distance',
  'Year',
  'Quarter',
  'Month',
  'DayofMonth',
  'DayOfWeek',
  'Marketing_Airline_Network',
  'Operated_or_Branded_Code_Share_Partners',
  'DOT_ID_Marketing_Airline',
  'IATA_Code_Marketing_Airline',
  'Flight_Number_Marketing_Airline',
  'Operating_Airline',
  'DOT_ID_Operating_Airline',
  'IATA_Code_Operating_Airline',
  'Tail_Number',
  'Flight_Number_Operating_Airline',
  'OriginAirportID',
  'OriginAirportSeqID',
  'OriginCityMarketID',
  'OriginCityName',
  'OriginState',
  'OriginStateFips',
  'OriginStateNam

## Load data

##### Pandas:

In [5]:
# read the file using config file
from datetime import datetime
import pandas as pd

file_type = config_data['file_type']
file_path = config_data['file_path']

t1 = datetime.now()

df = pd.read_csv(file_path, config_data['inbound_delimiter'])

t2 = datetime.now()

time_pandas = t2-t1

df.head()

  df = pd.read_csv(file_path, config_data['inbound_delimiter'])


Unnamed: 0,FlightDate,Airline,Origin,Dest,Cancelled,Diverted,CRSDepTime,DepTime,DepDelayMinutes,DepDelay,...,WheelsOff,WheelsOn,TaxiIn,CRSArrTime,ArrDelay,ArrDel15,ArrivalDelayGroups,ArrTimeBlk,DistanceGroup,DivAirportLandings
0,2021-03-03,SkyWest Airlines Inc.,SGU,PHX,False,False,724,714.0,0.0,-10.0,...,724.0,813.0,5.0,843,-25.0,0.0,-2.0,0800-0859,2,0.0
1,2021-03-03,SkyWest Airlines Inc.,PHX,SGU,False,False,922,917.0,0.0,-5.0,...,940.0,1028.0,3.0,1040,-9.0,0.0,-1.0,1000-1059,2,0.0
2,2021-03-03,SkyWest Airlines Inc.,MHT,ORD,False,False,1330,1321.0,0.0,-9.0,...,1336.0,1445.0,16.0,1530,-29.0,0.0,-2.0,1500-1559,4,0.0
3,2021-03-03,SkyWest Airlines Inc.,DFW,TRI,False,False,1645,1636.0,0.0,-9.0,...,1703.0,1955.0,7.0,2010,-8.0,0.0,-1.0,2000-2059,4,0.0
4,2021-03-03,SkyWest Airlines Inc.,PHX,BFL,False,False,1844,1838.0,0.0,-6.0,...,1851.0,1900.0,3.0,1925,-22.0,0.0,-2.0,1900-1959,2,0.0


In [6]:
print(f"It take {time_pandas} to load dataset using Pandas")

It take 0:00:23.781261 to load dataset using Pandas


##### Dask

In [14]:
import dask.dataframe as dd

t1 = datetime.now()

df = dd.read_csv(file_path, delimiter = config_data['inbound_delimiter'])

t2 = datetime.now()

time_dask = t2-t1

df.head()

Unnamed: 0,FlightDate,Airline,Origin,Dest,Cancelled,Diverted,CRSDepTime,DepTime,DepDelayMinutes,DepDelay,...,WheelsOff,WheelsOn,TaxiIn,CRSArrTime,ArrDelay,ArrDel15,ArrivalDelayGroups,ArrTimeBlk,DistanceGroup,DivAirportLandings
0,2021-03-03,SkyWest Airlines Inc.,SGU,PHX,False,False,724,714.0,0.0,-10.0,...,724.0,813.0,5.0,843,-25.0,0.0,-2.0,0800-0859,2,0.0
1,2021-03-03,SkyWest Airlines Inc.,PHX,SGU,False,False,922,917.0,0.0,-5.0,...,940.0,1028.0,3.0,1040,-9.0,0.0,-1.0,1000-1059,2,0.0
2,2021-03-03,SkyWest Airlines Inc.,MHT,ORD,False,False,1330,1321.0,0.0,-9.0,...,1336.0,1445.0,16.0,1530,-29.0,0.0,-2.0,1500-1559,4,0.0
3,2021-03-03,SkyWest Airlines Inc.,DFW,TRI,False,False,1645,1636.0,0.0,-9.0,...,1703.0,1955.0,7.0,2010,-8.0,0.0,-1.0,2000-2059,4,0.0
4,2021-03-03,SkyWest Airlines Inc.,PHX,BFL,False,False,1844,1838.0,0.0,-6.0,...,1851.0,1900.0,3.0,1925,-22.0,0.0,-2.0,1900-1959,2,0.0


Unnamed: 0,FlightDate,Airline,Origin,Dest,Cancelled,Diverted,CRSDepTime,DepTime,DepDelayMinutes,DepDelay,...,WheelsOff,WheelsOn,TaxiIn,CRSArrTime,ArrDelay,ArrDel15,ArrivalDelayGroups,ArrTimeBlk,DistanceGroup,DivAirportLandings
0,2021-03-03,SkyWest Airlines Inc.,SGU,PHX,False,False,724,714.0,0.0,-10.0,...,724.0,813.0,5.0,843,-25.0,0.0,-2.0,0800-0859,2,0.0
1,2021-03-03,SkyWest Airlines Inc.,PHX,SGU,False,False,922,917.0,0.0,-5.0,...,940.0,1028.0,3.0,1040,-9.0,0.0,-1.0,1000-1059,2,0.0
2,2021-03-03,SkyWest Airlines Inc.,MHT,ORD,False,False,1330,1321.0,0.0,-9.0,...,1336.0,1445.0,16.0,1530,-29.0,0.0,-2.0,1500-1559,4,0.0
3,2021-03-03,SkyWest Airlines Inc.,DFW,TRI,False,False,1645,1636.0,0.0,-9.0,...,1703.0,1955.0,7.0,2010,-8.0,0.0,-1.0,2000-2059,4,0.0
4,2021-03-03,SkyWest Airlines Inc.,PHX,BFL,False,False,1844,1838.0,0.0,-6.0,...,1851.0,1900.0,3.0,1925,-22.0,0.0,-2.0,1900-1959,2,0.0


In [15]:
print(f"It take {time_dask} to load dataset using Dask")

It take 0:00:00.005458 to load dataset using Dask
It take 0:00:00.011093 to load dataset using Dask


##### Modin

In [9]:
import modin.pandas

import os

os.environ["MODIN_ENGINE"] = "dask"  # Modin will use Dask

from distributed import Client

client = Client()

t1 = datetime.now()

df = modin.pandas.read_csv(file_path, delimiter = config_data['inbound_delimiter'])

t2 = datetime.now()

time_modin = t2-t1

df.head()

2022-11-11 00:52:41,736 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/55/8329xk652qqcmwyx7rj48p6m0000gn/T/dask-worker-space/worker-_2lwkijg', purging
2022-11-11 00:52:41,736 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/55/8329xk652qqcmwyx7rj48p6m0000gn/T/dask-worker-space/worker-qh0iid2o', purging
2022-11-11 00:52:41,736 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/55/8329xk652qqcmwyx7rj48p6m0000gn/T/dask-worker-space/worker-edqjricw', purging
2022-11-11 00:52:41,737 - distributed.diskutils - INFO - Found stale lock file and directory '/var/folders/55/8329xk652qqcmwyx7rj48p6m0000gn/T/dask-worker-space/worker-a36w6bam', purging


Unnamed: 0,FlightDate,Airline,Origin,Dest,Cancelled,Diverted,CRSDepTime,DepTime,DepDelayMinutes,DepDelay,...,WheelsOff,WheelsOn,TaxiIn,CRSArrTime,ArrDelay,ArrDel15,ArrivalDelayGroups,ArrTimeBlk,DistanceGroup,DivAirportLandings
0,2021-03-03,SkyWest Airlines Inc.,SGU,PHX,False,False,724,714.0,0.0,-10.0,...,724.0,813.0,5.0,843,-25.0,0.0,-2.0,0800-0859,2,0.0
1,2021-03-03,SkyWest Airlines Inc.,PHX,SGU,False,False,922,917.0,0.0,-5.0,...,940.0,1028.0,3.0,1040,-9.0,0.0,-1.0,1000-1059,2,0.0
2,2021-03-03,SkyWest Airlines Inc.,MHT,ORD,False,False,1330,1321.0,0.0,-9.0,...,1336.0,1445.0,16.0,1530,-29.0,0.0,-2.0,1500-1559,4,0.0
3,2021-03-03,SkyWest Airlines Inc.,DFW,TRI,False,False,1645,1636.0,0.0,-9.0,...,1703.0,1955.0,7.0,2010,-8.0,0.0,-1.0,2000-2059,4,0.0
4,2021-03-03,SkyWest Airlines Inc.,PHX,BFL,False,False,1844,1838.0,0.0,-6.0,...,1851.0,1900.0,3.0,1925,-22.0,0.0,-2.0,1900-1959,2,0.0


In [10]:
print(f"It take {time_modin} to load dataset using Modin")

It take 0:00:21.324870 to load dataset using Modin


## Validation

In [11]:
#validate the header of the file
util.col_header_val(df, config_data)

column name and column length validation passed


1

In [12]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['flightdate', 'airline', 'origin', 'dest', 'cancelled', 'diverted',
       'crsdeptime', 'deptime', 'depdelayminutes', 'depdelay', 'arrtime',
       'arrdelayminutes', 'airtime', 'crselapsedtime', 'actualelapsedtime',
       'distance', 'year', 'quarter', 'month', 'dayofmonth', 'dayofweek',
       'marketing_airline_network', 'operated_or_branded_code_share_partners',
       'dot_id_marketing_airline', 'iata_code_marketing_airline',
       'flight_number_marketing_airline', 'operating_airline',
       'dot_id_operating_airline', 'iata_code_operating_airline',
       'tail_number', 'flight_number_operating_airline', 'originairportid',
       'originairportseqid', 'origincitymarketid', 'origincityname',
       'originstate', 'originstatefips', 'originstatename', 'originwac',
       'destairportid', 'destairportseqid', 'destcitymarketid', 'destcityname',
       'deststate', 'deststatefips', 'deststatename', 'destwac', 'depdel15',
       'departuredelaygroups',

In [13]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine



column name and column length validation passed
col validation passed


## Write file

In [16]:
df.to_csv(filename="data.gz", index=False, compression="gzip", sep="|")

['/Users/yujh01/Local Documents/Data Glacier/Week 6/data.gz/00.part',
 '/Users/yujh01/Local Documents/Data Glacier/Week 6/data.gz/01.part',
 '/Users/yujh01/Local Documents/Data Glacier/Week 6/data.gz/02.part',
 '/Users/yujh01/Local Documents/Data Glacier/Week 6/data.gz/03.part',
 '/Users/yujh01/Local Documents/Data Glacier/Week 6/data.gz/04.part',
 '/Users/yujh01/Local Documents/Data Glacier/Week 6/data.gz/05.part',
 '/Users/yujh01/Local Documents/Data Glacier/Week 6/data.gz/06.part',
 '/Users/yujh01/Local Documents/Data Glacier/Week 6/data.gz/07.part',
 '/Users/yujh01/Local Documents/Data Glacier/Week 6/data.gz/08.part',
 '/Users/yujh01/Local Documents/Data Glacier/Week 6/data.gz/09.part',
 '/Users/yujh01/Local Documents/Data Glacier/Week 6/data.gz/10.part',
 '/Users/yujh01/Local Documents/Data Glacier/Week 6/data.gz/11.part',
 '/Users/yujh01/Local Documents/Data Glacier/Week 6/data.gz/12.part',
 '/Users/yujh01/Local Documents/Data Glacier/Week 6/data.gz/13.part',
 '/Users/yujh01/Loca

## Summary

In [17]:
%%writefile summary.txt
Total number of rows: 6311871
total number of columns: 61
file size: 334.6MB

Writing summary.txt
