In [2]:
from pyspark.sql import functions as F
import time
from sdv.single_table import GaussianCopulaSynthesizer
from sdv.metadata import Metadata
import pandas as pd
from pyspark.sql import SparkSession

path = '/Users/maximgolovchenko/Documents/University/HSE/PythonForDataEngineering/Topic_10/'
df = pd.read_csv(path + 'electronic_devices.csv')
df['addons'] = df['addons'].fillna('')
df['addons'] = df['addons'].apply(lambda x: len(x.split(',')))

metadata = Metadata.detect_from_dataframe(
    data=df,
    table_name='electronic_devices')

synthesizer = GaussianCopulaSynthesizer(metadata)
synthesizer.fit(df)

synthetic_data = synthesizer.sample(num_rows=1_000_000)
synthetic_data.to_csv(path + 'generated_electronic_devices.csv', index=False)

spark = SparkSession.builder \
    .appName("Electronic Devices Analysis") \
    .getOrCreate()

spark.sparkContext.setCheckpointDir(path)


df_spark = spark.read.csv(
    path + 'generated_electronic_devices.csv', header=True, inferSchema=True)

filtered_data = df_spark.filter(
    (df_spark.purchase_date == '2024-09-03') & (df_spark.addons == 1))

result = filtered_data.groupBy('gender', 'age').agg(
    (F.max('unit_price') - F.min('unit_price')).alias('price_difference'),
    (F.max('total_price') - F.min('total_price')).alias('order_difference')
)

result_collected = result.collect()

start_time = time.time()
result_no_cache = filtered_data.groupBy('gender', 'age').agg(
    (F.max('unit_price') - F.min('unit_price')).alias('price_difference'),
    (F.max('total_price') - F.min('total_price')).alias('order_difference')
).collect()
no_cache_duration = time.time() - start_time

filtered_data.cache()
start_time = time.time()
result_with_cache = filtered_data.groupBy('gender', 'age').agg(
    (F.max('unit_price') - F.min('unit_price')).alias('price_difference'),
    (F.max('total_price') - F.min('total_price')).alias('order_difference')
).collect()
cache_duration = time.time() - start_time

filtered_data.checkpoint()
start_time = time.time()
result_with_checkpoint = filtered_data.groupBy('gender', 'age').agg(
    (F.max('unit_price') - F.min('unit_price')).alias('price_difference'),
    (F.max('total_price') - F.min('total_price')).alias('order_difference')
).collect()
checkpoint_duration = time.time() - start_time

print(f"Duration without caching: {no_cache_duration} seconds")
print(f"Duration with caching: {cache_duration} seconds")
print(f"Duration with checkpointing: {checkpoint_duration} seconds")

24/12/02 18:20:43 WARN CacheManager: Asked to cache already cached data.        


Duration without caching: 0.1847553253173828 seconds
Duration with caching: 0.1622939109802246 seconds
Duration with checkpointing: 0.16279101371765137 seconds
