In [0]:
%spark.pyspark from pyspark.sql import functions as F
users = spark.createDataFrame(
[ ("u1", "Berlin"),
("u2", "Berlin"),
("u3", "Munich"),
("u4", "Hamburg"), ],
["user_id", "city"] )
orders = spark.createDataFrame(
[ ("o1", "u1", "p1", 2, 10.0),
("o2", "u1", "p2", 1, 30.0),
("o3", "u2", "p1", 1, 10.0),
("o4", "u2", "p3", 5, 7.0),
("o5", "u3", "p2", 3, 30.0),
("o6", "u3", "p3", 1, 7.0),
("o7", "u4", "p1", 10, 10.0), ],
["order_id", "user_id", "product_id", "qty", "price"] )
products = spark.createDataFrame(
[ ("p1", "Ring VOLA"),
("p2", "Ring POROG"),
("p3", "Ring TISHINA"), ],
["product_id", "product_name"] )
users.show() 
orders.show() 
products.show()

![1.png](./pictures/1.png)

In [1]:
%spark.pyspark
orders = orders.withColumn("revenue", F.col("qty")*F.col("price"))
orders.show()

![2.png](./pictures/second.png)

In [2]:
%spark.pyspark
union_df = users.join(orders, "user_id")
union_df = union_df.join(products, "product_id")
union_df.show()

![3.png](./pictures/3.png)

In [3]:
%spark.pyspark
orders_cnt = union_df.groupBy("city", "product_id", "product_name").agg(F.count("order_id").alias("orders_cnt"), F.sum("qty").alias("qty_sum"), F.sum("revenue").alias("revenue_sum"))
orders_cnt.show()

![4.png](./pictures/four.png)

In [4]:
%spark.pyspark
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc
w = Window.partitionBy("city").orderBy(desc("revenue_sum"))
orders_cnt = orders_cnt.withColumn("top_2_products", row_number().over(w)).filter(F.col("top_2_products")<=2).drop("top_2_products")
orders_cnt.show()

![5.png](./pictures/five.png)

In [5]:
%spark.pyspark
hdfs_path = "/tmp/sandbox_zeppelin/mart_city_top_products/"
orders_cnt.write.mode("overwrite").parquet(hdfs_path)
spark.read.parquet(hdfs_path).show()

![6.png](./pictures/six.png)

In [6]:
%spark.pyspark
s3_path = "s3a://hadoop2" + hdfs_path
orders_cnt.write.mode("overwrite").parquet(s3_path)
spark.read.parquet(s3_path).show()

![7.png](./pictures/six.png)

In [7]:
%spark.pyspark
