# Data Glacier Week 6: Data Ingestion Pipeline
## Zakaria Arshad
## Submitted on March 4, 2023

In [None]:
!pip install "dask[complete]"
!pip install modin[all]
!pip install ray[all]

In [41]:
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import numpy as np
import os
%env RAY_memory_monitor_refresh_ms=0
import ray
import time
import yaml
import gzip

env: RAY_memory_monitor_refresh_ms=0


In [3]:
start_time = time.time()

chunksize = 100000
dfs = []

for chunk in pd.read_csv('/content/drive/MyDrive/Data Glacier/Week 6/chess_games.csv', chunksize=chunksize):
    dfs.append(chunk)

df_p = pd.concat(dfs, ignore_index=True)
end_time = time.time()

pd_time_taken = end_time - start_time

In [4]:
start_time = time.time()

df_d = dd.read_csv('/content/drive/MyDrive/Data Glacier/Week 6/chess_games.csv')

end_time = time.time()

dash_time_taken = end_time - start_time

In [None]:
start_time = time.time()

df_r = ray.data.read_csv("/content/drive/MyDrive/Data Glacier/Week 6/chess_games.csv")

end_time = time.time()

ray_time_taken = end_time - start_time

In [15]:
print(f"Pandas time taken: {pd_time_taken:.2f} seconds")
print(f"Dash time taken: {dash_time_taken:.2f} seconds")
print(f"Ray time taken: {ray_time_taken:.2f} seconds")

Pandas time taken: 102.35 seconds
Dash time taken: 0.02 seconds
Ray time taken: 10.09 seconds


# Computational Efficiency

The most efficient method was using **Dash**, taking .02 seconds. The second most efficient was **Ray**, with 10.09 seconds. Finally, **Pandas** took far longer, with 102.35 seconds.

In [24]:
df_p.head(5)

Unnamed: 0,Event,White,Black,Result,UTCDate,UTCTime,WhiteElo,BlackElo,WhiteRatingDiff,BlackRatingDiff,ECO,Opening,TimeControl,Termination,AN
0,Classical,eisaaaa,HAMID449,1-0,2016.06.30,22:00:01,1901,1896,11.0,-11.0,D10,Slav Defense,300+5,Time forfeit,1. d4 d5 2. c4 c6 3. e3 a6 4. Nf3 e5 5. cxd5 e...
1,Blitz,go4jas,Sergei1973,0-1,2016.06.30,22:00:01,1641,1627,-11.0,12.0,C20,King's Pawn Opening: 2.b3,300+0,Normal,1. e4 e5 2. b3 Nf6 3. Bb2 Nc6 4. Nf3 d6 5. d3 ...
2,Blitz tournament,Evangelistaizac,kafune,1-0,2016.06.30,22:00:02,1647,1688,13.0,-13.0,B01,Scandinavian Defense: Mieses-Kotroc Variation,180+0,Time forfeit,1. e4 d5 2. exd5 Qxd5 3. Nf3 Bg4 4. Be2 Nf6 5....
3,Correspondence,Jvayne,Wsjvayne,1-0,2016.06.30,22:00:02,1706,1317,27.0,-25.0,A00,Van't Kruijs Opening,-,Normal,1. e3 Nf6 2. Bc4 d6 3. e4 e6 4. Nf3 Nxe4 5. Nd...
4,Blitz tournament,kyoday,BrettDale,0-1,2016.06.30,22:00:02,1945,1900,-14.0,13.0,B90,"Sicilian Defense: Najdorf, Lipnitsky Attack",180+0,Time forfeit,1. e4 c5 2. Nf3 d6 3. d4 cxd4 4. Nxd4 Nf6 5. N...


In [26]:
df_d.info()

<class 'dask.dataframe.core.DataFrame'>
Columns: 15 entries, Event to AN
dtypes: object(11), float64(2), int64(2)

In [28]:
row_num = len(df_d.index)
print(f"The number of rows is {row_num}.")

The number of rows is 6256184.


In [29]:
col_num = len(df_d.columns)
print(f"The number of columns is {col_num}.")

The number of columns is 15.


# Column Validation
Removing all special characters and whitespace that may exist in column names.

In [None]:
df_d.columns = df_d.columns.str.replace('[!,@,#,$,%,^,&,*]', '')
df_d.columns = df_d.columns.str.replace(' ', '')

In [33]:
df_d_columns = df_d.columns
df_d_columns

Index(['Event', 'White', 'Black', 'Result', 'UTCDate', 'UTCTime', 'WhiteElo',
       'BlackElo', 'WhiteRatingDiff', 'BlackRatingDiff', 'ECO', 'Opening',
       'TimeControl', 'Termination', 'AN'],
      dtype='object')

# Validation Process using YAML

In [34]:
import yaml

column_names = df_d.columns.tolist()

with open("columns.yaml", "w") as file:
    yaml.dump(column_names, file)

The above code created a YAML file and wrote the column names to the file columns.yaml.

In [35]:
with open("columns.yaml", "r") as file:
    expected_columns = yaml.safe_load(file)

actual_columns = df_d.columns.tolist()

if expected_columns != actual_columns:
    raise ValueError("Column names do not match YAML file.")

expected_num_cols = len(expected_columns)
actual_num_cols = len(actual_columns)

if expected_num_cols != actual_num_cols:
    raise ValueError("Number of columns does not match YAML file.")

No errors were raised, thus we have validated that the columns are the same.

In [40]:
df_d.to_csv('output_file.csv.gz', sep='|', compression='gzip', index=False)

['/content/output_file.csv.gz/00.part',
 '/content/output_file.csv.gz/01.part',
 '/content/output_file.csv.gz/02.part',
 '/content/output_file.csv.gz/03.part',
 '/content/output_file.csv.gz/04.part',
 '/content/output_file.csv.gz/05.part',
 '/content/output_file.csv.gz/06.part',
 '/content/output_file.csv.gz/07.part',
 '/content/output_file.csv.gz/08.part',
 '/content/output_file.csv.gz/09.part',
 '/content/output_file.csv.gz/10.part',
 '/content/output_file.csv.gz/11.part',
 '/content/output_file.csv.gz/12.part',
 '/content/output_file.csv.gz/13.part',
 '/content/output_file.csv.gz/14.part',
 '/content/output_file.csv.gz/15.part',
 '/content/output_file.csv.gz/16.part',
 '/content/output_file.csv.gz/17.part',
 '/content/output_file.csv.gz/18.part',
 '/content/output_file.csv.gz/19.part',
 '/content/output_file.csv.gz/20.part',
 '/content/output_file.csv.gz/21.part',
 '/content/output_file.csv.gz/22.part',
 '/content/output_file.csv.gz/23.part',
 '/content/output_file.csv.gz/24.part',


In [42]:
filename = 'output_file.csv.gz'

In [49]:
file_size = os.path.getsize(filename)
print(f"The file size is {file_size} bytes.")
print(f"The number of rows is {row_num}.")
print(f"The number of columns is {col_num}.")

The file size is 4096 bytes.
The number of rows is 6256184.
The number of columns is 15.
