In [None]:
from pyspark.storagelevel import StorageLevel
from pyspark.sql.functions import *
from pyspark.sql.types import *

### Reading the Sales data from cosmos landing zone.

In [None]:
df = spark.read.format("cosmos.oltp") \
    .option("spark.cosmos.accountEndpoint", "<accountURL>") \
    .option("spark.cosmos.accountKey", "<accountkey>") \
    .option("spark.cosmos.database", "DF") \
    .option("spark.cosmos.container", "Sales") \
    .load()
 
df.show()

+-----+--------+-----+------------+--------------------+-------------+-----+-------+----------+
|stock|store_id|price|promo_type_1|                  id|         date|sales|revenue|product_id|
+-----+--------+-----+------------+--------------------+-------------+-----+-------+----------+
|  3.0|   S0020| 33.9|        PR14|5546b31e-e931-48c...|1483315200000|  0.0|    0.0|     P0005|
|  5.0|   S0050| 33.9|        PR14|51b90b77-3e5b-44c...|1483315200000|  0.0|    0.0|     P0005|
|  5.0|   S0050| 6.25|        PR14|b73ba1a1-aec5-42d...|1483315200000|  0.0|    0.0|     P0001|
|  4.0|   S0102| 33.9|        PR14|50944570-f72c-412...|1483315200000|  0.0|    0.0|     P0005|
|  7.0|   S0102| 6.25|        PR14|d0e233f8-281a-421...|1483315200000|  0.0|    0.0|     P0001|
|  5.0|   S0050|  2.6|        PR14|bd4f4b5f-818a-470...|1483315200000|  0.0|    0.0|     P0015|
| 45.0|   S0020| 1.95|        PR14|8c478eb0-e80a-4da...|1483315200000|  0.0|    0.0|     P0024|
|  5.0|   S0086| 2.45|        PR14|50fd2

### Checking inferred datatypes and making necessary typecasting

In [None]:
df.printSchema()

root
 |-- stock: double (nullable = true)
 |-- store_id: string (nullable = true)
 |-- price: double (nullable = true)
 |-- promo_type_1: string (nullable = true)
 |-- id: string (nullable = false)
 |-- date: long (nullable = true)
 |-- sales: double (nullable = true)
 |-- revenue: double (nullable = true)
 |-- product_id: string (nullable = true)



In [None]:
df_converted = df.withColumn("date", from_unixtime(col("date") / 1000).cast("date"))
df_final = df_converted.drop("id")


### Optimisations

In [None]:
df_final.repartition('product_id')
df_final.persist(StorageLevel.MEMORY_ONLY)

Out[5]: DataFrame[stock: double, store_id: string, price: double, promo_type_1: string, date: date, sales: double, revenue: double, product_id: string]

### Checking Duplicates.

In [None]:
duplicates = df_final.groupBy(df_final.columns).count().filter("count > 1")
duplicates.count()

Out[6]: 0

### Count of Records

In [None]:
df_final.count()

Out[7]: 2143520

### Checking for nulls and handling them

In [None]:
null_counts = df_final.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_final.columns])
null_counts.show()

+-----+--------+-----+------------+----+-----+-------+----------+
|stock|store_id|price|promo_type_1|date|sales|revenue|product_id|
+-----+--------+-----+------------+----+-----+-------+----------+
|19005|       0|62705|           0|   0|19005|  19005|         0|
+-----+--------+-----+------------+----+-----+-------+----------+



In [None]:
median_dict = {
    c: df_final.approxQuantile(c, [0.5], 0.01)[0] 
    for c in df_final.columns 
    if isinstance(df_final.schema[c].dataType, (DoubleType, FloatType, IntegerType))
}

df_cleaned = df_final.fillna(median_dict)

df_cleaned.orderBy(col('date').desc()).show()


+-----+--------+-----+------------+----------+-----+-------+----------+
|stock|store_id|price|promo_type_1|      date|sales|revenue|product_id|
+-----+--------+-----+------------+----------+-----+-------+----------+
|  9.0|   S0020| 49.9|        PR14|2019-11-08|  0.0|    0.0|     P0005|
|  9.0|   S0040|10.95|        PR14|2019-11-08|  0.0|    0.0|     P0001|
|  9.0|   S0031| 13.9|        PR14|2019-11-08|  0.0|    0.0|     P0007|
|  9.0|   S0110|119.9|        PR14|2019-11-08|  0.0|    0.0|     P0012|
|  9.0|   S0048|10.95|        PR14|2019-11-08|  0.0|    0.0|     P0001|
|  9.0|   S0090| 49.9|        PR14|2019-11-08|  0.0|    0.0|     P0005|
|  9.0|   S0071| 3.25|        PR14|2019-11-08|  0.0|    0.0|     P0018|
|  9.0|   S0110| 49.9|        PR14|2019-11-08|  0.0|    0.0|     P0005|
|  9.0|   S0023|10.95|        PR14|2019-11-08|  0.0|    0.0|     P0001|
|  9.0|   S0024| 13.9|        PR14|2019-11-08|  0.0|    0.0|     P0007|
|  9.0|   S0031|119.9|        PR14|2019-11-08|  0.0|    0.0|    

In [None]:
df_cleaned.repartition('product_id')
df_cleaned.persist(StorageLevel.MEMORY_ONLY)

Out[10]: DataFrame[stock: double, store_id: string, price: double, promo_type_1: string, date: date, sales: double, revenue: double, product_id: string]

In [None]:
null_counts = df_cleaned.select([sum(col(c).isNull().cast("int")).alias(c) for c in df_final.columns if isinstance(df_final.schema[c].dataType, (DoubleType, FloatType, IntegerType))])
null_counts.show()

+-----+-----+-----+-------+
|stock|price|sales|revenue|
+-----+-----+-----+-------+
|    0|    0|    0|      0|
+-----+-----+-----+-------+



In [None]:
df_cleaned = df_cleaned.select(
    col("product_id"),
    col("store_id"),
    col("date"),
    col("sales"),
    col("revenue"),
    col("stock"),
    col("price"),
    col("promo_type_1")
)

### creating new columns from date 

In [None]:
df_cleaned = df_cleaned.select(
    "*",
    dayofmonth(col("date")).alias("day_id"),
    month(col("date")).alias("month_id"),
    year(col("date")).alias("year_id"),
    dayofweek(col("date")).alias("day_of_week"),
    quarter(col("date")).alias("business_quarter")
)

In [None]:
df_cleaned.show()

+----------+--------+----------+-----+-------+-----+-----+------------+------+--------+-------+-----------+----------------+
|product_id|store_id|      date|sales|revenue|stock|price|promo_type_1|day_id|month_id|year_id|day_of_week|business_quarter|
+----------+--------+----------+-----+-------+-----+-----+------------+------+--------+-------+-----------+----------------+
|     P0005|   S0020|2017-01-02|  0.0|    0.0|  3.0| 33.9|        PR14|     2|       1|   2017|          2|               1|
|     P0005|   S0050|2017-01-02|  0.0|    0.0|  5.0| 33.9|        PR14|     2|       1|   2017|          2|               1|
|     P0001|   S0050|2017-01-02|  0.0|    0.0|  5.0| 6.25|        PR14|     2|       1|   2017|          2|               1|
|     P0005|   S0102|2017-01-02|  0.0|    0.0|  4.0| 33.9|        PR14|     2|       1|   2017|          2|               1|
|     P0001|   S0102|2017-01-02|  0.0|    0.0|  7.0| 6.25|        PR14|     2|       1|   2017|          2|               1|


### Reading Product Data from Landing Zone

In [None]:
product_df = spark.read.format("cosmos.oltp") \
    .option("spark.cosmos.accountEndpoint", "<endpointURL>") \
    .option("spark.cosmos.accountKey", "<accountkey>") \
    .option("spark.cosmos.database", "DF") \
    .option("spark.cosmos.container", "Product") \
    .load()
product_df.printSchema()

root
 |-- hierarchy5_id: string (nullable = true)
 |-- product_depth: string (nullable = true)
 |-- hierarchy1_id: string (nullable = true)
 |-- id: string (nullable = false)
 |-- product_id: string (nullable = true)
 |-- product_length: string (nullable = true)
 |-- hierarchy4_id: string (nullable = true)
 |-- product_width: string (nullable = true)
 |-- cluster_id: string (nullable = true)
 |-- hierarchy2_id: string (nullable = true)
 |-- hierarchy3_id: string (nullable = true)



In [None]:
product_df = product_df.select(
    col("product_id"),
    col("product_length").cast("double").alias("product_length"),
    col("product_depth").cast("double").alias("product_depth"),
    col("product_width").cast("double").alias("product_width"),
    col("cluster_id"),
    col("hierarchy1_id"),
    col("hierarchy2_id"),
    col("hierarchy3_id"),
    col("hierarchy4_id"),
    col("hierarchy5_id")
)

### checking nulls and handling them

In [None]:
null_counts = product_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in product_df.columns])
null_counts.show()

+----------+--------------+-------------+-------------+----------+-------------+-------------+-------------+-------------+-------------+
|product_id|product_length|product_depth|product_width|cluster_id|hierarchy1_id|hierarchy2_id|hierarchy3_id|hierarchy4_id|hierarchy5_id|
+----------+--------------+-------------+-------------+----------+-------------+-------------+-------------+-------------+-------------+
|         0|            18|           16|           16|        50|            0|            0|            0|            0|            0|
+----------+--------------+-------------+-------------+----------+-------------+-------------+-------------+-------------+-------------+



In [None]:
median_length = product_df.approxQuantile('product_length', [0.5], 0.01)[0]
median_depth = product_df.approxQuantile('product_depth', [0.5], 0.01)[0]
median_width = product_df.approxQuantile('product_width', [0.5], 0.01)[0]

product_df = product_df.fillna({
    'product_length': median_length,
    'product_depth': median_depth,
    'product_width': median_width,
    'cluster_id' : 'NaN'
})


In [None]:
null_counts = product_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in product_df.columns])
null_counts.show()

+----------+--------------+-------------+-------------+----------+-------------+-------------+-------------+-------------+-------------+
|product_id|product_length|product_depth|product_width|cluster_id|hierarchy1_id|hierarchy2_id|hierarchy3_id|hierarchy4_id|hierarchy5_id|
+----------+--------------+-------------+-------------+----------+-------------+-------------+-------------+-------------+-------------+
|         0|             0|            0|            0|         0|            0|            0|            0|            0|            0|
+----------+--------------+-------------+-------------+----------+-------------+-------------+-------------+-------------+-------------+



### Reading Store Data from the Landing Zone

In [None]:
store_df = spark.read.format("cosmos.oltp") \
    .option("spark.cosmos.accountEndpoint", "<endpointURL>") \
    .option("spark.cosmos.accountKey", "<accountkey>") \
    .option("spark.cosmos.database", "DF") \
    .option("spark.cosmos.container", "Store") \
    .load()

In [None]:
store_df = store_df.select(
    col("store_id"),
    col("storetype_id"),
    col("store_size"),
    col("city_id")
)

In [None]:
null_counts = store_df.select([sum(col(c).isNull().cast("int")).alias(c) for c in store_df.columns])
null_counts.show()

+--------+------------+----------+-------+
|store_id|storetype_id|store_size|city_id|
+--------+------------+----------+-------+
|       0|           0|         0|      0|
+--------+------------+----------+-------+



### Joining 3 Datasets into a single dataframe

In [None]:
product_sales_df = df_cleaned.join(broadcast(product_df), on='product_id', how='inner')
joined_df = product_sales_df.join(broadcast(store_df), on='store_id', how='inner')

joined_df.toPandas()

### Writing the Cleaned Data to Azure Blob Storage

In [None]:
spark.conf.set("fs.azure.account.key.cs0storage0acc.blob.core.windows.net", "<accountkey>")

In [None]:
output_path = "wasbs://transformationoutput@cs0storage0acc.blob.core.windows.net/output.parquet"

In [None]:
joined_df.write.parquet(output_path, mode='overwrite')