## Read Me
### This notebook created to create "top_item" data mart

## Importing Required Packages

In [0]:
from pyspark.sql.functions import col
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.functions import desc, dense_rank

## Getting Helper Functions from Helper Notebook

In [0]:
%run ./helper_notebook

## Defining Variables to be Used on Further Loops
### Variables could be read from helper notebook as well but for ease of readability defining as hard coded

In [0]:
## Schema name (Tables to be saved into)
schema_name = "gold_layer"

## Metastore_name
catalog_name = "hive_metastore"

## External file location name (Azure Storage Account)
external_storage = "merkletaskstorage"

## External file location name (Azure Blob Container in storage account) for read operation
dataframe_list = ["items","events"]

## External file location name (Azure Blob Container in storage account) for write operation
naming_conversion_dict = {"items": "dimitems", "events": "factevents", "event_view_item_by_year" : "eventviewitembyyear", "item_view_numbers_by_year" : "itemviewnumbersbyyear", "most_used_platform_by_year_rank" : "mostusedplatformbyyearrank"}

# Secret scope
sas_key_scope = "BlobStorage4"

# Secret key name
sas_key_name = "BLB_Strg_Access_KEY"

# Partition col names
partition_cols = 'event_time_year'

## Reading Ingested Raw Data from Silver Layer External Location

In [0]:
for container in dataframe_list:
    # Create or use existing mount point
    mount_point = f"/mnt/{container}_raw_2"
    already_mounted = any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts())
    if not already_mounted:
        spark.conf.set(f"fs.azure.sas.{container}.{external_storage}.blob.core.windows.net",
                       dbutils.secrets.get(scope = sas_key_scope, key = sas_key_name))
        
        dbutils.fs.mount(
            source = f"wasbs://{container}@{external_storage}.blob.core.windows.net/",
            mount_point = mount_point,
            extra_configs = {
                f"fs.azure.sas.{container}.{external_storage}.blob.core.windows.net": dbutils.secrets.get(scope = sas_key_scope, key = sas_key_name)
            }
        )
    # Reading delta files from actual mount point per dataframe and assigning to dataframe name
    vars()[container] = read_delta_azure_file(mount_point, container)
    # print for debug on read dataframe sizes
    shape_df = (vars()[container].count(),len(vars()[container].columns))
    message = 'Dataframe read from silver layer pipeline schema read completed. Total {} rows & columns loaded into dataframe from {} table'.format(shape_df,container)
    print(message)

Dataframe read from silver layer pipeline schema read completed. Total (2198, 7) rows & columns loaded into dataframe from items table
Dataframe read from silver layer pipeline schema read completed. Total (853640, 8) rows & columns loaded into dataframe from events table


## Creating Requested Views on Top Item Datamart

###  Total number of item views in a particular year

In [0]:
# Filtering events dataframe by event_name col. for view_item events
events_view_item = events.filter(events.event_name == "view_item")

# Inner join between events_view_item and items table on item id column to filter item related view items
event_item_join_df = events_view_item.join(items,events_view_item.sub_event_name_value == items.id)

# Selecting only needed columns
event_item_join_df_filter = event_item_join_df.select("event_time_year","id")

# Group by on year column and counting rows to get total views on a year
event_view_item_by_year = event_item_join_df_filter.groupBy("event_time_year").count()

# Column aliasing
event_view_item_by_year = event_view_item_by_year.select(col("event_time_year").alias("Year"),col("count").alias("Total Item View"))

# Ordering by Year as descending
event_view_item_by_year = event_view_item_by_year.orderBy(col("Year").desc())

###  Rank of an item based on number of views in a particular year

In [0]:
# Gathered only needed cols for calculation (item id taken since it is unique per item)
event_item_join_df_filter = event_item_join_df.select("event_time_year","id")

# Grouped by year and id cols and counted rows
year_item_group_by = event_item_join_df.groupBy("event_time_year","id").count()

# Creating window item which will partition count in a year level (start again when year changes)
window_item = Window.partitionBy(year_item_group_by["event_time_year"]).orderBy(desc("count"))

# Rank column added and window function applied
item_view_numbers_by_year = year_item_group_by.withColumn("item_views",dense_rank().over(window_item))

# Column aliasing
item_view_numbers_by_year = item_view_numbers_by_year.select(col("event_time_year").alias("Year"),col("count").alias("Total Item View"),col("id").alias("Item Id"),col("item_views").alias("View Ranking"))

### The most used platform in particular year

In [0]:

event_item_join_df_filter = event_item_join_df.select("event_time_year","platform")

most_used_platform_by_year = event_item_join_df_filter.groupBy("event_time_year","platform").count()

window_item = Window.partitionBy(most_used_platform_by_year["event_time_year"]).orderBy(desc("count"))

most_used_platform_by_year_rank = most_used_platform_by_year.withColumn("platform_usage",dense_rank().over(window_item))

most_used_platform_by_year_rank = most_used_platform_by_year_rank.filter(most_used_platform_by_year_rank.platform_usage == 1)

# Column aliasing
most_used_platform_by_year_rank = most_used_platform_by_year_rank.select(col("event_time_year").alias("Year"),col("platform").alias("Most Used Platform"),col("count").alias("Total Platform Usage"))

# Ordering by Year as descending
most_used_platform_by_year_rank = most_used_platform_by_year_rank.orderBy(col("Year").desc())

## Saving dataframes to gold layer storage
### For this layer dataframes will also be saved as managed table addition to external table (For proving capability on this option)

### External table on external location save

In [0]:
for ex_df_name,new_df_name in naming_conversion_dict.items():
    print(ex_df_name,new_df_name)
    # Saving transformed data frames into silver layer storage as external table by changing their names
    new_df_name_str = new_df_name
    vars()[new_df_name] = globals()[ex_df_name]
    # printing read file size for debug
    shape_df = (vars()[new_df_name].count(),len(vars()[new_df_name].columns))
    print('Dataframes read on gold_layer_pipeline. Total {} rows & columns loaded into dataframe for {} dataframe`s gold layer schema write'.format(shape_df,new_df_name_str))
    # Create or use existing mount point
    mount_point = f"/mnt/{new_df_name_str}_silver"
    already_mounted = any(mount.mountPoint == mount_point for mount in dbutils.fs.mounts())
    if not already_mounted:
        spark.conf.set(f"fs.azure.sas.{new_df_name_str}.{external_storage}.blob.core.windows.net",
                       dbutils.secrets.get(scope = sas_key_scope, key = sas_key_name))
        
        dbutils.fs.mount(
            source = f"wasbs://{new_df_name_str}@{external_storage}.blob.core.windows.net/",
            mount_point = mount_point,
            extra_configs = {
                f"fs.azure.sas.{new_df_name_str}.{external_storage}.blob.core.windows.net": dbutils.secrets.get(scope = sas_key_scope, key = sas_key_name)
            }
        )
    # Writing dataframes to external Azure storage as delta formatted
    if ex_df_name == "events":
        vars()[new_df_name].write.format("delta").partitionBy(partition_cols).option("delta.columnMapping.mode", "name").mode("overwrite")\
        .option("path", f"{mount_point}").saveAsTable(new_df_name_str)
    else:
        vars()[new_df_name].write.format("delta").option("delta.columnMapping.mode", "name").mode("overwrite")\
        .option("path", f"{mount_point}").saveAsTable(new_df_name_str)

items dimitems
Dataframes read on gold_layer_pipeline. Total (2198, 7) rows & columns loaded into dataframe for dimitems dataframe`s gold layer schema write
events factevents
Dataframes read on gold_layer_pipeline. Total (853640, 8) rows & columns loaded into dataframe for factevents dataframe`s gold layer schema write
event_view_item_by_year eventviewitembyyear
Dataframes read on gold_layer_pipeline. Total (6, 2) rows & columns loaded into dataframe for eventviewitembyyear dataframe`s gold layer schema write
item_view_numbers_by_year itemviewnumbersbyyear
Dataframes read on gold_layer_pipeline. Total (13093, 4) rows & columns loaded into dataframe for itemviewnumbersbyyear dataframe`s gold layer schema write
most_used_platform_by_year_rank mostusedplatformbyyearrank
Dataframes read on gold_layer_pipeline. Total (6, 3) rows & columns loaded into dataframe for mostusedplatformbyyearrank dataframe`s gold layer schema write


## Saving dataframes as managed table on hive metastore`s gold layer schema

In [0]:
for ex_df_name,new_df_name in naming_conversion_dict.items():
    print(ex_df_name,new_df_name)
    # Saving created views and data frames into gold layer storage as managed table by changing their names
    new_df_name_str = new_df_name
    vars()[new_df_name] = globals()[ex_df_name]
    # printing read file size for debug
    shape_df = (vars()[new_df_name].count(),len(vars()[new_df_name].columns))
    print('Dataframes read on gold_layer_pipeline. Total {} rows & columns loaded into dataframe for {} dataframe`s gold layer schema write'.format(shape_df,new_df_name_str))
    # writing dataframes by using function gathered from helper notebook
    write_to_managed_table(vars()[new_df_name],new_df_name_str,schema_name,catalog_name)

items dimitems
Dataframes read on gold_layer_pipeline. Total (2198, 7) rows & columns loaded into dataframe for dimitems dataframe`s gold layer schema write
Table exists on hive_metastore.gold_layer.dimitems_gold_layer_managed_table
Overwriting all transactions on managed table hive_metastore.gold_layer.dimitems_gold_layer_managed_table
events factevents
Dataframes read on gold_layer_pipeline. Total (853640, 8) rows & columns loaded into dataframe for factevents dataframe`s gold layer schema write
Table exists on hive_metastore.gold_layer.factevents_gold_layer_managed_table
Overwriting all transactions on managed table hive_metastore.gold_layer.factevents_gold_layer_managed_table
event_view_item_by_year eventviewitembyyear
Dataframes read on gold_layer_pipeline. Total (6, 2) rows & columns loaded into dataframe for eventviewitembyyear dataframe`s gold layer schema write
Table exists on hive_metastore.gold_layer.eventviewitembyyear_gold_layer_managed_table
Overwriting all transactions o

## 