In [49]:
BUCKET_NAME = "simple-vertex-ai-pipeline"

In [64]:
import pandas as pd
import numpy as np
import time
size = 10_000_000
# Generate dummy data
df = pd.DataFrame({
    'id': range(size),
    'value': np.random.rand(size),
    'category': np.random.choice(['A', 'B', 'C'], size=size)
})

# Save to CSV and Parquet
df.to_csv(f'gs://{BUCKET_NAME}/sample.csv', index=False)
df.to_parquet(f'gs://{BUCKET_NAME}/sample.partquet', index=False)
df

Unnamed: 0,id,value,category
0,0,0.369550,B
1,1,0.745123,B
2,2,0.333385,B
3,3,0.066190,A
4,4,0.757993,B
...,...,...,...
9999995,9999995,0.872623,B
9999996,9999996,0.026599,B
9999997,9999997,0.554922,A
9999998,9999998,0.194187,B


In [65]:
# CSV load time
start = time.time()
df_csv = pd.read_csv(f'gs://{BUCKET_NAME}/sample.csv')
print("Pandas CSV Load Time:", round(time.time() - start, 2), "seconds")

# Parquet load time
start = time.time()
df_parquet = pd.read_parquet(f'gs://{BUCKET_NAME}/sample.partquet')
print("Pandas Parquet Load Time:", round(time.time() - start, 2), "seconds")

Pandas CSV Load Time: 13.84 seconds
Pandas Parquet Load Time: 2.56 seconds


In [66]:
from pyspark.sql import SparkSession
import time

# Initialize Spark
spark = SparkSession.builder \
    .appName("BenchmarkExample") \
    .master("local[*]") \
    .getOrCreate()

# CSV load time
start = time.time()
df_csv = spark.read.option("header", True).csv(f'gs://{BUCKET_NAME}/sample.csv')
df_csv.count()  # trigger execution
print("Spark CSV Load Time:", round(time.time() - start, 2), "seconds")

# Parquet load time
start = time.time()
df_parquet = spark.read.parquet(f'gs://{BUCKET_NAME}/sample.partquet')
df_parquet.count()  # trigger execution
print("Spark Parquet Load Time:", round(time.time() - start, 2), "seconds")

25/04/25 04:13:58 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Spark CSV Load Time: 1.52 seconds
Spark Parquet Load Time: 0.61 seconds


In [67]:
# Pandas csv
start = time.time()
df_csv = pd.read_csv(f'gs://{BUCKET_NAME}/sample.csv')
df_csv = df_csv[df_csv['value'] > 0.5]
df_csv['value_squared'] = df_csv['value'] ** 2
result_csv = df_csv.groupby('category').agg({'value': 'mean'}).head(10)
pandas_time = round(time.time() - start, 2)
print("🐼 Pandas CSV Total Time:", pandas_time, "seconds")
result_csv

# Pandas Parquet
start = time.time()
df_csv = pd.read_parquet(f'gs://{BUCKET_NAME}/sample.partquet')
df_csv = df_csv[df_csv['value'] > 0.5]
df_csv['value_squared'] = df_csv['value'] ** 2
result_csv = df_csv.groupby('category').agg({'value': 'mean'}).head(10)
pandas_time = round(time.time() - start, 2)
print("🐼 Pandas CSV Total Time:", pandas_time, "seconds")
result_csv

🐼 Pandas CSV Total Time: 10.26 seconds
🐼 Pandas CSV Total Time: 3.05 seconds


Unnamed: 0_level_0,value
category,Unnamed: 1_level_1
A,0.750036
B,0.749853
C,0.749873


In [68]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pow
import time

spark = SparkSession.builder \
    .appName("Benchmark Spark") \
    .master("local[*]") \
    .getOrCreate()

# Benchmark: pyspark CSV
start = time.time()
df_csv = spark.read.option("header", True).csv(f'gs://{BUCKET_NAME}/sample.csv', inferSchema=True)
df_csv = df_csv.filter(col("value") > 0.5)
df_csv = df_csv.withColumn("value_squared", pow(col("value"), 2))
result_csv = df_csv.groupBy("category").avg("value")
result_csv.collect()  # Trigger action
print("Spark CSV Total Time:", round(time.time() - start, 2), "seconds")

# Benchmark: pypsark Parquet
start = time.time()
df_parquet = spark.read.parquet(f'gs://{BUCKET_NAME}/sample.partquet')
df_parquet = df_parquet.filter(col("value") > 0.5)
df_parquet = df_parquet.withColumn("value_squared", pow(col("value"), 2))
result_parquet = df_parquet.groupBy("category").avg("value")
result_parquet.collect()
print("Spark Parquet Total Time:", round(time.time() - start, 2), "seconds")

25/04/25 04:14:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Spark CSV Total Time: 9.97 seconds
Spark Parquet Total Time: 1.19 seconds


                                                                                

In [69]:
result_csv.show()



+--------+------------------+
|category|        avg(value)|
+--------+------------------+
|       B|0.7498531983885696|
|       C|0.7498731103293323|
|       A|0.7500361755404709|
+--------+------------------+



                                                                                

In [70]:
result_parquet.show()

+--------+------------------+
|category|        avg(value)|
+--------+------------------+
|       B| 0.749853198388569|
|       C|0.7498731103293311|
|       A|0.7500361755404714|
+--------+------------------+



                                                                                

In [None]:
## What is Lazy Execution

# Benchmark: pypsark Parquet
start = time.time()
df_csv = spark.read.option("header", True).csv(f'gs://{BUCKET_NAME}/sample.csv', inferSchema=True)
df_csv = df_csv.filter(col("value") > 0.5)
df_csv = df_csv.withColumn("value_squared", pow(col("value"), 2))
result_csv = df_csv.groupBy("category").avg("value")
lazy_time = time.time()
print("Time before collect (lazy plan only):", round(lazy_time - start, 2), "seconds")
result_csv.collect()
total_time = time.time()
print("Actual execution time (collect only):", round(total_time - lazy_time, 2), "seconds")
print("Total pipeline time:", round(total_time - start, 2), "seconds")

                                                                                

Time before collect (lazy plan only): 5.04 seconds


[Stage 172:>                                                        (0 + 8) / 8]