To compare time and memory consumption when processing CSV files.

In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, max, min, length
from datetime import date
from datetime import datetime
import psutil
import time
import os

SMALL DATASET<br>
Dataset: 'Nutrition_Physical_Activity_and_Obesity.csv'<br>
Rows: 104,272<br>
Columns: 33<br>
Size: 34,934Kb<br>
<br>
Data.gov. (2023, December 8). U.S. Department of Health & Human Services - Nutrition, Physical Activity, and Obesity - Behavioral Risk Factor Surveillance System. Retrieved December 1, 2024, from https://catalog.data.gov/dataset/nutrition-physical-activity-and-obesity-behavioral-risk-factor-surveillance-system

Monitor PANDAS processing (Small dataset)

In [2]:
# To set the processing start
print(f"PANDAS TIME AND MEMORY STATS: START => ({date.today()} - {datetime.now().strftime('%H:%M:%S')})\n")

# Function to get memory usage
def get_memory_usage():
    process=psutil.Process(os.getpid())
    return process.memory_info().rss/1024/1024

# To track memory before loading
mem_before_load=get_memory_usage()

# To load the dataset
start_time=time.time()
df=pd.read_csv("nutrition-physical-activity.csv")
load_time=time.time()-start_time
mem_after_load = get_memory_usage()

# To process for-loop to create 1000000-size list
start_time=time.time()
new_list = []
range_size = range(1, 1000001)
for n in range_size:
    new_list.insert(n, n*2)
filter_time=time.time()-start_time
mem_after_filter=get_memory_usage()

# To calculate the range of Data_Value
start_time=time.time()
max_value=df['Data_Value'].max()
min_value=df['Data_Value'].min()
avg_value=df['Data_Value'].mean()
range_value=max_value - min_value
agg_time=time.time()-start_time
mem_after_agg=get_memory_usage()

# To print results
print(f"Pandas: Data loaded: {load_time:.2f} seconds, Memory: {mem_after_load-mem_before_load:.2f} MB")
print(f"Pandas: Filter operation: {filter_time:.2f} seconds, Memory: {mem_after_filter-mem_after_load:.2f} MB")
print(f"Pandas: Aggregation: {agg_time:.2f} seconds, Memory: {mem_after_agg-mem_after_filter:.2f} MB\n")

print(f"PANDAS TIME AND MEMORY STATS: END => ({date.today()} - {datetime.now().strftime('%H:%M:%S')})\n")

PANDAS TIME AND MEMORY STATS: START => (2024-12-05 - 21:02:28)

Pandas: Data loaded: 0.35 seconds, Memory: 27.90 MB
Pandas: Filter operation: 0.12 seconds, Memory: 40.62 MB
Pandas: Aggregation: 0.00 seconds, Memory: 0.10 MB

PANDAS TIME AND MEMORY STATS: END => (2024-12-05 - 21:02:29)



Monitor SPARK processing (Small dataset)

In [3]:
# To set the processing start
print(f"SPARK TIME AND MEMORY STATS: START => ({date.today()} - {datetime.now().strftime('%H:%M:%S')})\n")

# Function to get memory usage
def get_memory_usage():
    process=psutil.Process(os.getpid())
    return process.memory_info().rss / 1024 / 1024  # Convert bytes to MB

# To start Spark session
spark = SparkSession.builder.appName("Spark Memory Monitor").getOrCreate()

# To track memory before loading
mem_before_load = get_memory_usage()

# To load the dataset
start_time = time.time()
df = spark.read.csv("nutrition-physical-activity.csv", header=True, inferSchema=True)
load_time = time.time() - start_time
mem_after_load = get_memory_usage()

df_filtered = df.filter(col("Data_Value").isNotNull())

# To process for-loop to create 1,000,000-size list
start_time=time.time()
new_list = []
range_size = range(1, 1000001)
for n in range_size:
    new_list.insert(n, n*2)  
filter_time=time.time()-start_time
mem_after_filter=get_memory_usage()

# To calculate the range of Data_Value
start_time=time.time()
agg_values = df_filtered.agg(
    max("Data_Value").alias("max_value"),
    min("Data_Value").alias("min_value"),
    avg("Data_Value").alias("avg_value")
).collect()[0]
range_value=agg_values.max_value - agg_values.min_value
agg_time=time.time()-start_time
mem_after_agg=get_memory_usage()

# To print results
print(f"Spark: Data loaded: {load_time:.2f} seconds, Memory: {mem_after_load-mem_before_load:.2f} MB")
print(f"Spark: Filter operation: {filter_time:.2f} seconds, Memory: {mem_after_filter-mem_after_load:.2f} MB")
print(f"Spark: Aggregation: {agg_time:.2f} seconds, Memory: {mem_after_agg-mem_after_filter:.2f} MB\n")

print(f"SPARK TIME AND MEMORY STATS: END => ({date.today()} - {datetime.now().strftime('%H:%M:%S')})\n")

SPARK TIME AND MEMORY STATS: START => (2024-12-05 - 21:02:37)

Spark: Data loaded: 10.20 seconds, Memory: -26.24 MB
Spark: Filter operation: 0.16 seconds, Memory: 2.86 MB
Spark: Aggregation: 1.90 seconds, Memory: 0.46 MB

SPARK TIME AND MEMORY STATS: END => (2024-12-05 - 21:03:14)



BIG DATASET<br>
Dataset: 'yellow_tripdata_2015-01.csv'<br>
Rows: 12,748.986<br>
Columns: 19<br>
Size: 1,939,419Kb<br>
<br>
https://www.kaggle.com/datasets/elemento/nyc-yellow-taxi-trip-data

Monitor PANDAS processing (Big dataset)

In [4]:
# To set the processing start
print(f"PANDAS TIME AND MEMORY STATS: START => ({date.today()} - {datetime.now().strftime('%H:%M:%S')})\n")

# Function to get memory usage
def get_memory_usage():
    process=psutil.Process(os.getpid())
    return process.memory_info().rss/1024/1024

# To track memory before loading
mem_before_load=get_memory_usage()

# To load the dataset
start_time=time.time()
df=pd.read_csv("yellow_tripdata_2015-01.csv", quotechar='"') #, quoting=csv.QUOTE_MINIMAL)
load_time=time.time()-start_time
mem_after_load = get_memory_usage()

# To process for-loop to create 10,000,000-size list
start_time=time.time()
new_list = []
range_size = range(1, 10000001) # 10,000,000
for n in range_size:
    new_list.insert(n, n*2)
filter_time=time.time()-start_time
mem_after_filter=get_memory_usage()

# To calculate the range of Data_Value
start_time=time.time()
# max_value=df['trip_distance'].str.len().max()
# min_value=df['trip_distance'].str.len().min()
max_value=df['trip_distance'].max()
min_value=df['trip_distance'].min()
range_value=max_value - min_value
agg_time=time.time()-start_time
mem_after_agg=get_memory_usage()

# To print results
print(f"Pandas: Data loaded: {load_time:.2f} seconds, Memory: {mem_after_load-mem_before_load:.2f} MB")
print(f"Pandas: Filter operation: {filter_time:.2f} seconds, Memory: {mem_after_filter-mem_after_load:.2f} MB")
print(f"Pandas: Aggregation: {agg_time:.2f} seconds, Memory: {mem_after_agg-mem_after_filter:.2f} MB\n")

print(f"PANDAS TIME AND MEMORY STATS: END => ({date.today()} - {datetime.now().strftime('%H:%M:%S')})\n")

PANDAS TIME AND MEMORY STATS: START => (2024-12-05 - 21:18:39)

Pandas: Data loaded: 44.46 seconds, Memory: 2359.93 MB
Pandas: Filter operation: 1.39 seconds, Memory: 393.59 MB
Pandas: Aggregation: 0.08 seconds, Memory: 99.46 MB

PANDAS TIME AND MEMORY STATS: END => (2024-12-05 - 21:19:25)



Monitor SPARK processing (Big dataset)

In [5]:
# To set the processing start
print(f"SPARK TIME AND MEMORY STATS: START => ({date.today()} - {datetime.now().strftime('%H:%M:%S')})\n")

# Function to get memory usage
def get_memory_usage():
    process=psutil.Process(os.getpid())
    return process.memory_info().rss / 1024 / 1024  # Convert bytes to MB

# To start Spark session
spark = SparkSession.builder.appName("Spark Memory Monitor").getOrCreate()

# To track memory before loading
mem_before = get_memory_usage()

# To load the dataset
start_time = time.time()
df = spark.read.csv("yellow_tripdata_2015-01.csv", quote='"', header=True, inferSchema=True)
load_time = time.time() - start_time
mem_after_load = get_memory_usage()

df_filtered = df.filter(col('trip_distance').isNotNull())

# To process for-loop to create 1,000,000-size list
start_time=time.time()
new_list = []
range_size = range(1, 1000001)
for n in range_size:
    new_list.insert(n, n*2)  
filter_time=time.time()-start_time
mem_after_filter=get_memory_usage()

# To calculate the range of Data_Value
start_time=time.time()
agg_values = df_filtered.agg(
    max('trip_distance').alias("max_value"),
    min('trip_distance').alias("min_value")
).collect()[0] 
range_value=agg_values.max_value - agg_values.min_value
agg_time=time.time()-start_time
mem_after_agg=get_memory_usage()

# To print results
print(f"Spark: Data loaded: {load_time:.2f} seconds, Memory: {mem_after_load-mem_before:.2f} MB")
print(f"Spark: Filter operation: {filter_time:.2f} seconds, Memory: {mem_after_filter-mem_after_load:.2f} MB")
print(f"Spark: Aggregation: {agg_time:.2f} seconds, Memory: {mem_after_agg-mem_after_filter:.2f} MB\n")

print(f"SPARK TIME AND MEMORY STATS: END => ({date.today()} - {datetime.now().strftime('%H:%M:%S')})\n")

SPARK TIME AND MEMORY STATS: START => (2024-12-05 - 21:26:11)

Spark: Data loaded: 21.07 seconds, Memory: -2567.06 MB
Spark: Filter operation: 0.27 seconds, Memory: -406.32 MB
Spark: Aggregation: 6.60 seconds, Memory: 3.35 MB

SPARK TIME AND MEMORY STATS: END => (2024-12-05 - 21:26:40)

