## Assignment Summary

**File Used: token_info_onchain_divers_v2.csv (~700MB)**

**Libraries Compared: Pandas, Dask, Modin (Ray)**

**Schema validated using YAML**

**Output written as pipe-separated .gz file**

**Final Output Size: ~202 MB**

**Successfully handled file ingestion, transformation, and export using scalable methods**

## ✅ Step 1: Mount Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


## ⏱️ Step 2: Install Required Libraries

In [None]:
!pip install modin[ray] -q
!pip install pyyaml -q

## ⏱️ Step 3: Imports and Setup

In [None]:
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import ray
import time
import yaml
import os

## Step 4: Initialize Ray

In [None]:
ray.shutdown()
ray.init(ignore_reinit_error = True)

2025-05-18 17:27:55,972	INFO worker.py:1888 -- Started a local Ray instance.


0,1
Python version:,3.11.12
Ray version:,2.46.0


## 📁 Step 5: Set File Path

In [None]:
file_path = '/content/drive/MyDrive/archive/token_info_onchain_divers_v2.csv'
print("✅ File exists:", os.path.exists(file_path))

✅ File exists: True


## Step 6: Clean Column Name Function

In [None]:
def clean_column_names(df):
    df.columns = df.columns.str.strip().str.replace(r'[^\w]', '_', regex = True)
    return df

## 🔹 Step 7: Read with Pandas

In [None]:
start = time.time()
df_pandas = pd.read_csv(file_path)
df_pandas = clean_column_names(df_pandas)
end = time.time()
print(f"✅ Pandas load time: {end - start:.2f} seconds")

✅ Pandas load time: 25.17 seconds


## 🔹 Step 8: Read with Dask

In [None]:
start = time.time()
df_dask = dd.read_csv(file_path, assume_missing=True)
df_dask = df_dask.rename(columns=lambda x: x.strip().replace(' ', '_').replace('-', '_'))
df_dask_head = df_dask.head()  # Trigger read
end = time.time()
print(f"✅ Dask load time: {end - start:.2f} seconds")

✅ Dask load time: 7.22 seconds


## 🔹 Step 9: Read with Modin + Ray

In [None]:
start = time.time()
df_modin = mpd.read_csv(file_path)
df_modin = clean_column_names(df_modin)
end = time.time()
print(f"✅ Modin (Ray) load time: {end - start:.2f} seconds")

✅ Modin (Ray) load time: 31.29 seconds


## Step 10: YAML Schema Creation from Pandas DF

In [None]:
schema = {'separator' : ',',
          'columns' : list(df_pandas.columns)}

In [None]:
yaml_path = '/content/schema.yaml'

with open(yaml_path, 'w') as f:
  yaml.dump(schema, f)

print("✅ YAML schema saved.")

✅ YAML schema saved.


## Step 11 : Validate Columns

In [None]:
with open(yaml_path) as f:
    loaded_schema = yaml.safe_load(f)

In [None]:
if df_pandas.shape[1] == len(loaded_schema['columns']) and list(df_pandas.columns) == loaded_schema['columns']:
    print("✅ Schema validation passed.")
else:
    print("❌ Schema validation failed.")

✅ Schema validation passed.


## Step 12 : Export to Pipe-Separated gzipped File

In [None]:
output_path = '/content/drive/MyDrive/archive/processed_output.txt.gz'

df_pandas.to_csv(output_path, sep='|', index=False, compression='gzip')
print(f"✅ File written to {output_path}")

✅ File written to /content/drive/MyDrive/archive/processed_output.txt.gz


## Step 13 : File Summary

In [None]:
output_size = os.path.getsize(output_path) / (1024 * 1024)

print(f"📊 Summary:")
print(f"- Rows: {df_pandas.shape[0]}")
print(f"- Columns: {df_pandas.shape[1]}")
print(f"- Output file size: {output_size:.2f} MB")

📊 Summary:
- Rows: 1168712
- Columns: 22
- Output file size: 202.56 MB
