In [0]:
%spark.pyspark
from pyspark.sql import functions as F
users = spark.createDataFrame(
    [
        ("u1", "Berlin"),
        ("u2", "Berlin"),
        ("u3", "Munich"),
        ("u4", "Hamburg"),
    ],
    ["user_id", "city"]
)
orders = spark.createDataFrame(
    [
        ("o1", "u1", "p1", 2, 10.0),
        ("o2", "u1", "p2", 1, 30.0),
        ("o3", "u2", "p1", 1, 10.0),
        ("o4", "u2", "p3", 5, 7.0),
        ("o5", "u3", "p2", 3, 30.0),
        ("o6", "u3", "p3", 1, 7.0),
        ("o7", "u4", "p1", 10, 10.0),
    ],
    ["order_id", "user_id", "product_id", "qty", "price"]
)
products = spark.createDataFrame(
    [
        ("p1", "Ring VOLA"),
        ("p2", "Ring POROG"),
        ("p3", "Ring TISHINA"),
    ],
    ["product_id", "product_name"]
)
users.show()
orders.show()
products.show()


In [1]:
%spark.pyspark
revenue = orders.select(sum(orders["qty"]*orders["price"]))
revenue.show()

In [2]:
%spark.pyspark
orders=orders.withColumn("revenue", orders["qty"]*orders["price"])
orders.show()

In [3]:
%spark.pyspark
orders=broadcast(orders).join(users, "user_id")
orders=broadcast(orders).join(products, "product_id")

orders.show()

In [4]:
%spark.pyspark
import pyspark.sql.functions as F
metric_city = orders.groupBy("city").agg(F.count(orders["order_id"]).alias("orders_cnt"), F.sum(orders["qty"]).alias("qty_sum"), F.sum(orders["revenue"]).alias("revenue_sum"))
metric_city.show()

metric_product_id = orders.groupBy("product_id").agg(F.count(orders["order_id"]).alias("orders_cnt"), F.sum(orders["qty"]).alias("qty_sum"), F.sum(orders["revenue"]).alias("revenue_sum"))
metric_product_id.show()

metric_product_name = orders.groupBy("product_name").agg(F.count(orders["order_id"]).alias("orders_cnt"), F.sum(orders["qty"]).alias("qty_sum"), F.sum(orders["revenue"]).alias("revenue_sum"))
metric_product_name.show()

metric = orders.groupBy("city", "product_id", "product_name").agg(F.count(orders["order_id"]).alias("orders_cnt"), F.sum(orders["qty"]).alias("qty_sum"), F.sum(orders["revenue"]).alias("revenue_sum"))
metric.show()

In [5]:
%spark.pyspark
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc
funk = Window.partitionBy("city").orderBy(desc("revenue_sum"))

top2 = metric.withColumn("rn", row_number().over(funk)).filter("rn<3").drop("rn")
top2.show()

In [6]:
%spark.pyspark
path = "hdfs:///tmp/sandbox_zeppelin/mart_city_top_products/"
top2.write.mode("overwrite").parquet(path)

In [7]:
%spark.pyspark
spark.read.parquet(path).show()