In [14]:
#pip install -U ipywidgets

In [15]:
#!pip install pandas dask modin[ray] PyYAML

In [20]:
#pip install psutil

In [21]:
#pip install --upgrade ray

In [1]:
import pandas as pd
import modin.pandas as mpd
import ray
import time
import dask.dataframe as dd
import os
import re
import yaml
import psutil

In [2]:
ray.init(ignore_reinit_error=True)

2024-05-26 00:04:27,523	INFO worker.py:1749 -- Started a local Ray instance.


0,1
Python version:,3.11.7
Ray version:,2.20.0


In [17]:
# Step 1: Take any CSV/Text File of 2+ GB

In [4]:
file='C:/DDrive/Dataset/Data.json'

In [22]:
#pip install -U ipywidgets

In [23]:
# Step 2: Read the File (Present Approach of Reading the File)

In [8]:
df = pd.read_json(file,lines=True)

In [9]:
df.head(3)

Unnamed: 0,user id,name,review count,yelping since,useful,funny,cool,elite,friends,fans,...,compliment more,compliment profile,compliment cute,compliment list,compliment note,compliment plain,compliment cool,compliment funny,compliment writer,compliment photos
0,m1evbVj4Q-BcO-Xu5VfvEw,Kevin,49,2010-05-20 23:23:59,67,30,23,,"txGhbn4nZZwCG6QKyU4oug, oMFqDgXcBaOz-V4mK0z1tw...",3,...,1,0,0,0,1,1,1,1,1,1
1,yOO5Dj4CU-UJsnVxsSpVPg,Lowie,265,2010-08-07 00:07:59,383,144,235,20122013,"GY6tKv0ZD8wuHCZKN165og, PWrQ5EH-XdX8gaORnfbWKg...",19,...,5,4,2,2,7,21,17,17,14,3
2,wEE-YMx5pmSuagLtNxMPKA,Stephanie,2000,2007-08-16 13:41:49,38666,20280,28814,"2009,2010,2011,2012,2013,2014,2015,2016,2017,2...","SmYoYCF3RVg5gaHeejQFhA, qQ_IOstGA56cnBQ_LxvQXg...",1761,...,168,112,107,64,1637,7431,2954,2954,812,691


In [26]:
# Step 3: Different Methods of File Reading and Present Findings in Terms of Computational Efficiency

In [5]:
# Measure time and memory usage for different libraries
def measure_efficiency(read_func, file_path, is_ray=False):
    start_time = time.time()
    if is_ray:
        df = read_func(file_path)
        memory_usage = df.size_bytes()  # Memory usage in bytes for Ray
    else:
        df = read_func(file_path)
        if hasattr(df, 'compute'):
            df = df.compute()  # For Dask
        memory_usage = df.memory_usage(deep=True).sum() if isinstance(df, pd.DataFrame) else psutil.Process().memory_info().rss
    end_time = time.time()
    return end_time - start_time, memory_usage

# Pandas
pandas_time, pandas_memory = measure_efficiency(lambda file: pd.read_json(file, lines=True), file)

# Dask
dask_time, dask_memory = measure_efficiency(lambda file: dd.read_json(file, lines=True), file)

# Modin
modin_time, modin_memory = measure_efficiency(lambda file: mpd.read_json(file, lines=True), file)

# Ray
ray_time, ray_memory = measure_efficiency(ray.data.read_json, file, is_ray=True)

# Display results
efficiency_comparison = {
    'Library': ['Pandas', 'Dask', 'Modin', 'Ray'],
    'Time (seconds)': [pandas_time, dask_time, modin_time, ray_time],
    'Memory Usage (bytes)': [pandas_memory, dask_memory, modin_memory, ray_memory]
}

print(pd.DataFrame(efficiency_comparison))

  Library  Time (seconds)  Memory Usage (bytes)
0  Pandas      194.939918            3445657444
1    Dask      372.973100            2955097788
2   Modin      124.084744             189960192
3     Ray        9.045848            3347825977


In [None]:
#Step 4: Perform Basic Validation on Data Columns

In [12]:
# Remove special characters and whitespace from column names
df.columns = [re.sub(r'\W+', '', col).strip() for col in df.columns]
print("Cleaned column names:", df.columns)

# Save the cleaned DataFrame
df.to_csv('C:/DDrive/Data/cleaned_large_file.csv', index=False)

Cleaned column names: Index(['userid', 'name', 'reviewcount', 'yelpingsince', 'useful', 'funny',
       'cool', 'elite', 'friends', 'fans', 'averagestars', 'complimenthot',
       'complimentmore', 'complimentprofile', 'complimentcute',
       'complimentlist', 'complimentnote', 'complimentplain', 'complimentcool',
       'complimentfunny', 'complimentwriter', 'complimentphotos'],
      dtype='object')


In [None]:
Step 5: Create a YAML File with Schema Information

In [14]:
# Generate schema based on the columns
schema = {
    'input_separator': ',',
    'output_separator': '|',
    'columns': list(df.columns)
}

# Write the schema to a YAML file
with open('C:/DDrive/Data/schema.yaml', 'w') as file:
    yaml.dump(schema, file)

print("Schema written to schema.yaml")

Schema written to schema.yaml


In [None]:
# Step 6: Validate Number of Columns and Column Names of Ingested File with YAML

In [16]:
# Load the YAML schema
with open('C:/DDrive/Data/schema.yaml', 'r') as file:
    schema = yaml.safe_load(file)

# Read the CSV file using the specified separator
df = pd.read_csv('C:/DDrive/Data/cleaned_large_file.csv', sep=schema['input_separator'])

# Validate the number of columns
assert len(df.columns) == len(schema['columns']), f"Column count does not match. Expected {len(schema['columns'])}, got {len(df.columns)}."+üoh

# Validate the column names
assert all([col in df.columns for col in schema['columns']]), "Column names do not match."

print("Validation successful!")



Validation successful!


In [None]:
# Step 7: Write the File in Pipe-Separated Text Format with Gzip Compression

In [None]:
df.to_csv('C:/DDrive/Data/cleaned_large_file_pipe_separated.csv.gz', sep='|', compression='gzip', index=False)

In [None]:
# Step 8: Create a Summary of the File

In [19]:
num_rows, num_cols = df.shape
file_size = os.path.getsize('C:/DDrive/Data/cleaned_large_file_pipe_separated.csv.gz')

summary = {
    'Total number of rows': num_rows,
    'Total number of columns': num_cols,
    'File size (bytes)': file_size
}
print(summary)

{'Total number of rows': 1986595, 'Total number of columns': 22, 'File size (bytes)': 1243473485}
