# **Week 6: File ingestion and schema validation**

Data Glacier Virtual Internship

Submitted By: Mahima Sadananda

Date: 10/10/2024

## **Dataset**

Combined_Flights_2018.csv  from Flight Status Prediction Dataset in Kaggle - https://www.kaggle.com/datasets/robikscube/flight-delay-dataset-20182022?select=Combined_Flights_2018.csv

## **Import Necessary Libraries**

In [1]:
import warnings
warnings.filterwarnings("ignore")

import os
import time

## **1. Reading Data Using Different Methods**

### **1.1 Pandas**

In [2]:
import pandas as pd

start = time.time()
df_pandas = pd.read_csv("/content/drive/MyDrive/Week6/Combined_Flights_2018.csv")
end = time.time()

print(f"Read Time using Pandas: {end - start} seconds")

ParserError: Error tokenizing data. C error: Calling read(nbytes) on source failed. Try engine='python'.

### **1.2 Dask**

In [4]:
import dask.dataframe as dd

start = time.time()
df_dask = dd.read_csv("/content/drive/MyDrive/Week6/Combined_Flights_2018.csv")
end = time.time()

print(f"Read Time using Dask: {end - start} seconds")

Read Time using Dask: 0.12508320808410645 seconds


### **1.3 Modin and Ray**

In [None]:
#pip install modin[ray]

In [7]:
import modin.pandas as mpd

start = time.time()
df_modin = mpd.read_csv("/content/drive/MyDrive/Week6/Combined_Flights_2018.csv")
end = time.time()

print(f"Read Time using Modin: {end - start} seconds")

2024-10-10 22:18:15,383	INFO worker.py:1786 -- Started a local Ray instance.


[33m(raylet)[0m A worker died or was killed while executing a task by an unexpected system error. To troubleshoot the problem, check the logs for the dead worker. RayTask ID: 04e4a039984cef68a6f79e962cd449e88879783f01000000 Worker ID: ec73cf3f5086485731cca2de4ede20c18911ae27f12a6261f0666328 Node ID: 19bace899fa5e6dae47ad002ed9e497d2919708b3f9416c5b4967484 Worker IP address: 172.28.0.12 Worker port: 37163 Worker PID: 23360 Worker exit type: SYSTEM_ERROR Worker exit detail: Worker unexpectedly exits with a connection error code 2. End of file. There are some potential root causes. (1) The process is killed by SIGKILL by OOM killer due to high memory usage. (2) ray stop --force is called. (3) The worker is crashed unexpectedly due to SIGSEGV or other unexpected errors.


[33m(raylet)[0m [2024-10-10 22:20:15,281 E 23290 23290] (raylet) node_manager.cc:3065: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 19bace899fa5e6dae47ad002ed9e497d2919708b3f9416c5b4967484, IP: 172.28.0.12) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 172.28.0.12`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.
[33m(raylet)[0m 
[33m(raylet)[0m [2024-10-10 22:21:15,283 E 23290 23290] (raylet) node_m

Read Time using Modin: 231.2152135372162 seconds


**Conclusion:** Pandas crashed while reading the file, Dask took only 0.125 seconds, while Modin & Ray took 231.215 seconds. **Dask** has been time efficient in this case.


## **2. Basic Validation**

In [8]:
# making a copy
df = df_modin.copy()

In [12]:
# printing top 5 rows
df.head()

Unnamed: 0,FlightDate,Airline,Origin,Dest,Cancelled,Diverted,CRSDepTime,DepTime,DepDelayMinutes,DepDelay,...,WheelsOff,WheelsOn,TaxiIn,CRSArrTime,ArrDelay,ArrDel15,ArrivalDelayGroups,ArrTimeBlk,DistanceGroup,DivAirportLandings
0,2018-01-23,Endeavor Air Inc.,ABY,ATL,False,False,1202,1157.0,0.0,-5.0,...,1211.0,1249.0,7.0,1304,-8.0,0.0,-1.0,1300-1359,1,0.0
1,2018-01-24,Endeavor Air Inc.,ABY,ATL,False,False,1202,1157.0,0.0,-5.0,...,1210.0,1246.0,12.0,1304,-6.0,0.0,-1.0,1300-1359,1,0.0
2,2018-01-25,Endeavor Air Inc.,ABY,ATL,False,False,1202,1153.0,0.0,-9.0,...,1211.0,1251.0,11.0,1304,-2.0,0.0,-1.0,1300-1359,1,0.0
3,2018-01-26,Endeavor Air Inc.,ABY,ATL,False,False,1202,1150.0,0.0,-12.0,...,1207.0,1242.0,11.0,1304,-11.0,0.0,-1.0,1300-1359,1,0.0
4,2018-01-27,Endeavor Air Inc.,ABY,ATL,False,False,1400,1355.0,0.0,-5.0,...,1412.0,1448.0,11.0,1500,-1.0,0.0,-1.0,1500-1559,1,0.0


In [9]:
# checking df info
df.info()

<class 'modin.pandas.dataframe.DataFrame'>
RangeIndex: 5689512 entries, 0 to 5689511
Data columns (total 61 columns):
 #   Column                                   Dtype  
---  ------                                   -----  
 0   FlightDate                               object 
 1   Airline                                  object 
 2   Origin                                   object 
 3   Dest                                     object 
 4   Cancelled                                bool   
 5   Diverted                                 bool   
 6   CRSDepTime                               int64  
 7   DepTime                                  float64
 8   DepDelayMinutes                          float64
 9   DepDelay                                 float64
 10  ArrTime                                  float64
 11  ArrDelayMinutes                          float64
 12  AirTime                                  float64
 13  CRSElapsedTime                           float64
 14  ActualElapsed

In [10]:
# removing special character
df.columns = df.columns.str.replace('[^A-Za-z0-9]+', '', regex=True)

In [11]:
# removing white space from columns
df.columns = df.columns.str.replace(' ', '')

## **3. YAML**

### **3.1 Creating YAML File**

In [13]:
# column names
df.columns.tolist()

['FlightDate',
 'Airline',
 'Origin',
 'Dest',
 'Cancelled',
 'Diverted',
 'CRSDepTime',
 'DepTime',
 'DepDelayMinutes',
 'DepDelay',
 'ArrTime',
 'ArrDelayMinutes',
 'AirTime',
 'CRSElapsedTime',
 'ActualElapsedTime',
 'Distance',
 'Year',
 'Quarter',
 'Month',
 'DayofMonth',
 'DayOfWeek',
 'MarketingAirlineNetwork',
 'OperatedorBrandedCodeSharePartners',
 'DOTIDMarketingAirline',
 'IATACodeMarketingAirline',
 'FlightNumberMarketingAirline',
 'OperatingAirline',
 'DOTIDOperatingAirline',
 'IATACodeOperatingAirline',
 'TailNumber',
 'FlightNumberOperatingAirline',
 'OriginAirportID',
 'OriginAirportSeqID',
 'OriginCityMarketID',
 'OriginCityName',
 'OriginState',
 'OriginStateFips',
 'OriginStateName',
 'OriginWac',
 'DestAirportID',
 'DestAirportSeqID',
 'DestCityMarketID',
 'DestCityName',
 'DestState',
 'DestStateFips',
 'DestStateName',
 'DestWac',
 'DepDel15',
 'DepartureDelayGroups',
 'DepTimeBlk',
 'TaxiOut',
 'WheelsOff',
 'WheelsOn',
 'TaxiIn',
 'CRSArrTime',
 'ArrDelay',
 'Ar

### **3.2 Open YAML File**

In [16]:
import yaml

with open("/content/drive/MyDrive/Week6/config.yaml") as file:
    config = yaml.safe_load(file)

### **3.3 Validate Ingested YAML File**

In [17]:
if list(df.columns) == config["columns"]:
    print("Validation Successful :)")
else:
    print("Validation Failed :(")

Validation Successful :)


### **3.4 Create Textfile in GZ format with Pipe Separator**

In [19]:
df_subset = df.iloc[:2000]  # selecting first 2000 rows
df_subset.to_csv("output_file.csv.gz", sep="|", compression="gzip", index=False)

### **3.5 File Summary**

In [20]:
# no. of rows
len(df)

5689512

In [21]:
# no. of columns
len(df.columns)

61

In [22]:
# file size
os.path.getsize("output_file.csv.gz")

82059