In [135]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, StructType, IntegerType, DateType, StructField

In [136]:
csv_path = '.'

In [137]:
spark = SparkSession.builder \
    .appName("spark_first") \
    .getOrCreate()

In [138]:
import pandas as pd
from sdv.metadata import Metadata
df = pd.read_csv('electronic_devices.csv')
metadata = Metadata.detect_from_dataframe(
    data=df,
    table_name='electronic_devices`')

In [139]:
from sdv.single_table import GaussianCopulaSynthesizer
model = GaussianCopulaSynthesizer(metadata)
model.fit(df)
synthetic_data = model.sample(num_rows=1000_000)




In [140]:
print(synthetic_data.dtypes)

customer_id         int64
age                 int64
gender             object
loyalty_member     object
product_type       object
sku                object
rating              int64
order_status       object
payment_method     object
total_price       float64
unit_price        float64
quantity            int64
purchase_date      object
shipping_type      object
addons             object
addons_cnt        float64
dtype: object


In [141]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType
synthetic_data['purchase_date'] = pd.to_datetime(synthetic_data['purchase_date']).dt.date
# Определение схемы для Spark DataFrame
schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("loyalty_member", StringType(), True),
    StructField("product_type", StringType(), True),
    StructField("sku", StringType(), True),
    StructField("rating", IntegerType(), True),
    StructField("order_status", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("total_price", FloatType(), True),
    StructField("unit_price", FloatType(), True),
    StructField("quantity", IntegerType(), True),
    StructField("purchase_date", DateType(), True),
    StructField("shipping_type", StringType(), True),
    StructField("addons", StringType(), True),
    StructField("addons_cnt", FloatType(), True)
])

# Создание Spark DataFrame с заданной схемой
spark_df = spark.createDataFrame(synthetic_data, schema=schema)


In [None]:
from pyspark.sql import functions as F

df = spark_df\
    .filter(F.col('purchase_date') == '2024-09-03') \
    .filter(F.size(F.split(F.col('addons'), ',')) == 1)

df.show(6)


+-----------+---+------+--------------+------------+-------+------+------------+--------------+-----------+----------+--------+-------------+-------------+-----------------+----------+
|customer_id|age|gender|loyalty_member|product_type|    sku|rating|order_status|payment_method|total_price|unit_price|quantity|purchase_date|shipping_type|           addons|addons_cnt|
+-----------+---+------+--------------+------------+-------+------+------------+--------------+-----------+----------+--------+-------------+-------------+-----------------+----------+
|      15470| 28|  Male|            No|  Smartphone|SKU1005|     2|   Cancelled|        Paypal|    1622.39|   1095.89|       8|   2024-09-03|    Overnight|     Impulse Item|       0.0|
|      15560| 47|  Male|           Yes|      Laptop| LTP123|     2|   Completed|        PayPal|    11396.8|   1138.94|       2|   2024-09-03|      Express|Extended Warranty|      4.38|
|      16142| 24|  Male|            No|  Headphones| TBL345|     5|   Compl

In [143]:
df.groupBy("gender")\
.agg(
    (F.max("unit_price") - F.min("unit_price")).alias("unit_pirce_dif"),
    (F.max("total_price") - F.min("total_price")).alias("total_pirce_dif")
).show()


+------+--------------+---------------+
|gender|unit_pirce_dif|total_pirce_dif|
+------+--------------+---------------+
|  Male|    1015.11005|       11374.84|
|Female|       1014.99|      10741.601|
+------+--------------+---------------+



In [144]:
df.groupBy("age")\
.agg(
    (F.max("unit_price") - F.min("unit_price")).alias("unit_pirce_dif"),
    (F.max("total_price") - F.min("total_price")).alias("total_pirce_dif")
)\
.sort(F.asc("age"))\
.show(100)


+---+--------------+---------------+
|age|unit_pirce_dif|total_pirce_dif|
+---+--------------+---------------+
| 19|        208.13|        6506.31|
| 20|        950.02|        9512.53|
| 21|        959.83|      8043.6797|
| 22|         925.7|        8398.29|
| 23|     994.89996|       10353.96|
| 24|     817.80994|         8870.5|
| 25|        909.15|        7423.88|
| 26|        960.39|        8046.99|
| 27|        968.24|      5952.8896|
| 28|     881.80994|        7913.52|
| 29|     802.47003|        6004.44|
| 30|    1002.94006|        6220.28|
| 31|        914.87|      7761.2397|
| 32|        978.87|       9138.069|
| 33|     975.69995|        5403.81|
| 34|         870.9|        6496.31|
| 35|        987.95|        8250.41|
| 36|     947.42993|       10653.34|
| 37|      989.2399|      11063.149|
| 38|    1012.79004|        9304.54|
| 39|        935.21|        6650.85|
| 40|     958.97003|        9449.84|
| 41|     987.63007|      6998.9697|
| 42|        887.38|        8852.94|
|

In [145]:
aggr_data = df.groupBy("age",
"gender",
)\
.agg(
    (F.max("unit_price") - F.min("unit_price")).alias("unit_pirce_dif"),
    (F.max("total_price") - F.min("total_price")).alias("total_pirce_dif")
)\
.sort(F.asc("age"))
aggr_data.show()


+---+------+--------------+---------------+
|age|gender|unit_pirce_dif|total_pirce_dif|
+---+------+--------------+---------------+
| 19|Female|        208.13|      5245.4004|
| 19|  Male|         112.5|        6506.31|
| 20|Female|     908.81995|        9512.53|
| 20|  Male|     269.27002|        3729.55|
| 21|  Male|     925.62006|        5303.55|
| 21|Female|     372.84998|        6107.91|
| 22|  Male|        812.39|       577.1201|
| 22|Female|         925.7|        8398.29|
| 23|Female|      711.6499|        6685.51|
| 23|  Male|     993.24005|       10017.35|
| 24|  Male|         767.1|        6367.75|
| 24|Female|     803.41003|         8870.5|
| 25|  Male|     859.98004|        6842.95|
| 25|Female|        909.15|        7423.88|
| 26|Female|     299.46997|      5998.2803|
| 26|  Male|        960.39|      7849.9795|
| 27|Female|     934.07007|      5952.8896|
| 27|  Male|     843.18994|        3275.03|
| 28|  Male|        844.48|        7761.63|
| 28|Female|     881.64996|     

In [146]:
collected_data = aggr_data.collect()
for row in collected_data:
    print(row)

Row(age=19, gender='Female', unit_pirce_dif=208.1300048828125, total_pirce_dif=5245.400390625)
Row(age=19, gender='Male', unit_pirce_dif=112.5, total_pirce_dif=6506.31005859375)
Row(age=20, gender='Female', unit_pirce_dif=908.8199462890625, total_pirce_dif=9512.5302734375)
Row(age=20, gender='Male', unit_pirce_dif=269.27001953125, total_pirce_dif=3729.550048828125)
Row(age=21, gender='Male', unit_pirce_dif=925.6200561523438, total_pirce_dif=5303.5498046875)
Row(age=21, gender='Female', unit_pirce_dif=372.8499755859375, total_pirce_dif=6107.91015625)
Row(age=22, gender='Male', unit_pirce_dif=812.3900146484375, total_pirce_dif=577.1201171875)
Row(age=22, gender='Female', unit_pirce_dif=925.7000122070312, total_pirce_dif=8398.2900390625)
Row(age=23, gender='Female', unit_pirce_dif=711.64990234375, total_pirce_dif=6685.509765625)
Row(age=23, gender='Male', unit_pirce_dif=993.2400512695312, total_pirce_dif=10017.349609375)
Row(age=24, gender='Male', unit_pirce_dif=767.0999755859375, total_p

# Измерение скорости

### No cache

In [None]:
import time
def speed_test(func):
    def wrapps(*args, **kwargs):
        start_time = time.time()
        func(*args, **kwargs)
        print(str("%s секунд" % (time.time() - start_time)))
        return 
    return wrapps
@speed_test
def spark_test_aggregration(df):
    df = df\
        .filter(F.col('purchase_date') == '2024-09-03') \
        .filter(F.size(F.split(F.col('addons'), ',')) == 1)
    aggr_df = df.groupBy('gender', 'age') \
    .agg(
        (F.max('unit_price') - F.min('unit_price')).alias('price_diff'),
        (F.max('total_price') - F.min('total_price')).alias('order_price_diff')
    )
    # aggr_df.show(5)
    return aggr_df
spark_test_aggregration(spark_df)
df.unpersist()


0.05283808708190918 секунд


DataFrame[customer_id: int, age: int, gender: string, loyalty_member: string, product_type: string, sku: string, rating: int, order_status: string, payment_method: string, total_price: float, unit_price: float, quantity: int, purchase_date: date, shipping_type: string, addons: string, addons_cnt: float]

### Cached

In [None]:
spark_df.cache()
spark_test_aggregration(spark_df)
spark_df.unpersist()


0.04671454429626465 секунд


DataFrame[customer_id: int, age: int, gender: string, loyalty_member: string, product_type: string, sku: string, rating: int, order_status: string, payment_method: string, total_price: float, unit_price: float, quantity: int, purchase_date: date, shipping_type: string, addons: string, addons_cnt: float]

In [None]:
from pyspark import StorageLevel
spark_df.persist(StorageLevel.MEMORY_AND_DISK)
spark_df.count()
spark_test_aggregration(spark_df)
spark_df.unpersist()

0.02849102020263672 секунд


DataFrame[customer_id: int, age: int, gender: string, loyalty_member: string, product_type: string, sku: string, rating: int, order_status: string, payment_method: string, total_price: float, unit_price: float, quantity: int, purchase_date: date, shipping_type: string, addons: string, addons_cnt: float]

### Checkpoint

In [None]:
# Set checkpoint directory
spark.sparkContext.setCheckpointDir("checkpoints/")

In [None]:
spark_df.checkpoint()
spark_test_aggregration(spark_df)