In [None]:
!pip install pandas dask[complete] modin[ray] pyyaml

Collecting modin[ray]
  Downloading modin-0.28.0-py3-none-any.whl (1.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m8.1 MB/s[0m eta [36m0:00:00[0m
Collecting lz4>=4.3.2 (from dask[complete])
  Downloading lz4-4.3.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m11.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pandas
  Downloading pandas-2.2.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.0/13.0 MB[0m [31m24.5 MB/s[0m eta [36m0:00:00[0m
Collecting ray[default]!=2.5.0,>=1.13.0 (from modin[ray])
  Downloading ray-2.10.0-cp310-cp310-manylinux2014_x86_64.whl (65.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.1/65.1 MB[0m [31m7.0 MB/s[0m eta [36m0:00:00[0m
Collecting aiohttp-cors (from ray[default]!=2.5.0,>=1.13.0->m

In [None]:
import pandas as pd
import dask.dataframe as dd
import modin.pandas as mpd
import yaml
import os

In [None]:
def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            print(exc)

def col_header_val(df, table_config):
    df.columns = df.columns.str.lower().str.replace('[^\w]', '_', regex=True).str.strip('_')
    df.columns = [re.sub('_+', '_', col) for col in df.columns]
    expected_col = [col.lower() for col in table_config['columns']]
    if set(expected_col) == set(df.columns):
        print("Column name and column length validation passed")
        return True
    else:
        print("Column name and column length validation failed")
        return False

In [None]:
import re

config_data = read_config_file("file.yaml")

df_dask = dd.read_csv(f"{config_data['file_name']}.csv", delimiter=config_data['inbound_delimiter'],
                      dtype={'time': 'float64'})

if col_header_val(df_dask.compute(), config_data):
    df_dask.to_csv('processed_file.gz', sep=config_data['outbound_delimiter'], compression='gzip', index=False, single_file=True)

    print(f"Total rows: {len(df_dask)}, Total columns: {len(df_dask.columns)}")
    print(f"Processed file size: {os.path.getsize('processed_file.gz')} bytes")

Column name and column length validation passed
Total rows: 12004808, Total columns: 12
Processed file size: 430286604 bytes


Compare the performance of reading files using different libraries

In [None]:
import time

start_time = time.time()
df_pandas = pd.read_csv(f"{config_data['file_name']}.csv", delimiter=config_data['inbound_delimiter'])
pandas_time = time.time() - start_time
print(f"Reading time with Pandas: {pandas_time} seconds")

start_time = time.time()
df_dask = dd.read_csv(f"{config_data['file_name']}.csv", delimiter=config_data['inbound_delimiter'])
dask_time = time.time() - start_time
print(f"Reading time with Dask: {dask_time} seconds")

start_time = time.time()
df_modin = mpd.read_csv(f"{config_data['file_name']}.csv", delimiter=config_data['inbound_delimiter'])
modin_time = time.time() - start_time
print(f"Reading time with Modin: {modin_time} seconds")


Reading time with Pandas: 49.59385704994202 seconds
Reading time with Dask: 0.021520376205444336 seconds


2024-04-04 09:52:48,398	INFO worker.py:1752 -- Started a local Ray instance.
[33m(raylet)[0m [2024-04-04 09:53:48,332 E 9654 9654] (raylet) node_manager.cc:2967: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 4c79afa08adec216a5fc78b7065a567ab73da1f39638e047970ec8f7, IP: 172.28.0.12) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 172.28.0.12`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


Reading time with Modin: 128.97150540351868 seconds


Data types of partitions are different! Please refer to the troubleshooting section of the Modin documentation to fix this issue.
