In [1]:
!pip install dask[complete] modin[all] ray pandas pyyaml

Collecting ray
  Downloading ray-2.47.0-cp311-cp311-manylinux2014_x86_64.whl.metadata (20 kB)
Collecting modin[all]
  Downloading modin-0.33.1-py3-none-any.whl.metadata (17 kB)
Collecting lz4>=4.3.2 (from dask[complete])
  Downloading lz4-4.4.4-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.8 kB)
Collecting modin-spreadsheet>=0.1.0 (from modin[all])
  Downloading modin_spreadsheet-0.1.2-py2.py3-none-any.whl.metadata (587 bytes)
Collecting dataframe-api-compat>=0.2.7 (from modin[all])
  Downloading dataframe_api_compat-0.2.7-py3-none-any.whl.metadata (1.6 kB)
Collecting jupyter>=1.0.0 (from modin-spreadsheet>=0.1.0->modin[all])
  Downloading jupyter-1.1.1-py2.py3-none-any.whl.metadata (2.0 kB)
Collecting jupyterlab (from jupyter>=1.0.0->modin-spreadsheet>=0.1.0->modin[all])
  Downloading jupyterlab-4.4.3-py3-none-any.whl.metadata (16 kB)
Collecting jedi>=0.16 (from ipython>=4.0.0->ipywidgets>=7.0.0->modin-spreadsheet>=0.1.0->modin[all])
  Downloading jedi-0.19.2-

In [2]:
# Download ~2.5 GB of data from NYC Taxi data
!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet

# Convert to CSV to simulate a raw format
import pandas as pd
df = pd.read_parquet("yellow_tripdata_2023-01.parquet")
df.to_csv("yellow_tripdata.csv", index=False)


--2025-06-12 11:44:55--  https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2023-01.parquet
Resolving d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)... 65.8.245.50, 65.8.245.171, 65.8.245.51, ...
Connecting to d37ci6vzurychx.cloudfront.net (d37ci6vzurychx.cloudfront.net)|65.8.245.50|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 47673370 (45M) [application/x-www-form-urlencoded]
Saving to: ‘yellow_tripdata_2023-01.parquet’


2025-06-12 11:44:56 (80.6 MB/s) - ‘yellow_tripdata_2023-01.parquet’ saved [47673370/47673370]



In [11]:
import pandas as pd
import time

start = time.time()
df_pandas = pd.read_csv("yellow_tripdata.csv")
end = time.time()
print("Pandas read time:", end - start)




Pandas read time: 9.657840251922607


In [12]:
import dask.dataframe as dd

start = time.time()
df_dask = dd.read_csv("yellow_tripdata.csv")
df_dask.head()  # Trigger computation
end = time.time()
print("Dask read time:", end - start)


Dask read time: 2.04052472114563


In [15]:
import ray
import modin.pandas as mpd

ray.init()
start = time.time()
df_modin = mpd.read_csv("yellow_tripdata.csv")
end = time.time()
print("Modin read time:", end - start)


2025-06-12 11:51:00,050	INFO worker.py:1917 -- Started a local Ray instance.


ValueError: An application is trying to access a Ray object whose owner is unknown(00ffffffffffffffffffffffffffffffffffffff0100000003e1f505). Please make sure that all Ray objects you are trying to access are part of the current Ray session. Note that object IDs generated randomly (ObjectID.from_random()) or out-of-band (ObjectID.from_binary(...)) cannot be passed as a task argument because Ray does not know which task created them. If this was not how your object ID was generated, please file an issue at https://github.com/ray-project/ray/issues/

In [14]:
ray.shutdown()

In [16]:


@ray.remote
def load_file():
    import pandas as pd
    return pd.read_csv("yellow_tripdata.csv")

start = time.time()
df_ray = ray.get(load_file.remote())
end = time.time()
print("Ray read time:", end - start)




Ray read time: 13.204901933670044


In [17]:
def clean_columns(df):
    df.columns = (
        df.columns.str.strip()
        .str.lower()
        .str.replace(r"[^a-zA-Z0-9_]", "_", regex=True)
    )
    return df

df_clean = clean_columns(df_pandas)


In [18]:
import yaml

schema = {
    'separator': '|',
    'columns': list(df_clean.columns)
}

with open("schema.yaml", "w") as f:
    yaml.dump(schema, f)


In [19]:
with open("schema.yaml") as f:
    yaml_schema = yaml.safe_load(f)

assert list(df_clean.columns) == yaml_schema['columns'], "Column names do not match!"
print("Validation successful!")


Validation successful!


In [20]:
df_clean.to_csv("cleaned_data.txt.gz", sep="|", index=False, compression="gzip")


In [21]:
import os
import gzip

rows, cols = df_clean.shape
file_size = os.path.getsize("cleaned_data.txt.gz") / (1024 * 1024)  # in MB

print(f"Total Rows: {rows}")
print(f"Total Columns: {cols}")
print(f"Compressed File Size: {file_size:.2f} MB")


Total Rows: 3066766
Total Columns: 19
Compressed File Size: 53.37 MB
