In [0]:
%python
# Set up and connect to Azure Data Lake Store Gen2

access_key = dbutils.widgets.get("access_key")
storage_account = dbutils.widgets.get("storage_account")
input_container_name = dbutils.widgets.get("input_container_name")
devices_input_path = dbutils.widgets.get("devices_input_path")
revenue_input_path = dbutils.widgets.get("revenue_input_path")
crm_input_path = dbutils.widgets.get("crm_input_path")

#contruct the connection string and the filepath based on the input parameters
spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", access_key)

file_path_crm = f"abfss://{input_container_name}@{storage_account}.dfs.core.windows.net/{crm_input_path}"
file_path_devices = f"abfss://{input_container_name}@{storage_account}.dfs.core.windows.net/{devices_input_path}"
file_path_revenue = f"abfss://{input_container_name}@{storage_account}.dfs.core.windows.net/{revenue_input_path}"

# Read the data from Azure Data Lake Store Gen2 and convert to pandas dataframe from Spark
crm_df = spark.read.format("csv").option("header", "true").load(file_path_crm)
devices_df = spark.read.format("csv").option("header", "true").load(file_path_devices)
revenue_df = spark.read.format("csv").option("header", "true").load(file_path_revenue)

In [0]:
crm_df.printSchema()
devices_df.printSchema()
revenue_df.printSchema()

root
 |-- msisdn: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- year_of_birth: string (nullable = true)
 |-- system_status: string (nullable = true)
 |-- mobile_type: string (nullable = true)
 |-- value_segment: string (nullable = true)

root
 |-- msisdn: string (nullable = true)
 |-- imei_tac: string (nullable = true)
 |-- brand_name: string (nullable = true)
 |-- model_name: string (nullable = true)
 |-- os_name: string (nullable = true)
 |-- os_vendor: string (nullable = true)

root
 |-- msisdn: string (nullable = true)
 |-- week_number: string (nullable = true)
 |-- revenue_usd: string (nullable = true)



In [0]:
#update the datatypes of the columns with mismatched data types
from pyspark.sql.functions import col, lower, when

crm_df =  crm_df.withColumn("year_of_birth", col("year_of_birth").cast("int"))
crm_df = crm_df.withColumn("gender", lower(col("gender")))
revenue_df = revenue_df.withColumn("revenue_usd", col("revenue_usd").cast("double"))
revenue_df = revenue_df.withColumn("week_number", col("week_number").cast("int"))

In [0]:
#replace null genders with not available tag
crm_df = crm_df.fillna({"gender":"not available"})

In [0]:
#clean the crm_df of remaining null values and any duplicates.

crm_df = crm_df.dropDuplicates()
crm_df = crm_df.dropna()
# crm_df = crm_df.filter(col("gender").isin(["male", "female", "not available"]))
# display(crm_df.groupBy("gender").count())

In [0]:
#drop duplicate rows and missing values from devices and revenue
devices_df = devices_df.dropDuplicates()
devices_df = devices_df.dropna()

revenue_df = revenue_df.dropDuplicates()
revenue_df = revenue_df.dropna()


In [0]:
#sum the revenue by customer and count the weeks occurences before segregating them into segments

revenue_by_cus = revenue_df.groupBy("msisdn").agg({"revenue_usd":"sum", "week_number":"count"}).withColumnRenamed("sum(revenue_usd)","total_revenue").withColumnRenamed("count(week_number)","weeks_count")

revenue_by_cus = revenue_by_cus.withColumn("revenue_segment", 
                                            when(col("total_revenue") >= 150,"High Revenue")
                                            .when(col("total_revenue") >= 50, "Medium Revenue")
                                            .when(col("total_revenue") >0, "Low Revenue")
                                            .otherwise("No Revenue"))


revenue_by_cus = revenue_by_cus.withColumn("iot_usage_segment", 
                                            when(col("weeks_count") >= 10,"Heavy Usage")
                                            .when(col("weeks_count") >= 6, "Moderate Usage")
                                            .when(col("weeks_count") >0, "Minimal Usage"))

In [0]:
display(crm_df.describe())

summary,msisdn,gender,year_of_birth,system_status,mobile_type,value_segment,total_revenue,weeks_count,revenue_segment,iot_usage_segment
count,122021,122021,122021.0,122021,122021,122021,122021.0,122021.0,122021,122021
mean,,,1979.38641709214,,,,142.4900290319377,12.260709222183069,,
stddev,,,11.260475298477388,,,,40.68470932301853,2.885685097298892,,
min,00000b6bff59445804c19425bf61971a,.,1900.0,ACTIVE,Postpaid,Tier_3,2.881891544877948,1.0,High Revenue,Heavy Usage
max,ffffeefeca215c9f82deedaa6d66f683,u,2017.0,SUSPEND,Prepaid,Tier_3,435.62988544386656,14.0,Medium Revenue,Moderate Usage


In [0]:
#join the revenue dataset with 
crm_df = crm_df.join(revenue_by_cus, "msisdn")

In [0]:
#write the curated crm dataset as a delta table to azure datalake
output_container_name = dbutils.widgets.get("output_container_name")
output_path = dbutils.widgets.get("output_path")

crm_df.write.format("delta").mode("overwrite").save(f"abfss://{output_container_name}@{storage_account}.dfs.core.windows.net/{output_path}")