In [3]:
%%pyspark
df = spark.read.load('abfss://wwi-02@asadatalakenm10djs.dfs.core.windows.net/top-products/*.parquet', format='parquet')
display(df.limit(10))

StatementMeta(SparkPool01, 5, 1, Finished, Available)

SynapseWidget(Synapse.DataFrame, 36f25d70-3d16-41ba-af33-babf83a7b21b)

In [5]:
topPurchases = df.select(
    "UserId", "ProductId",
    "ItemsPurchasedLast12Months", "IsTopProduct",
    "IsPreferredProduct")

# Populate a temporary view so we can query from SQL
topPurchases.createOrReplaceTempView("top_purchases")

topPurchases.show(100)

StatementMeta(SparkPool01, 5, 3, Finished, Available)

+------+---------+--------------------------+------------+------------------+
|UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
+------+---------+--------------------------+------------+------------------+
| 80332|     2098|                        31|        true|              true|
| 80332|     2098|                        31|        true|              true|
| 80332|     1687|                         4|        true|              true|
| 80332|     1687|                         4|        true|              true|
| 80332|     2470|                        94|        true|              true|
| 80332|     2470|                        94|        true|              true|
| 80332|      420|                        34|        true|              true|
| 80332|      420|                        34|        true|              true|
| 80451|     4885|                        96|        true|             false|
| 81349|     2960|                         3|        true|      

In [9]:
from pyspark.sql.functions import *

topPreferredProducts = (topPurchases
    .filter( col("IsTopProduct") == True)
    .filter( col("IsPreferredProduct") == True)
    .orderBy( col("ItemsPurchasedLast12Months").desc() ))

topPreferredProducts.show(100)

StatementMeta(SparkPool01, 5, 7, Finished, Available)

+------+---------+--------------------------+------------+------------------+
|UserId|ProductId|ItemsPurchasedLast12Months|IsTopProduct|IsPreferredProduct|
+------+---------+--------------------------+------------+------------------+
| 95450|       31|                        99|        true|              true|
| 83400|     4561|                        99|        true|              true|
| 95450|       31|                        99|        true|              true|
| 88684|     4425|                        99|        true|              true|
| 95450|       31|                        99|        true|              true|
| 94788|     3142|                        99|        true|              true|
| 95450|       31|                        99|        true|              true|
| 94788|     3142|                        99|        true|              true|
| 95450|       31|                        99|        true|              true|
| 82969|     3967|                        99|        true|      

In [6]:
%%sql

CREATE OR REPLACE TEMPORARY VIEW top_5_products
AS
    select UserId, ProductId, ItemsPurchasedLast12Months
    from (select *,
                row_number() over (partition by UserId order by ItemsPurchasedLast12Months desc) as seqnum
        from top_purchases
        ) a
    where seqnum <= 5 and IsTopProduct == true and IsPreferredProduct = true
    order by a.UserId

StatementMeta(SparkPool01, 5, 4, Finished, Available)

<Spark SQL result set with 0 rows and 0 fields>

In [7]:
top5Products = sqlContext.table("top_5_products")

top5Products.show(100)

StatementMeta(SparkPool01, 5, 5, Finished, Available)

+------+---------+--------------------------+
|UserId|ProductId|ItemsPurchasedLast12Months|
+------+---------+--------------------------+
| 80000|     3794|                        90|
| 80000|     3794|                        90|
| 80000|     3794|                        90|
| 80000|     3794|                        90|
| 80000|     3794|                        90|
| 80001|     3729|                        56|
| 80001|     3729|                        56|
| 80001|     3729|                        56|
| 80001|     3729|                        56|
| 80001|     3729|                        56|
| 80002|     4254|                        30|
| 80002|     4254|                        30|
| 80004|     1528|                        78|
| 80004|     1528|                        78|
| 80005|      660|                        72|
| 80005|      660|                        72|
| 80005|      660|                        72|
| 80005|      660|                        72|
| 80005|      660|                

In [10]:
print('before filter: ', topPreferredProducts.count(), ', after filter: ', top5Products.count())

StatementMeta(SparkPool01, 5, 8, Finished, Available)

before filter:  141085 , after filter:  54506


In [11]:
top5ProductsOverall = (top5Products.select("ProductId","ItemsPurchasedLast12Months")
    .groupBy("ProductId")
    .agg( sum("ItemsPurchasedLast12Months").alias("Total") )
    .orderBy( col("Total").desc() )
    .limit(5))

top5ProductsOverall.show()

StatementMeta(SparkPool01, 5, 9, Finished, Available)

+---------+-----+
|ProductId|Total|
+---------+-----+
|     1555| 9947|
|     4549| 8107|
|     2062| 8066|
|     2107| 8055|
|     2429| 7899|
+---------+-----+



In [12]:
import uuid

# Generate random GUID
runId = uuid.uuid4()

StatementMeta(SparkPool01, 5, 10, Finished, Available)

In [13]:
%%pyspark

top5ProductsOverall.write.parquet('abfss://wwi-02@asadatalakenm10djs.dfs.core.windows.net/top5-products/' + str(runId) + '.parquet')

StatementMeta(SparkPool01, 5, 11, Finished, Available)