##Large-Scale CSV Data Processing and Validation Report


---



This report documents the process of generating, processing, validating, and exporting a large CSV dataset using Python in Google Colab. The goal is to demonstrate computational efficiency using multiple reading tools, clean and validate column names, create a schema in YAML, and export the cleaned data in a compressed pipe-separated format.

## Step 1: Created a 2GB CSV File in Chunks

I generated a synthetic dataset with 50 million rows and 5 columns (`id`, `name`, `age`, `salary`, `city`) using NumPy and pandas. To avoid memory errors in Colab, I wrote the data in chunks of 1 million rows. The first chunk included the header, and the remaining chunks were appended without headers.

This process produced a large CSV file (`large_dataset_file.csv`) of approximately 2GB in size.


In [33]:
import pandas as pd
import numpy as np

# 🔁 Total rows and chunk size
total_rows = 50_000_000
chunk_size = 1_000_000

#  First chunk: write with headers
df = pd.DataFrame({
    'id': np.arange(0, chunk_size),
    'name': np.random.choice(['Alice', 'Bob', 'Charlie'], size=chunk_size),
    'age': np.random.randint(18, 70, size=chunk_size),
    'salary': np.random.uniform(30000, 120000, size=chunk_size),
    'city': np.random.choice(['London', 'New York', 'Delhi'], size=chunk_size)
})

df.to_csv('large_dataset_file.csv', index=False, mode='w')  # First write (with header)

# ⏳ Now write the rest in append mode
for i in range(1, total_rows // chunk_size):
    start = i * chunk_size
    df = pd.DataFrame({
        'id': np.arange(start, start + chunk_size),
        'name': np.random.choice(['Alice', 'Bob', 'Charlie'], size=chunk_size),
        'age': np.random.randint(18, 70, size=chunk_size),
        'salary': np.random.uniform(30000, 120000, size=chunk_size),
        'city': np.random.choice(['London', 'New York', 'Delhi'], size=chunk_size)
    })

    # Append to CSV without writing header again
    df.to_csv('large_dataset_file.csv', index=False, mode='a', header=False)

    print(f" Written chunk {i+1} of {total_rows // chunk_size}")


 Written chunk 2 of 50
 Written chunk 3 of 50
 Written chunk 4 of 50
 Written chunk 5 of 50
 Written chunk 6 of 50
 Written chunk 7 of 50
 Written chunk 8 of 50
 Written chunk 9 of 50
 Written chunk 10 of 50
 Written chunk 11 of 50
 Written chunk 12 of 50
 Written chunk 13 of 50
 Written chunk 14 of 50
 Written chunk 15 of 50
 Written chunk 16 of 50
 Written chunk 17 of 50
 Written chunk 18 of 50
 Written chunk 19 of 50
 Written chunk 20 of 50
 Written chunk 21 of 50
 Written chunk 22 of 50
 Written chunk 23 of 50
 Written chunk 24 of 50
 Written chunk 25 of 50
 Written chunk 26 of 50
 Written chunk 27 of 50
 Written chunk 28 of 50
 Written chunk 29 of 50
 Written chunk 30 of 50
 Written chunk 31 of 50
 Written chunk 32 of 50
 Written chunk 33 of 50
 Written chunk 34 of 50
 Written chunk 35 of 50
 Written chunk 36 of 50
 Written chunk 37 of 50
 Written chunk 38 of 50
 Written chunk 39 of 50
 Written chunk 40 of 50
 Written chunk 41 of 50
 Written chunk 42 of 50
 Written chunk 43 of 50


## Step 2: Reading the Large CSV File Using Different Methods

In this step, I tested how fast different tools can read the 2GB CSV file. I used four methods:

- **Pandas**: the normal way we usually read CSV files.
- **Dask**: reads large files in chunks using parallel processing.
- **Modin with Ray**: speeds up pandas using multiple CPU cores.
- **Ray (Direct)**: runs pandas as a remote task using Ray directly.

For each one, I recorded how long it took and checked the shape of the data. This helped me understand which tools are better for working with large datasets.


In [34]:
import os
file_size = os.path.getsize('large_dataset_file.csv') / (1024 * 1024 * 1024)
print(f" Final file size: {file_size:.2f} GB")


 Final file size: 2.02 GB


In [4]:
# Install Dask
!pip install dask -q

# Install Modin with Ray backend
!pip install modin[ray] -q


[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m70.1/70.1 MB[0m [31m9.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m36.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [6]:
import pandas as pd
import time

start_time = time.time()

df_pandas = pd.read_csv('large_dataset_file.csv')

end_time = time.time()
print(" Pandas Read Done")
print(" Time Taken:", round(end_time - start_time, 2), "seconds")
print(" Shape:", df_pandas.shape)


 Pandas Read Done
 Time Taken: 37.43 seconds
 Shape: (50000000, 5)


In [8]:
import dask.dataframe as dd
import time

start_time = time.time()

# Read CSV with Dask (lazy)
df_dask = dd.read_csv('large_dataset_file.csv')

# Force computation
row_count = len(df_dask)             # Triggers reading
col_count = len(df_dask.columns)     # Already known

end_time = time.time()

print("Dask Read Done")
print("Time Taken:", round(end_time - start_time, 2), "seconds")
print(" Shape:", (row_count, col_count))


Dask Read Done
Time Taken: 34.09 seconds
 Shape: (50000000, 5)


In [9]:
import modin.pandas as mpd
import time

start_time = time.time()

df_modin = mpd.read_csv('large_dataset_file.csv')

end_time = time.time()
print("Modin Read Done")
print("Time Taken:", round(end_time - start_time, 2), "seconds")
print(" Shape:", df_modin.shape)


2025-07-26 12:13:24,910	INFO worker.py:1927 -- Started a local Ray instance.
[33m(raylet)[0m [2025-07-26 12:14:24,871 E 6291 6291] (raylet) node_manager.cc:3041: 1 Workers (tasks / actors) killed due to memory pressure (OOM), 0 Workers crashed due to other reasons at node (ID: 2e2e17d1ed017cdfab688b9eed651a95386d43fd01b38b20cf3a6371, IP: 172.28.0.12) over the last time period. To see more information about the Workers killed on this node, use `ray logs raylet.out -ip 172.28.0.12`
[33m(raylet)[0m 
[33m(raylet)[0m Refer to the documentation on how to address the out of memory issue: https://docs.ray.io/en/latest/ray-core/scheduling/ray-oom-prevention.html. Consider provisioning more memory on this node or reducing task parallelism by requesting more CPUs per task. To adjust the kill threshold, set the environment variable `RAY_memory_usage_threshold` when starting Ray. To disable worker killing, set the environment variable `RAY_memory_monitor_refresh_ms` to zero.


Modin Read Done
Time Taken: 93.31 seconds
 Shape: (50000000, 5)


In [12]:
import ray
import time

# Initialize Ray if not already running
ray.init(ignore_reinit_error=True)

@ray.remote
def read_with_ray():
    import pandas as pd
    df = pd.read_csv("large_dataset_file.csv")
    return df.shape

start_time = time.time()
shape_ray = ray.get(read_with_ray.remote())
end_time = time.time()

time_ray = round(end_time - start_time, 2)

print("Ray (Direct) Read Done")
print("Time Taken:", time_ray, "seconds")
print(" Shape:", shape_ray)


2025-07-26 12:20:12,614	INFO worker.py:1765 -- Calling ray.init() again after it has already been called.


Ray (Direct) Read Done
Time Taken: 46.22 seconds
 Shape: (50000000, 5)


### Step 3 Summary: File Reading Time Comparison

After testing all four methods (Pandas, Dask, Modin, and Ray), I created a table to compare how long each one took to read the 2GB CSV file. All methods returned the same number of rows and columns, but the time varied based on how they handle parallel processing.

This table helped me understand the performance differences when working with large datasets.


In [13]:
import pandas as pd
from IPython.display import display

# Replace with your actual times
time_pandas = 37.43
time_dask = 34.09
time_modin = 93.31
# time_ray = captured from previous step

summary_df = pd.DataFrame({
    'Method': ['Pandas', 'Dask', 'Modin (Ray)', 'Ray (Direct)'],
    'Time Taken (s)': [time_pandas, time_dask, time_modin, time_ray],
    'Rows': [50000000]*4,
    'Columns': [5]*4,
    'Notes': [
        'Standard single-threaded read',
        'Lazy chunked read (parallel)',
        'Ray backend, memory overhead',
        'Used Ray remote worker directly'
    ]
})

display(summary_df)


Unnamed: 0,Method,Time Taken (s),Rows,Columns,Notes
0,Pandas,37.43,50000000,5,Standard single-threaded read
1,Dask,34.09,50000000,5,Lazy chunked read (parallel)
2,Modin (Ray),93.31,50000000,5,"Ray backend, memory overhead"
3,Ray (Direct),46.22,50000000,5,Used Ray remote worker directly


## Step 4: Creating the YAML Schema

Here, I created a simple YAML file (`schema.yaml`) that stores the column names and the delimiter (`|`) I plan to use when saving the cleaned file. This helps make sure the structure stays consistent later during validation and export.


In [14]:
import pandas as pd

# Read just the header + small sample
df_sample = pd.read_csv('large_dataset_file.csv', nrows=5)
print(df_sample.columns)


Index(['id', 'name', 'age', 'salary', 'city'], dtype='object')


In [15]:
!pip install pyyaml -q


In [17]:
import yaml

# Define the schema
schema = {
    'delimiter': '|',    # You’ll use this later when writing the file
    'columns': ['id', 'name', 'age', 'salary', 'city']
}


In [18]:
# Save to schema.yaml
with open('schema.yaml', 'w') as file:
    yaml.dump(schema, file)


In [19]:
# Read back and display for confirmation
with open('schema.yaml', 'r') as file:
    print(file.read())


columns:
- id
- name
- age
- salary
- city
delimiter: '|'



## Step 5: Validating Column Names with YAML Schema

In this step, I loaded the YAML schema and compared it to the actual dataset to make sure the column names and count matched.

I used only the first few rows for this check and confirmed that the dataset has exactly the same number of columns and the same column names as defined in the YAML file.


In [20]:
import pandas as pd
import yaml


In [21]:
# Load schema.yaml
with open('schema.yaml', 'r') as file:
    schema = yaml.safe_load(file)

# Extract expected columns and delimiter
expected_columns = schema['columns']
expected_delimiter = schema['delimiter']

print("✅ Schema Loaded:")
print("Expected Columns:", expected_columns)
print("Expected Delimiter:", repr(expected_delimiter))


✅ Schema Loaded:
Expected Columns: ['id', 'name', 'age', 'salary', 'city']
Expected Delimiter: '|'


In [24]:
import pandas as pd

# ⚠️ Use the actual delimiter for this file (comma)
df = pd.read_csv('large_dataset_file.csv', sep=',', nrows=5)

# Get the column names from the file
actual_columns = df.columns.tolist()
print("📂 File Columns:", actual_columns)


📂 File Columns: ['id', 'name', 'age', 'salary', 'city']


In [26]:
# Column count check
if len(actual_columns) != len(expected_columns):
    print("Column count mismatch!")
    print(f"YAML: {len(expected_columns)} columns")
    print(f"File: {len(actual_columns)} columns")
else:
    print("Column count matches")

# Column names check
if actual_columns != expected_columns:
    print(" Column names do NOT match the YAML schema")
    print("Mismatches:")
    for i, (actual, expected) in enumerate(zip(actual_columns, expected_columns)):
        if actual != expected:
            print(f"  Column {i+1}: Expected '{expected}', got '{actual}'")
else:
    print("Column names match exactly")


Column count matches
Column names match exactly


## Step 6: Save the Cleaned File as Pipe-Separated GZIP

After validating the column names, I saved the full dataset using `|` as the separator and compressed it using gzip format.

The final file is named `cleaned_dataset_file.txt.gz`, which helps reduce size and ensures it's consistent with the YAML schema.


In [27]:
import pandas as pd

# Read your original CSV file
df = pd.read_csv('large_dataset_file.csv')  # This file uses ',' as delimiter


In [28]:
# Save the DataFrame as a pipe-separated gzip file
df.to_csv('cleaned_dataset_file.txt.gz', sep='|', index=False, compression='gzip')


## Step 7: Summary of the Final File

Here, I checked the final cleaned file to confirm everything looks correct. I printed the total number of rows and columns, and calculated the file size in both MB and GB.

The output file `cleaned_dataset_file.txt.gz` contains 50 million rows and 5 columns, and its size is around 704 MB after gzip compression.


In [31]:
import pandas as pd
import os

# Get shape of the dataframe
rows, columns = df.shape
print(f"Total Rows: {rows}")
print(f"Total Columns: {columns}")



Total Rows: 50000000
Total Columns: 5


In [32]:
# Get the file size in bytes
file_path = 'cleaned_dataset_file.txt.gz'
file_size_bytes = os.path.getsize(file_path)

# Convert to MB and GB
file_size_mb = round(file_size_bytes / (1024 * 1024), 2)
file_size_gb = round(file_size_bytes / (1024 * 1024 * 1024), 2)

print(f"File Size: {file_size_mb} MB ({file_size_gb} GB)")


File Size: 704.36 MB (0.69 GB)


## Final Conclusion

In this project, I worked with a large synthetic dataset (50 million rows) to simulate a real-world data processing workflow. I compared different tools for reading large files, validated the data structure using a YAML schema, cleaned the data, and saved it in a compressed pipe-separated format.

The final file was successfully saved as `cleaned_dataset_file.txt.gz` with the correct schema and size (~704 MB). This end-to-end process helped demonstrate how to handle big data efficiently using Python in a cloud notebook environment like Google Colab.
