In [1]:
%%pyspark
df = spark.read.load('abfss://wwi-02@asadatalakeic6nl8d.dfs.core.windows.net/sale-small/Year=2019/Quarter=Q4/Month=12/Day=20191231/sale-small-20191231-snappy.parquet', format='parquet')
display(df.limit(10))

datalake = 'asadatalakeic6nl8d'

StatementMeta(SparkPool01, 4, 1, Finished, Available)

SynapseWidget(Synapse.DataFrame, e66f26d4-b688-416c-b37b-55cd736d7fd3)

In [2]:
df.printSchema()

StatementMeta(SparkPool01, 4, 2, Finished, Available)

root
 |-- TransactionId: string (nullable = true)
 |-- CustomerId: integer (nullable = true)
 |-- ProductId: short (nullable = true)
 |-- Quantity: byte (nullable = true)
 |-- Price: decimal(38,18) (nullable = true)
 |-- TotalAmount: decimal(38,18) (nullable = true)
 |-- TransactionDate: integer (nullable = true)
 |-- ProfitAmount: decimal(38,18) (nullable = true)
 |-- Hour: byte (nullable = true)
 |-- Minute: byte (nullable = true)
 |-- StoreId: short (nullable = true)



In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

profitByDateProduct = (df.groupBy("TransactionDate","ProductId")
    .agg(
        sum("ProfitAmount").alias("(sum)ProfitAmount"),
        round(avg("Quantity"), 4).alias("(avg)Quantity"),
        sum("Quantity").alias("(sum)Quantity"))
    .orderBy("TransactionDate"))
display(profitByDateProduct.limit(100))

StatementMeta(SparkPool01, 4, 3, Finished, Available)

SynapseWidget(Synapse.DataFrame, 2ac4c67e-d014-4f5c-af38-78697a5b1603)

In [4]:
df = (spark.read \
        .option('inferSchema', 'true') \
        .json('abfss://wwi-02@' + datalake + '.dfs.core.windows.net/online-user-profiles-02/*.json', multiLine=True)
    )

df.printSchema()

StatementMeta(SparkPool01, 4, 4, Finished, Available)

root
 |-- topProductPurchases: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- itemsPurchasedLast12Months: long (nullable = true)
 |    |    |-- productId: long (nullable = true)
 |-- visitorId: long (nullable = true)



In [5]:
# create a view called user_profiles
df.createOrReplaceTempView("user_profiles")

StatementMeta(SparkPool01, 4, 5, Finished, Available)

In [6]:
%%sql

SELECT * FROM user_profiles LIMIT 10

StatementMeta(SparkPool01, 4, 6, Finished, Available)

<Spark SQL result set with 10 rows and 2 fields>

In [7]:
from pyspark.sql.functions import udf, explode

flat=df.select('visitorId',explode('topProductPurchases').alias('topProductPurchases_flat'))
flat.show(100)

StatementMeta(SparkPool01, 4, 7, Finished, Available)

+---------+------------------------+
|visitorId|topProductPurchases_flat|
+---------+------------------------+
|   117000|              [13, 3623]|
|   117000|               [5, 2321]|
|   117001|               [93, 713]|
|   117001|              [19, 2144]|
|   117001|              [30, 1094]|
|   117001|              [82, 3223]|
|   117001|              [42, 3328]|
|   117001|              [62, 2926]|
|   117001|              [63, 2651]|
|   117001|               [39, 341]|
|   117001|              [85, 4841]|
|   117001|              [67, 4289]|
|   117001|              [42, 1264]|
|   117001|              [43, 3608]|
|   117001|               [14, 504]|
|   117001|              [97, 2649]|
|   117001|              [44, 2873]|
|   117001|               [7, 4491]|
|   117001|              [70, 3623]|
|   117001|               [4, 4035]|
|   117001|              [45, 3576]|
|   117002|               [25, 478]|
|   117002|              [15, 2515]|
|   117002|               [34, 209]|
|

In [11]:
topPurchases = (flat.select('visitorId','topProductPurchases_flat.productId','topProductPurchases_flat.itemsPurchasedLast12Months')
    .orderBy('visitorId'))

topPurchases.show(100)

StatementMeta(SparkPool01, 4, 11, Finished, Available)

+---------+---------+--------------------------+
|visitorId|productId|itemsPurchasedLast12Months|
+---------+---------+--------------------------+
|    80000|     3270|                         5|
|    80000|     1330|                        78|
|    80000|       93|                        86|
|    80000|     4745|                        38|
|    80000|     2481|                        63|
|    80000|     4206|                        21|
|    80000|     2488|                        31|
|    80000|     4136|                        30|
|    80000|      290|                        83|
|    80000|     2859|                        54|
|    80000|     3794|                        90|
|    80000|      102|                        42|
|    80000|     4198|                        92|
|    80000|     4074|                        48|
|    80000|      291|                        49|
|    80000|     1362|                        18|
|    80000|     2069|                        93|
|    80000|     3122

In [12]:
# Let's order by the number of items purchased in the last 12 months
sortedTopPurchases = topPurchases.orderBy("itemsPurchasedLast12Months")

display(sortedTopPurchases.limit(100))

StatementMeta(SparkPool01, 4, 12, Finished, Available)

SynapseWidget(Synapse.DataFrame, 970349d1-87dd-45ae-aae2-8e1f4f755243)

In [15]:
sortedTopPurchases = (topPurchases
    .orderBy( col("itemsPurchasedLast12Months").desc() ))

display(sortedTopPurchases.limit(100))

StatementMeta(SparkPool01, 4, 15, Finished, Available)

SynapseWidget(Synapse.DataFrame, 0f9d72b1-553a-4b30-97f1-56f08346ecef)

In [16]:
groupedTopPurchases = (sortedTopPurchases.select("visitorId")
    .groupBy("visitorId")
    .agg(count("*").alias("total"))
    .orderBy("visitorId") )

display(groupedTopPurchases.limit(100))

StatementMeta(SparkPool01, 4, 16, Finished, Available)

SynapseWidget(Synapse.DataFrame, fd5f5b0a-174c-45b8-aeef-ae690dfaf79a)

In [17]:
groupedTopPurchases = (sortedTopPurchases.select("visitorId","itemsPurchasedLast12Months")
    .groupBy("visitorId")
    .agg(sum("itemsPurchasedLast12Months").alias("totalItemsPurchased"))
    .orderBy("visitorId") )

display(groupedTopPurchases.limit(100))

StatementMeta(SparkPool01, 4, 17, Finished, Available)

SynapseWidget(Synapse.DataFrame, e4dc7f71-90b8-4519-9bb9-02819800bb8a)

In [18]:
# Create a temporary view for top purchases so we can load from Scala
topPurchases.createOrReplaceTempView("top_purchases")

StatementMeta(SparkPool01, 4, 18, Finished, Available)

In [19]:
%%spark
// Make sure the name of the dedcated SQL pool (SQLPool01 below) matches the name of your SQL pool.
val df = spark.sqlContext.sql("select * from top_purchases")
df.write.synapsesql("SQLPool01.wwi.TopPurchases", Constants.INTERNAL)

StatementMeta(SparkPool01, 4, 20, Finished, Available)

df: org.apache.spark.sql.DataFrame = [visitorId: bigint, productId: bigint ... 1 more field]


In [20]:
dfsales = spark.read.load('abfss://wwi-02@' + datalake + '.dfs.core.windows.net/sale-small/Year=2019/Quarter=Q4/Month=12/*/*.parquet', format='parquet')
display(dfsales.limit(10))

StatementMeta(SparkPool01, 4, 21, Finished, Available)

SynapseWidget(Synapse.DataFrame, e69b1ca2-802f-4359-8d88-d669502e85c9)

In [21]:
%%spark
// Make sure the name of the SQL pool (SQLPool01 below) matches the name of your SQL pool.
val df2 = spark.read.synapsesql("SQLPool01.wwi.TopPurchases")
df2.createTempView("top_purchases_sql")

df2.head(10)

StatementMeta(SparkPool01, 4, 22, Finished, Available)

df2: org.apache.spark.sql.DataFrame = [visitorId: bigint, productId: bigint ... 1 more field]
res7: Array[org.apache.spark.sql.Row] = Array([81820,1815,73], [99313,1640,66], [81820,2442,45], [99313,4296,37], [81821,3292,36], [99313,1566,39], [81821,1126,40], [99313,3765,40], [81822,793,14], [99313,303,19])


In [22]:
dfTopPurchasesFromSql = sqlContext.table("top_purchases_sql")

display(dfTopPurchasesFromSql.limit(10))

StatementMeta(SparkPool01, 4, 23, Finished, Available)

SynapseWidget(Synapse.DataFrame, bc117b05-5df2-48e6-b441-30fc21ef1d2d)

In [29]:
inner_join = dfsales.join(dfTopPurchasesFromSql,
    (dfsales.CustomerId == dfTopPurchasesFromSql.visitorId) & (dfsales.ProductId == dfTopPurchasesFromSql.productId))

inner_join_agg = (inner_join.select("CustomerId","TotalAmount","Quantity","itemsPurchasedLast12Months","top_purchases_sql.productId")
    .groupBy(["CustomerId","top_purchases_sql.productId"])
    .agg(
        sum("TotalAmount").alias("TotalAmountDecember"),
        sum("Quantity").alias("TotalQuantityDecember"),
        sum("itemsPurchasedLast12Months").alias("TotalItemsPurchasedLast12Months"))
        .withColumn("TotalAmountDecember",round(col("TotalAmountDecember"),2))
    .orderBy("CustomerId") )

display(inner_join_agg.limit(100))

StatementMeta(SparkPool01, 4, 30, Finished, Available)

SynapseWidget(Synapse.DataFrame, 946ba556-e2ac-4cf5-b2c2-80a921e7d224)