In [None]:
import subprocess
import os

# Check JAVA_HOME
print("JAVA_HOME:", os.environ.get('JAVA_HOME', 'Not set'))

if os.environ.get('JAVA_HOME'):
    result = subprocess.run([os.environ['JAVA_HOME'] + '/bin/java', '-version'], 
                          capture_output=True, text=True)
    print("\nProject JAVA_HOME Java version:")
    print(result.stderr)

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("local-pyspark").getOrCreate()

In [None]:
import pandas as pd

# Read csv
taxi_df = pd.read_csv("./nyc-yellow-taxi-trip-records-january-2024/nyc_tlc_yellow_2024_01.csv")

print(taxi_df.dtypes)

In [None]:
# Convert to parquet

taxi_df.to_parquet("./nyc-yellow-taxi-trip-records-january-2024/nyc_tlc_yellow_2024_01.parquet")
taxi_df.head(5)

In [None]:
import pyarrow.parquet as pq

# Check generated statistics in parquet file
pf = pq.ParquetFile("./nyc-yellow-taxi-trip-records-january-2024/nyc_tlc_yellow_2024_01.parquet")
print(f"Row groups: {pf.num_row_groups}")
for i in range(pf.num_row_groups):
    rg = pf.metadata.row_group(i)
    print(type(rg))

    print(f"\n--- Row Group {i} ({rg.num_rows:,} rows) ---")
    
    for j in range(rg.num_columns):
        print(f"\n--- Column {j} statistics ---")
        chunk = rg.column(j)
        print(chunk.statistics.to_dict)

In [None]:
# Time for benchmarking
import time

def timer(func, df, *args, **kwargs):
    """Time Spark operation and return result"""
    start = time.perf_counter()
    result = func(df, *args, **kwargs)
    elapsed = (time.perf_counter() - start) * 1000
    print(f"Time: {elapsed:.2f} ms")
    return result

In [None]:
# Read parquet file in Spark session
taxi_pq_df = spark.read.parquet("./nyc-yellow-taxi-trip-records-january-2024/nyc_tlc_yellow_2024_01.parquet")

### Benchmark Test #1: efficiency gain by using cache

In [None]:
from pyspark import StorageLevel

df_filtered = taxi_pq_df.filter(taxi_pq_df.passenger_count > 1) #Lazy execution

In [None]:
# check cache status
print(df_filtered.storageLevel)
print(df_filtered.is_cached)

In [None]:
start = time.perf_counter()
result = df_filtered.count() #Action triggers real execution
end = time.perf_counter()

print(f"Count WITHOUT cache: {(end - start)*1000:.2f} ms")
print(f"{taxi_pq_df.count()} total rides - {result} rides with multiple passengers")

In [None]:
# Test: Cache the dataframe and check execution time
df_filtered.persist(StorageLevel.MEMORY_ONLY)
print(df_filtered.storageLevel)
print(df_filtered.is_cached)

# first count triggers caching (caching is also LAZY just like filtering)
# total time therefore is count-time + caching-time
print("Count + cache:")
result = timer(lambda df: df.count(), df_filtered)

In [None]:
# second count should show speed gains by using cache:
print("Count FROM cache:")
result = timer(lambda df: df.count(), df_filtered)

# Key takeaways: 

* Different cache modes between disk and in-memory, 1 or 2 replications
* Caching/persisting is a LAZY operation, data only gets cached when an ACTION triggers it
* Speed gains are minimal for small datasets

### Benchmark #2: Partitions

In [None]:
df_partitions = spark.read.parquet("./nyc-yellow-taxi-trip-records-january-2024/nyc_tlc_yellow_2024_01.parquet")
df_partitions = df_partitions.filter(df_partitions.passenger_count > 1)

print(f"Partitions: {df_partitions.rdd.getNumPartitions()}")

# Check partition sizes
df_partitions.rdd.glom().map(len).collect()

In [None]:
print("Count with 6 partitions:")
result = timer(lambda df: df.count(), df_partitions)

In [None]:
df_repartitioned = df_partitions.repartition(12)

# Check partition sizes
df_repartitioned.rdd.glom().map(len).collect()

In [None]:
print("Count with 12 partitions:")
result = timer(lambda df: df.count(), df_repartitioned)

### Key Takeaways

* not all partitions necessarely get used: after repartition(12) all partitions got 50k rows, before that some were empty
* Spark GUI shows execution timing. 12 partitions = 12 tasks, my PC has 6 cores and i can see only 6 tasks ran concurrently, as expected
* This means you can process as many partitions in parallel as you have available CPU cores.

# TO DO:

### Check efficiency gains with larger datasets

### Benchmark #3: Aggregations (groupBy, agg)

check shuffle stage

### Execution plans

df.explain("extended")