In [0]:
import pyspark
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.functions import collect_list, concat_ws, udf ,lit, col, when, split, size, lower, explode
from pyspark.sql.types import *
from pyspark.sql import SparkSession, Row	
from pyspark.sql.types import MapType, StringType, StructType,StructField
from pyspark.sql.functions import sum as spark_sum

from pathlib import Path
import pyarrow.parquet as pq
import re
import string

import pandas as pd
import os
import numpy as np

import matplotlib as mpl
import matplotlib.pyplot as plt
import seaborn as sns

from wordcloud import WordCloud
from collections import Counter
from textwrap import wrap

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, lit
from pyspark.sql.types import DateType
from datetime import datetime, timedelta

## 1. df - joint table

In [0]:
dist_postcode = spark.read.format("delta").load(
    "abfss://sandbox@datastmktprodeuw.dfs.core.windows.net/contact_centre/projects/DistanceToStore/store_sector_distances"
)

### 1.1. postcode to nearest store

In [0]:
simplified_postcodes = dist_postcode.withColumn(
    "simplified_postcode",
    F.expr("substring(PostcodeSector, 1, length(PostcodeSector) - 1)"),
)


postcode_miles = simplified_postcodes.groupBy("simplified_postcode").agg(
    F.mean("Distance").alias("meanmilestostore")
)


postcode_miles = postcode_miles.select(
    F.col("simplified_postcode").alias("postcode"), "meanmilestostore"
)

In [0]:
postcode_miles.cache()

DataFrame[postcode: string, meanmilestostore: double]

In [0]:
#postcode_miles.display()

postcode,meanmilestostore
BT8,236.80891536862228
DL6,155.78741454470136
SE17,129.73093231311424
L19,130.80003264348525
NR19,147.77946118014324
NR8,156.08034276183295
TQ14,193.72790383592329
HS7,403.393563692687
IP15,168.30747120969934
CT17,180.855472691235


### 1.2. merging baskets_UK_3y & svoccust

In [0]:
svoccust = spark.table('marketingdata_prod.warehouse.svoccust')
baskets = spark.table('marketingdata_prod.warehouse.baskets_uk_3y')

In [0]:
svoccust_columns = [
    "accountnumberkey",
    "accountstartdate",
    "firstorderdate",
    "directmailoptin",
    "smsoptin",
    "telephoneoptin",
    "salesemailoptin",
    "gender",
    "age",
    "emailprefoffers",
    "lastcancelorderdate",
    "lastrequestdate",
    "client",
    "emailoptin",
    "postcodearea_full",
    "account_number",
    "countrycode"
]

svo = svoccust.select(*[col(column) for column in svoccust_columns])

svo.createOrReplaceTempView("svo")
#svo.display()
#print(svo.count())

In [0]:
baskets_columns = [
    "account_number",
    "basketid",
    "order_date",
    "returndate",
    "s740orderstakenqty",
    "s740orderstakenvalue",
    "s740despatchvalue",
    "s740returnsqty",
    "s740returnsvalue",
    "deladdressind",
    "division",
    "divisiondescription",
    "isocountrycode"
]

bsk = baskets.select(*[col(column) for column in baskets_columns])

bsk = (
    bsk.withColumnRenamed("account_number", "accountnumber")
    .withColumnRenamed("s740orderstakenqty", "orderqty")
    .withColumnRenamed("s740orderstakenvalue", "ordervalue")
    .withColumnRenamed("s740despatchvalue", "despatchvalue")
    .withColumnRenamed("s740returnsqty", "returnqty")
    .withColumnRenamed("s740returnsvalue", "returnvalue")
)

bsk.createOrReplaceTempView("bsk")
#bsk.display()
#print(bsk.count())

In [0]:
#bsk.limit(3).display()

In [0]:
bsk = bsk.withColumn("order_date", col("order_date").cast(DateType()))
two_years_ago = datetime.now() - timedelta(days=2 * 365)
bsk_1 = bsk.filter(col("order_date") >= lit(two_years_ago))

bsk_1 = bsk_1.withColumn(
    "delivery",
    when(col("deladdressind").isin(["S", "P"]), lit("Store"))
    .when(col("deladdressind") == "N", lit("Home"))
    .when(col("deladdressind") == "C", lit("Parcelshop"))
    .when(col("deladdressind").isin(["Y", "B"]), lit(None))
    .otherwise(lit(None)),
)

bsk_1 = bsk_1.filter(F.col("isocountrycode") == "GB")
svo_1 = svo.filter((F.col("client") == "NEXT") & (F.col("countrycode") == "GB"))

bsk_1 = bsk_1.drop("isocountrycode")
svo_1 = svo_1.drop("client", "countrycode")

# bsk_1.limit(25).display()
# svo_1.limit(25).display()

In [0]:
joined_df = bsk_1.join(
    svo,
    (bsk_1.accountnumber == svo.account_number)
    & (bsk_1.order_date >= svo.firstorderdate),
    how="inner",
)


df = joined_df.select(
    "accountnumber",
    "accountnumberkey",
    "order_date",
    "returndate",
    "delivery",
    "age",
    "gender",
    "postcodearea_full",
    "orderqty",
    "ordervalue",
    "returnqty",
    "returnvalue",
    "despatchvalue",
    "division",
    "divisiondescription",
    "emailoptin",
    "smsoptin",
    "telephoneoptin",
    "salesemailoptin",
)

In [0]:
df = df.fillna({"age": 0, "ordervalue": 0, "returnvalue": 0})

df = df.fillna({"gender": "Unknown", "postcodearea_full": "Unknown", "divisiondescription": "Unknown", "emailoptin": "Unknown", "smsoptin": "Unknown", "telephoneoptin": "Unknown", "salesemailoptin": "Unknown", "delivery": "Unknown"})

df = df.withColumn("delivery", when(col("delivery").isNull(), "Unknown").otherwise(col("delivery")))

In [0]:
df = df.filter(F.col("delivery") != "Unknown")
df = df.fillna(0)
df = df.filter((col("orderqty") > 0) & (col("ordervalue") > 0))

In [0]:
df.cache()

DataFrame[accountnumber: string, accountnumberkey: int, order_date: date, returndate: date, delivery: string, age: int, gender: string, postcodearea_full: string, orderqty: int, ordervalue: double, returnqty: int, returnvalue: double, despatchvalue: double, division: string, divisiondescription: string, emailoptin: string, smsoptin: string, telephoneoptin: string, salesemailoptin: string]

### 1.3. df

In [0]:
df.limit(15).display()

accountnumber,accountnumberkey,order_date,returndate,delivery,age,gender,postcodearea_full,orderqty,ordervalue,returnqty,returnvalue,despatchvalue,division,divisiondescription,emailoptin,smsoptin,telephoneoptin,salesemailoptin
A00A7676,64500804,2023-08-20,2023-09-13,Home,45,M,SR5,1,35.0,1,35.0,35.0,W,BRANDED,Y,N,N,Y
A00A7676,64500804,2024-05-17,1900-01-01,Home,45,M,SR5,1,19.0,0,0.0,19.0,K,MENSWEAR,Y,N,N,Y
A00A7676,64500804,2024-05-18,1900-01-01,Home,45,M,SR5,1,15.0,0,0.0,15.0,K,MENSWEAR,Y,N,N,Y
A00A7676,64500804,2024-05-18,1900-01-01,Home,45,M,SR5,1,15.0,0,0.0,15.0,K,MENSWEAR,Y,N,N,Y
A00A7676,64500804,2024-08-01,1900-01-01,Home,45,M,SR5,1,22.0,0,0.0,22.0,K,MENSWEAR,Y,N,N,Y
A00A7676,64500804,2024-08-01,1900-01-01,Home,45,M,SR5,1,14.0,0,0.0,14.0,W,BRANDED,Y,N,N,Y
A00A7676,64500804,2024-08-01,1900-01-01,Home,45,M,SR5,1,18.0,0,0.0,18.0,K,MENSWEAR,Y,N,N,Y
A00A7676,64500804,2024-08-01,1900-01-01,Home,45,M,SR5,1,20.0,0,0.0,20.0,W,BRANDED,Y,N,N,Y
A00A7676,64500804,2023-11-14,1900-01-01,Store,45,M,SR5,1,17.0,0,0.0,17.0,W,BRANDED,Y,N,N,Y
A00A7676,64500804,2023-01-01,1900-01-01,Store,45,M,SR5,1,62.0,0,0.0,62.0,K,MENSWEAR,Y,N,N,Y


In [0]:
df.write.mode("overwrite").saveAsTable("marketingdata_prod.ds_sandbox.df_original_yw")

## 2. df1 - for modelling

1. Group by: group the data by accountnumberkey.
2. Aggregations: Various aggregation functions (sum, min, max, etc.).
3. Joining: Results are joined back to the main dataframe to build the df1 dataframe.
3. Window Functions: Window functions (Window.partitionBy) are used to calculate order gaps and other sequence-based calculations.
4. Zero Values: handle cases where division by zero might occur.
5. Pivot and Aggregations: for counting delivery types and division descriptions.

### 2.1. Group by

In [0]:
grouped_df = df.groupBy("accountnumberkey")
df1 = grouped_df.agg(F.sum("orderqty").alias("totalorderqty"))
df1.cache()

DataFrame[accountnumberkey: int, totalorderqty: bigint]

### 2.2. Aggregations

In [0]:
df1 = grouped_df.agg(F.sum("orderqty").alias("totalorderqty"))
df1.cache()

df1 = df1.join(
    grouped_df.agg(
        F.sum("ordervalue").alias("totalordervalue"),
        (F.sum("ordervalue") / F.sum("orderqty")).alias("meanordervalue"),
    ),
    on="accountnumberkey",
)

df1 = df1.join(
    grouped_df.agg(F.sum("returnqty").alias("totalreturnqty")), on="accountnumberkey"
)

df1 = df1.join(
    grouped_df.agg(
        F.sum("returnvalue").alias("totalreturnvalue"),
        (F.sum("returnvalue") / F.sum("returnqty")).alias("meanreturnvalue"),
    ),
    on="accountnumberkey",
)

df1 = df1.withColumn(
    "meanreturnvalue",
    F.when(F.col("totalreturnqty") == 0, 0).otherwise(F.col("meanreturnvalue")),
)

### 2.3. Dates

#### 2.3.1. Order

In [0]:
if "firstorderdate" in df1.columns:
    df1 = df1.drop("firstorderdate")
if "lastorderdate" in df1.columns:
    df1 = df1.drop("lastorderdate")


df1 = df1.join(
    grouped_df.agg(
        F.min("order_date").alias("firstorderdate"),
        F.max("order_date").alias("lastorderdate"),
    ),
    on="accountnumberkey",
)

window_spec = Window.partitionBy("accountnumberkey").orderBy("order_date")


if "averageordergap" in df1.columns:
    df = df1.drop("averageordergap")

df = df.withColumn("previous_order_date", F.lag("order_date").over(window_spec))

df = df.withColumn("order_gap", F.datediff("order_date", "previous_order_date"))

df1 = df1.join(
    df.groupBy("accountnumberkey").agg(
        (F.sum("order_gap") / F.count("order_date")).alias("averageordergap")
    ),
    on="accountnumberkey",
)

#### 2.3.2. Return

In [0]:
df = df.withColumn(
    "return_gap",
    F.when(F.col("returnqty") != 0, F.datediff("returndate", "order_date")).otherwise(
        0
    ),
)

df1.drop("averagereturndates")
df1 = df1.join(
    df.groupBy("accountnumberkey").agg(
        (F.sum("return_gap") / F.sum("returnqty")).alias("averagereturndates")
    ),
    on="accountnumberkey",
)

df1 = df1.withColumn(
    "averagereturndates",
    F.when(F.col("totalreturnqty") == 0, 0).otherwise(F.col("averagereturndates")),
)

### 2.4. Joining

In [0]:
df1 = df1.join(
    df.groupBy("accountnumberkey", "age")
    .count()
    .orderBy(F.col("count").desc())
    .drop("count")
    .groupBy("accountnumberkey")
    .agg(F.first("age").alias("age")),
    on="accountnumberkey",
)

df1 = df1.join(
    df.groupBy("accountnumberkey", "gender")
    .count()
    .orderBy(F.col("count").desc())
    .drop("count")
    .groupBy("accountnumberkey")
    .agg(F.first("gender").alias("gender")),
    on="accountnumberkey",
)

df1 = df1.join(
    df.groupBy("accountnumberkey", "postcodearea_full")
    .count()
    .orderBy(F.col("count").desc())
    .drop("count")
    .groupBy("accountnumberkey")
    .agg(F.first("postcodearea_full").alias("postcodearea")),
    on="accountnumberkey",
)

### 2.5. Delivery count & percentage

In [0]:
df1 = df1.join(
    df.groupBy("accountnumberkey").pivot("delivery").count().na.fill(0),
    on="accountnumberkey",
)

df1 = df1.withColumn("homedelivery%", F.col("Home") / F.col("totalorderqty"))
df1 = df1.withColumn("storedelivery%", F.col("Store") / F.col("totalorderqty"))
df1 = df1.withColumn("parcelshopdelivery%", F.col("Parcelshop") / F.col("totalorderqty"))

### 2.6. Division proportion

In [0]:
df1.drop("Home", "Parcelshop", "Store")

DataFrame[accountnumberkey: int, totalorderqty: bigint, totalordervalue: double, meanordervalue: double, totalreturnqty: bigint, totalreturnvalue: double, meanreturnvalue: double, firstorderdate: date, lastorderdate: date, averageordergap: double, averagereturndates: double, age: int, gender: string, postcodearea: string, homedelivery%: double, storedelivery%: double, parcelshopdelivery%: double]

In [0]:
#ids = [1322, 1342, 3226]
d#f.filter(df.accountnumberkey.isin(ids)).display()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:718)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:437)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:437)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1266)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:983)
	at com.databricks.logging.UsageL

In [0]:
#df1.limit(50).display()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:718)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:437)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:437)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1266)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:983)
	at com.databricks.logging.UsageL

In [0]:
df1 = df1.drop("Home")
div_desc = (
    df.select("divisiondescription").distinct().rdd.flatMap(lambda x: x).collect()
)

for desc in div_desc:
    agg_df = df.groupBy("accountnumberkey").agg(
        F.sum(F.when(F.col("divisiondescription") == desc, 1).otherwise(0)).alias(desc)
    )

    df1 = df1.join(agg_df, on="accountnumberkey", how="left")
    df1 = df1.withColumn(f"{desc}%", F.col(desc) / F.col("totalorderqty"))

In [0]:
df1 = df1.drop(
    "Home",
    "WOMENSWEAR",
    "SPECIAL PURCHASE",
    "BRANDED",
    "BOYS & GIRLS",
    "Lipsy Childrenswear",
    "Lipsy Womenswear",
    "MENSWEAR",
    "Lipsy Beauty",
    "NO DIVISION",
    "Lipsy Menswear",
    "NO DIVISION%",
    "Unknown%",
    "Store",
    "Parcelshop", 
    "Home",
    "Unknown"
)

### 2.7. Output - df1, df2

In [0]:
from pyspark.sql.functions import col, trim, upper

postcode_miles = postcode_miles.withColumn("postcode", trim(upper(col("postcode"))))
df1 = df1.withColumn("postcodearea", trim(upper(col("postcodearea"))))

postcode_miles = postcode_miles.withColumn("postcode", col("postcode").cast("string"))
df1 = df1.withColumn("postcodearea", col("postcodearea").cast("string"))

df2 = df1.join(postcode_miles, df1["postcodearea"] == postcode_miles["postcode"], how="left")
#display(df2.select("meanmilestostore"))

In [0]:
df2.cache()

DataFrame[accountnumberkey: int, totalorderqty: bigint, totalordervalue: double, meanordervalue: double, totalreturnqty: bigint, totalreturnvalue: double, meanreturnvalue: double, firstorderdate: date, lastorderdate: date, averageordergap: double, averagereturndates: double, age: int, gender: string, postcodearea: string, homedelivery%: double, storedelivery%: double, parcelshopdelivery%: double, HOME%: double, WOMENSWEAR%: double, SPECIAL PURCHASE%: double, BRANDED%: double, BOYS & GIRLS%: double, Lipsy Childrenswear%: double, Lipsy Womenswear%: double, MENSWEAR%: double, Lipsy Beauty%: double, Lipsy Menswear%: double, postcode: string, meanmilestostore: double]

In [0]:
agg_df2 = df2.groupBy("postcode").agg(
    F.sum("meanordervalue").alias("sum_meanordervalue"),
    F.count("meanordervalue").alias("count_meanordervalue"),
)

agg_df2 = agg_df2.withColumn(
    "averageordervalue", F.col("sum_meanordervalue") / F.col("count_meanordervalue")
)

df2 = df2.join(
    agg_df2.select("postcode", "averageordervalue"), on="postcode", how="left"
)

if "meanordervalue_bypostcode" in df2.columns:
    df2 = df2.drop("meanordervalue_bypostcode")

df2 = df2.withColumnRenamed("meanmilestostore", "meanmilestostore_bypostcode")
df2 = df2.withColumnRenamed("averageordervalue", "meanordervalue_bypostcode")

df2 = df2.fillna(0)

In [0]:
#df2.display()

postcode,accountkey,totalorderqty,totalordervalue,meanordervalue,totalreturnqty,totalreturnvalue,meanreturnvalue,firstorderdate,lastorderdate,averageordergap,averagereturndates,age,gender,homedelivery%,storedelivery%,parcelshopdelivery%,HOME%,WOMENSWEAR%,SPECIAL PURCHASE%,BRANDED%,BOYS & GIRLS%,Lipsy Childrenswear%,Lipsy Womenswear%,MENSWEAR%,Lipsy Beauty%,Lipsy Menswear%,meanmilestostore_bypostcode,meanordervalue_bypostcode,meanordervalue_bypostcode.1
WF8,109622,161,5030.0,31.24223602484472,104,3522.0,33.86538461538461,2022-10-02,2024-06-02,3.782608695652174,7.615384615384615,55,F,0.5590062111801242,0.4409937888198758,0.0,0.0,0.4782608695652174,0.0,0.093167701863354,0.0062111801242236,0.0,0.2236024844720497,0.1614906832298136,0.0,0.0372670807453416,128.95741683769097,30.71140575300856,30.711405753008552
RG24,584850,137,4684.5,34.193430656934304,16,876.0,54.75,2022-08-13,2024-07-31,5.240875912408759,26.0,55,F,1.0,0.0,0.0,0.1897810218978102,0.1897810218978102,0.0,0.2043795620437956,0.0,0.0,0.1751824817518248,0.072992700729927,0.145985401459854,0.0218978102189781,130.88596947084034,30.002173533940056,30.00217353394006
M19,628016,239,7873.4,32.94309623430962,105,4129.45,39.32809523809524,2022-08-28,2024-06-22,2.778242677824268,6.419047619047619,60,F,1.0,0.0,0.0,0.1380753138075313,0.3305439330543933,0.00836820083682,0.2970711297071129,0.0543933054393305,0.0,0.1338912133891213,0.0167364016736401,0.00418410041841,0.0167364016736401,123.02622053857708,29.55380118645917,29.553801186459168
HS7,636106,29,871.0,30.03448275862069,9,294.0,32.666666666666664,2023-04-02,2024-06-30,15.689655172413794,24.11111111111111,54,F,1.0,0.0,0.0,0.2413793103448276,0.0344827586206896,0.0,0.2413793103448276,0.2068965517241379,0.0,0.0689655172413793,0.1724137931034483,0.0,0.0344827586206896,403.393563692687,28.87713335199044,28.877133351990437
BT8,964635,118,3760.5,31.86864406779661,54,2084.0,38.5925925925926,2022-08-14,2024-07-24,6.016949152542373,16.962962962962962,58,F,0.9830508474576272,0.0169491525423728,0.0,0.0084745762711864,0.2542372881355932,0.0,0.1779661016949152,0.1440677966101695,0.0,0.2372881355932203,0.1694915254237288,0.0,0.0084745762711864,236.80891536862228,34.969582916746575,34.96958291674657
TQ14,1100657,98,2072.5,21.147959183673468,42,852.5,20.297619047619047,2022-09-20,2024-07-03,6.653061224489796,9.714285714285714,65,F,0.1224489795918367,0.8775510204081632,0.0,0.2244897959183673,0.2755102040816326,0.0,0.0918367346938775,0.0612244897959183,0.0,0.1326530612244898,0.2142857142857142,0.0,0.0,193.72790383592329,31.911938320304728,31.91193832030475
BT8,1260494,102,1857.5,18.21078431372549,54,967.0,17.90740740740741,2022-09-27,2024-07-30,6.588235294117647,11.925925925925926,59,F,1.0,0.0,0.0,0.1176470588235294,0.6078431372549019,0.0588235294117647,0.0098039215686274,0.0490196078431372,0.0,0.0784313725490196,0.0196078431372549,0.0,0.0588235294117647,236.80891536862228,34.969582916746575,34.96958291674657
WF4,1338470,103,3329.0,32.320388349514566,79,2588.0,32.75949367088607,2022-08-04,2024-07-30,7.048543689320389,8.39240506329114,63,F,0.7378640776699029,0.2135922330097087,0.0485436893203883,0.0,0.5339805825242718,0.0,0.3398058252427184,0.0,0.0,0.029126213592233,0.0970873786407767,0.0,0.0,126.91568599720036,31.239673490155983,31.23967349015597
L19,1905179,109,3133.0,28.743119266055047,58,1804.0,31.103448275862068,2022-08-11,2024-07-18,6.486238532110092,19.603448275862068,58,F,0.5412844036697247,0.4587155963302752,0.0,0.018348623853211,0.5504587155963303,0.0,0.1651376146788991,0.0,0.0,0.1926605504587156,0.0,0.018348623853211,0.055045871559633,130.80003264348525,32.33520548557305,32.335205485573084
WF4,1931585,1,13.0,13.0,0,0.0,0.0,2022-12-21,2022-12-21,,0.0,52,F,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,126.91568599720036,31.239673490155983,31.23967349015597


## 3. EDA

In [0]:
df2.describe().display()

summary,postcode,accountkey,totalorderqty,totalordervalue,meanordervalue,totalreturnqty,totalreturnvalue,meanreturnvalue,averageordergap,averagereturndates,age,gender,homedelivery%,storedelivery%,parcelshopdelivery%,HOME%,WOMENSWEAR%,SPECIAL PURCHASE%,BRANDED%,BOYS & GIRLS%,Lipsy Childrenswear%,Lipsy Womenswear%,MENSWEAR%,Lipsy Beauty%,Lipsy Menswear%,meanmilestostore_bypostcode,meanordervalue_bypostcode
count,11875315,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0,11875315.0
mean,1170.142857142857,107694714.06097212,28.989130646218648,798.7441577364534,32.26690586069516,11.259191524603768,376.44484189766456,20.30828192747376,19.48520260619231,6.395121609679058,40.64757962209845,,0.4402899880209573,0.5431851459040565,0.0182944423447262,0.0890043381491059,0.1908549083409847,0.0030298682962218,0.1965012445441036,0.1991373322768918,0.0029085304422035,0.1325526141730234,0.1574806771840884,0.0136099585408134,0.0166900213448245,149.97865004737665,32.27701052257033
stddev,2795.2613729054287,50649750.92656896,71.27293095621155,2017.665144823483,27.318146389117555,39.05278773165186,1345.9346536698158,28.416527143529123,34.04918518258858,9.173726980585029,19.54634734524742,,0.4684289293211583,0.4695801465243606,0.1154366529217712,0.225606998729579,0.2862861839812287,0.0261065685791893,0.3038475213638086,0.3235419441037185,0.0338994713958824,0.2641947016659133,0.3076723866515054,0.0886257072935709,0.0778330155423526,48.48270597129282,2.86959755236659
min,,15.0,0.0,-190.0,-190.0,0.0,0.0,0.0,0.0,0.0,0.0,F,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,5.666666666666667
max,ZE3,169051866.0,23581.0,587475.4300000002,2500.0,10194.0,458488.0999999984,3050.0,363.0,406.0,106.0,X,9.0,7.0,4.0,5.0,4.0,2.0,9.0,6.5,2.0,5.0,5.0,4.0,2.0,1933.132559514483,210.0


In [0]:
df2.select(
    "totalorderqty",
    "totalordervalue",
    "meanordervalue",
    "totalreturnqty",
    "totalreturnvalue",
    "meanreturnvalue",
    "averageordergap",
    "averagereturndates",
    "age",
    "homedelivery%",
    "storedelivery%",
    "parcelshopdelivery%",
    "HOME%",
    "WOMENSWEAR%",
    "SPECIAL PURCHASE%",
    "BRANDED%",
    "BOYS & GIRLS%",
    "Lipsy Childrenswear%",
    "Lipsy Womenswear%",
    "MENSWEAR%",
    "Lipsy Beauty%",
    "Lipsy Menswear%",
    "meanmilestostore_bypostcode",
    "meanordervalue_bypostcode",
).summary().display()

summary,totalorderqty,totalordervalue,meanordervalue,totalreturnqty,totalreturnvalue,meanreturnvalue,averageordergap,averagereturndates,age,homedelivery%,storedelivery%,parcelshopdelivery%,HOME%,WOMENSWEAR%,SPECIAL PURCHASE%,BRANDED%,BOYS & GIRLS%,Lipsy Childrenswear%,Lipsy Womenswear%,MENSWEAR%,Lipsy Beauty%,Lipsy Menswear%,meanmilestostore_bypostcode,meanordervalue_bypostcode
count,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0,11857412.0
mean,28.96259672852727,800.1613373567584,32.41247880336169,11.270318261691504,376.80230135209626,20.336565343269065,19.534755273940437,6.403860796083386,40.63746355444173,0.4378641506005392,0.5438234065915324,0.0183124428079288,0.0865615766768801,0.1911611564074532,0.0030316541234553,0.1967496383016161,0.1992592859207538,0.002905172473418,0.1325642383858477,0.1575065669281407,0.0135465830054891,0.0167140444812013,149.98127855360954,32.27446714770442
stddev,71.22063199677586,2019.3386016672268,27.459859018276603,39.06548857721732,1346.340391718864,28.427942109263643,34.12082858554546,9.175645614460246,19.54595518356177,0.4671809740800902,0.4683200096844493,0.1153641256404149,0.2216722295737392,0.2859484896148547,0.0260627821920254,0.3034711583300982,0.3229313795417973,0.0338240284529999,0.2639034932787031,0.3069001062482174,0.0883318370984457,0.0777883283794105,48.48446620578226,3.50278293758388
min,1.0,0.4,0.1271014492753622,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
25%,2.0,56.0,18.5,0.0,0.0,0.0,0.0,0.0,31.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,122.96811797498354,30.53428906155671
50%,6.0,159.0,26.363636363636363,1.0,24.0,14.0,5.921052631578948,3.0,42.0,0.125,0.7708333333333334,0.0,0.0,0.0,0.0,0.0526315789473684,0.0,0.0,0.0,0.0,0.0,0.0,132.60582186243295,31.775523567724598
75%,23.0,615.0,37.0,6.0,187.0,32.90384615384615,24.0,10.478260869565217,54.0,1.0,1.0,0.0,0.0378378378378378,0.3111111111111111,0.0,0.25,0.3333333333333333,0.0,0.1351351351351351,0.125,0.0,0.0,156.95179223128355,33.51609839372634
max,23581.0,587475.4300000002,2500.0,10194.0,458488.0999999984,3050.0,363.0,406.0,106.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1933.132559514483,113.45454545454544


### 3.1. df_model

In [0]:
df_model = df2.drop("postcodearea")
df_model.withColumnRenamed('averagereturndates', 'averagereturngap')

DataFrame[postcode: string, accountnumberkey: int, totalorderqty: bigint, totalordervalue: double, meanordervalue: double, totalreturnqty: bigint, totalreturnvalue: double, meanreturnvalue: double, firstorderdate: date, lastorderdate: date, averageordergap: double, averagereturngap: double, age: int, gender: string, homedelivery%: double, storedelivery%: double, parcelshopdelivery%: double, HOME%: double, WOMENSWEAR%: double, SPECIAL PURCHASE%: double, BRANDED%: double, BOYS & GIRLS%: double, Lipsy Childrenswear%: double, Lipsy Womenswear%: double, MENSWEAR%: double, Lipsy Beauty%: double, Lipsy Menswear%: double, meanmilestostore_bypostcode: double, meanordervalue_bypostcode: double]

In [0]:
df_model_new_column_names = [c.replace(" ", "_")
                    .replace(",", "_")
                    .replace(";", "_")
                    .replace("{", "_")
                    .replace("}", "_")
                    .replace("(", "_")
                    .replace(")", "_")
                    .replace("\n", "_")
                    .replace("\t", "_")
                    .replace("=", "_")
                    for c in df_model.columns]

df_model = df_model.select([col(c).alias(new_col) for c, new_col in zip(df_model.columns, df_model_new_column_names)])

df_model.write.mode("overwrite").saveAsTable("marketingdata_prod.ds_sandbox.df_model_yw")

## feature engineering

In [0]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml import Pipeline

indexers = [
    StringIndexer(inputCol=column, outputCol=column + "_index").fit(joined_df)
    for column in [
        "gender",
        "postcodearea_full",
        "delivery",
        "divisiondescription",
        "emailoptin",
        "smsoptin",
        "telephoneoptin",
        "salesemailoptin",
    ]
]

assembler = VectorAssembler(
    inputCols=[
        "age",
        "ordervalue",
        "returnvalue",
        "divisiondescription_index",
        "emailoptin_index",
        "smsoptin_index",
        "telephoneoptin_index",
        "salesemailoptin_index",
        "gender_index",
        "postcodearea_full_index",
    ],
    outputCol="features",
)

pipeline = Pipeline(stages=indexers + [assembler])

model_data = pipeline.fit(joined_df).transform(joined_df)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:718)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:437)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:437)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1266)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:983)
	at com.databricks.logging.UsageL

In [0]:
summary_df = joined_1.describe().toPandas()
print(summary_df)

import pyspark.sql.functions as F

age_distribution = joined_1.groupBy("age").count().toPandas()

plt.figure(figsize=(10, 6))
sns.barplot(x='age', y='count', data=age_distribution)
plt.title('Age Distribution')
plt.xlabel('Age')
plt.ylabel('Count')
plt.xticks(rotation=90)
plt.show()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:718)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:437)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:437)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1266)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:983)
	at com.databricks.logging.UsageL

In [0]:
joined_1.describe().display()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:718)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:437)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:437)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1266)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:983)
	at com.databricks.logging.UsageL

In [0]:
def plot_age_distribution(df, chunk_size=1000):
    total_rows = df.count()
    for start in range(0, total_rows, chunk_size):
        chunk_df = df.limit(chunk_size).offset(start).toPandas()
        plt.figure(figsize=(10, 6))
        sns.histplot(chunk_df['age'], kde=True, bins=30)
        plt.title('Age Distribution')
        plt.xlabel('Age')
        plt.ylabel('Count')
        plt.show()

plot_age_distribution(joined_1)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:718)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:437)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:437)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1266)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:983)
	at com.databricks.logging.UsageL

In [0]:
age_distribution = joined_1.groupBy("age").count()
age_distribution.show()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:718)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:437)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:437)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1266)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:983)
	at com.databricks.logging.UsageL

---

In [0]:
from pyspark.ml.feature import StringIndexer, VectorAssembler, PCA
from pyspark.ml import Pipeline

indexers = [
    StringIndexer(inputCol=column, outputCol=column + "_index").fit(joined_1)
    for column in ["gender", "postcodearea_full", "delivery", "divisiondescription"]
]

assembler = VectorAssembler(
    inputCols=[
        "age",
        "ordervalue",
        "returnvalue",
        "divisiondescription_index",
        "gender_index",
        "postcodearea_full_index",
    ],
    outputCol="features"
)


pca = PCA(k=5, inputCol="features", outputCol="pca_features")

pipeline = Pipeline(stages=indexers + [assembler, pca])

model_data = pipeline.fit(joined_1).transform(joined_1)

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:718)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:437)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:437)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1266)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:983)
	at com.databricks.logging.UsageL

## modelling

In [0]:
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

train_data, test_data = model_data.randomSplit([0.8, 0.2], seed=42)

models = [
    ("RandomForest", RandomForestClassifier(labelCol="delivery_index", featuresCol="selectedFeatures", numTrees=100)),
    ("LogisticRegression", LogisticRegression(labelCol="delivery_index", featuresCol="selectedFeatures")),
    ("DecisionTree", DecisionTreeClassifier(labelCol="delivery_index", featuresCol="selectedFeatures"))
]

best_model = None
best_accuracy = 0
best_model_name = ""

for model_name, model in models:
    trained_model = model.fit(train_data)
    predictions = trained_model.transform(test_data)
    accuracy = MulticlassClassificationEvaluator(labelCol="delivery_index", predictionCol="prediction", metricName="accuracy").evaluate(predictions)
    print(f"{model_name} accuracy: {accuracy}")
    
    if accuracy > best_accuracy:
        best_model = trained_model
        best_accuracy = accuracy
        best_model_name = model_name

print(f"Best model: {best_model_name} with accuracy: {best_accuracy}")


com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:718)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:437)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:437)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1266)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:983)
	at com.databricks.logging.UsageL

In [0]:
new_predictions = best_model.transform(model_data)
new_predictions.select("accountnumberkey", "delivery", "prediction").show()

com.databricks.backend.common.rpc.CommandSkippedException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2(SequenceExecutionState.scala:105)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$2$adapted(SequenceExecutionState.scala:100)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:100)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:718)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:437)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:437)
	at com.databricks.spark.chauffeur.ChauffeurState.cancelExecution(ChauffeurState.scala:1266)
	at com.databricks.spark.chauffeur.ChauffeurState.$anonfun$process$1(ChauffeurState.scala:983)
	at com.databricks.logging.UsageL