In [17]:
import pandas as pd
import os
from dask import dataframe as d
import time

In [22]:
#Reading in data with pandas and saving the time of installtion
begining = time.time()
dddf = pd.read_csv("C:/Users/roger/OneDrive/Documents/Combined_Flights_2021.csv")
finish = time.time()
total = finish-begining

In [3]:
# !pip install dask

In [23]:
#Reading in data with dask and saving the time of installation
start = time.time()
dask_df = d.read_csv("C:/Users/roger/OneDrive/Documents/Combined_Flights_2021.csv")
end  = time.time()
total2 = end-start

In [24]:
#Evaluating timed results
print(f'The time it took to read in the data using pandas was {total}')
print(f'The time it took to read in the data using dask was {total2}')

The time it took to read in the data using pandas was 56.294713497161865
The time it took to read in the data using dask was 0.8365993499755859


We see that loading in the data with dask was far more superior to pandas

In [25]:
dask_df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 61 entries, FlightDate to DivAirportLandings
dtypes: object(18), bool(2), float64(19), int64(22)

In [6]:
number_of_rows = len(dask_df)
number_of_columns = len(dask_df.columns)
file_size = os.path.getsize("C:/Users/roger/OneDrive/Documents/Combined_Flights_2021.csv")
print(f'Number of rows is {number_of_rows}')
print(f'Number of columns is {number_of_columns}')
print(f'File size is {file_size} in bytes')

Number of rows is 6311871
Number of columns is 61
File size is 2214293351 in bytes


In [34]:
# Dict_1 = {'Number_of_Rows': [number_of_rows], 'Number_of_Columns': [number_of_columns], "File_Size(in Bytes)": [file_size]}
# df = pd.DataFrame.from_dict(Dict_1)

In [7]:
#Removing any white spaces within the column names
dask_df.columns = dask_df.columns.str.replace(' ', '')
# Removing a variety of special Characters that could be in column name
dask_df.columns=dask_df.columns.str.replace('[#,@,_]','')

In [8]:
dask_df.columns

Index(['FlightDate', 'Airline', 'Origin', 'Dest', 'Cancelled', 'Diverted',
       'CRSDepTime', 'DepTime', 'DepDelayMinutes', 'DepDelay', 'ArrTime',
       'ArrDelayMinutes', 'AirTime', 'CRSElapsedTime', 'ActualElapsedTime',
       'Distance', 'Year', 'Quarter', 'Month', 'DayofMonth', 'DayOfWeek',
       'MarketingAirlineNetwork', 'OperatedorBrandedCodeSharePartners',
       'DOTIDMarketingAirline', 'IATACodeMarketingAirline',
       'FlightNumberMarketingAirline', 'OperatingAirline',
       'DOTIDOperatingAirline', 'IATACodeOperatingAirline', 'TailNumber',
       'FlightNumberOperatingAirline', 'OriginAirportID', 'OriginAirportSeqID',
       'OriginCityMarketID', 'OriginCityName', 'OriginState',
       'OriginStateFips', 'OriginStateName', 'OriginWac', 'DestAirportID',
       'DestAirportSeqID', 'DestCityMarketID', 'DestCityName', 'DestState',
       'DestStateFips', 'DestStateName', 'DestWac', 'DepDel15',
       'DepartureDelayGroups', 'DepTimeBlk', 'TaxiOut', 'WheelsOff',
       'Wh

Generating py for functions to be used throughout notebook

In [9]:
%%writefile utility_file.py

import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime 
import gc
import re

def read_config_file(filepath):
    with open(filepath, 'r') as f:
        try:
            return yaml.safe_load(f)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df,table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Overwriting utility_file.py


Creating a yaml file

In [10]:
%%writefile airline.yaml
file_type: csv
dataset_name: file
file_name: Combined_Flights_2021
table_name: edsurv
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns: 
    - FlightDate
    - Airline
    - Origin
    - Dest
    - Cancelled
    - Diverted
    - CRSDepTime
    - DepTime
    - DepDelayMinutes
    - DepDelay
    - ArrTime
    - ArrDelayMinutes
    - AirTime
    - CRSElapsedTime
    - ActualElapsedTime
    - Distance
    - Year
    - Quarter
    - Month
    - DayofMonth
    - DayOfWeek
    - MarketingAirlineNetwork
    - OperatedorBrandedCodeSharePartners
    - DOTIDMarketingAirline
    - IATACodeMarketingAirline
    - FlightNumberMarketingAirline
    - OperatingAirline
    - DOTIDOperatingAirline
    - IATACodeOperatingAirline
    - TailNumber
    - FlightNumberOperatingAirline
    - OriginAirportID
    - OriginAirportSeqID
    - OriginCityMarketID
    - OriginCityName
    - OriginState
    - OriginStateFips
    - OriginStateName
    - OriginWac
    - DestAirportID
    - DestAirportSeqID
    - DestCityMarketID
    - DestCityName
    - DestState
    - DestStateFips
    - DestStateName
    - DestWac
    - DepDel15
    - DepartureDelayGroups
    - DepTimeBlk
    - TaxiOut
    - WheelsOff
    - WheelsOn
    - TaxiIn
    - CRSArrTime
    - ArrDelay
    - ArrDel15
    - ArrivalDelayGroups
    - ArrTimeBlk
    - DistanceGroup
    - DivAirportLandings

Overwriting airline.yaml


In [11]:
import utility_file as util
data = util.read_config_file("airline.yaml")
data

{'file_type': 'csv',
 'dataset_name': 'file',
 'file_name': 'Combined_Flights_2021',
 'table_name': 'edsurv',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['FlightDate',
  'Airline',
  'Origin',
  'Dest',
  'Cancelled',
  'Diverted',
  'CRSDepTime',
  'DepTime',
  'DepDelayMinutes',
  'DepDelay',
  'ArrTime',
  'ArrDelayMinutes',
  'AirTime',
  'CRSElapsedTime',
  'ActualElapsedTime',
  'Distance',
  'Year',
  'Quarter',
  'Month',
  'DayofMonth',
  'DayOfWeek',
  'MarketingAirlineNetwork',
  'OperatedorBrandedCodeSharePartners',
  'DOTIDMarketingAirline',
  'IATACodeMarketingAirline',
  'FlightNumberMarketingAirline',
  'OperatingAirline',
  'DOTIDOperatingAirline',
  'IATACodeOperatingAirline',
  'TailNumber',
  'FlightNumberOperatingAirline',
  'OriginAirportID',
  'OriginAirportSeqID',
  'OriginCityMarketID',
  'OriginCityName',
  'OriginState',
  'OriginStateFips',
  'OriginStateName',
  'OriginWac',
  'DestAirportID',
  'DestAirportS

In [12]:
# read the file using config file
file_type = data['file_type']
source_file = "C:/Users/roger/OneDrive/Documents/" + data['file_name'] + f'.{file_type}'

In [13]:
#Testing the validation
util.col_header_val(dddf,data)

column name and column length validation failed
Following File columns are not in the YAML file ['marketing_airline_network', 'operating_airline', 'dot_id_operating_airline', 'dot_id_marketing_airline', 'operated_or_branded_code_share_partners', 'iata_code_marketing_airline', 'flight_number_operating_airline', 'iata_code_operating_airline', 'flight_number_marketing_airline', 'tail_number']
Following YAML columns are not in the file uploaded ['iatacodemarketingairline', 'operatingairline', 'iatacodeoperatingairline', 'dotidoperatingairline', 'flightnumberoperatingairline', 'flightnumbermarketingairline', 'tailnumber', 'marketingairlinenetwork', 'dotidmarketingairline', 'operatedorbrandedcodesharepartners']


0

In [32]:
# !pip uninstall pandas
# !pip install pandas==1.1.5
#Had to revert to older version of pandas to complete the to csv conversion

In [26]:
import csv
import gzip

dask_df.to_csv('airline.csv.gz',
          sep='|',
          header=True,
          index=False,
          quoting=csv.QUOTE_ALL,
          compression='gzip',
          quotechar='"',
          doublequote=True,
          line_terminator='\n')

['C:\\Users\\roger\\OneDrive\\Documents\\Data_Glacier_Week_6\\File_ingestion_and_schema_validation\\airline.csv.gz\\00.part',
 'C:\\Users\\roger\\OneDrive\\Documents\\Data_Glacier_Week_6\\File_ingestion_and_schema_validation\\airline.csv.gz\\01.part',
 'C:\\Users\\roger\\OneDrive\\Documents\\Data_Glacier_Week_6\\File_ingestion_and_schema_validation\\airline.csv.gz\\02.part',
 'C:\\Users\\roger\\OneDrive\\Documents\\Data_Glacier_Week_6\\File_ingestion_and_schema_validation\\airline.csv.gz\\03.part',
 'C:\\Users\\roger\\OneDrive\\Documents\\Data_Glacier_Week_6\\File_ingestion_and_schema_validation\\airline.csv.gz\\04.part',
 'C:\\Users\\roger\\OneDrive\\Documents\\Data_Glacier_Week_6\\File_ingestion_and_schema_validation\\airline.csv.gz\\05.part',
 'C:\\Users\\roger\\OneDrive\\Documents\\Data_Glacier_Week_6\\File_ingestion_and_schema_validation\\airline.csv.gz\\06.part',
 'C:\\Users\\roger\\OneDrive\\Documents\\Data_Glacier_Week_6\\File_ingestion_and_schema_validation\\airline.csv.gz\\07

In [31]:
total_size = os.path.getsize("C:/Users/roger/OneDrive/Documents/Data_Glacier_Week_6/File_ingestion_and_schema_validation/airline.csv.gz")
total_length = len(os.listdir("C:/Users/roger/OneDrive/Documents/Data_Glacier_Week_6/File_ingestion_and_schema_validation/airline.csv.gz"))
print(f'The total size of the gz file is {total_size}')
print(f'The total length of the directory is {total_length}')

The total size of the gz file is 12288
The total length of the directory is 34
