<a href="https://colab.research.google.com/github/apurwasontakke/Data-Ingestion-Pipeline/blob/main/Data_Ingestion_Pipeline_and_Schema_Validation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Section 1: Setup and Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

file_path = '/content/drive/My Drive/Data/chess_games.csv'



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Section 2: Install Required Libraries
!pip install dask modin[all] ray pyyaml




In [None]:
# Section 3: Read the File Using Different Methods
import pandas as pd
import dask.dataframe as dd
import modin.config as cfg
import modin.pandas as mpd
import yaml
import os
import time

cfg.Engine.put('dask')  # Set Modin to use Dask


In [None]:
# Pandas
start_time = time.time()
df_pandas = pd.read_csv(file_path)
pandas_time = time.time() - start_time
print(f"Pandas read time: {pandas_time} seconds")

Pandas read time: 110.25692129135132 seconds


In [None]:
# Dask
start_time = time.time()
df_dask = dd.read_csv(file_path)
dask_time = time.time() - start_time
print(f"Dask read time: {dask_time} seconds")



Dask read time: 0.3614833354949951 seconds


In [None]:
# Section 4: Basic Validation on Data Columns
df_pandas.columns = df_pandas.columns.str.replace('[^A-Za-z0-9]+', '_').str.strip()

# Section 5: Create a YAML File
schema = {
    'separator': ',',
    'columns': list(df_pandas.columns)
}

with open('schema.yaml', 'w') as file:
    yaml.dump(schema, file)


In [None]:
# Section 6: Validate the Ingested File with YAML
with open('schema.yaml', 'r') as file:
    schema = yaml.safe_load(file)

assert len(df_pandas.columns) == len(schema['columns']), "Column count does not match"
assert all([a == b for a, b in zip(df_pandas.columns, schema['columns'])]), "Column names do not match"



In [None]:
# Section 7: Write the File in Pipe Separated Text File in gz Format
df_pandas.to_csv('output_file.txt.gz', sep='|', index=False, compression='gzip')

# Section 8: Create a Summary of the File
total_rows, total_columns = df_pandas.shape
file_size = os.path.getsize('output_file.txt.gz')

summary = {
    'total_rows': total_rows,
    'total_columns': total_columns,
    'file_size': file_size
}

print(summary)


{'total_rows': 6256184, 'total_columns': 15, 'file_size': 1500375680}


In [None]:
# Section 8: Create a Summary of the File
total_rows, total_columns = df_pandas.shape
file_size = os.path.getsize('output_file.txt.gz')

summary = {
    'total_rows': total_rows,
    'total_columns': total_columns,
    'file_size': file_size
}

print(summary)


{'total_rows': 6256184, 'total_columns': 15, 'file_size': 1500375680}


In [13]:
# Section 9: Presenting Findings on Computational Efficiency
efficiency = {
    'pandas_time': pandas_time,
    'dask_time': dask_time,

}

print(efficiency)

{'pandas_time': 110.25692129135132, 'dask_time': 0.3614833354949951}


In [14]:
import os

# List all files in the current directory
files_in_directory = os.listdir('.')
print(files_in_directory)

# Check if 'schema.yaml' exists in the current directory
if 'schema.yaml' in files_in_directory:
    print("schema.yaml is saved in the current directory.")
else:
    print("schema.yaml is not found in the current directory.")


['.config', 'drive', 'schema.yaml', 'output_file.txt.gz', 'sample_data']
schema.yaml is saved in the current directory.


In [15]:
from google.colab import files

# Download schema.yaml
files.download('schema.yaml')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>