In [0]:
# Configuring access to data lake
configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": "77a60835-4d15-4fbd-b8c2-f1635c31ea7b",
          "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="akv-secret-scope-kpmg",key="adls2-secret"),
          "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/d4e8a2a5-0301-4a81-9187-ecaaec70bd74/oauth2/token"}

# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
  source = "abfss://kpmgdata@sadatakpmg.dfs.core.windows.net/",
  mount_point = "/mnt/kpmg",
  extra_configs = configs)

In [0]:
dbutils.fs.ls("/mnt/kpmg/")

In [0]:
import pandas as pd
from pyspark.sql.functions import *

file_path = 'dbfs:/mnt/kpmg/decase3.xlsx'
#xls = pd.ExcelFile(file_path)

customersDF = spark.read.format("com.crealytics.spark.excel") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("dataAddress", "'Customers'!A1") \
    .load(file_path)


customersDF = customersDF.select(col("customer_name"), col("customer_id"), col("gender"),to_date(col("customer_dob"),"yyyy-MM-dd").alias("date_of_birth"))


transactionsDF = spark.read.format("com.crealytics.spark.excel") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("dataAddress", "'Transactions'!A1") \
    .load(file_path)

transactionsDF = transactionsDF.select(col("transaction_id"), col("customer_id"), col("product_id"),col("store_id"),col("offer_id"),col("sales_amount"),to_date(col("transaction_date"),"yyyy-MM-dd").alias("date_transaction"))


offersDF = spark.read.format("com.crealytics.spark.excel") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("dataAddress", "'Offers'!A1") \
    .load(file_path)

offersDF = offersDF.select(col("customer_id"), col("offer_id"),to_date(col("start_date"),"yyyy-MM-dd").alias("date_start"), to_date(col("end_date"),"yyyy-MM-dd").alias("date_end"))

#df1 = pd.read_excel(xls, 'Sheet1')
#df2 = pd.read_excel(xls, 'Sheet2')

In [0]:
customersDF.show()
transactionsDF.show()
offersDF.show()
print(customersDF.count())
print(transactionsDF.count())
print(offersDF.count())

In [0]:
# Define the input and output formats and paths and the table name.
write_format = 'delta'
save_path_customers = '/tmp/delta/customers_'
save_path_transactions = '/tmp/delta/transactions_'
save_path_offers = '/tmp/delta/offers_'
customers_table_name = 'default.customers'
transactions_table_name = 'default.transactions'
offers_table_name = 'default.offers'

# Write the data to its target.
customersDF.write \
  .format(write_format) \
  .save(save_path_customers)

transactionsDF.write \
  .format(write_format) \
  .save(save_path_transactions)

offersDF.write \
  .format(write_format) \
  .save(save_path_offers)

In [0]:
# Create the tables.
spark.sql("CREATE TABLE " + customers_table_name + " USING DELTA LOCATION '" + save_path_customers + "'")
spark.sql("CREATE TABLE " + transactions_table_name + " USING DELTA LOCATION '" + save_path_transactions + "'")
spark.sql("CREATE TABLE " + offers_table_name + " USING DELTA LOCATION '" + save_path_offers + "'")

In [0]:
%sql
SELECT * FROM default.customers LIMIT 20

customer_name,customer_id,gender,date_of_birth
xxxx,1187667.0,F,1931-03-03
xxxx,1188705.0,F,1964-02-05
xxxx,881223.0,M,1944-10-18
xxxx,882197.0,F,1947-05-18
xxxx,882210.0,M,1970-06-05
xxxx,882213.0,M,1970-12-09
xxxx,882224.0,F,1964-02-18
xxxx,882227.0,M,1962-06-26
xxxx,883208.0,M,1936-05-31
xxxx,883215.0,F,1958-06-08


In [0]:
%sql
SELECT * FROM default.transactions LIMIT 20

transaction_id,customer_id,product_id,store_id,offer_id,sales_amount,date_transaction
11853308.0,869573.0,74069.0,909.0,7001370897.0,18.247134428708367,2020-03-14
5632404.0,878676.0,267295.0,1262.0,2000113555.0,44.99822854898147,2020-01-26
15234583.0,891142.0,82960.0,134.0,7001373027.0,4.305763018322082,2020-03-18
6325000.0,1182772.0,82715.0,232.0,466868.0,104.42142779423318,2020-03-31
5733971.0,891142.0,326249.0,113.0,7001367548.0,0.6616945952324758,2020-03-03
31902256.0,1183811.0,834549.0,1190.0,7001353021.0,6.046884484913408,2020-01-30
46852570.0,1148716.0,447800.0,1159.0,7001376606.0,0.4539566770879856,2020-04-07
10158740.0,1194597.0,52330.0,1413.0,7001369991.0,2.2828148289699977,2020-03-10
11437378.0,869606.0,14051.0,694.0,2000124326.0,567.2491424097929,2020-03-25
43238293.0,1188274.0,69226.0,1079.0,463064.0,139.55251784011787,2020-02-15


In [0]:
%sql
SELECT * FROM default.offers LIMIT 20

customer_id,offer_id,date_start,date_end
1182119.0,2000110064.0,2019-12-19,2019-12-29
1189159.0,2000106346.0,2019-11-20,2019-12-08
1277982.0,7001364884.0,2020-02-18,2020-02-19
887155.0,2000141345.0,2020-04-22,2020-05-10
1192911.0,2000126114.0,2020-03-18,2020-04-05
1315484.0,7001355964.0,2020-01-08,2020-01-08
878657.0,7001372786.0,2020-03-17,2020-03-17
1184093.0,7001363638.0,2020-02-11,2020-02-11
875870.0,2000110399.0,2019-12-18,2020-01-05
1184093.0,7001361302.0,2020-01-28,2020-01-28


In [0]:
'''
1. Write a query that gives us an aggregated table that provides:
o Customer name,
o Customer id,
o Customer’s age (in years),
o Total sales per customer,  # numero de compras/transacoes de um cliente
o Number of offers received,  # numero de ofertas/promocoes recebidas pelo cliente
o Number of offers redeemed,  # numero de ofertas/promocoes que o cliente actually comprou
o Number of visits for each day of week (Mon-Sun), e.g. one column per weekday.  # quantas vezes o cliente foi num determinado dia da semana
'''

In [0]:
%sql
CREATE TABLE default.aggregated_table
AS 
(SELECT C.customer_name,
       C.customer_id,
       (SELECT FLOOR(DATEDIFF(current_date(), C.date_of_birth)/365)) AS customer_age,
       
       (SELECT COUNT(T.transaction_id) 
        FROM default.transactions T
        WHERE T.customer_id = C.customer_id
        GROUP BY T.customer_id) AS sales_per_customer,
        
        (SELECT COUNT(distinct(T.offer_id))
        FROM default.transactions T
        WHERE T.customer_id = C.customer_id
        GROUP BY T.customer_id) AS offers_received,
        
        (SELECT COUNT(T.transaction_id) 
        FROM default.transactions T
        WHERE T.customer_id = C.customer_id AND T.offer_id IS NOT null
        GROUP BY T.customer_id) AS offers_redeemed,
        
        (SELECT COUNT(T.transaction_id) 
        FROM default.transactions T
        WHERE T.customer_id = C.customer_id AND WEEKDAY(T.date_transaction) = 0
        GROUP BY T.customer_id) AS monday_visit,
        
        (SELECT COUNT(T.transaction_id) 
        FROM default.transactions T
        WHERE T.customer_id = C.customer_id AND WEEKDAY(T.date_transaction) = 1
        GROUP BY T.customer_id) AS tuesday_visit,
        
        (SELECT COUNT(T.transaction_id) 
        FROM default.transactions T
        WHERE T.customer_id = C.customer_id AND WEEKDAY(T.date_transaction) = 2
        GROUP BY T.customer_id) AS wednesday_visit,
        
        (SELECT COUNT(T.transaction_id) 
        FROM default.transactions T
        WHERE T.customer_id = C.customer_id AND WEEKDAY(T.date_transaction) = 3
        GROUP BY T.customer_id) AS thursday_visit,
        
        (SELECT COUNT(T.transaction_id) 
        FROM default.transactions T
        WHERE T.customer_id = C.customer_id AND WEEKDAY(T.date_transaction) = 4
        GROUP BY T.customer_id) AS friday_visit,
        
        (SELECT COUNT(T.transaction_id) 
        FROM default.transactions T
        WHERE T.customer_id = C.customer_id AND WEEKDAY(T.date_transaction) = 5
        GROUP BY T.customer_id) AS saturday_visit,
        
        (SELECT COUNT(T.transaction_id) 
        FROM default.transactions T
        WHERE T.customer_id = C.customer_id AND WEEKDAY(T.date_transaction) = 6
        GROUP BY T.customer_id) AS sunday_visit

FROM default.customers C);

num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * FROM default.aggregated_table;

customer_name,customer_id,customer_age,sales_per_customer,offers_received,offers_redeemed,monday_visit,tuesday_visit,wednesday_visit,thursday_visit,friday_visit,saturday_visit,sunday_visit
xxxx,883393.0,51,8,4,5,2,0,1,0,0,2,3
xxxx,883425.0,69,59,19,51,0,41,0,2,6,6,4
xxxx,883449.0,65,38,7,30,4,0,0,1,1,2,30
xxxx,1493093.0,80,0,0,0,0,0,0,0,0,0,0
xxxx,1494110.0,90,93,11,76,0,15,10,67,1,0,0
xxxx,1318820.0,73,0,0,0,0,0,0,0,0,0,0
xxxx,1318848.0,65,4,2,2,0,1,0,0,2,0,1
xxxx,1320012.0,78,69,23,53,0,0,68,0,0,1,0
xxxx,1320018.0,62,4,2,3,1,0,0,3,0,0,0
xxxx,1189884.0,72,56,24,47,1,28,2,16,9,0,0


In [0]:
'''
Additionally, write a query that tells us on which day of week that customers of the same age group (0-9 years, 10-19 years, etc.) visits the stores most often.
'''

In [0]:
%sql
SELECT t.range AS customer_range, sum(monday_visit) AS monday_visits, sum(tuesday_visit) AS tuesday_visits, sum(wednesday_visit) AS wednesday_visits, sum(thursday_visit) AS thursday_visits, sum(friday_visit) AS friday_visits, sum(saturday_visit) AS saturday_visits, sum(sunday_visit) AS sunday_visits
FROM (
  SELECT monday_visit, tuesday_visit, wednesday_visit, thursday_visit, friday_visit, saturday_visit, sunday_visit, 
  CASE 
    WHEN customer_age BETWEEN  0 AND  9 THEN '0-9'
    WHEN customer_age BETWEEN 10 AND 19 THEN '10-19'
    WHEN customer_age BETWEEN 20 AND 29 THEN '20-29'
    WHEN customer_age BETWEEN 30 AND 39 THEN '30-39'
    WHEN customer_age BETWEEN 40 AND 49 THEN '40-49'
    WHEN customer_age BETWEEN 50 AND 59 THEN '50-59'
    WHEN customer_age BETWEEN 60 AND 69 THEN '60-69'
    WHEN customer_age BETWEEN 70 AND 79 THEN '70-79'
    WHEN customer_age BETWEEN 80 AND 89 THEN '80-89'
    ELSE '90-100' END AS RANGE
  FROM default.aggregated_table) t
GROUP BY t.range

customer_range,monday_visits,tuesday_visits,wednesday_visits,thursday_visits,friday_visits,saturday_visits,sunday_visits
60-69,892,3883,2020,2638,1637,1051,725
80-89,418,3304,1649,1919,1168,502,86
40-49,374,426,308,577,469,400,169
90-100,66,788,488,334,454,505,69
70-79,1342,7871,3852,3210,2463,833,609
50-59,1198,1207,1478,1783,2360,1456,726


In [0]:
%sql
SELECT b.customer_range, GREATEST(b.monday_visits, b.tuesday_visits, b.wednesday_visits, b.thursday_visits, b.friday_visits, b.saturday_visits, b.sunday_visits) AS max_visits,
  CASE GREATEST(b.monday_visits, b.tuesday_visits, b.wednesday_visits, b.thursday_visits, b.friday_visits, b.saturday_visits, b.sunday_visits)
  WHEN b.monday_visits THEN 'monday'
  WHEN b.tuesday_visits THEN 'tuesday'
  WHEN b.wednesday_visits THEN 'wednesday'
  WHEN b.thursday_visits THEN 'thursday'
  WHEN b.friday_visits THEN 'friday'
  WHEN b.saturday_visits THEN 'saturday'
  ELSE 'sunday' END AS day
FROM (
SELECT t.range AS customer_range, sum(monday_visit) AS monday_visits, sum(tuesday_visit) AS tuesday_visits, sum(wednesday_visit) AS wednesday_visits, sum(thursday_visit) AS thursday_visits, sum(friday_visit) AS friday_visits, sum(saturday_visit) AS saturday_visits, sum(sunday_visit) AS sunday_visits
FROM (
  SELECT monday_visit, tuesday_visit, wednesday_visit, thursday_visit, friday_visit, saturday_visit, sunday_visit, 
  CASE 
    WHEN customer_age BETWEEN  0 AND  9 THEN '0-9'
    WHEN customer_age BETWEEN 10 AND 19 THEN '10-19'
    WHEN customer_age BETWEEN 20 AND 29 THEN '20-29'
    WHEN customer_age BETWEEN 30 AND 39 THEN '30-39'
    WHEN customer_age BETWEEN 40 AND 49 THEN '40-49'
    WHEN customer_age BETWEEN 50 AND 59 THEN '50-59'
    WHEN customer_age BETWEEN 60 AND 69 THEN '60-69'
    WHEN customer_age BETWEEN 70 AND 79 THEN '70-79'
    WHEN customer_age BETWEEN 80 AND 89 THEN '80-89'
    ELSE '90-100' END AS RANGE
  FROM default.aggregated_table) t
GROUP BY t.range) AS b ORDER BY b.customer_range;

customer_range,max_visits,day
40-49,577,thursday
50-59,2360,friday
60-69,3883,tuesday
70-79,7871,tuesday
80-89,3304,tuesday
90-100,788,tuesday
