In [11]:
from pyspark.sql import functions as F
import time
from sdv.single_table import GaussianCopulaSynthesizer
from sdv.metadata import Metadata
import pandas as pd
from pyspark.sql import SparkSession

path = './materials/hw_10/'
df = pd.read_csv(path + 'electronic_devices.csv')
df['addons'] = df['addons'].fillna('')
df['addons'] = df['addons'].apply(lambda x: len(x.split(',')))


In [12]:
metadata = Metadata.detect_from_dataframe(
    data=df,
    table_name='electronic_devices')

synthesizer = GaussianCopulaSynthesizer(metadata)
synthesizer.fit(df)

synthetic_data = synthesizer.sample(num_rows=1_000_000)
synthetic_data.to_csv(path + 'generated_electronic_devices.csv', index=False)



In [13]:
spark = SparkSession.builder \
    .appName("Electronic Devices Analysis") \
    .getOrCreate()

spark.sparkContext.setCheckpointDir(path)

df_spark = spark.read.csv(
    path + 'generated_electronic_devices.csv', header=True, inferSchema=True)

filtered_data = df_spark.filter(
    (df_spark.purchase_date == '2024-09-03') & (df_spark.addons == 1))

df = filtered_data.groupBy('gender', 'age').agg(
    (F.max('unit_price') - F.min('unit_price')).alias('price_difference'),
    (F.max('total_price') - F.min('total_price')).alias('order_difference')
)

df_collected = df.collect()

                                                                                

In [21]:
start_time = time.time()
result_no_cache = filtered_data.groupBy('gender', 'age').agg(
    (F.max('unit_price') - F.min('unit_price')).alias('price_difference'),
    (F.max('total_price') - F.min('total_price')).alias('order_difference')
).collect()
no_cache_duration = time.time() - start_time
print(f"Без кэша {no_cache_duration}")

Без кэша 0.11723947525024414


In [26]:
filtered_data.cache()
start_time = time.time()
result_with_cache = filtered_data.groupBy('gender', 'age').agg(
    (F.max('unit_price') - F.min('unit_price')).alias('price_difference'),
    (F.max('total_price') - F.min('total_price')).alias('order_difference')
).collect()
cache_duration = time.time() - start_time
print(f"С кэшем {cache_duration}")

С кэшем 0.11908555030822754


24/12/17 02:22:15 WARN CacheManager: Asked to cache already cached data.


In [27]:
filtered_data.checkpoint() 
start_time = time.time()
result_with_checkpoint = filtered_data.groupBy('gender', 'age').agg(
    (F.max('unit_price') - F.min('unit_price')).alias('price_difference'),
    (F.max('total_price') - F.min('total_price')).alias('order_difference')
).collect()
checkpoint_duration = time.time() - start_time
print(f"С контрольной точкой {checkpoint_duration}")

С контрольной точкой 0.10404419898986816
