In [10]:
# Import necessary libraries
import pandas as pd
from sdv.single_table import GaussianCopulaSynthesizer
from sdv.metadata import SingleTableMetadata

# Load the original dataset
original_data = pd.read_csv('electronic_devices.csv')

# Create a metadata object
metadata = SingleTableMetadata()
metadata.detect_from_dataframe(original_data)

# Create a synthesizer model with the metadata
synthesizer = GaussianCopulaSynthesizer(metadata)

# Fit the model on the original data
synthesizer.fit(original_data)

# Generate synthetic data with 1 million rows
synthetic_data = synthesizer.sample(num_rows=1000000)

synthetic_data.to_csv('synthetic_data.csv', index=False)
# Save or display the synthetic data (optional)
print(synthetic_data.head())



   customer_id  age  gender loyalty_member product_type      sku  rating  \
0        15061   50  Female             No       Tablet   TBL345       4   
1        11716   55  Female             No   Headphones   SWT567       5   
2        18435   35    Male             No   Smartwatch   SWT567       4   
3        13696   46  Female             No   Smartphone  SKU1005       5   
4         1180   65  Female             No   Smartphone  SKU1002       3   

  order_status payment_method  total_price  unit_price  quantity  \
0    Completed           Cash      6754.24     1135.00         6   
1    Cancelled           Cash      3235.81      927.27         9   
2    Completed    Credit Card      3258.52      533.55         4   
3    Cancelled         Paypal       867.60      913.77        10   
4    Cancelled         PayPal       691.18      136.06         2   

  purchase_date shipping_type  \
0    2024-05-02       Express   
1    2024-01-01      Standard   
2    2024-08-26      Standard   
3 

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import size, split, col

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Electronic Devices Analysis") \
    .getOrCreate()

# Assuming synthetic_data is already a PySpark DataFrame
# If synthetic_data is a Pandas DataFrame, convert it to PySpark DataFrame first
# df = spark.createDataFrame(synthetic_data)

# Read the synthetic data into a DataFrame (if not already done)
df = spark.read.csv('synthetic_data.csv', header=True, inferSchema=True)

# Filter for records on '2024-09-03' and with exactly one additional purchase
filtered_df = df.filter(
    (col('purchase_date') == '2024-09-03') & 
    (size(split(col('addons'), ',')) == 1)  # Count addons after splitting by comma
)

# Show the filtered results (optional)
filtered_df.show()

+-----------+---+------+--------------+------------+-------+------+------------+--------------+-----------+----------+--------+-------------+-------------+-----------------+----------+
|customer_id|age|gender|loyalty_member|product_type|    sku|rating|order_status|payment_method|total_price|unit_price|quantity|purchase_date|shipping_type|           addons|addons_cnt|
+-----------+---+------+--------------+------------+-------+------+------------+--------------+-----------+----------+--------+-------------+-------------+-----------------+----------+
|      15470| 28|  Male|            No|  Smartphone|SKU1005|     2|   Cancelled|        Paypal|    1622.39|   1095.89|       8|   2024-09-03|    Overnight|     Impulse Item|       0.0|
|      15560| 47|  Male|           Yes|      Laptop| LTP123|     2|   Completed|        PayPal|    11396.8|   1138.94|       2|   2024-09-03|      Express|Extended Warranty|      4.38|
|      16142| 24|  Male|            No|  Headphones| TBL345|     5|   Compl

In [13]:
from pyspark.sql import functions as F

# Calculate min and max prices grouped by gender and age
price_diff_df = filtered_df.groupBy('gender', 'age').agg(
    (F.max('unit_price') - F.min('unit_price')).alias('unit_price_diff'),
    (F.max('total_price') - F.min('total_price')).alias('total_price_diff')
)

# Collect the results to driver
results = price_diff_df.collect()

# Display results (optional)
for row in results:
    print(row)

Row(gender='Female', age=47, unit_price_diff=651.4399999999999, total_price_diff=6053.91)
Row(gender='Female', age=65, unit_price_diff=993.0900000000001, total_price_diff=6348.18)
Row(gender='Male', age=21, unit_price_diff=925.6200000000001, total_price_diff=5303.549999999999)
Row(gender='Female', age=52, unit_price_diff=972.4900000000001, total_price_diff=9981.5)
Row(gender='Male', age=52, unit_price_diff=1006.99, total_price_diff=11309.09)
Row(gender='Female', age=76, unit_price_diff=785.1000000000001, total_price_diff=10247.06)
Row(gender='Male', age=24, unit_price_diff=767.1, total_price_diff=6367.75)
Row(gender='Female', age=23, unit_price_diff=711.6499999999999, total_price_diff=6685.509999999999)
Row(gender='Male', age=64, unit_price_diff=988.81, total_price_diff=9127.83)
Row(gender='Male', age=68, unit_price_diff=909.28, total_price_diff=6483.9400000000005)
Row(gender='Male', age=74, unit_price_diff=905.54, total_price_diff=8002.9800000000005)
Row(gender='Female', age=49, unit_

In [14]:
import time

# Measure time without caching
start_time = time.time()
price_diff_df.collect()  # Perform action to trigger computation
end_time = time.time()
print(f"Execution time without caching: {end_time - start_time} seconds")

# Cache the DataFrame and measure again
filtered_df.cache()
start_time = time.time()
price_diff_df.collect()  # Perform action to trigger computation after caching
end_time = time.time()
print(f"Execution time with caching: {end_time - start_time} seconds")

# Optionally, you can also try checkpointing for performance measurement.
spark.sparkContext.setCheckpointDir("checkpoints")
filtered_df.checkpoint()
start_time = time.time()
price_diff_df.collect()  # Perform action to trigger computation after checkpointing
end_time = time.time()
print(f"Execution time with checkpointing: {end_time - start_time} seconds")

Execution time without caching: 0.025639772415161133 seconds
Execution time with caching: 0.025003671646118164 seconds
Execution time with checkpointing: 0.025121450424194336 seconds
