In [None]:
from pyspark.sql.functions import col, to_date, split, add_months

In [None]:
# Config
AWS_ACCESS_KEY_ID= "123"
AWS_SECRET_ACCESS_KEY= "123"
aws_bucket_name = "s3a://abc/"
aws_container_name = "/bronze/csv/"
aws_file_name = "insurance_data.csv"

# Set AWS Access
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
spark.sparkContext._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)

In [None]:

# Read Bronze csv
df = spark.read.format("csv").option("header", "true").load(f"{aws_bucket_name}{aws_container_name}{aws_file_name}")

# Delta table output path
aws_container_name = '/bronze/delta/'

# Write Delta table
df.write.format('delta').mode('overwrite').save(f"{aws_bucket_name}{aws_container_name}")

In [None]:
# Load delta file
df = spark.read.load(f"{aws_bucket_name}{aws_container_name}")

# Check schema
df.printSchema()

# Display data
df.show(1)

In [None]:
# Split ​product_identifier
df2 = df.withColumn('country', split(col('product_identifier'), '_')[0]) \
           .withColumn('vertical', split(col('product_identifier'), '_')[1]) \
           .withColumn('vehicle', split(col('product_identifier'), '_')[3])

# Cast correct data types - leave string as default
df3 = df2.withColumn('user_id', col('user_id').cast('int')) \
       .withColumn('inception_policy_id', col('inception_policy_id').cast('int')) \
       .withColumn('adjustment_policy_id', col('adjustment_policy_id').cast('int')) \
       .withColumn('change_number', col('change_number').cast('int')) \
       .withColumn('effective_start_date', to_date(col('effective_start_date'), 'dd/MM/yyyy')) \
       .withColumn('effective_end_date', to_date(col('effective_end_date'), 'dd/MM/yyyy')) \
       .withColumn('ftp_start_date', to_date(col('ftp_start_date'), 'dd/MM/yyyy').cast('date')) \
       .withColumn('ftp_valid_until', to_date(col('ftp_valid_until'), 'dd/MM/yyyy').cast('date')) \
       .withColumn('flex_policy_earned_hours', col('flex_policy_earned_hours').cast('int')) \
       .withColumn('transaction_gwp', col('transaction_gwp').cast('float')) \
       .withColumn('policy_total_gwp', col('policy_total_gwp').cast('float'))

# Rename columns
df4 = df3.withColumnRenamed("user_id", "UserID") \
       .withColumnRenamed("product_identifier", "ProductID") \
        .withColumnRenamed("frequency_type", "FrequencyType") \
       .withColumnRenamed("inception_policy_id", "InceptionPolicyID") \
       .withColumnRenamed("adjustment_policy_id", "AdjustmentPolicyID") \
       .withColumnRenamed("change_number", "PolicyChangeNumber") \
       .withColumnRenamed("transaction_type", "TransactionType") \
       .withColumnRenamed("effective_start_date", "EffectiveStartDate") \
       .withColumnRenamed("effective_end_date", "EffectiveEndDate") \
       .withColumnRenamed("ftp_start_date", "FTPStartDate") \
       .withColumnRenamed("ftp_valid_until", "FTPValidUnitl") \
       .withColumnRenamed("flex_policy_earned_hours", "FlexPolicyHoursEarned") \
       .withColumnRenamed("transaction_gwp", "GrossWrittenPremium") \
       .withColumnRenamed("policy_total_gwp", "TotalGrossWrittenPremium") \
       .withColumnRenamed("country", "Country") \
       .withColumnRenamed("vertical", "Vertical") \
       .withColumnRenamed("vehicle", "Vehicle")

# Check schema
df4.printSchema()

In [None]:
# Delta table output path
aws_container_name = 'silver/delta/'

# Write Silver Delta table
df4.write.format('delta').mode('overwrite').save(f"{aws_bucket_name}{aws_container_name}")

In [None]:
%sql
-- Set variables
SET delta_source.var = 's3://abc/silver/delta/';


In [None]:
%sql
-- Create Table
CREATE TABLE IF NOT EXISTS delivery_transactions USING DELTA LOCATION ${delta_source.var};

In [None]:
%sql
-- Number of customers that purchased at least 1 ​Fixed​ policy
SELECT
  COUNT(DISTINCT UserID) AS FixedPolicyCustomerCount
FROM
  delivery_transactions
WHERE
  FrequencyType IN ('short', 'annual');

In [None]:
%sql
-- Average hourly premium by ​product_identifier​ for all the ​Flex products
SELECT
  ProductID,
  AVG(GrossWrittenPremium / FlexPolicyHoursEarned) AS AvgHourlyPremium
FROM
  delivery_transactions
WHERE
  FrequencyType = 'flex'
GROUP BY
  ProductID;

In [None]:
%sql
-- Rolling monthly GWP generated by each ​product_identifier
SELECT
  ProductID,
  EffectiveStartDate,
  SUM(GrossWrittenPremium) OVER (
    PARTITION BY ProductID
    ORDER BY
      EffectiveStartDate RANGE BETWEEN INTERVAL 1 MONTH PRECEDING
      AND CURRENT ROW
  ) AS RollingMonthlyGWP
FROM
  delivery_transactions

In [None]:
%sql
-- Policies of all the customers that purchased a ​Fixed​ product and ​Cancelled​ grouped by year and month
SELECT
  UserID ProductID,
  InceptionPolicyID,
  YEAR(EffectiveStartDate) AS Year,
  MONTH(EffectiveStartDate) AS Month,
  PolicyChangeNumber,
  TransactionType,
  GrossWrittenPremium
FROM
  delivery_transactions
WHERE
  InceptionPolicyID IN (
    SELECT
      InceptionPolicyID
    FROM
      delivery_transactions
    WHERE
      TransactionType = 'Cancellation'
  );

In [None]:
# Spark SQL has no recursive CTEs so utilise Pyspark

df = spark.read.load(f"{aws_bucket_name}{aws_container_name}")

df = df.withColumn("EffectiveStartDate", col("EffectiveStartDate").cast("date"))

min_max_dates = df.selectExpr("min(EffectiveStartDate) as min_date", "max(EffectiveStartDate) as max_date") \
                  .collect()[0]

min_date = min_max_dates["min_date"]
max_date = min_max_dates["max_date"]
date = max_date
date_list = []

while date > min_date:
    df.select(add_months(df.EffectiveStartDate, 1).alias('next_month')).collect()

# Cross Join UserIDs
# Window Function First occurance of UserID
# 
