In [1]:
%%writefile testutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re


def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing testutility.py


In [2]:
%%writefile file.yaml
file_type: csv
dataset_name: flight_dataset
file_name: flight_delay_dataset
inbound_delimiter: ","
outbound_delimiter: "|"
skip_leading_rows: 1
columns:
  - OP_CARRIER_FL_NUM
  - FL_DATE
  - OP_CARRIER
  - DEST


Writing file.yaml


In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
import testutility

config_data = testutility.read_config_file("file.yaml")

In [5]:
config_data

{'file_type': 'csv',
 'dataset_name': 'flight_dataset',
 'file_name': 'flight_delay_dataset',
 'inbound_delimiter': ',',
 'outbound_delimiter': '|',
 'skip_leading_rows': 1,
 'columns': ['OP_CARRIER_FL_NUM', 'FL_DATE', 'OP_CARRIER', 'DEST']}

In [7]:
!pip install modin


Collecting modin
  Downloading modin-0.25.1-py3-none-any.whl (1.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m10.2 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pandas<2.2,>=2.1 (from modin)
  Downloading pandas-2.1.4-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (12.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.3/12.3 MB[0m [31m55.0 MB/s[0m eta [36m0:00:00[0m
Collecting tzdata>=2022.1 (from pandas<2.2,>=2.1->modin)
  Downloading tzdata-2023.3-py2.py3-none-any.whl (341 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m341.8/341.8 kB[0m [31m38.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: tzdata, pandas, modin
  Attempting uninstall: pandas
    Found existing installation: pandas 1.5.3
    Uninstalling pandas-1.5.3:
      Successfully uninstalled pandas-1.5.3
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are ins

In [8]:
!pip install ray

Collecting ray
  Downloading ray-2.8.1-cp310-cp310-manylinux2014_x86_64.whl (62.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m62.6/62.6 MB[0m [31m9.0 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: ray
Successfully installed ray-2.8.1


In [6]:
import time
import pandas as pd
import dask.dataframe as dd
import modin as mpd
import ray as ray

In [10]:
import pandas as pd
df_sample = pd.read_csv("/content/drive/MyDrive/flight_delay_dataset.csv",delimiter=',')
df_sample.head()

Unnamed: 0,FL_DATE,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,...,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,Unnamed: 27
0,2018-01-01,UA,2429,EWR,DEN,1517,1512.0,-5.0,15.0,1527.0,...,268.0,250.0,225.0,1605.0,,,,,,
1,2018-01-01,UA,2427,LAS,SFO,1115,1107.0,-8.0,11.0,1118.0,...,99.0,83.0,65.0,414.0,,,,,,
2,2018-01-01,UA,2426,SNA,DEN,1335,1330.0,-5.0,15.0,1345.0,...,134.0,126.0,106.0,846.0,,,,,,
3,2018-01-01,UA,2425,RSW,ORD,1546,1552.0,6.0,19.0,1611.0,...,190.0,182.0,157.0,1120.0,,,,,,
4,2018-01-01,UA,2424,ORD,ALB,630,650.0,20.0,13.0,703.0,...,112.0,106.0,83.0,723.0,,,,,,


In [3]:
def read_with_pandas(file_path):
    start_time = time.time()
    df = pd.read_csv(file_path)
    elapsed_time = time.time() - start_time
    return df, elapsed_time
def read_with_dask(file_path):
    start_time = time.time()
    ddf = dd.read_csv(file_path, dtype={'CANCELLATION_CODE': 'object'})
    df = ddf.compute()
    elapsed_time = time.time() - start_time
    return df, elapsed_time
def read_with_modin(file_path):
    start_time = time.time()
    # Use pd.read_csv instead of mpd.read_csv
    df = pd.read_csv(file_path)
    elapsed_time = time.time() - start_time
    return df, elapsed_time



In [7]:
pandas_df, pandas_time = read_with_pandas(file_path)
dask_df, dask_time = read_with_dask(file_path)
modin_df, modin_time = read_with_modin(file_path)


# Display the time taken for each library
print(f"Pandas Time: {pandas_time} seconds")
print(f"Dask Time: {dask_time} seconds")
print(f"Modin Time: {modin_time} seconds")


Pandas Time: 24.88537311553955 seconds
Dask Time: 22.188930988311768 seconds
Modin Time: 25.02906060218811 seconds


In [None]:
file_type = config_data['file_type']
source_file = "/content/drive/MyDrive/" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file)
df.head()

Unnamed: 0,FL_DATE,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,...,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,Unnamed: 27
0,2018-01-01,UA,2429,EWR,DEN,1517,1512.0,-5.0,15.0,1527.0,...,268.0,250.0,225.0,1605.0,,,,,,
1,2018-01-01,UA,2427,LAS,SFO,1115,1107.0,-8.0,11.0,1118.0,...,99.0,83.0,65.0,414.0,,,,,,
2,2018-01-01,UA,2426,SNA,DEN,1335,1330.0,-5.0,15.0,1345.0,...,134.0,126.0,106.0,846.0,,,,,,
3,2018-01-01,UA,2425,RSW,ORD,1546,1552.0,6.0,19.0,1611.0,...,190.0,182.0,157.0,1120.0,,,,,,
4,2018-01-01,UA,2424,ORD,ALB,630,650.0,20.0,13.0,703.0,...,112.0,106.0,83.0,723.0,,,,,,


In [None]:
#validate the header of the file
testutility.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['air_time', 'diverted', 'arr_time', 'origin', 'unnamed_27', 'cancelled', 'crs_dep_time', 'crs_arr_time', 'late_aircraft_delay', 'dep_delay', 'weather_delay', 'carrier_delay', 'actual_elapsed_time', 'dep_time', 'nas_delay', 'distance', 'taxi_out', 'cancellation_code', 'security_delay', 'wheels_on', 'crs_elapsed_time', 'wheels_off', 'arr_delay', 'taxi_in']
Following YAML columns are not in the file uploaded []


0

In [None]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])


columns of files are: Index(['fl_date', 'op_carrier', 'op_carrier_fl_num', 'origin', 'dest',
       'crs_dep_time', 'dep_time', 'dep_delay', 'taxi_out', 'wheels_off',
       'wheels_on', 'taxi_in', 'crs_arr_time', 'arr_time', 'arr_delay',
       'cancelled', 'cancellation_code', 'diverted', 'crs_elapsed_time',
       'actual_elapsed_time', 'air_time', 'distance', 'carrier_delay',
       'weather_delay', 'nas_delay', 'security_delay', 'late_aircraft_delay',
       'unnamed_27'],
      dtype='object')
columns of YAML are: ['OP_CARRIER_FL_NUM', 'FL_DATE', 'OP_CARRIER', 'DEST']


In [None]:
if testutility.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation failed
Following File columns are not in the YAML file ['air_time', 'diverted', 'arr_time', 'origin', 'unnamed_27', 'cancelled', 'crs_dep_time', 'crs_arr_time', 'late_aircraft_delay', 'dep_delay', 'weather_delay', 'carrier_delay', 'actual_elapsed_time', 'dep_time', 'nas_delay', 'distance', 'taxi_out', 'cancellation_code', 'security_delay', 'wheels_on', 'crs_elapsed_time', 'wheels_off', 'arr_delay', 'taxi_in']
Following YAML columns are not in the file uploaded []
validation failed


In [None]:
pd.read_csv("/content/drive/MyDrive/test_data.csv")

Unnamed: 0,FL_DATE,OP_CARRIER,OP_CARRIER_FL_NUM,ORIGIN,DEST,CRS_DEP_TIME,DEP_TIME,DEP_DELAY,TAXI_OUT,WHEELS_OFF,...,CRS_ELAPSED_TIME,ACTUAL_ELAPSED_TIME,AIR_TIME,DISTANCE,CARRIER_DELAY,WEATHER_DELAY,NAS_DELAY,SECURITY_DELAY,LATE_AIRCRAFT_DELAY,Unnamed: 27
0,2018-01-01,UA,2429,EWR,DEN,1517,1512.0,-5.0,15.0,1527.0,...,268.0,250.0,225.0,1605.0,,,,,,
1,2018-01-01,UA,2427,LAS,SFO,1115,1107.0,-8.0,11.0,1118.0,...,99.0,83.0,65.0,414.0,,,,,,
2,2018-01-01,UA,2426,SNA,DEN,1335,1330.0,-5.0,15.0,1345.0,...,134.0,126.0,106.0,846.0,,,,,,
3,2018-01-01,UA,2425,RSW,ORD,1546,1552.0,6.0,19.0,1611.0,...,190.0,182.0,157.0,1120.0,,,,,,
4,2018-01-01,UA,2424,ORD,ALB,630,650.0,20.0,13.0,703.0,...,112.0,106.0,83.0,723.0,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
7213441,2018-12-31,AA,1815,DCA,CLT,1534,1530.0,-4.0,20.0,1550.0,...,100.0,99.0,72.0,331.0,,,,,,
7213442,2018-12-31,AA,1816,CLT,DFW,1751,1757.0,6.0,18.0,1815.0,...,181.0,176.0,148.0,936.0,,,,,,
7213443,2018-12-31,AA,1817,CLT,MEM,2015,2010.0,-5.0,36.0,2046.0,...,112.0,128.0,88.0,511.0,,,,,,
7213444,2018-12-31,AA,1818,CLT,RDU,1300,1323.0,23.0,11.0,1334.0,...,50.0,41.0,26.0,130.0,,,,,,


In [None]:
import csv
import gzip

def write_pipe_delimited_gzip(file_path, data):
    with gzip.open(file_path, 'wt', encoding='utf-8') as gzipped_file:
        writer = csv.writer(gzipped_file, delimiter='|')
        for row in data:
            writer.writerow(row)

# Example data
data_to_write = [
    ['FL_DATE','OP_CARRIER','OP_CARRIER_FL_NUM','ORIGIN','DEST'],
    ['2018-01-01','UA','2429','EWR','DEN'],
    ['2018-01-01','UA','2427','LAS','SFO']

]

# Specify the file path for the gzipped file
gzipped_file_path = '/content/drive/MyDrive/test_data_pipe_delimited.gz'

# Call the function to write the data to the gzipped file
write_pipe_delimited_gzip(gzipped_file_path, data_to_write)


In [3]:
import yaml
import pandas as pd

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            print(exc)

def validate_columns(df, table_config):
    expected_columns = set(map(str.lower, table_config['columns']))
    actual_columns = set(map(str.lower, df.columns))

    if expected_columns != actual_columns:
        #print("Validation failed: Column names do not match.")
        print(f"Expected columns: {expected_columns}")
        print(f"Actual columns: {actual_columns}")
        return False

    print("Validation passed: Column names match.")
    return True


# Specify the path to your YAML configuration file
yaml_file_path = '/content/file.yaml'

# Specify the path to your large CSV file
csv_file_path = '/content/drive/MyDrive/flight_delay_dataset.csv'

# Read YAML configuration
config = read_config_file(yaml_file_path)

# Read CSV file
df = pd.read_csv(csv_file_path)

# Validate columns
validation_result = validate_columns(df, config)


Expected columns: {'op_carrier_fl_num', 'dest', 'fl_date', 'op_carrier'}
Actual columns: {'taxi_in', 'late_aircraft_delay', 'crs_elapsed_time', 'arr_delay', 'distance', 'unnamed: 27', 'op_carrier', 'crs_arr_time', 'fl_date', 'dep_delay', 'crs_dep_time', 'nas_delay', 'cancellation_code', 'dep_time', 'arr_time', 'wheels_on', 'air_time', 'taxi_out', 'diverted', 'wheels_off', 'op_carrier_fl_num', 'dest', 'security_delay', 'origin', 'cancelled', 'weather_delay', 'carrier_delay', 'actual_elapsed_time'}


In [4]:
import os

def create_file_summary(df, file_path):
    # Get file size
    file_size = os.path.getsize(file_path) / (1024 * 1024)  # Size in MB

    # Get total number of rows and columns
    total_rows, total_columns = df.shape

    # Print summary
    print(f"File Summary:")
    print(f"Total Number of Rows: {total_rows}")
    print(f"Total Number of Columns: {total_columns}")
    print(f"File Size: {file_size:.2f} MB")

# Call the function to create the file summary
create_file_summary(df, csv_file_path)


File Summary:
Total Number of Rows: 7213446
Total Number of Columns: 28
File Size: 851.62 MB
