# File Ingestion and Schema Validation

## Tasks Performed

1. Read the file using different method

2. Perform basic validation of the taken data

3. Create a YAML file, and generate summary of file

## Dataset

I found the dataset(2.21GB) on Kraggle's site. You can also find on here: [Combined_Flights_2021](https://www.kaggle.com/datasets/robikscube/flight-delay-dataset-20182022?resource=download&select=Combined_Flights_2021.csv).

*When you download the dataset and want to run this notebook, change the file path to yours.*


## 1. Read the File Using Different Methods

In [64]:
import warnings
warnings.filterwarnings("ignore")

### 1.1 Reading the File with Pandas

In [65]:
import pandas as pd
import time

start_time = time.time()
df_pandas = pd.read_csv("Combined_Flights_2021.csv")
end_time = time.time()

print(f"Pandas read time: {end_time - start_time} seconds")

Pandas read time: 22.055590867996216 seconds


### 1.2 Reading the File with Dask

In [66]:
import dask.dataframe as dd
import time

start_time = time.time()
df_dask = dd.read_csv("Combined_Flights_2021.csv")
end_time = time.time()

print(f"Dask read time: {end_time - start_time} seconds")

Dask read time: 0.18443703651428223 seconds


### 1.3 Reading the File with Ray

In [67]:
import ray
import ray.data
import time

ray.init(ignore_reinit_error=True)


start_time = time.time()

df_ray = ray.data.read_csv("Combined_Flights_2021.csv")

end_time = time.time()

print(f"Ray read time: {end_time - start_time} seconds")


2024-08-07 23:35:30,234	INFO worker.py:1614 -- Calling ray.init() again after it has already been called.


Ray read time: 0.708920955657959 seconds


### Conclusion

Dask has the fastest computational speed.

## 2. Basic Validation

In [68]:
df = df_dask.copy()
df.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 61 entries, FlightDate to DivAirportLandings
dtypes: bool(2), float64(19), int64(22), string(18)

In [69]:
len(df.index)

6311871

In [70]:
len(df.columns)

61

In [71]:
df.columns=df.columns.str.replace('[#,@,&]','')
df.columns = df.columns.str.replace(' ', '')

In [72]:
data=df.columns
data

Index(['FlightDate', 'Airline', 'Origin', 'Dest', 'Cancelled', 'Diverted',
       'CRSDepTime', 'DepTime', 'DepDelayMinutes', 'DepDelay', 'ArrTime',
       'ArrDelayMinutes', 'AirTime', 'CRSElapsedTime', 'ActualElapsedTime',
       'Distance', 'Year', 'Quarter', 'Month', 'DayofMonth', 'DayOfWeek',
       'Marketing_Airline_Network', 'Operated_or_Branded_Code_Share_Partners',
       'DOT_ID_Marketing_Airline', 'IATA_Code_Marketing_Airline',
       'Flight_Number_Marketing_Airline', 'Operating_Airline',
       'DOT_ID_Operating_Airline', 'IATA_Code_Operating_Airline',
       'Tail_Number', 'Flight_Number_Operating_Airline', 'OriginAirportID',
       'OriginAirportSeqID', 'OriginCityMarketID', 'OriginCityName',
       'OriginState', 'OriginStateFips', 'OriginStateName', 'OriginWac',
       'DestAirportID', 'DestAirportSeqID', 'DestCityMarketID', 'DestCityName',
       'DestState', 'DestStateFips', 'DestStateName', 'DestWac', 'DepDel15',
       'DepartureDelayGroups', 'DepTimeBlk', 'TaxiOu

In [73]:
# Check missing value
df.isnull().sum().value_counts()

Dask Series Structure:
npartitions=1
    int64
      ...
Name: count, dtype: int64
Dask Name: value-counts-agg, 10 graph layers

## 3. Create YAML File

In [74]:
# Get header name
df.columns.tolist()

['FlightDate',
 'Airline',
 'Origin',
 'Dest',
 'Cancelled',
 'Diverted',
 'CRSDepTime',
 'DepTime',
 'DepDelayMinutes',
 'DepDelay',
 'ArrTime',
 'ArrDelayMinutes',
 'AirTime',
 'CRSElapsedTime',
 'ActualElapsedTime',
 'Distance',
 'Year',
 'Quarter',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'Marketing_Airline_Network',
 'Operated_or_Branded_Code_Share_Partners',
 'DOT_ID_Marketing_Airline',
 'IATA_Code_Marketing_Airline',
 'Flight_Number_Marketing_Airline',
 'Operating_Airline',
 'DOT_ID_Operating_Airline',
 'IATA_Code_Operating_Airline',
 'Tail_Number',
 'Flight_Number_Operating_Airline',
 'OriginAirportID',
 'OriginAirportSeqID',
 'OriginCityMarketID',
 'OriginCityName',
 'OriginState',
 'OriginStateFips',
 'OriginStateName',
 'OriginWac',
 'DestAirportID',
 'DestAirportSeqID',
 'DestCityMarketID',
 'DestCityName',
 'DestState',
 'DestStateFips',
 'DestStateName',
 'DestWac',
 'DepDel15',
 'DepartureDelayGroups',
 'DepTimeBlk',
 'TaxiOut',
 'WheelsOff',
 'WheelsOn',
 'TaxiIn',
 'CRS

In [75]:
import yaml

columns = [
    'FlightDate', 'Airline', 'Origin', 'Dest', 'Cancelled', 'Diverted',
    'CRSDepTime', 'DepTime', 'DepDelayMinutes', 'DepDelay', 'ArrTime',
    'ArrDelayMinutes', 'AirTime', 'CRSElapsedTime', 'ActualElapsedTime',
    'Distance', 'Year', 'Quarter', 'Month', 'DayofMonth', 'DayOfWeek',
    'MarketingAirlineNetwork', 'OperatedorBrandedCodeSharePartners',
    'DOTIDMarketingAirline', 'IATACodeMarketingAirline',
    'FlightNumberMarketingAirline', 'OperatingAirline', 'DOTIDOperatingAirline',
    'IATACodeOperatingAirline', 'TailNumber', 'FlightNumberOperatingAirline',
    'OriginAirportID', 'OriginAirportSeqID', 'OriginCityMarketID',
    'OriginCityName', 'OriginState', 'OriginStateFips', 'OriginStateName',
    'OriginWac', 'DestAirportID', 'DestAirportSeqID', 'DestCityMarketID',
    'DestCityName', 'DestState', 'DestStateFips', 'DestStateName', 'DestWac',
    'DepDel15', 'DepartureDelayGroups', 'DepTimeBlk', 'TaxiOut', 'WheelsOff',
    'WheelsOn', 'TaxiIn', 'CRSArrTime', 'ArrDelay', 'ArrDel15',
    'ArrivalDelayGroups', 'ArrTimeBlk', 'DistanceGroup', 'DivAirportLandings'
]

# Data to be written to the YAML file
data = {'columns': columns}

# Write data to a YAML file
with open('columns.yaml', 'w') as file:
    yaml.dump(data, file, default_flow_style=False)

print("YAML file 'columns.yaml' created.")


YAML file 'columns.yaml' created.


In [77]:
import yaml
import gzip
import os

# Step 1: Read the YAML configuration file
with open("columns.yaml") as file:
    config = yaml.safe_load(file)

# Step 2: Validate DataFrame structure against YAML
if list(df.columns) == config["columns"]:
    print("Column validation passed")
else:
    print("Column validation failed. Kindly check the column names and order.")

# Step 3: Write DataFrame to a gzipped, pipe separated text file
df_subset = df.head(2000)  # selecting the first 2000 rows
df_subset.to_csv("output.csv.gz", sep="|", compression="gzip", index=False)

# Step 4: Create and print a summary of the file
summary = {
    "Total number of rows": len(df),
    "Total number of columns": len(df.columns),
    "File size (bytes)": os.path.getsize("output.csv.gz")
}

print("File Summary:", summary)


Column validation failed. Kindly check the column names and order.
File Summary: {'Total number of rows': 6311871, 'Total number of columns': 61, 'File size (bytes)': 112945}
