## CosmosDB/Mongo API Sales Data Processing with Synapse Link

Purpose is to read the **Synapse Link Sales data** and produce
**aggregated totals by Customer**.  Write these aggregated totals to both
**Azure Storage and Azure PostgreSQL**.

### Programming Logic in this Notebook, by Cell

- Define common PySpark functions
- Load the Synapse Link Sales data into a Dataframe
- Select just the pertinent columns for the aggregation calculation
- Aggregate Sales by Customer
- Pass the aggregated DataFrame to Spark/Scala as a TempView
- Write the aggregated DataFrame to Azure Storage as CSV 
- Write the aggregated DataFrame to an Azure PostgreSQL table

### Links 

- https://github.com/Azure-Samples/Synapse/tree/main/Notebooks/PySpark

In [None]:
# Define common PySpark functions used in this Notebook

def write_df_to_csv_blob(df, out_csv):

    # Azure storage account info
    blob_account_name   = 'cjoakimstorage'
    blob_container_name = 'retail'
    blob_relative_path  = 'demo'
    linked_service_name = 'SecondaryAzureBlobStorage'

    blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(
        linked_service_name)
    #print('blob_sas_token: {}'.format(blob_sas_token))

    wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (
        blob_container_name, blob_account_name, blob_relative_path)

    spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (
        blob_container_name, blob_account_name), blob_sas_token)

    csv_path = '{}/{}'.format(wasbs_path, out_csv)

    print('wasbs_path: ' + wasbs_path)
    print('csv_path:   ' + csv_path)

    # Write to blob storage, coalesce it into one CSV file
    df.coalesce(1).write.csv(csv_path, mode='overwrite', header='true')
    print('written')


In [None]:
# Load the SynapseLink Sales data into a Dataframe.
# Select just the "sale" document types from the sales container, 
# which have a minimum _ts (timestamp) value.

# The documents in CosmosDB OLTP data look like this:
# { 
#     "_id" : ObjectId("6200059edbf78e1f05346e70"), 
#     "pk" : "1", 
#     "id" : "d6167c84-024a-4ecd-9c95-55f0005615d0", 
#     "sale_id" : NumberInt(1), 
#     "doctype" : "sale", 
#     "date" : "2021-01-01", 
#     "dow" : "fri", 
#     "customer_id" : NumberInt(3275), 
#     "store_id" : NumberInt(61), 
#     "item_count" : NumberInt(3), 
#     "total_cost" : 2049.71
# }

# Above document from this query in Studio 3T:
#   db.getCollection("sales").find({doctype:"sale"})

from pyspark.sql.functions import col
import pyspark.sql.functions as F 

# initialize variables
min_doc_timestamp = 1635168000

# read the raw OLAP data, filtering by document _ts (timestamp)
df_sales_raw = spark.read\
    .format("cosmos.olap")\
    .option("spark.synapse.linkedService", "CosmosMongoDemoDB")\
    .option("spark.cosmos.container", "sales")\
    .load().filter(col("_ts") > min_doc_timestamp)

display(df_sales_raw.limit(3))
df_sales_raw.printSchema()


In [None]:
# Select just the pertinent columns for the aggregation calculation.
# Unpack the struct columns with attrname.* syntax.

df_sales_unpacked = df_sales_raw.select(
    col('doctype.*'),
    col('date.*'),
    col('customer_id.*'),
    col('item_count.*'),
    col('total_cost.*'))

display(df_sales_unpacked.limit(3))
df_sales_unpacked.printSchema()

# Rename the columns of the unpacked DataFrame
new_column_names = [
    'doctype', 'date', 'customer_id', 'item_count', 'total_cost']
df_sales = df_sales_unpacked.toDF(*new_column_names).filter(col("doctype") == "sale")

display(df_sales.limit(3))
df_sales.printSchema()

print('df_sales, shape: {} x {}'.format(
        df_sales.count(), len(df_sales.columns)))


In [None]:
# Aggregate Sales by Customer 

import pyspark.sql.functions as F 

df_customer_aggregated = df_sales.groupBy("customer_id") \
    .agg(
        F.first('customer_id').alias('_id'), \
        F.first('customer_id').alias('pk'), \
        F.count("customer_id").alias('order_count'), \
        F.sum("total_cost").alias("total_dollar_amount"), \
        F.sum("item_count").alias("total_item_count")) \
        .sort("customer_id", ascending=True)

display(df_customer_aggregated.limit(4))
df_customer_aggregated.printSchema()

# Pass the df_customer_aggregated DataFrame to Spark/Scala as a TempView
df_customer_aggregated.createOrReplaceTempView("CustomerAggSales")

print('df_customer_aggregated, shape: {} x {}'.format(
        df_customer_aggregated.count(), 
        len(df_customer_aggregated.columns)))


In [None]:
# Write the df_customer_aggregated to Azure Blob Storage as CSV 

from pyspark.sql import SparkSession
from pyspark.sql.types import *

import pyspark.sql.functions as F 

# Azure storage account info
blob_account_name   = 'cjoakimstorage'
blob_container_name = 'synapse'
blob_relative_path  = 'retail/sales/mongo/'
linked_service_name = 'SecondaryAzureBlobStorage'

blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)
#print('blob_sas_token: {}'.format(blob_sas_token))

# Allow Spark to access from Blob remotely
wasbs_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (
    blob_container_name, blob_account_name, blob_relative_path)

spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (
    blob_container_name, blob_account_name), blob_sas_token)

csv_path = '{}{}'.format(wasbs_path,'sales_by_customer_csv')

print('wasbs_path: ' + wasbs_path)
print('csv_path:   ' + csv_path)

# Write to blob storage, coalesce it into one CSV file
df_customer_aggregated.coalesce(1).write.csv(csv_path, mode='overwrite', header='true')
print('csv data written to azure storage blob')


In [None]:
%%spark 

// Obtain configuration values from spark.conf
val connStr  = spark.conf.get("spark.azurepg.jdbc.connstring")
val driver   = spark.conf.get("spark.azurepg.jdbc.driver")
val server   = spark.conf.get("spark.azurepg.jdbc.server")
val database = spark.conf.get("spark.azurepg.jdbc.database")
val table    = "public.customer_sales"
val user     = spark.conf.get("spark.azurepg.jdbc.user")
val password = spark.conf.get("spark.azurepg.jdbc.pass")

// Read the temp table into a Dataframe
val df_temp_view = spark.read.table("CustomerAggSales")

// Using JDBC, write the Dataframe to Azure PostgreSQL
df_temp_view.write
  .format("jdbc")
  .option("url", connStr)
  .option("driver", driver)
  .option("dbtable", table)
  .option("user", user)
  .option("password", password)
  .mode("overwrite")
  .save()


In [None]:
%%pyspark

# Obtain the list of Installed Packages in this Spark cluster,
# and write it to blob storage.

import pkg_resources
pkg_list = list()

for d in sorted(pkg_resources.working_set):
    pkg_list.append([str(d)])

sorted_pkg_list = sorted(pkg_list)

columns = ['package_and_version']
df_pkg = spark.createDataFrame(sorted_pkg_list, columns)


display(df_pkg.limit(3))
df_pkg.printSchema()

write_df_to_csv_blob(df_pkg, 'installed_packages')


In [None]:
%%spark 

//import spark.implicits._

val connStr  = spark.conf.get("spark.azurepg.jdbc.connstring")
val driver   = spark.conf.get("spark.azurepg.jdbc.driver")
val server   = spark.conf.get("spark.azurepg.jdbc.server")
val database = spark.conf.get("spark.azurepg.jdbc.database")
val table    = "public.packages"
val user     = spark.conf.get("spark.azurepg.jdbc.user")
val pass     = spark.conf.get("spark.azurepg.jdbc.pass")

val df_hardcoded = Seq(
  ("this"),
  ("that"),
  ("the other")
).toDF("package_and_version")

df_hardcoded.write
  .format("jdbc")
  .option("url", connStr)
  .option("driver", driver)
  .option("dbtable", table)
  .option("user", user)
  .option("password", pass)
  .mode("overwrite")
  .save()

println("df written to database: " + database + ", table: " + table)
