In [1]:
import pandas as pd
import numpy as np

# Set a seed for reproducibility
np.random.seed(42)

# Define chunk size and total rows
chunk_size = 5_000_000  # 5 million rows per chunk
num_chunks = 9  # Create 9 chunks to get 45 million rows in total

# Open the CSV file in write mode initially
with open('employee_churn_large_2gb.csv', 'w') as f:
    for chunk in range(num_chunks):
        print(f"Generating chunk {chunk + 1} of {num_chunks}")

        # Generate random employee churn data for the chunk
        data = {
            'Employee_ID': np.arange(chunk * chunk_size + 1, (chunk + 1) * chunk_size + 1),  # Unique Employee ID
            'Age': np.random.randint(22, 65, size=chunk_size),
            'Gender': np.random.choice(['Male', 'Female'], size=chunk_size),
            'Years_At_Company': np.random.randint(1, 35, size=chunk_size),
            'Last_Promotion': np.random.randint(0, 10, size=chunk_size),
            'Salary': np.random.randint(30_000, 200_000, size=chunk_size),
            'Ethnicity': np.random.choice(['White', 'Black', 'Asian', 'Hispanic', 'Other'], size=chunk_size),
            'Department': np.random.choice(['Sales', 'IT', 'HR', 'Marketing', 'Finance', 'Operations'], size=chunk_size),
            'Number_of_Projects': np.random.randint(1, 10, size=chunk_size),
            'Work_Life_Balance_Score': np.random.randint(1, 5, size=chunk_size),
            'Job_Satisfaction_Score': np.random.randint(1, 5, size=chunk_size),
            'left': np.random.choice([0, 1], size=chunk_size)
        }

        # Create the DataFrame for the chunk
        df_chunk = pd.DataFrame(data)

        # Write header only for the first chunk, otherwise append without header
        if chunk == 0:
            df_chunk.to_csv(f, index=False, header=True)
        else:
            df_chunk.to_csv(f, index=False, header=False, mode='a')

print("Larger employee churn dataset created and saved as 'employee_churn_large_2gb.csv'")


Generating chunk 1 of 9
Generating chunk 2 of 9
Generating chunk 3 of 9
Generating chunk 4 of 9
Generating chunk 5 of 9
Generating chunk 6 of 9
Generating chunk 7 of 9
Generating chunk 8 of 9
Generating chunk 9 of 9
Larger employee churn dataset created and saved as 'employee_churn_large_2gb.csv'


In [2]:
import os

# Check the size of the created CSV file in GB
file_size = os.path.getsize('employee_churn_large_2gb.csv') / (1024 ** 3)  # Convert to GB
print(f"File size: {file_size:.2f} GB")

File size: 2.12 GB


In [3]:
!pip install dask




In [None]:
import dask.dataframe as dd
import time

# Measure time taken to read the CSV file using Dask
start_time = time.time()

# Dask reads the CSV in parallel without loading everything into memory
df_dask = pd.read_csv('employee_churn_large_2gb.csv')

end_time = time.time()

print(f"Dask read time: {end_time - start_time:.2f} seconds")

# Now let's inspect the shape (number of rows and columns)
# We need to "compute" the result since Dask works lazily
rows = df_dask.shape[0].compute()
columns = df_dask.shape[1]

print(f"Shape of the dataframe: {rows} rows, {columns} columns")


Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



In [5]:
!pip install modin[ray] ray


Collecting ray
  Downloading ray-2.35.0-cp310-cp310-manylinux2014_x86_64.whl.metadata (16 kB)
Collecting modin[ray]
  Downloading modin-0.32.0-py3-none-any.whl.metadata (17 kB)
Collecting pandas<2.3,>=2.2 (from modin[ray])
  Downloading pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (19 kB)
Downloading ray-2.35.0-cp310-cp310-manylinux2014_x86_64.whl (65.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.0/65.0 MB[0m [31m9.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.0/13.0 MB[0m [31m67.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading modin-0.32.0-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m42.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pandas, modin, ray
  Attempting uninstall: pandas
    Fo

In [7]:
!pip install --upgrade pandas



In [9]:
!pip uninstall pandas -y


Found existing installation: pandas 2.2.2
Uninstalling pandas-2.2.2:
  Successfully uninstalled pandas-2.2.2


In [10]:
!pip install pandas==2.2.2


Collecting pandas==2.2.2
  Using cached pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (19 kB)
Using cached pandas-2.2.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (13.0 MB)
Installing collected packages: pandas
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
cudf-cu12 24.4.1 requires pandas<2.2.2dev0,>=2.0, but you have pandas 2.2.2 which is incompatible.
google-colab 1.0.0 requires pandas==2.1.4, but you have pandas 2.2.2 which is incompatible.[0m[31m
[0mSuccessfully installed pandas-2.2.2


In [11]:
!pip install modin[ray] ray




In [1]:
import modin.pandas as mpd
import time

# Measure time taken by Modin (on Ray) to read the large CSV file
start_time = time.time()

# Modin will parallelize the loading process
df_modin = mpd.read_csv('employee_churn_large_2gb.csv')

end_time = time.time()

print(f"Modin read time: {end_time - start_time:.2f} seconds")
print(f"Shape of the dataframe: {df_modin.shape}")


2024-09-14 22:20:37,621	INFO worker.py:1783 -- Started a local Ray instance.
[33m(raylet)[0m [2024-09-14 22:22:37,536 E 6728 6728] (raylet) node_manager.cc:3064: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: c84c994dd496b0f46cc3caa8392b83679f6973f7bd8fa83a0dbaebaa, IP: 172.28.0.12) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 172.28.0.12`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


Modin read time: 208.17 seconds
Shape of the dataframe: (45000000, 12)


In [2]:
import pandas as pd
import time

# Define the chunk size
chunk_size = 1_000_000  # 1 million rows at a time

start_time = time.time()

# Initialize an empty list to store the chunks
chunks = []

# Read the file in chunks and append them to the list
for chunk in pd.read_csv('employee_churn_large_2gb.csv', chunksize=chunk_size):
    chunks.append(chunk)

# Concatenate all chunks into a single DataFrame (optional)
df_pandas = pd.concat(chunks, axis=0)

end_time = time.time()

print(f"Pandas (with chunking) read time: {end_time - start_time:.2f} seconds")
print(f"Shape of the dataframe: {df_pandas.shape}")


Pandas (with chunking) read time: 72.63 seconds
Shape of the dataframe: (45000000, 12)


In [3]:
# Clean column names by removing special characters and white spaces
df_dask.columns = df_dask.columns.str.replace('[^A-Za-z0-9_]+', '', regex=True).str.strip()

# Verify the cleaned column names
print("Cleaned column names:", df_dask.columns.tolist())


Cleaned column names: ['Employee_ID', 'Age', 'Gender', 'Years_At_Company', 'Last_Promotion', 'Salary', 'Ethnicity', 'Department', 'Number_of_Projects', 'Work_Life_Balance_Score', 'Job_Satisfaction_Score', 'left']


In [2]:
import dask.dataframe as dd

# Read the CSV file using Dask
df_dask = dd.read_csv('employee_churn_large_2gb.csv')

# Print a few rows to verify
print(df_dask.head())


Dask dataframe query planning is disabled because dask-expr is not installed.

You can install it with `pip install dask[dataframe]` or `conda install dask`.
This will raise in a future version.



   Employee_ID  Age  Gender  Years_At_Company  Last_Promotion  Salary  \
0            1   60    Male                 3               6   96346   
1            2   50  Female                 6               3   54229   
2            3   36  Female                32               7  166233   
3            4   64  Female                34               9   88052   
4            5   29  Female                28               3  109972   

  Ethnicity  Department  Number_of_Projects  Work_Life_Balance_Score  \
0     White          IT                   8                        1   
1     Black  Operations                   4                        3   
2     Other     Finance                   6                        2   
3     Other  Operations                   1                        4   
4     White          IT                   4                        1   

   Job_Satisfaction_Score  left  
0                       3     0  
1                       4     1  
2                       2 

In [4]:
!pip install pyyaml





In [5]:
import yaml

# Define the schema and file details
schema = {
    'file_name': 'employee_churn_large_2gb.csv',
    'separator': ',',
    'columns': df_dask.columns.tolist()
}

# Save the schema to a YAML file
with open('schema.yml', 'w') as yaml_file:
    yaml.dump(schema, yaml_file)

print("YAML schema file created as 'schema.yml'")


YAML schema file created as 'schema.yml'


In [6]:
# Load the schema from the YAML file
with open('schema.yml', 'r') as yaml_file:
    yaml_schema = yaml.safe_load(yaml_file)

# Extract column names from the YAML file
yaml_columns = yaml_schema['columns']

# Validate if the columns in the dataframe match the columns in the YAML
if list(df_dask.columns) == yaml_columns:
    print("Column validation passed.")
else:
    print("Column validation failed.")


Column validation passed.


In [7]:
# Write the DataFrame to a pipe-separated CSV file and compress it
df_dask.to_csv('employee_churn_large_pipe_*.csv', sep='|', index=False, compression='gzip', single_file=True)

print("File written in pipe-separated format and compressed as 'employee_churn_large_pipe.gz'")


File written in pipe-separated format and compressed as 'employee_churn_large_pipe.gz'


In [8]:
import os

# Compute the number of rows in the Dask DataFrame
total_rows = df_dask.shape[0].compute()
total_columns = df_dask.shape[1]

# Get the file size of the compressed file
file_size = os.path.getsize('employee_churn_large_pipe_*.csv') / (1024 ** 3)  # Convert to GB

# Print the summary
print(f"Total rows: {total_rows}")
print(f"Total columns: {total_columns}")
print(f"File size: {file_size:.2f} GB")


Total rows: 45000000
Total columns: 12
File size: 0.55 GB
