# Task
Find out the data ingestion efficiency of reading this file and give a summary of findings.

data file used:
"us_congestion_2016_2022_sample_2m.csv" - 737 MB

## Data loading - Pandas

Load the "us_congestion_2016_2022_sample_2m.csv" file into a pandas DataFrame and measure the loading time.


In [1]:
import pandas as pd
import time

# Record the start time
start_time = time.time()

# Load the data using pandas
try:
    df_pandas = pd.read_csv('us_congestion_2016_2022_sample_2m.csv')
except FileNotFoundError:
    print("Error: 'us_congestion_2016_2022_sample_2m.csv' not found.")
    pandas_loading_time = -1 # Indicate an error
except Exception as e:
    print(f"An unexpected error occurred: {e}")
    pandas_loading_time = -1 # Indicate an error
else:
    # Record the end time
    end_time = time.time()

    # Calculate the loading time
    pandas_loading_time = end_time - start_time

    # Print the shape of the DataFrame (optional)
    print(f"Pandas DataFrame shape: {df_pandas.shape}")

loading_times = {"pandas": pandas_loading_time}
print(f"Pandas Loading Time: {pandas_loading_time:.4f} seconds")

Pandas DataFrame shape: (2000000, 30)
Pandas Loading Time: 29.3114 seconds


## Data loading - Modin

Load the "us_congestion_2016_2022_sample_2m.csv" file into a Modin DataFrame (using Ray as the backend) and measure the loading time.


In [2]:
import modin.pandas as pd
import time
import ray

# Initialize Ray (if not already running)
try:
    ray.init()
except RuntimeError:
    print("Ray is already initialized.")

# Record the start time
start_time = time.time()

# Load the data using Modin
try:
    df_modin = pd.read_csv('us_congestion_2016_2022_sample_2m.csv')
except FileNotFoundError:
    print("Error: 'us_congestion_2016_2022_sample_2m.csv' not found.")
    modin_loading_time = -1
except Exception as e:
    print(f"An unexpected error occurred: {e}")
    modin_loading_time = -1
else:
    # Record the end time
    end_time = time.time()

    # Calculate the loading time
    modin_loading_time = end_time - start_time

    # Print the shape of the DataFrame
    print(f"Modin DataFrame shape: {df_modin.shape}")

    print(f"Modin Loading Time: {modin_loading_time:.4f} seconds")
    loading_times["modin"] = modin_loading_time

ModuleNotFoundError: No module named 'modin'

In [3]:
!pip install modin[ray]

Collecting modin[ray]
  Downloading modin-0.32.0-py3-none-any.whl.metadata (17 kB)
Collecting ray!=2.5.0,>=2.1.0 (from modin[ray])
  Downloading ray-2.44.1-cp311-cp311-manylinux2014_x86_64.whl.metadata (19 kB)
Downloading ray-2.44.1-cp311-cp311-manylinux2014_x86_64.whl (68.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m68.1/68.1 MB[0m [31m10.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading modin-0.32.0-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m55.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: modin, ray
Successfully installed modin-0.32.0 ray-2.44.1


In [4]:
import modin.pandas as pd
import time
import ray

# Initialize Ray (if not already running)
try:
    ray.init()
except RuntimeError:
    print("Ray is already initialized.")

# Record the start time
start_time = time.time()

# Load the data using Modin
try:
    df_modin = pd.read_csv('us_congestion_2016_2022_sample_2m.csv')
except FileNotFoundError:
    print("Error: 'us_congestion_2016_2022_sample_2m.csv' not found.")
    modin_loading_time = -1
except Exception as e:
    print(f"An unexpected error occurred: {e}")
    modin_loading_time = -1
else:
    # Record the end time
    end_time = time.time()

    # Calculate the loading time
    modin_loading_time = end_time - start_time

    # Print the shape of the DataFrame
    print(f"Modin DataFrame shape: {df_modin.shape}")

    print(f"Modin Loading Time: {modin_loading_time:.4f} seconds")
    loading_times["modin"] = modin_loading_time

2025-04-12 21:32:10,978	INFO worker.py:1852 -- Started a local Ray instance.


Modin DataFrame shape: (2000000, 30)
Modin Loading Time: 34.1174 seconds


## Data loading - Dask

Load the "us_congestion_2016_2022_sample_2m.csv" file into a Dask DataFrame and measure the loading time.


In [5]:
import dask.dataframe as dd
import time

# Record the start time
start_time = time.time()

# Load the data using Dask
try:
    df_dask = dd.read_csv('us_congestion_2016_2022_sample_2m.csv')
except FileNotFoundError:
    print("Error: 'us_congestion_2016_2022_sample_2m.csv' not found.")
    dask_loading_time = -1
except Exception as e:
    print(f"An unexpected error occurred: {e}")
    dask_loading_time = -1
else:
    # Record the end time
    end_time = time.time()

    # Calculate the loading time
    dask_loading_time = end_time - start_time

    # Print the shape of the Dask DataFrame
    print(f"Dask DataFrame shape: {df_dask.shape}")

    # Print the loading time
    print(f"Dask Loading Time: {dask_loading_time:.4f} seconds")

    # Add the loading time to the loading_times dictionary
    loading_times["dask"] = dask_loading_time

Dask DataFrame shape: (<dask_expr.expr.Scalar: expr=ArrowStringConversion(frame=FromMapProjectable(ce62c67)).size() // 30, dtype=int64>, 30)
Dask Loading Time: 0.0289 seconds


## Data analysis

Compare the data loading times for pandas, Modin, and Dask.


In [6]:
# Analyze loading times and identify the fastest library
fastest_library = None
fastest_time = float('inf')

print("Library Loading Times:")
print("----------------------")
for library, loading_time in loading_times.items():
    print(f"{library}: {loading_time:.4f} seconds")
    if loading_time != -1 and loading_time < fastest_time:
        fastest_time = loading_time
        fastest_library = library

if fastest_library:
    print(f"\nFastest loading library: {fastest_library} ({fastest_time:.4f} seconds)")
else:
    print("\nNo valid loading times found for comparison.")

Library Loading Times:
----------------------
pandas: 29.3114 seconds
modin: 34.1174 seconds
dask: 0.0289 seconds

Fastest loading library: dask (0.0289 seconds)


## Summary:

* **Data ingestion efficiency of reading the file using pandas, Modin, and Dask?**  Dask was the fastest, loading the file in approximately 0.0289 seconds. Pandas took about 29.3114 seconds, and Modin took around 34.1174 seconds.

* **Dask's superior performance:** Dask loaded the 2 million-row CSV file significantly faster (0.0289 seconds) compared to pandas (29.3114 seconds) and Modin (34.1174 seconds).
* **Delayed computation in Dask:** Dask's reported DataFrame shape indicates that the row count is a delayed computation, highlighting its lazy evaluation approach.  The actual row count is not computed until a computation is performed on the DataFrame.

* **Prioritize Dask for large datasets:** For large CSV files like the one analyzed, Dask offers a substantial performance advantage in data loading, making it the preferred choice for initial data ingestion.
* **Investigate Dask's compute time:** While Dask excels at loading data quickly, investigate the performance of subsequent computations on the Dask DataFrame compared to pandas or Modin.  The initial load time is just one part of the overall data analysis workflow.


# Task

Perform basic validation on data columns on the same data file: For example: remove special character, white spaces from the column name and any other if relevant.

# Files written:
*   yamlutility.py - consists of functions to call to read the YAML config file and for other various column validations
*   data_schema.yaml - YAML file

In [7]:
%%writefile yamlutility.py
import logging
import os
import subprocess
import yaml
import pandas as pd
import datetime
import gc
import re


################
# File Reading #
################

def read_config_file(filepath):
    with open(filepath, 'r') as stream:
        try:
            return yaml.safe_load(stream)
        except yaml.YAMLError as exc:
            logging.error(exc)


def replacer(string, char):
    pattern = char + '{2,}'
    string = re.sub(pattern, char, string)
    return string

def col_header_val(df,table_config):
    '''
    replace whitespaces in the column
    and standardized column names
    '''
    df.columns = df.columns.str.lower()
    df.columns = df.columns.str.replace('[^\w]','_',regex=True)
    df.columns = list(map(lambda x: x.strip('_'), list(df.columns)))
    df.columns = list(map(lambda x: replacer(x,'_'), list(df.columns)))
    expected_col = list(map(lambda x: x.lower(),  table_config['columns']))
    expected_col.sort()
    df.columns =list(map(lambda x: x.lower(), list(df.columns)))
    df = df.reindex(sorted(df.columns), axis=1)
    if len(df.columns) == len(expected_col) and list(expected_col)  == list(df.columns):
        print("column name and column length validation passed")
        return 1
    else:
        print("column name and column length validation failed")
        mismatched_columns_file = list(set(df.columns).difference(expected_col))
        print("Following File columns are not in the YAML file",mismatched_columns_file)
        missing_YAML_file = list(set(expected_col).difference(df.columns))
        print("Following YAML columns are not in the file uploaded",missing_YAML_file)
        logging.info(f'df columns: {df.columns}')
        logging.info(f'expected columns: {expected_col}')
        return 0

Writing yamlutility.py


In [8]:
%%writefile data_schema.yaml
file_type: csv
file_name: us_congestion_2016_2022_sample_2m
read_separator: ','
write_separator: ','
columns:
    - id
    - time
    - speed
    - travel_time
    - length
    - ratio
    - congestion_level
    - geometry
    - origin
    - destination
    - segment_id
    - city_code
    - road
    - direction
    - year
    - month
    - day
    - hour
    - minute
    - dayofweek

Writing data_schema.yaml


In [10]:
# Read config file
import yamlutility as util
config_data = util.read_config_file("data_schema.yaml")

In [15]:
config_data['read_separator']

','

In [18]:
# read the file using config file
file_type = config_data['file_type']
source_file = "./" + config_data['file_name'] + f'.{file_type}'
#print("",source_file)
df = pd.read_csv(source_file, sep= config_data['read_separator'])
df.head()

Unnamed: 0,ID,Severity,Start_Lat,Start_Lng,StartTime,EndTime,Distance(mi),DelayFromTypicalTraffic(mins),DelayFromFreeFlowSpeed(mins),Congestion_Speed,...,Temperature(F),WindChill(F),Humidity(%),Pressure(in),Visibility(mi),WindDir,WindSpeed(mph),Precipitation(in),Weather_Event,Weather_Conditions
0,C-14344128,2,39.191032,-120.81974,2016-12-20T19:19:00.000-05:00,2016-12-20T19:33:47.000-05:00,1.4,2.58,2.6,Moderate,...,54.0,,30.0,30.15,10.0,North,3.5,,,Clear
1,C-32285069,0,41.736015,-87.721565,2018-11-16T17:18:00.000-05:00,2018-11-16T18:08:28.000-05:00,0.73,0.42,1.0,Slow,...,39.0,31.8,70.0,30.06,10.0,West,11.5,,,Overcast
2,C-14213642,0,32.519043,-93.741096,2021-02-18T20:32:00.000-05:00,2021-02-18T21:21:32.000-05:00,1.8,1.0,2.0,Moderate,...,30.0,30.0,79.0,30.2,10.0,WNW,3.0,0.0,,Fair
3,C-29674072,0,40.730564,-74.001709,2020-11-13T08:06:00.000-05:00,2020-11-13T08:48:22.000-05:00,1.42,1.0,2.0,Slow,...,48.0,48.0,93.0,29.92,1.0,VAR,3.0,0.04,,Light Rain
4,C-24044478,1,33.758331,-118.238533,2017-08-24T09:54:00.000-04:00,2017-08-24T11:13:19.000-04:00,2.6,4.9,6.92,Slow,...,66.9,,79.0,29.89,9.0,Calm,,,,Overcast


In [19]:
#validate the header of the file
util.col_header_val(df,config_data)

column name and column length validation failed
Following File columns are not in the YAML file ['weatherstation_airportcode', 'distance_mi', 'description', 'winddir', 'windspeed_mph', 'state', 'precipitation_in', 'start_lng', 'windchill_f', 'start_lat', 'localtimezone', 'humidity', 'temperature_f', 'visibility_mi', 'weather_event', 'starttime', 'weather_conditions', 'congestion_speed', 'county', 'delayfromtypicaltraffic_mins', 'street', 'city', 'zipcode', 'endtime', 'severity', 'weathertimestamp', 'pressure_in', 'delayfromfreeflowspeed_mins', 'country']
Following YAML columns are not in the file uploaded ['travel_time', 'direction', 'speed', 'minute', 'hour', 'destination', 'segment_id', 'time', 'geometry', 'origin', 'city_code', 'length', 'congestion_level', 'month', 'dayofweek', 'year', 'day', 'road', 'ratio']


0

In [20]:
print("columns of files are:" ,df.columns)
print("columns of YAML are:" ,config_data['columns'])

columns of files are: Index(['id', 'severity', 'start_lat', 'start_lng', 'starttime', 'endtime',
       'distance_mi', 'delayfromtypicaltraffic_mins',
       'delayfromfreeflowspeed_mins', 'congestion_speed', 'description',
       'street', 'city', 'county', 'state', 'country', 'zipcode',
       'localtimezone', 'weatherstation_airportcode', 'weathertimestamp',
       'temperature_f', 'windchill_f', 'humidity', 'pressure_in',
       'visibility_mi', 'winddir', 'windspeed_mph', 'precipitation_in',
       'weather_event', 'weather_conditions'],
      dtype='object')
columns of YAML are: ['id', 'time', 'speed', 'travel_time', 'length', 'ratio', 'congestion_level', 'geometry', 'origin', 'destination', 'segment_id', 'city_code', 'road', 'direction', 'year', 'month', 'day', 'hour', 'minute', 'dayofweek']


In [21]:
if util.col_header_val(df,config_data)==0:
    print("validation failed")
    # write code to reject the file
else:
    print("col validation passed")
    # write the code to perform further action
    # in the pipleine

column name and column length validation failed
Following File columns are not in the YAML file ['weatherstation_airportcode', 'distance_mi', 'description', 'winddir', 'windspeed_mph', 'state', 'precipitation_in', 'start_lng', 'windchill_f', 'start_lat', 'localtimezone', 'humidity', 'temperature_f', 'visibility_mi', 'weather_event', 'starttime', 'weather_conditions', 'congestion_speed', 'county', 'delayfromtypicaltraffic_mins', 'street', 'city', 'zipcode', 'endtime', 'severity', 'weathertimestamp', 'pressure_in', 'delayfromfreeflowspeed_mins', 'country']
Following YAML columns are not in the file uploaded ['travel_time', 'direction', 'speed', 'minute', 'hour', 'destination', 'segment_id', 'time', 'geometry', 'origin', 'city_code', 'length', 'congestion_level', 'month', 'dayofweek', 'year', 'day', 'road', 'ratio']
validation failed


# Task
Write the file in pipe separated text file (|) in gz format.

Output file: output.txt.gz

In [26]:
import gzip
from io import BytesIO

def write_to_gzip(df, output_file):
    """
    Writes a DataFrame to a pipe-separated text file in gz format.
    """
    # Load the data using Dask
    from dask import dataframe as dd
    df = dd.read_csv('us_congestion_2016_2022_sample_2m.csv', delimiter=",", dtype={'Weather_Event': 'object'})

   # Write the DataFrame to a temporary CSV file first using compute()
    temp_file = "temp.csv"
    df.to_csv(temp_file, sep='|', index=False, encoding='utf-8', single_file=True) # Write to a single file

    # Then, compress the temp file to gzip
    with open(temp_file, 'rb') as f_in, gzip.open(output_file, 'wb') as f_out:
        f_out.writelines(f_in)

    # Remove the temporary file to cleanup
    import os
    os.remove(temp_file)


# Example usage:
write_to_gzip(df, 'output.txt.gz')

# Task
Create a summary of the file:



*   Total number of rows
*   Total number of columns
*   File size

In [27]:
import os

def file_summary(file_path):
    """
    Generates a summary of a file, including row count, column count, and file size.
    """
    df = pd.read_csv(file_path, sep='|')  # Assume pipe separator for reading
    num_rows = df.shape[0]
    num_cols = df.shape[1]
    file_size = os.path.getsize(file_path)

    summary = {
        "Total rows": num_rows,
        "Total columns": num_cols,
        "File size (bytes)": file_size
    }

    return summary

# Example usage:
summary = file_summary('output.txt.gz')
print(summary)

{'Total rows': 2000000, 'Total columns': 30, 'File size (bytes)': 218272822}
