In [0]:
%spark.pyspark
from pyspark.sql import functions as F


users = spark.createDataFrame(
    [
        ("u1", "Berlin"),
        ("u2", "Berlin"),
        ("u3", "Munich"),
        ("u4", "Hamburg"),
    ],
    ["user_id", "city"]
)

orders = spark.createDataFrame(
    [
        ("o1", "u1", "p1", 2,  10.0),
        ("o2", "u1", "p2", 1,  30.0),
        ("o3", "u2", "p1", 1,  10.0),
        ("o4", "u2", "p3", 5,   7.0),
        ("o5", "u3", "p2", 3,  30.0),
        ("o6", "u3", "p3", 1,   7.0),
        ("o7", "u4", "p1", 10, 10.0),
    ],
    ["order_id", "user_id", "product_id", "qty", "price"]
)

products = spark.createDataFrame(
    [
        ("p1", "Ring VOLA"),
        ("p2", "Ring POROG"),
        ("p3", "Ring TISHINA"),
    ],
    ["product_id", "product_name"]
)

users.show()
orders.show()
products.show()

In [1]:
%spark.pyspark

# Посчитаем revenue
orders_with_revenue = orders.withColumn(
    "revenue",
    F.col("qty") * F.col("price")
)

orders_with_revenue.show()

In [2]:
%spark.pyspark

# Объединим orders с users и products
orders_enriched = (
    orders_with_revenue
    .join(users,    on="user_id",    how="left")
    .join(products, on="product_id", how="left")
)

orders_enriched.show()

In [3]:
%spark.pyspark

### Посчитаем метрики по (city, product_id, product_name):
### - orders_cnt (кол-во заказов)
### - qty_sum (сумма qty)
### - revenue_sum (сумма revenue)
mart_agg = (
    orders_enriched
    .groupBy("city", "product_id", "product_name")
    .agg(
        F.count("order_id").alias("orders_cnt"),
        F.sum("qty").alias("qty_sum"),
        F.sum("revenue").alias("revenue_sum")
    )
)

mart_agg.show()

In [4]:
%spark.pyspark

# Для каждого города выбрать Top-2 товара по revenue_sum используя Window
window_spec = Window.partitionBy("city").orderBy(F.desc("revenue_sum"))

mart_with_rank = mart_agg.withColumn(
    "rank",
    F.row_number().over(window_spec)
)

mart_city_top_products = mart_with_rank.filter(F.col("rank") <= 2)

mart_city_top_products.show()

In [5]:
%spark.pyspark

# Теперь сохраним результат в HDFS по пути: /tmp/sandbox_zeppelin/mart_city_top_products/ (parquet, overwrite)
hdfs_path = "/tmp/sandbox_zeppelin/mart_city_top_products/"

mart_city_top_products.write.mode("overwrite").parquet(hdfs_path)

print(f"Записано в {hdfs_path}")

In [6]:
%spark.pyspark

# Теперь в s3
s3_path = "s3a://hw-1-yc/mart_city_top_products/"

mart_city_top_products.write.mode("overwrite").parquet(s3_path)

print(f"Записано в {s3_path}")


In [7]:
%spark.pyspark

# Проверим теперь данные, которые были записаны в HDFS
result = spark.read.parquet(hdfs_path)

result.orderBy("city", "rank").show()