## Dask

In [None]:
import dask
import dask.dataframe as dd
import pandas as pd

pd.options.display.max_rows = 10



In [None]:
%%time
dask_df = dd.read_csv('/content/flights.csv', parse_dates={'Date':[0, 1, 2]})
dask_df.head()

CPU times: user 2.99 s, sys: 812 ms, total: 3.81 s
Wall time: 3.88 s


Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990-01-01,1,1621.0,1540,1747.0,1701,US,33,,86.0,...,,46.0,41.0,EWR,PIT,319.0,,,0,0
1,1990-01-02,2,1547.0,1540,1700.0,1701,US,33,,73.0,...,,-1.0,7.0,EWR,PIT,319.0,,,0,0
2,1990-01-03,3,1546.0,1540,1710.0,1701,US,33,,84.0,...,,9.0,6.0,EWR,PIT,319.0,,,0,0
3,1990-01-04,4,1542.0,1540,1710.0,1701,US,33,,88.0,...,,9.0,2.0,EWR,PIT,319.0,,,0,0
4,1990-01-05,5,1549.0,1540,1706.0,1701,US,33,,77.0,...,,5.0,9.0,EWR,PIT,319.0,,,0,0


## Pandas

In [None]:
%%time
pandas_df = pd.read_csv('/content/flights.csv', low_memory=False, parse_dates={'Date':[0, 1, 2]})
pandas_df.head()

CPU times: user 7.62 s, sys: 2.12 s, total: 9.73 s
Wall time: 9.76 s


Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990-01-01,1,1621.0,1540,1747.0,1701,US,33,,86.0,...,,46.0,41.0,EWR,PIT,319.0,,,0,0
1,1990-01-02,2,1547.0,1540,1700.0,1701,US,33,,73.0,...,,-1.0,7.0,EWR,PIT,319.0,,,0,0
2,1990-01-03,3,1546.0,1540,1710.0,1701,US,33,,84.0,...,,9.0,6.0,EWR,PIT,319.0,,,0,0
3,1990-01-04,4,1542.0,1540,1710.0,1701,US,33,,88.0,...,,9.0,2.0,EWR,PIT,319.0,,,0,0
4,1990-01-05,5,1549.0,1540,1706.0,1701,US,33,,77.0,...,,5.0,9.0,EWR,PIT,319.0,,,0,0


## Modin

In [None]:
!pip install modin

In [None]:
import modin.pandas as pdm

In [None]:
from distributed import Client
client = Client()

In [None]:
%%time
modin_df = pdm.read_csv('/content/flights.csv', parse_dates={'Date':[0, 1, 2]})
modin_df.head()

CPU times: user 739 ms, sys: 239 ms, total: 977 ms
Wall time: 9.51 s


Unnamed: 0,Date,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,...,AirTime,ArrDelay,DepDelay,Origin,Dest,Distance,TaxiIn,TaxiOut,Cancelled,Diverted
0,1990-01-01,1,1621.0,1540,1747.0,1701,US,33,,86.0,...,,46.0,41.0,EWR,PIT,319.0,,,0,0
1,1990-01-02,2,1547.0,1540,1700.0,1701,US,33,,73.0,...,,-1.0,7.0,EWR,PIT,319.0,,,0,0
2,1990-01-03,3,1546.0,1540,1710.0,1701,US,33,,84.0,...,,9.0,6.0,EWR,PIT,319.0,,,0,0
3,1990-01-04,4,1542.0,1540,1710.0,1701,US,33,,88.0,...,,9.0,2.0,EWR,PIT,319.0,,,0,0
4,1990-01-05,5,1549.0,1540,1706.0,1701,US,33,,77.0,...,,5.0,9.0,EWR,PIT,319.0,,,0,0


In [None]:
modin_df.columns

Index(['Date', 'DayOfWeek', 'DepTime', 'CRSDepTime', 'ArrTime', 'CRSArrTime',
       'UniqueCarrier', 'FlightNum', 'TailNum', 'ActualElapsedTime',
       'CRSElapsedTime', 'AirTime', 'ArrDelay', 'DepDelay', 'Origin', 'Dest',
       'Distance', 'TaxiIn', 'TaxiOut', 'Cancelled', 'Diverted'],
      dtype='object')

In [None]:
modin_df.info()

Dask is the fastest at reading files, and Modin is faster than Pandas, as expected.

# Schema Validation

In [None]:
import yaml

In [None]:
with open("/content/schema.yaml", "r") as file:
    schema = yaml.safe_load(file)

In [None]:
c_names = []
for name in schema['fields']:
  c_names.append(name['name'])

In [None]:
c_names

In [None]:
modin_df.shape[1]

21

In [None]:
column_names = []
for i in range(modin_df.shape[1]):
  column_names.append(modin_df.columns[i])

In [None]:
column_names

In [None]:
if c_names == column_names:
  print("Columns are validated.")
else:
  print("Validation failed!")   

Columns are validated


In [None]:
df_type_list = pdm.Series(modin_df.dtypes.to_list())

In [None]:
print(df_type_list)

In [None]:
#schema type list
schema_tl = []
for item in schema['fields']:
  schema_tl.append(item['type'])
  

In [None]:
print(schema_tl)

In [None]:
df_type_list = [str(item) for item in df_type_list]
schema_tl = [str(item) for item in schema_tl]
if df_type_list == schema_tl:
  print("Types are validated.")
else:
  print("Validation failed!")

Types are validated.


## Write the file in pipe separated text file (|) in gz format.

In [None]:
#dask_df.to_csv('flights.csv.gz', sep='|', compression='gzip', index=False, single_file=True)

modin_df.to_csv('flights.csv.gz', sep='|', compression='gzip', index=False)

In [None]:
# modin_df = pdm.read_csv('/content/flights.csv.gz', sep='|', compression='gzip')
# dask_df = dd.read_csv('flights.csv.gz', sep='|', compression='gzip')

## File Summary

In [29]:
import os
file_size = os.path.getsize('/content/flights.csv')
file_size_mb = file_size / 10**6
print(f"The data has {modin_df.shape[0]} rows.")
print(f"The data has {modin_df.shape[1]} columns.")
print(f"The size of the file is {file_size_mb:.2f} MB")

The data has 2611892 rows.
The data has 21 columns.
The size of the file is 244.99 MB
