In [1]:
%%pyspark
df = spark.read.load('abfss://wwi-02@asadatalakeg3xbtcv.dfs.core.windows.net/top-products/*.parquet', format='parquet')
display(df.limit(10))

StatementMeta(SparkPool01, 8, 1, Finished, Available)

SynapseWidget(Synapse.DataFrame, 76759456-3a39-4407-8852-62235c2bf41e)

In [2]:
topPurchases = df.select(
    "UserId", "ProductId",
    "ItemsPurchasedLast12Months", "IsTopProduct",
    "IsPreferredProduct")

# Populate a temporary view so we can query from SQL
topPurchases.createOrReplaceTempView("top_purchases")

topPurchases.show(100)

StatementMeta(SparkPool01, 8, 2, Finished, Available)

+------+---------+--------------------------+------------+------------------+
|UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
+------+---------+--------------------------+------------+------------------+
|   148|     2717|                      null|       false|              true|
|   148|     4002|                      null|       false|              true|
|   148|     1716|                      null|       false|              true|
|   148|     4520|                      null|       false|              true|
|   148|      951|                      null|       false|              true|
|   148|     1817|                      null|       false|              true|
|   463|     2634|                      null|       false|              true|
|   463|     2795|                      null|       false|              true|
|   471|     1946|                      null|       false|              true|
|   471|     4431|                      null|       false|      

In [3]:
from pyspark.sql.functions import *

topPreferredProducts = (topPurchases
    .filter( col("IsTopProduct") == True)
    .filter( col("IsPreferredProduct") == True)
    .orderBy( col("ItemsPurchasedLast12Months").desc() ))

topPreferredProducts.show(100)

StatementMeta(SparkPool01, 8, 3, Finished, Available)

+------+---------+--------------------------+------------+------------------+
|UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
+------+---------+--------------------------+------------+------------------+
| 90779|     4086|                        99|        true|              true|
| 90619|     1557|                        99|        true|              true|
| 90779|     4086|                        99|        true|              true|
| 85007|      521|                        99|        true|              true|
| 90779|     4086|                        99|        true|              true|
| 89537|     1473|                        99|        true|              true|
| 89457|      677|                        99|        true|              true|
| 89537|     1473|                        99|        true|              true|
| 89457|      677|                        99|        true|              true|
| 89537|     1473|                        99|        true|      

In [4]:
%%sql

CREATE OR REPLACE TEMPORARY VIEW top_5_products
AS
    select UserId, ProductId, ItemsPurchasedLast12Months
    from (select *,
                row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
        from top_purchases
        ) a
    where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
    order by a.UserId

StatementMeta(SparkPool01, 8, 4, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [5]:
top5Products = sqlContext.table("top_5_products")

top5Products.show(100)

StatementMeta(SparkPool01, 8, 5, Finished, Available)

+------+---------+--------------------------+
|UserId|ProductId|ItemsPurchasedLast12Months|
+------+---------+--------------------------+
| 80000|     2069|                        93|
| 80000|     2069|                        93|
| 80000|     2069|                        93|
| 80000|     2069|                        93|
| 80000|     2069|                        93|
| 80001|     1812|                        93|
| 80001|     1812|                        93|
| 80001|     1812|                        93|
| 80001|     1812|                        93|
| 80001|     1812|                        93|
| 80002|     4987|                        88|
| 80002|     3190|                        92|
| 80002|     3190|                        92|
| 80002|     1256|                        90|
| 80002|     1256|                        90|
| 80003|      295|                        91|
| 80003|      638|                        97|
| 80003|      638|                        97|
| 80003|      638|                

In [6]:
print('before filter: ', topPreferredProducts.count(), ', after filter: ', top5Products.count())

StatementMeta(SparkPool01, 8, 6, Finished, Available)

before filter:  930572 , after filter:  79333


In [7]:
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
    .groupBy("ProductId")
    .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
    .orderBy( col("Total").desc() )
    .limit(5))

top5ProductsOverall.show()

StatementMeta(SparkPool01, 8, 7, Finished, Available)

+---------+-----+
|ProductId|Total|
+---------+-----+
|     2107| 4538|
|     3459| 4233|
|      347| 4148|
|     2746| 4067|
|     1262| 3974|
+---------+-----+



In [8]:
import uuid

# Generate random GUID
runId = uuid.uuid4()

StatementMeta(SparkPool01, 8, 8, Finished, Available)

In [9]:
%%pyspark

top5ProductsOverall.write.parquet('abfss://wwi-02@asadatalakeg3xbtcv.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')

StatementMeta(SparkPool01, 8, 9, Finished, Available)