In [1]:
import os
import time

In [2]:
os.path.getsize("flight.csv")
file_path = 'flight.csv'

**Read in the data with Dask**

In [3]:
from dask import dataframe as dd
start = time.time()
dask_df = dd.read_csv(file_path, dtype={'totalTravelDistance': 'float64'})
end = time.time()
print("Read csv with dask: ",(end-start),"sec")
del dask_df


Read csv with dask:  0.016516923904418945 sec


**Read in the data with Modin**

In [4]:
os.environ["__MODIN_AUTOIMPORT_PANDAS__"] = "1"
import modin.pandas as pd
import ray
ray.shutdown()
ray.init()
start = time.time()
df = pd.read_csv(file_path)
end = time.time()
print("Read csv with modin and ray: ",(end-start),"sec")
del df

2023-08-03 15:43:36,593	INFO worker.py:1544 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
[2m[33m(raylet)[0m   aiogrpc.init_grpc_aio()


Read csv with modin and ray:  70.89477944374084 sec


**EXTRA: Read in the data with Polar**

In [5]:
import polars as pl
import time

start = time.time()
df = pl.read_csv(file_path)
end = time.time()

print("Read csv with Polars: ",(end-start),"sec")
del df

Read csv with Polars:  34.13512921333313 sec


**Read in the data with Pandas**

In [6]:
import pandas as pd
start = time.time()
dask_df = pd.read_csv(file_path)
end = time.time()
print("Read csv with pandas: ",(end-start),"sec")


Read csv with pandas:  74.97887182235718 sec


In [7]:
dask_df.head()

Unnamed: 0,legId,searchDate,flightDate,startingAirport,destinationAirport,fareBasisCode,travelDuration,elapsedDays,isBasicEconomy,isRefundable,...,segmentsArrivalTimeEpochSeconds,segmentsArrivalTimeRaw,segmentsArrivalAirportCode,segmentsDepartureAirportCode,segmentsAirlineName,segmentsAirlineCode,segmentsEquipmentDescription,segmentsDurationInSeconds,segmentsDistance,segmentsCabinCode
0,50c42c10658dfc36b84f4965190d016c,2022-04-16,2022-04-18,EWR,ORD,QAA0OHEN,PT2H29M,0,False,False,...,1650310440,2022-04-18T14:34:00.000-05:00,ORD,EWR,United,UA,Boeing 737-800,8940,720,coach
1,0689b6270162f9e3325e3c55fd9530fe,2022-04-17,2022-04-25,BOS,DFW,S7AHZNN3,PT4H28M,0,False,False,...,1650898200,2022-04-25T09:50:00.000-05:00,DFW,BOS,American Airlines,AA,Boeing 737-800,16080,1556,coach
2,df6855ce5c91936d0156dc5bed579f94,2022-04-16,2022-04-21,DFW,BOS,L0AZZNN1,PT3H40M,0,False,False,...,1650563160,2022-04-21T13:46:00.000-04:00,BOS,DFW,American Airlines,AA,Airbus A321,13200,1556,coach
3,5d1b3e32f51df4e50212d82758750150,2022-04-17,2022-04-21,IAD,SFO,UAA0AQEN,PT6H2M,1,False,False,...,1650615360,2022-04-22T01:16:00.000-07:00,SFO,IAD,United,UA,Boeing 737 MAX 9,21720,2426,coach
4,2a90ca19b2f416aa290c819972c31bc4,2022-04-16,2022-04-25,LGA,DFW,QAA0OKEN,PT6H15M,0,False,False,...,1650919200||1650925980,2022-04-25T15:40:00.000-05:00||2022-04-25T17:3...,IAH||DFW,LGA||IAH,United||United,UA||UA,Embraer 175 (Enhanced Winglets)||Airbus A319,15720||4380,1419||233,coach||coach


In [8]:
len(dask_df.index)

8213882

In [9]:
len(dask_df.columns)

27

In [10]:
dask_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8213882 entries, 0 to 8213881
Data columns (total 27 columns):
 #   Column                             Dtype  
---  ------                             -----  
 0   legId                              object 
 1   searchDate                         object 
 2   flightDate                         object 
 3   startingAirport                    object 
 4   destinationAirport                 object 
 5   fareBasisCode                      object 
 6   travelDuration                     object 
 7   elapsedDays                        int64  
 8   isBasicEconomy                     bool   
 9   isRefundable                       bool   
 10  isNonStop                          bool   
 11  baseFare                           float64
 12  totalFare                          float64
 13  seatsRemaining                     int64  
 14  totalTravelDistance                float64
 15  segmentsDepartureTimeEpochSeconds  object 
 16  segmentsDepartureT

As you can see Dask package has surpessed all other packages with great difference

In [11]:
# remove special character
dask_df.columns=dask_df.columns.str.replace('[#,@,&]','')



In [12]:
#To remove white space from columns
dask_df.columns = dask_df.columns.str.replace(' ', '')

In [13]:
data=dask_df.columns
data

Index(['legId', 'searchDate', 'flightDate', 'startingAirport',
       'destinationAirport', 'fareBasisCode', 'travelDuration', 'elapsedDays',
       'isBasicEconomy', 'isRefundable', 'isNonStop', 'baseFare', 'totalFare',
       'seatsRemaining', 'totalTravelDistance',
       'segmentsDepartureTimeEpochSeconds', 'segmentsDepartureTimeRaw',
       'segmentsArrivalTimeEpochSeconds', 'segmentsArrivalTimeRaw',
       'segmentsArrivalAirportCode', 'segmentsDepartureAirportCode',
       'segmentsAirlineName', 'segmentsAirlineCode',
       'segmentsEquipmentDescription', 'segmentsDurationInSeconds',
       'segmentsDistance', 'segmentsCabinCode'],
      dtype='object')

In [19]:
%%writefile data.yaml
dataset_name: testfile
file_name: flight
file_path: flight.csv
file_type: csv
inbound_delimiter: ','
outbound_delimiter: '|'
skip_leading_rows: 1
table_name: edsurv
column_counts:
  baseFare: 8213882
  destinationAirport: 8213882
  elapsedDays: 8213882
  fareBasisCode: 8213882
  flightDate: 8213882
  isBasicEconomy: 8213882
  isNonStop: 8213882
  isRefundable: 8213882
  legId: 8213882
  searchDate: 8213882
  seatsRemaining: 8213882
  segmentsAirlineCode: 8213882
  segmentsAirlineName: 8213882
  segmentsArrivalAirportCode: 8213882
  segmentsArrivalTimeEpochSeconds: 8213882
  segmentsArrivalTimeRaw: 8213882
  segmentsCabinCode: 8213882
  segmentsDepartureAirportCode: 8213882
  segmentsDepartureTimeEpochSeconds: 8213882
  segmentsDepartureTimeRaw: 8213882
  segmentsDistance: 8213882
  segmentsDurationInSeconds: 8213882
  segmentsEquipmentDescription: 8058102
  startingAirport: 8213882
  totalFare: 8213882
  totalTravelDistance: 7604289
  travelDuration: 8213882
columns:
- legId
- searchDate
- flightDate
- startingAirport
- destinationAirport
- fareBasisCode
- travelDuration
- elapsedDays
- isBasicEconomy
- isRefundable
- isNonStop
- baseFare
- totalFare
- seatsRemaining
- totalTravelDistance
- segmentsDepartureTimeEpochSeconds
- segmentsDepartureTimeRaw
- segmentsArrivalTimeEpochSeconds
- segmentsArrivalTimeRaw
- segmentsArrivalAirportCode
- segmentsDepartureAirportCode
- segmentsAirlineName
- segmentsAirlineCode
- segmentsEquipmentDescription
- segmentsDurationInSeconds
- segmentsDistance
- segmentsCabinCode


Overwriting data.yaml


In [15]:
%%writefile utility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string) 
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility.py


In [20]:
# Read config file
import utility as util1
config_data = util1.read_config_file("data.yaml")

#inspecting data of config file
config_data

{'dataset_name': 'testfile',
 'file_name': 'flight',
 'file_path': 'flight.csv',
 'file_type': 'csv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'table_name': 'edsurv',
 'column_counts': {'baseFare': 8213882,
  'destinationAirport': 8213882,
  'elapsedDays': 8213882,
  'fareBasisCode': 8213882,
  'flightDate': 8213882,
  'isBasicEconomy': 8213882,
  'isNonStop': 8213882,
  'isRefundable': 8213882,
  'legId': 8213882,
  'searchDate': 8213882,
  'seatsRemaining': 8213882,
  'segmentsAirlineCode': 8213882,
  'segmentsAirlineName': 8213882,
  'segmentsArrivalAirportCode': 8213882,
  'segmentsArrivalTimeEpochSeconds': 8213882,
  'segmentsArrivalTimeRaw': 8213882,
  'segmentsCabinCode': 8213882,
  'segmentsDepartureAirportCode': 8213882,
  'segmentsDepartureTimeEpochSeconds': 8213882,
  'segmentsDepartureTimeRaw': 8213882,
  'segmentsDistance': 8213882,
  'segmentsDurationInSeconds': 8213882,
  'segmentsEquipmentDescription': 8058102,
  'startingAirport'

In [17]:
# read the file using config file
file_type = config_data['file_type']
source_file =  config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file,config_data['inbound_delimiter'])
df.head()



Unnamed: 0,legId,searchDate,flightDate,startingAirport,destinationAirport,fareBasisCode,travelDuration,elapsedDays,isBasicEconomy,isRefundable,...,segmentsArrivalTimeEpochSeconds,segmentsArrivalTimeRaw,segmentsArrivalAirportCode,segmentsDepartureAirportCode,segmentsAirlineName,segmentsAirlineCode,segmentsEquipmentDescription,segmentsDurationInSeconds,segmentsDistance,segmentsCabinCode
0,50c42c10658dfc36b84f4965190d016c,2022-04-16,2022-04-18,EWR,ORD,QAA0OHEN,PT2H29M,0,False,False,...,1650310440,2022-04-18T14:34:00.000-05:00,ORD,EWR,United,UA,Boeing 737-800,8940,720,coach
1,0689b6270162f9e3325e3c55fd9530fe,2022-04-17,2022-04-25,BOS,DFW,S7AHZNN3,PT4H28M,0,False,False,...,1650898200,2022-04-25T09:50:00.000-05:00,DFW,BOS,American Airlines,AA,Boeing 737-800,16080,1556,coach
2,df6855ce5c91936d0156dc5bed579f94,2022-04-16,2022-04-21,DFW,BOS,L0AZZNN1,PT3H40M,0,False,False,...,1650563160,2022-04-21T13:46:00.000-04:00,BOS,DFW,American Airlines,AA,Airbus A321,13200,1556,coach
3,5d1b3e32f51df4e50212d82758750150,2022-04-17,2022-04-21,IAD,SFO,UAA0AQEN,PT6H2M,1,False,False,...,1650615360,2022-04-22T01:16:00.000-07:00,SFO,IAD,United,UA,Boeing 737 MAX 9,21720,2426,coach
4,2a90ca19b2f416aa290c819972c31bc4,2022-04-16,2022-04-25,LGA,DFW,QAA0OKEN,PT6H15M,0,False,False,...,1650919200||1650925980,2022-04-25T15:40:00.000-05:00||2022-04-25T17:3...,IAH||DFW,LGA||IAH,United||United,UA||UA,Embraer 175 (Enhanced Winglets)||Airbus A319,15720||4380,1419||233,coach||coach


In [21]:
#validate the header of the file
util1.col_header_val(df,config_data)

column name and column length validation passed


1

In [22]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['legid', 'searchdate', 'flightdate', 'startingairport',
       'destinationairport', 'farebasiscode', 'travelduration', 'elapseddays',
       'isbasiceconomy', 'isrefundable', 'isnonstop', 'basefare', 'totalfare',
       'seatsremaining', 'totaltraveldistance',
       'segmentsdeparturetimeepochseconds', 'segmentsdeparturetimeraw',
       'segmentsarrivaltimeepochseconds', 'segmentsarrivaltimeraw',
       'segmentsarrivalairportcode', 'segmentsdepartureairportcode',
       'segmentsairlinename', 'segmentsairlinecode',
       'segmentsequipmentdescription', 'segmentsdurationinseconds',
       'segmentsdistance', 'segmentscabincode'],
      dtype='object')
columns of YAML are: ['legId', 'searchDate', 'flightDate', 'startingAirport', 'destinationAirport', 'fareBasisCode', 'travelDuration', 'elapsedDays', 'isBasicEconomy', 'isRefundable', 'isNonStop', 'baseFare', 'totalFare', 'seatsRemaining', 'totalTravelDistance', 'segmentsDepartureTimeEpochSeconds', 'segment

In [24]:
if util1.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation passed
col validation passed


In [25]:
import gzip
import csv

from dask import dataframe as dd
df = dd.read_csv(file_path,delimiter=',')

# Write csv in gz format in pipe separated text file (|)
df.to_csv(file_path+'.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')



['c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\00.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\01.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\02.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\03.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\04.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\05.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\06.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\07.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\08.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\09.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\10.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\11.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\12.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\13.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\14.part',
 'c:\\Users\\Salih\\Desktop\\week 6\\flight.csv.gz\\15.part',
 'c:\\Us