# This Notebook is Made to Create Golden Layer Tables from Silver Layer Tables as Tables to be Used in Star Schema or Aggregated Tables

## Importing Packages and Config Files

In [0]:
import pandas as pd
import re
import os
import glob
import logging
import requests
import time
import datetime
import numpy as np
# import pyspark as spark
from pyspark.sql import DataFrame
from pyspark.sql import SQLContext
from pyspark.sql.functions import countDistinct,md5,concat,when,col
from pyspark.sql.types import IntegerType,BooleanType,DateType,StringType,VarcharType,LongType,DecimalType,FloatType,StructField,StructType
from pyspark.sql.functions import *
import pandas as pd
from pyspark.sql import Row
import dlt
from pyspark.sql import functions as F, Window
import csv 
from pyspark import SparkContext,SQLContext,SparkConf,StorageLevel

In [0]:
# Getting Variables and Dataframes from config notebook
%run /Workspace/Users/cihangiray.oner@gmail.com/congif.py

### Creating Day Parameter for General KPI`S calculation

In [0]:
day_parameter = dbutils.widgets.dropdown("day_parameter", "2", ["2", "8", "15", "31"])

day_parameter = dbutils.widgets.get("day_parameter")

### Reading Tables from Silver Layer

In [0]:
def check_if_table_exists(read_table_name,read_schema_name):
    """
    Checks if a table exists in the spark catalog.

    Args:
        table_name (str): The name of the table to check.

    Returns:
        bool: True if the table exists, False otherwise.
    """
    return spark.catalog.tableExists(f"hive_metastore.{read_schema_name}.{read_table_name}")


def read_table(dataframe_name,schema_name,table_name,columns_to_read,filter_value = "N/A",fill_value = "N/A"):
    """
    Reads table and assign it to a dataframe from databricks schemas.

    Args:
        schema_name (str): The name of the schema to read table from.
        table_name (str): The name of the table to read data from.
        columns_to_read (list): List containing column names from table to be read
        fill_value (str): String value to be used in null value conversion
        filter_value (str): Filter parameter for reading table with filtering by spesific values

    Returns:
        spark.DataFrame: DataFrame after reading table gets into DataFrame.
    """
    #check if table exists
    if check_if_table_exists(table_name,schema_name):
        dataframe_name = spark.table(f"hive_metastore.{schema_name}.{table_name}").select(columns_to_read).na.fill(f'{fill_value}')
    else:
        print(f"Read table not found hive_metastore.{schema_name}.{table_name}")
    return dataframe_name

In [0]:
for table_name in golden_layer_read_table_names_list:
    print(table_name)
    # Creating dataframes by create_dataframe function
    vars() [table_name] = read_table(table_name,second_layer_schema_name,f"{table_name}","*")

in_app_purchase_df_silver_layer_managed_table
login_df_silver_layer_managed_table
multiplayer_battle_df_silver_layer_managed_table
new_user_df_silver_layer_managed_table
session_started_df_silver_layer_managed_table
ship_transaction_df_silver_layer_managed_table
user_id_df_silver_layer_managed_table


### Renaming Dim Tables and Create & Rename Fact Table

In [0]:
golden_layer_table_names = []
for df_name in golden_layer_read_table_names_list:
    new_table_name = "".join(("d_", df_name)).replace("_df_silver_layer_managed_table", "")
    # Appending new table names into a list for further usage
    golden_layer_table_names.append(new_table_name)
    vars()[new_table_name] = globals()[df_name]

### Rename Columns of Tables

In [0]:
def rename_columns(dataframe_name,columns_to_rename):
    """
	    Renames the columns of a PySpark DataFrame based on the provided configuration. All Notebook names are defined in namespace of related config file section
	    e.g. F_MES_ACTIVITIES_COLUMN_RENAME:
                PLANT:PLANT>
                DESCRIPTION:OPERATION>
	    Args:
	        DataFrame (pyspark.sql.DataFrame): The DataFrame to rename columns.
	    Returns:
	        pyspark.sql.DataFrame: The DataFrame with renamed columns.
	    Example:
	        >>> dataframe = spark.createDataFrame([(1, "John"), (2, "Jane")], ["id", "name"])
	        >>> rename_columns_with_config_file(dataframe, config)
	        DataFrame[user_id: int, user_name: string]
        """
    counter_rename = 0
    while counter_rename < len(columns_to_rename)-1:
        # global Material_master_df
        dataframe_name = dataframe_name.withColumnRenamed((columns_to_rename[counter_rename]),(columns_to_rename[counter_rename+1]))
        counter_rename += 2
    return dataframe_name

In [0]:
for df_name,columns_to_rename in golden_layer_col_names.items():
    globals()[df_name] = rename_columns(globals()[df_name],columns_to_rename)

### Based on Data Model Creating Fact Table with Joining (SHIP_TRANSACTION_LOG,SESSION_STARTED,MULTIPLAYER_BATTLE_STARTED,IN_APP_PURCHASE_LOG_SERVER) Tables on "Join_Key" column)

In [0]:
f_multi_ships = d_session_started.join(d_multiplayer_battle,'Join_key',"left")\
                        .join(d_in_app_purchase,'Join_key', how="left")\
                        .join(d_ship_transaction,'Join_key', how="left")



golden_layer_table_names.append('f_multi_ships')

## Writing Dataframes into Hive Metastore Golden Layer Schema

In [0]:
def create_schema(third_layer_schema_name,location_name):
    """
    Checks if a schema exists in the spark catalog.

    Args:
        location_name (str): catalog name to save schema on
        third_layer_schema_name (str): The name of the schema to check.

    Returns:
        DataFrame: Empty DF creates schema on defined catalog if not exits
    """
    return spark.sql(f"CREATE SCHEMA IF NOT EXISTS  {location_name}.{third_layer_schema_name}")


def check_if_table_exists(third_layer_schema_name, table_name):
    """
    Checks if a table exists in the spark catalog.

    Args:
        table_name (str): The name of the table to check.

    Returns:
        bool: True if the table exists, False otherwise.
    """
    return spark.catalog.tableExists(f"hive_metastore.{third_layer_schema_name}.{table_name}_golden_layer_managed_table")

def write_to_managed_table(df, table_name, third_layer_schema_name, location_name,partition_cols = [], mode = "overwrite"):
    """
    Writes a DataFrame to a managed table in Delta Lake.

    If the table exists and mode is overwrite, it performs an overwrite operation.
    Otherwise, it either creates a new table or appends transactions to table based on the `mode` parameter.

    Args:
        df (pyspark.sql.DataFrame): The DataFrame to write to the table.
        table_name (str): The name of the target table.
        third_layer_schema_name (str): The schema name of target table.
        location_name (str): Catalog name to save schema on
        mode (str, optional): The write mode.
    """
    #create schema if not exists
    create_schema(third_layer_schema_name,location_name)
    # check if the table exists
    if check_if_table_exists(third_layer_schema_name, table_name):
        print(f"Table exists on hive_metastore.{third_layer_schema_name}.{table_name}_golden_layer_managed_table")
        if mode == "overwrite":
            print(f"Overwriting all transactions on managed table hive_metastore.{third_layer_schema_name}.{table_name}_golden_layer_managed_table")
            df.write.format("delta").partitionBy(partition_cols).option("overwriteSchema", "true").option("delta.columnMapping.mode", "name").mode(mode).saveAsTable(f"hive_metastore.{third_layer_schema_name}.{table_name}_golden_layer_managed_table")
        else:
            print(f"Appending all transactions on managed table hive_metastore.{third_layer_schema_name}.{table_name}_golden_layer_managed_table")
            df.write.format("delta").partitionBy(partition_cols).option("overwriteSchema", "true").option("delta.columnMapping.mode", "name").mode(mode).saveAsTable(f"hive_metastore.{third_layer_schema_name}.{table_name}_golden_layer_managed_table")
    else:
        print(f"Writing to managed table hive_metastore.{third_layer_schema_name}.{table_name}_golden_layer_managed_table")
        df.write.format("delta").partitionBy(partition_cols).option("overwriteSchema", "true").option("delta.columnMapping.mode", "name").saveAsTable(f"hive_metastore.{third_layer_schema_name}.{table_name}_golden_layer_managed_table")

In [0]:
for df_name in golden_layer_table_names:
    print(df_name)
    # Writing dataframes to 3rd layer on defined schema and table (Except Login Table)
    if df_name != "d_login" and df_name != "f_multi_ships":
        write_to_managed_table(vars()[df_name], df_name,  third_layer_schema_name, location_name)
    # Partitioning fact table on most used date column for data pruning in case direct query setup 
    elif df_name == "f_multi_ships":
        write_to_managed_table(vars()[df_name], df_name,  third_layer_schema_name, location_name,["SESSION_EVENT_TIMESTAMP"])

d_in_app_purchase
Table exists on hive_metastore.third_layer_pipeline.d_in_app_purchase_golden_layer_managed_table
Overwriting all transactions on managed table hive_metastore.third_layer_pipeline.d_in_app_purchase_golden_layer_managed_table
d_login
d_multiplayer_battle
Table exists on hive_metastore.third_layer_pipeline.d_multiplayer_battle_golden_layer_managed_table
Overwriting all transactions on managed table hive_metastore.third_layer_pipeline.d_multiplayer_battle_golden_layer_managed_table
d_new_user
Table exists on hive_metastore.third_layer_pipeline.d_new_user_golden_layer_managed_table
Overwriting all transactions on managed table hive_metastore.third_layer_pipeline.d_new_user_golden_layer_managed_table
d_session_started
Table exists on hive_metastore.third_layer_pipeline.d_session_started_golden_layer_managed_table
Overwriting all transactions on managed table hive_metastore.third_layer_pipeline.d_session_started_golden_layer_managed_table
d_ship_transaction
Table exists on h

## Requested Analytical Queries

###  General KPIs:
 - metrics: Active Users, New Users, Revenue, Spenders(Buyers), ARPU, ARPPU, 1 Day 
Retention rate, 3 Day Retention rate, 7 Day Retention rate, 7 day Conversion Rate

- period: Daily, Monthly, Weekly

In [0]:
# Active Users (Prefiltering f_multi_ships dataframe by period defined on day_parameter)

f_multi_ships_a_users = f_multi_ships.filter(F.col('SESSION_EVENT_TIMESTAMP') > F.expr(f'current_date() - interval {day_parameter} days'))
kpi_active_user_count = f_multi_ships_a_users.agg(countDistinct(col("SESSION_USER_ID")).alias("Active_User_Number"))

In [0]:
# New Users (Prefiltering d_new_users dataframe by period defined on day_parameter)

d_new_user_n_users = d_new_user.filter(F.col('USER_EVENT_TIMESTAMP') > F.expr(f'current_date() - interval {day_parameter} days'))
kpi_new_user_count = d_new_user_n_users.agg(countDistinct(col("USER_USER_ID")).alias("New_User_Number"))

In [0]:
# Revenue Sum (Prefiltering f_multi_ships dataframe by period defined on day_parameter)

f_multi_ships_a_users = f_multi_ships.filter(F.col('SESSION_EVENT_TIMESTAMP') > F.expr(f'current_date() - interval {day_parameter} days'))
kpi_revenue_sum = f_multi_ships_a_users.agg(sum(col("IN_APP_USD_COST")).cast('decimal(12,2)').alias("Total_Revenue"))  

In [0]:
# Spender Users (This KPI will be saved and calculated for all times)

kpi_spenders_count = d_user_id.filter(d_user_id.USER_IS_SPENDER == True).agg(count(col("USER_IS_SPENDER")).alias("Spender_User_Number"))

In [0]:
# Arpu (Time period of this KPI will be calculated by day_parameter)

kpi_arpu = [str((kpi_revenue_sum.first()['Total_Revenue'])/kpi_active_user_count.first()['Active_User_Number'])]
rdd = sc.parallelize(kpi_arpu)
rdd = rdd.map(lambda x:[x])
schema = StructType([StructField("Arpu_Value", StringType(), True)])
kpi_arpu_df = spark.createDataFrame(rdd,schema)

In [0]:
# Arrpu (Time period of this KPI will be calculated by day_parameter)

# kpi_arrpu = [str((kpi_revenue_sum.first()['Total_Revenue'])/kpi_spenders_count.first()['Spender_User_Number'])]
kpi_arrpu = [str((kpi_revenue_sum.first()['Total_Revenue'])/1)]
rdd = sc.parallelize(kpi_arrpu)
rdd = rdd.map(lambda x:[x])
schema = StructType([StructField("Arpu_Value", StringType(), True)])
kpi_arrpu_df = spark.createDataFrame(rdd,schema)

In [0]:
# 1 DAY RETENTION RATE (UNIQUE USER_ID Counts from fact table who plays a multi game  (Meaning active users in last n day session openers) // New User Number) (time period has taken back from prefiltered dataframe named as f_multi_ships_a_users  + 1 day)

one_day_active_session_user = f_multi_ships_a_users.withColumn('maxdate',F.max('MULTIPLAYER_EVENT_TIMESTAMP').over(Window.orderBy(F.lit(1)))).filter('MULTIPLAYER_EVENT_TIMESTAMP >= maxdate - interval 1 days').drop('maxdate')
one_day_active_session_user = one_day_active_session_user.agg(countDistinct(col("MULTIPLAYER_USER_ID")).alias("Last_1_Day_Game_Players"))                        
kpi_one_day_ret_rate = [str((one_day_active_session_user.first()['Last_1_Day_Game_Players'])/(kpi_new_user_count.first()['New_User_Number']))]
rdd = sc.parallelize(kpi_one_day_ret_rate)
rdd = rdd.map(lambda x:[x])
schema = StructType([StructField("kpi_one_day_ret_rate", StringType(), True)])
kpi_one_day_ret_rate = spark.createDataFrame(rdd,schema)

In [0]:
#3 DAY RETENTION RATE (UNIQUE USER_ID Counts from fact table who plays a multi game (Meaning active users in last n day session openers) // New User Number) (time period has taken back from prefiltered dataframe named as f_multi_ships_a_users  + 3 day)

three_day_active_session_user = f_multi_ships_a_users.withColumn('maxdate',F.max('MULTIPLAYER_EVENT_TIMESTAMP').over(Window.orderBy(F.lit(1)))).filter('MULTIPLAYER_EVENT_TIMESTAMP >= maxdate - interval 3 days').drop('maxdate')
three_day_active_session_user = three_day_active_session_user.agg(countDistinct(col("MULTIPLAYER_USER_ID")).alias("Last_3_Days_Game_Players"))                         
kpi_three_day_ret_rate = [str((three_day_active_session_user.first()['Last_3_Days_Game_Players'])/(kpi_new_user_count.first()['New_User_Number']))]
rdd = sc.parallelize(kpi_three_day_ret_rate)
rdd = rdd.map(lambda x:[x])
schema = StructType([StructField("kpi_three_day_ret_rate", StringType(), True)])
kpi_three_day_ret_rate = spark.createDataFrame(rdd,schema)

In [0]:
# 7 DAY RETENTION RATE (UNIQUE USER_ID Counts from fact table who plays a multi game (Meaning active users in last n day session openers) // New User Number) (time period has taken back from prefiltered dataframe named as f_multi_ships_a_users  + 7 day)

seven_day_active_session_user = f_multi_ships_a_users.withColumn('maxdate',F.max('MULTIPLAYER_EVENT_TIMESTAMP').over(Window.orderBy(F.lit(1)))).filter('MULTIPLAYER_EVENT_TIMESTAMP >= maxdate - interval 7 days').drop('maxdate')
seven_day_active_session_user = seven_day_active_session_user.agg(countDistinct(col("MULTIPLAYER_USER_ID")).alias("Last_7_Days_Game_Players"))                   
kpi_seven_day_ret_rate = [str((seven_day_active_session_user.first()['Last_7_Days_Game_Players'])/(kpi_new_user_count.first()['New_User_Number']))]
rdd = sc.parallelize(kpi_seven_day_ret_rate)
rdd = rdd.map(lambda x:[x])
schema = StructType([StructField("kpi_seven_day_ret_rate", StringType(), True)])
kpi_seven_day_ret_rate = spark.createDataFrame(rdd,schema)

In [0]:
# 7 DAY CONVERSION RATE (UNIQUE User ID Counts from fact table who purchases an item (Meaning Buyers in last n day ) // New User Number) (time period has taken back from prefiltered dataframe named as f_multi_ships_a_users  + 7 day)

seven_day_buyer_user_count = f_multi_ships_a_users.withColumn('maxdate',F.max('IN_APP_EVENT_TIMESTAMP').over(Window.orderBy(F.lit(1)))).filter('IN_APP_EVENT_TIMESTAMP >= maxdate - interval 7 days').drop('maxdate')
seven_day_buyer_user_count = seven_day_buyer_user_count.agg(countDistinct(col("IN_APP_USER_ID")).alias("Last_7_Days_item_buyers"))
kpi_seven_day_conv_rate = [str((seven_day_buyer_user_count.first()['Last_7_Days_item_buyers'])/(kpi_new_user_count.first()['New_User_Number']))]
rdd = sc.parallelize(kpi_seven_day_conv_rate)
rdd = rdd.map(lambda x:[x]) 
schema = StructType([StructField("kpi_seven_day_conv_rate", StringType(), True)])
kpi_seven_day_conv_rate = spark.createDataFrame(rdd,schema)

### Ships saturation:
 - ships owned by a every user every day
 - daily ships popularity
 (check comments for SHIP_TRANSACTION_LOG table for some additional details)


In [0]:
# ships owned by a every user every day

kpi_ships_popularity = f_multi_ships.filter(F.col('SHIP_TRANS_SHIP_NAME') != 'null').groupBy("SESSION_USER_ID","SHIP_TRANS_EVENT_TIMESTAMP","SHIP_TRANS_SHIP_NAME").count()

In [0]:
##### daily ship popularity (Purchased Ships Ranks -  Daily purchased ship name by amount rank)

kpi_ships_daily_popularity_purchased = f_multi_ships.filter(F.col('SHIP_TRANS_SHIP_NAME') != 'null').filter(F.col('SHIP_TRANS_SC_AMOUNT') != 0).select("SHIP_TRANS_EVENT_TIMESTAMP","SHIP_TRANS_SHIP_NAME","SHIP_TRANS_SC_AMOUNT").groupBy("SHIP_TRANS_EVENT_TIMESTAMP","SHIP_TRANS_SHIP_NAME").count()

window_item = Window.partitionBy(kpi_ships_daily_popularity_purchased["SHIP_TRANS_EVENT_TIMESTAMP"]).orderBy(desc("count"))

kpi_ships_daily_popularity_purchased = kpi_ships_daily_popularity_purchased.withColumn("purchase_rank",dense_rank().over(window_item))

##### daily ship popularity (Sold Ships Ranks -  Daily sold ship name by amount rank)

kpi_ships_daily_popularity_sold = f_multi_ships.filter(F.col('SHIP_TRANS_SHIP_NAME') != 'null').filter(F.col('SHIP_TRANS_SC_AMOUNT') == 0).select("SHIP_TRANS_EVENT_TIMESTAMP","SHIP_TRANS_SHIP_NAME","SHIP_TRANS_SC_AMOUNT").groupBy("SHIP_TRANS_EVENT_TIMESTAMP","SHIP_TRANS_SHIP_NAME").count()

window_item = Window.partitionBy(kpi_ships_daily_popularity_sold["SHIP_TRANS_EVENT_TIMESTAMP"]).orderBy(desc("count"))

kpi_ships_daily_popularity_sold = kpi_ships_daily_popularity_sold.withColumn("purchase_rank",dense_rank().over(window_item))


### User transactions overview:
 - amount of battles, logins, days since registration before first purchase
 - daily/weekly/monthly revenue per user

In [0]:
# amount of battles, logins, days since registration before first purchase

cond = [(f_multi_ships.IN_APP_USER_ID == d_new_user.USER_USER_ID)]

new_user_fact_join = f_multi_ships.join(d_new_user,on = cond, how="inner")

window_item = Window.partitionBy(new_user_fact_join["USER_USER_ID"])

new_user_fact_join = new_user_fact_join.select("USER_USER_ID","IN_APP_USER_ID","USER_EVENT_TIMESTAMP","IN_APP_EVENT_TIMESTAMP").groupBy("USER_USER_ID","IN_APP_USER_ID","USER_EVENT_TIMESTAMP","IN_APP_EVENT_TIMESTAMP").agg(min(col("IN_APP_EVENT_TIMESTAMP")).over(window_item).alias("First_Purchase_Date"))

new_user_fact_join = new_user_fact_join.dropDuplicates(["USER_USER_ID"]).select("USER_USER_ID","USER_EVENT_TIMESTAMP","First_Purchase_Date").withColumnRenamed("USER_EVENT_TIMESTAMP","Registration_Date")

cond = [(f_multi_ships.SESSION_USER_ID == new_user_fact_join.USER_USER_ID)]

f_multi_ships_reg_purchase_date = f_multi_ships.join(new_user_fact_join,on = cond, how="inner").select("USER_USER_ID","SESSION_SESSION_ID","First_Purchase_Date","Registration_Date","MULTIPLAYER_BATTLE_ID","SESSION_EVENT_TIMESTAMP")

f_multi_ships_reg_purchase_date = f_multi_ships_reg_purchase_date.where(f_multi_ships_reg_purchase_date.SESSION_EVENT_TIMESTAMP < f_multi_ships_reg_purchase_date.First_Purchase_Date)

kpi_login_amounts_since_reg =  f_multi_ships_reg_purchase_date.select("USER_USER_ID","SESSION_SESSION_ID").groupBy("USER_USER_ID").count()

kpi_battle_amounts_since_reg =  f_multi_ships_reg_purchase_date.filter(F.isnotnull("MULTIPLAYER_BATTLE_ID")).select("USER_USER_ID","MULTIPLAYER_BATTLE_ID").groupBy("USER_USER_ID").count()

timeDiff = ((unix_timestamp('First_Purchase_Date', "yyyy-MM-dd") - unix_timestamp('Registration_Date', "yyyy-MM-dd"))/86400)

kpi_days_since_reg = f_multi_ships_reg_purchase_date.withColumn("Day_Amount_From_Registration", timeDiff).select("USER_USER_ID","Day_Amount_From_Registration").dropDuplicates()


In [0]:
# daily/weekly/monthly revenue per user

kpi_f_multi_ships_1_day = f_multi_ships.filter(F.col('SESSION_EVENT_TIMESTAMP') > F.expr(f'current_date() - interval 2 days')).groupBy("SESSION_USER_ID").agg(sum(col("IN_APP_USD_COST")).alias("Revenue_1_day")).orderBy(desc("Revenue_1_day"))

kpi_f_multi_ships_7_day = f_multi_ships.filter(F.col('SESSION_EVENT_TIMESTAMP') > F.expr(f'current_date() - interval 8 days')).groupBy("SESSION_USER_ID").agg(sum(col("IN_APP_USD_COST")).alias("Revenue_7_day")).orderBy(desc("Revenue_7_day"))

kpi_f_multi_ships_30_day = f_multi_ships.filter(F.col('SESSION_EVENT_TIMESTAMP') > F.expr(f'current_date() - interval 31 days')).groupBy("SESSION_USER_ID").agg(sum(col("IN_APP_USD_COST")).alias("Revenue_30_day")).orderBy(desc("Revenue_30_day"))

### Battle analysis
 - new users participation in battles on a 1/3/7/14 day since registration
 - battle participation by active users


In [0]:
# 1 day battle participation

f_multi_ships_1 = f_multi_ships.filter(F.col('SESSION_EVENT_TIMESTAMP') > F.expr(f'current_date() - interval 2 days'))

cond = [(f_multi_ships_1.MULTIPLAYER_USER_ID == d_new_user.USER_USER_ID)]

new_user_fact_join_battle_1 = f_multi_ships_1.join(d_new_user,on = cond, how="inner")

window_item = Window.partitionBy(new_user_fact_join_battle_1["USER_USER_ID"])

new_user_fact_join_battle_1 = new_user_fact_join_battle_1.select("USER_USER_ID","MULTIPLAYER_USER_ID","USER_EVENT_TIMESTAMP","MULTIPLAYER_EVENT_TIMESTAMP").groupBy("USER_USER_ID","MULTIPLAYER_USER_ID","USER_EVENT_TIMESTAMP","MULTIPLAYER_EVENT_TIMESTAMP").agg(count(col("MULTIPLAYER_USER_ID")).over(window_item).alias("Battle_Participation_Number"))

kpi_new_user_fact_join_battle_1 = new_user_fact_join_battle_1.select("USER_USER_ID","Battle_Participation_Number").dropDuplicates()

# 3 day battle participation

f_multi_ships_3 = f_multi_ships.filter(F.col('SESSION_EVENT_TIMESTAMP') > F.expr(f'current_date() - interval 4 days'))

cond = [(f_multi_ships_3.MULTIPLAYER_USER_ID == d_new_user.USER_USER_ID)]

new_user_fact_join_battle_3 = f_multi_ships_3.join(d_new_user,on = cond, how="inner")

window_item = Window.partitionBy(new_user_fact_join_battle_3["USER_USER_ID"])

new_user_fact_join_battle_3 = new_user_fact_join_battle_3.select("USER_USER_ID","MULTIPLAYER_USER_ID","USER_EVENT_TIMESTAMP","MULTIPLAYER_EVENT_TIMESTAMP").groupBy("USER_USER_ID","MULTIPLAYER_USER_ID","USER_EVENT_TIMESTAMP","MULTIPLAYER_EVENT_TIMESTAMP").agg(count(col("MULTIPLAYER_USER_ID")).over(window_item).alias("Battle_Participation_Number"))

kpi_new_user_fact_join_battle_3 = new_user_fact_join_battle_3.select("USER_USER_ID","Battle_Participation_Number").dropDuplicates()


# 7 day battle participation

f_multi_ships_7 = f_multi_ships.filter(F.col('SESSION_EVENT_TIMESTAMP') > F.expr(f'current_date() - interval 8 days'))

cond = [(f_multi_ships_7.MULTIPLAYER_USER_ID == d_new_user.USER_USER_ID)]

new_user_fact_join_battle_7 = f_multi_ships_7.join(d_new_user,on = cond, how="inner")

window_item = Window.partitionBy(new_user_fact_join_battle_7["USER_USER_ID"])

new_user_fact_join_battle_7 = new_user_fact_join_battle_7.select("USER_USER_ID","MULTIPLAYER_USER_ID","USER_EVENT_TIMESTAMP","MULTIPLAYER_EVENT_TIMESTAMP").groupBy("USER_USER_ID","MULTIPLAYER_USER_ID","USER_EVENT_TIMESTAMP","MULTIPLAYER_EVENT_TIMESTAMP").agg(count(col("MULTIPLAYER_USER_ID")).over(window_item).alias("Battle_Participation_Number"))

kpi_new_user_fact_join_battle_7 = new_user_fact_join_battle_7.select("USER_USER_ID","Battle_Participation_Number").dropDuplicates()

# 14 day battle participation

f_multi_ships_14 = f_multi_ships.filter(F.col('SESSION_EVENT_TIMESTAMP') > F.expr(f'current_date() - interval 15 days'))

cond = [(f_multi_ships_14.MULTIPLAYER_USER_ID == d_new_user.USER_USER_ID)]

new_user_fact_join_battle_14 = f_multi_ships_14.join(d_new_user,on = cond, how="inner")

window_item = Window.partitionBy(new_user_fact_join_battle_14["USER_USER_ID"])

new_user_fact_join_battle_14 = new_user_fact_join_battle_14.select("USER_USER_ID","MULTIPLAYER_USER_ID","USER_EVENT_TIMESTAMP","MULTIPLAYER_EVENT_TIMESTAMP").groupBy("USER_USER_ID","MULTIPLAYER_USER_ID","USER_EVENT_TIMESTAMP","MULTIPLAYER_EVENT_TIMESTAMP").agg(count(col("MULTIPLAYER_USER_ID")).over(window_item).alias("Battle_Participation_Number"))

kpi_new_user_fact_join_battle_14 = new_user_fact_join_battle_14.select("USER_USER_ID","Battle_Participation_Number").dropDuplicates()

In [0]:
# battle participation by active users

window_item = Window.partitionBy(f_multi_ships["SESSION_USER_ID"])

f_multi_ships = f_multi_ships.select("SESSION_USER_ID","MULTIPLAYER_USER_ID","SESSION_EVENT_TIMESTAMP","MULTIPLAYER_EVENT_TIMESTAMP").groupBy("SESSION_USER_ID","MULTIPLAYER_USER_ID","SESSION_EVENT_TIMESTAMP","MULTIPLAYER_EVENT_TIMESTAMP").agg(count(col("MULTIPLAYER_USER_ID")).over(window_item).alias("Battle_Participation_Number"))

kpi_active_user_join_battle = f_multi_ships.select("SESSION_USER_ID","Battle_Participation_Number").dropDuplicates()

### Writing KPIs tables to third layer schema

In [0]:
def list_dataframes():
    """
    Lists of dataframes names as list.

    Returns:
        list: list consist of names.
    """
    df_names = [k for (k, v) in globals().items() if isinstance(v, DataFrame)]

    return df_names

for df_name in list_dataframes():
    if "kpi" in df_name:
        print(df_name)
    # Writing kpi dataframes to 3rd layer on defined schema and table
        write_to_managed_table(globals() [df_name], df_name,  third_layer_schema_name, location_name)

kpi_active_user_count
Table exists on hive_metastore.third_layer_pipeline.kpi_active_user_count_golden_layer_managed_table
Overwriting all transactions on managed table hive_metastore.third_layer_pipeline.kpi_active_user_count_golden_layer_managed_table
kpi_revenue_sum
Table exists on hive_metastore.third_layer_pipeline.kpi_revenue_sum_golden_layer_managed_table
Overwriting all transactions on managed table hive_metastore.third_layer_pipeline.kpi_revenue_sum_golden_layer_managed_table
kpi_arpu_df
Table exists on hive_metastore.third_layer_pipeline.kpi_arpu_df_golden_layer_managed_table
Overwriting all transactions on managed table hive_metastore.third_layer_pipeline.kpi_arpu_df_golden_layer_managed_table
kpi_new_user_count
Table exists on hive_metastore.third_layer_pipeline.kpi_new_user_count_golden_layer_managed_table
Overwriting all transactions on managed table hive_metastore.third_layer_pipeline.kpi_new_user_count_golden_layer_managed_table
kpi_spenders_count
Table exists on hive_m

### KPIS can be addded to workflows as Live Table (Materialized View) some examples can be found below, however this method is not being used for this task`s solution

In [0]:
# Active Users

# @dlt.table
# def f_multi_ships_fact_table_read():
#   return spark.table("hive_metastore.third_layer_pipeline.f_multi_ships_golden_layer_managed_table")


# @dlt.table(
#   comment="A table containing active users count."
# )
# def active_users_count():
#   return (
#     dlt.read('f_multi_ships_fact_table_read')
#     .agg(countDistinct(col("SESSION_USER_ID")).alias("Active_User_Number"))
#   )





In [0]:
# New Users

# @dlt.table
# def d_new_users_table_read():
#   return spark.table("hive_metastore.third_layer_pipeline.d_new_user_golden_layer_managed_table")

# @dlt.table(
#   comment="A table containing new users count."
# )
# def new_users_count():
#   return (
#     dlt.read('d_new_users_table_read')
#     .agg(countDistinct(col("USER_USER_ID")).alias("New_User_Number"))
#   )