In [0]:
df = spark.read.load("/Volumes/dai/phase2/bronze/ecom_transactions")
display(df.head(10))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12/1/2010 8:26,2.55,17850,United Kingdom
536365,71053,WHITE METAL LANTERN,6,12/1/2010 8:26,3.39,17850,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12/1/2010 8:26,2.75,17850,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,12/1/2010 8:26,3.39,17850,United Kingdom
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,12/1/2010 8:26,3.39,17850,United Kingdom
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,12/1/2010 8:26,7.65,17850,United Kingdom
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,12/1/2010 8:26,4.25,17850,United Kingdom
536366,22633,HAND WARMER UNION JACK,6,12/1/2010 8:28,1.85,17850,United Kingdom
536366,22632,HAND WARMER RED POLKA DOT,6,12/1/2010 8:28,1.85,17850,United Kingdom
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,12/1/2010 8:34,1.69,13047,United Kingdom


In [0]:
df.columns

['InvoiceNo',
 'StockCode',
 'Description',
 'Quantity',
 'InvoiceDate',
 'UnitPrice',
 'CustomerID',
 'Country']

In [0]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: string (nullable = true)
 |-- CustomerID: string (nullable = true)
 |-- Country: string (nullable = true)



In [0]:
from pyspark.sql.functions import when, col, max, to_date, lit

In [0]:
df1 = df.withColumn(
    "InvoiceDate",
    to_date(col("InvoiceDate"), "M/d/yyyy H:mm")
).withColumn("Quantity", col("Quantity").cast("int")) \
       .withColumn("UnitPrice", col("UnitPrice").cast("double")) \
        .withColumn("CustomerID", col("CustomerID").cast("int")) 

In [0]:
df1.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: date (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)



---

### As you can see there is no target column in this dataset. So I will now create the target column named `is_high_valued` for this dataset

---

#### Creating Transactional Revenue

In [0]:
from pyspark.sql.functions import expr

df_new = df1.withColumn("total_price", expr("Quantity * UnitPrice"))
display(df_new.head(10))

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,total_price
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01,2.55,17850,United Kingdom,15.3
536365,71053,WHITE METAL LANTERN,6,2010-12-01,3.39,17850,United Kingdom,20.34
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01,2.75,17850,United Kingdom,22.0
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01,3.39,17850,United Kingdom,20.34
536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01,3.39,17850,United Kingdom,20.34
536365,22752,SET 7 BABUSHKA NESTING BOXES,2,2010-12-01,7.65,17850,United Kingdom,15.3
536365,21730,GLASS STAR FROSTED T-LIGHT HOLDER,6,2010-12-01,4.25,17850,United Kingdom,25.5
536366,22633,HAND WARMER UNION JACK,6,2010-12-01,1.85,17850,United Kingdom,11.1
536366,22632,HAND WARMER RED POLKA DOT,6,2010-12-01,1.85,17850,United Kingdom,11.1
536367,84879,ASSORTED COLOUR BIRD ORNAMENT,32,2010-12-01,1.69,13047,United Kingdom,54.08


---

## Aggregate to Customer Level Data:

Since, my goal is to create dataset with target feature `is_high_valued`, i can drop the following features from my data:

- `StockCode`
- `Description`
- `UnitPrice`
- `Country`

In [0]:
from pyspark.sql.functions import sum, count

customer_df = df_new.groupBy("CustomerID").agg(
    sum("total_price").alias("total_spent"),
    count("InvoiceNo").alias("total_transactions"),
    sum("Quantity").alias("total_quantity"),
    max("InvoiceDate").alias("last_purchase_date")
)

In [0]:
display(customer_df.head(10))

CustomerID,total_spent,total_transactions,total_quantity,last_purchase_date
17420,598.8300000000002,30,265,2011-10-20
16552,379.73,17,219,2011-04-11
17572,226.75,12,95,2011-09-29
15350,115.65,5,51,2010-12-01
12921,16389.740000000016,741,9454,2011-12-06
13090,8689.390000000003,161,2194,2011-12-01
14135,4690.31,134,3850,2011-12-08
12915,363.65,22,93,2011-07-14
17685,3191.5299999999997,130,1956,2011-11-23
17581,10736.109999999991,452,5861,2011-12-09


In [0]:
customer_df.write.format("delta").mode("overwrite").save("/Volumes/dai/phase2/bronze/customer_df_without_target")