In [1]:
import time 

file_path = 'C:/Users/poong/Documents/GitHub/week6/itineraries.csv'


# File reading

## File reading using Dask 

In [2]:
import dask.dataframe as dd

# Start and end 
start_time = time.time()
dask_df = dd.read_csv(file_path)
end_time = time.time()

# Elapsed time
elapsed_time = end_time - start_time

print(f"Time taken to read the file using Dask: {elapsed_time:.2f} seconds")


Time taken to read the file using Dask: 0.06 seconds


## File reading using Pandas 

In [None]:
import pandas as pd

# finding time 
start_time_pandas = time.time()
pandas_df = pd.read_csv(file_path)
end_time_pandas = time.time()

# Calculate the elapsed time
elapsed_time_pandas = end_time_pandas - start_time_pandas

print(elapsed_time_pandas)

print(f"Time taken to read the file using Pandas: {elapsed_time_pandas:.2f} seconds")


## File reading using Modin 

In [None]:
import modin.pandas as pd

# finding time
start_time_modin = time.time()
modin_df = pd.read_csv(file_path)
end_time_modin = time.time()

# Calculate the elapsed time
elapsed_time_modin = end_time_modin - start_time_modin

print(f"Time taken to read the file using Modin: {elapsed_time_modin:.2f} seconds")


## File reading using Ray 

In [None]:
import ray
import ray.dataframe as rdf

ray.init(ignore_reinit_error=True)


# finding time
start_time_ray = time.time()
ray_df = rdf.read_csv(file_path)
end_time_ray = time.time()

# Calculate the elapsed time
elapsed_time_ray = end_time_ray - start_time_ray

print(f"Time taken to read the file using Ray: {elapsed_time_ray:.2f} seconds")


As shown from above, the fastest method would be to read the file using Dask. Hence, this will be the method chosen.

In [3]:
row_count = len(dask_df)
column_count = len(dask_df.columns)
columns = dask_df.columns.tolist()
data_types = dask_df.dtypes

In [5]:
# information about itineraries.csv
print(row_count)
print(column_count)
print(columns)
print(data_types)

82138753
27
['legId', 'searchDate', 'flightDate', 'startingAirport', 'destinationAirport', 'fareBasisCode', 'travelDuration', 'elapsedDays', 'isBasicEconomy', 'isRefundable', 'isNonStop', 'baseFare', 'totalFare', 'seatsRemaining', 'totalTravelDistance', 'segmentsDepartureTimeEpochSeconds', 'segmentsDepartureTimeRaw', 'segmentsArrivalTimeEpochSeconds', 'segmentsArrivalTimeRaw', 'segmentsArrivalAirportCode', 'segmentsDepartureAirportCode', 'segmentsAirlineName', 'segmentsAirlineCode', 'segmentsEquipmentDescription', 'segmentsDurationInSeconds', 'segmentsDistance', 'segmentsCabinCode']
legId                                 object
searchDate                            object
flightDate                            object
startingAirport                       object
destinationAirport                    object
fareBasisCode                         object
travelDuration                        object
elapsedDays                            int64
isBasicEconomy                          bool
isRef

## Removing special characters and white spaces from column name 

In [7]:
import dask.dataframe as dd
df = dd.read_csv(file_path)

# Removing white space 
df.columns = df.columns.str.replace(' ', '')

# Removing special characters 
df.columns=df.columns.str.replace('[#,@,&]','')

  df.columns=df.columns.str.replace('[#,@,&]','')


# File validation

## Creating yaml file 

In [2]:
%%writefile config.yaml

file_type: csv
dataset_name: itineraries
file_name: itineraries
table_name: itinerary_data
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
  - legId
  - searchDate
  - flightDate
  - startingAirport
  - destinationAirport
  - fareBasisCode
  - travelDuration
  - elapsedDays
  - isBasicEconomy
  - isRefundable
  - isNonStop
  - baseFare
  - totalFare
  - seatsRemaining
  - totalTravelDistance
  - segmentsDepartureTimeEpochSeconds
  - segmentsDepartureTimeRaw
  - segmentsArrivalTimeEpochSeconds
  - segmentsArrivalTimeRaw
  - segmentsArrivalAirportCode
  - segmentsDepartureAirportCode
  - segmentsAirlineName
  - segmentsAirlineCode
  - segmentsEquipmentDescription
  - segmentsDurationInSeconds
  - segmentsDistance
  - segmentsCabinCode


Overwriting config.yaml


## Creating utility.py

In [4]:
%%writefile utility.py

import yaml
import logging

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.load(stream, Loader=yaml.Loader)
        except yaml.YAMLError as exc:
            logging.error(exc)

def col_header_val(df, table_config):
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]', '_', regex=True)
    df.columns = df.columns.str.strip('_')
    
    expected_col = list(map(str.lower, table_config['columns']))
    expected_col.sort()
    
    df.columns = list(map(str.lower, df.columns))
    df = df.reindex(sorted(df.columns), axis=1)
    
    if len(df.columns) == len(expected_col) and list(expected_col) == list(df.columns):
        print("Column name and column length validation passed")
        return 1
    else:
        print("Column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file:", mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded:", missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0


Writing utility.py


## Reading configuration file 

In [6]:
import utility as util

# Reading the Configuration File
config_data = util.read_config_file("config.yaml")

## Reading the File Using Configuration

In [None]:
file_type = config_data['file_type']
source_file = 'C:/Users/poong/Documents/GitHub/week6/itineraries.' + file_type  # Update with the correct file path

import pandas as pd
df = pd.read_csv(source_file, config_data['inbound_delimiter'])

## Validating the Header of the File

In [None]:
validation_result = util.col_header_val(df, config_data)

if validation_result == 0:
    print("Validation failed")
else:
    print("Validation passed")

## Displaying Column Information

In [None]:
print("Columns of the file are:", df.columns)
print("Columns from the YAML configuration are:", config_data['columns'])

## Writing CSV in Gzip Format Using Dask

In [None]:
from dask import dataframe as dd
df_dask = dd.from_pandas(df, npartitions=1)  # Using Dask DataFrame for demonstration


## Counting Files in Gzip Folder

In [None]:
import os
gzip_folder_path = 'C:/Users/poong/Documents/GitHub/week6/'  
gzip_files = [entry for entry in os.listdir(gzip_folder_path) if entry.endswith('.gz')]

# number of files 
print("Number of files in the gzip folder:", len(gzip_files))
print("List of gzip files:", gzip_files)

In [None]:
# file size
os.path.getsize('C:/Users/poong/Documents/GitHub/week6/itineraries.csv.gz')