In [16]:
import time
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import yaml
import os

In [3]:
file_path = '/Users/vaweng02/Desktop/Combined_Flights_2021.csv'

# Method 1: Pandas
start_time = time.time()
df_pandas = pd.read_csv(file_path)
end_time = time.time()
time_pandas = end_time - start_time
print("Time taken by Pandas:", time_pandas, "seconds")

# Method 2: Dask
start_time = time.time()
df_dask = dd.read_csv(file_path)
df_dask.compute()
end_time = time.time()
time_dask = end_time - start_time
print("Time taken by Dask:", time_dask, "seconds")

# Method 3: Modin
start_time = time.time()
df_modin = mpd.read_csv(file_path)
end_time = time.time()
time_modin = end_time - start_time
print("Time taken by Modin:", time_modin, "seconds")

# Conclusion: Dask showed the fastest computational speed

Time taken by Pandas: 61.989914894104004 seconds
Time taken by Dask: 34.21029996871948 seconds



    import ray
    ray.init(runtime_env={'env_vars': {'__MODIN_AUTOIMPORT_PANDAS__': '1'}})

2023-04-09 20:40:06,367	INFO worker.py:1553 -- Started a local Ray instance.
[2m[33m(raylet)[0m   aiogrpc.init_grpc_aio()


Time taken by Modin: 47.518561124801636 seconds


[2m[36m(raylet)[0m Spilled 2057 MiB, 16 objects, write throughput 294 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.


In [6]:
# Perform basic validation on data columns

df = pd.read_csv(file_path)

# remove special characters from column names
df.columns = df.columns.str.replace('[^a-zA-Z0-9]+', '_')

# remove whitespaces from column names
df.columns = df.columns.str.strip().str.lower()

# check for missing values in each column
missing_values = df.isnull().sum()

# check for duplicates
duplicates = df[df.duplicated()]



In [7]:
# get all columns
all_cols = df.columns.tolist()

# print the columns
print(all_cols)

['flightdate', 'airline', 'origin', 'dest', 'cancelled', 'diverted', 'crsdeptime', 'deptime', 'depdelayminutes', 'depdelay', 'arrtime', 'arrdelayminutes', 'airtime', 'crselapsedtime', 'actualelapsedtime', 'distance', 'year', 'quarter', 'month', 'dayofmonth', 'dayofweek', 'marketing_airline_network', 'operated_or_branded_code_share_partners', 'dot_id_marketing_airline', 'iata_code_marketing_airline', 'flight_number_marketing_airline', 'operating_airline', 'dot_id_operating_airline', 'iata_code_operating_airline', 'tail_number', 'flight_number_operating_airline', 'originairportid', 'originairportseqid', 'origincitymarketid', 'origincityname', 'originstate', 'originstatefips', 'originstatename', 'originwac', 'destairportid', 'destairportseqid', 'destcitymarketid', 'destcityname', 'deststate', 'deststatefips', 'deststatename', 'destwac', 'depdel15', 'departuredelaygroups', 'deptimeblk', 'taxiout', 'wheelsoff', 'wheelson', 'taxiin', 'crsarrtime', 'arrdelay', 'arrdel15', 'arrivaldelaygroups'

In [12]:
# Validating number of columns and column name of ingested file with YAML.

import yaml

with open('file.yaml', 'r') as f:
    schema = yaml.safe_load(f)

if 'columns' not in schema:
    print("Error: Invalid schema file. 'columns' key is missing.")
elif not isinstance(schema['columns'], list):
    print("Error: Invalid schema file. 'columns' value should be a list.")
else:
    if len(df.columns) != len(schema['columns']):
        print("Error: Number of columns in the file doesn't match the schema.")
    else:
        for i, col in enumerate(df.columns):
            if col != schema['columns'][i]:
                print(f"Error: Column {i+1} doesn't match the schema.")

In [17]:
subset_df = df.iloc[:1000]  # selecting the first 1000 rows, since full size is taking too long to compute
subset_df.to_csv('output_file.csv.gz', sep='|', compression='gzip', index=False)

In [18]:
# get the file size
file_size = os.path.getsize('output_file.csv.gz')

# get the total number of rows and columns
num_rows = len(df)
num_cols = len(df.columns)

# print the summary
print(f"Total number of rows: {num_rows}")
print(f"Total number of columns: {num_cols}")
print(f"File size: {file_size} bytes")

Total number of rows: 6311871
Total number of columns: 61
File size: 57758 bytes
