In [1]:
import os
import pandas as pd
from IPython.display import display
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, when # pyspark sql functions
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType, TimestampType, DecimalType # pyspark sql types
import random
from faker import Faker
import uuid
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# SPARK SETUP
spark = SparkSession.builder.appName("PySpark Synthetic").getOrCreate() # Create a SparkSession, which is the entry point to any Spark functionality

# Initialize Faker
fake = Faker()

# Generate synthetic product data, this would be the raw data, extract phase, ingestion phase.
# can be replaced with data warehouse or data lake creation credentials which would means we skip the extract phase
def generate_synthetic_product_data(num_records=1000):
    data = []
    product_ids = [fake.uuid4() for _ in range(num_records)]
    for product_id in product_ids:
        data.append({
            'product_id': product_id,
            'product_name': fake.word(),
            'product_category': random.choice(['Electronics', 'Clothing', 'Books', 'Home & Kitchen']),
            'product_price': round(random.uniform(10.0, 1000.0), 2),
            'product_description': fake.sentence(),
        })
    return data, product_ids

# Generate synthetic payment data
def generate_synthetic_payment_data(num_records=1000, product_ids=None):
    if product_ids is None or len(product_ids) < num_records:
        raise ValueError("Not enough product IDs provided")
    
    data = []
    for _ in range(num_records): 
        data.append({
            'transaction_id': fake.uuid4(),
            'customer_id': fake.uuid4(),
            'customer_name': fake.name(),
            'payment_amount': round(random.uniform(10.0, 1000.0), 2),
            'payment_method': random.choice(['Credit Card', 'Debit Card', 'PayPal', 'Bank Transfer']),
            'transaction_date': fake.date_this_year(),
            'country': fake.country(),
            'product_id': random.choice(product_ids),
        })
    return data

# Generate 1000 records of product data first
product_data, product_ids = generate_synthetic_product_data(1000)

# Convert product data to Pandas DataFrame before converting to Spark DataFrame
product_df = pd.DataFrame(product_data)
# Convert to Spark DataFrame
spark_product_df = spark.createDataFrame(product_df)

# Generate 1000 records of payment data using the product IDs
payment_data = generate_synthetic_payment_data(1000, product_ids)

# Convert payment data to Pandas DataFrame before converting to Spark DataFrame
payment_df = pd.DataFrame(payment_data)
# Convert to Spark DataFrame
spark_payment_df = spark.createDataFrame(payment_df)

# Repartition the DataFrames so that Spark can handle them efficiently
spark_product_df = spark_product_df.repartition(4)
spark_payment_df = spark_payment_df.repartition(4)

# Display schemas of both DataFrames for verification
spark_product_df.printSchema()
spark_payment_df.printSchema()

# Show the first few rows of both DataFrames for verification
spark_product_df.show(5)
spark_payment_df.show(5)


root
 |-- product_id: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- product_price: double (nullable = true)
 |-- product_description: string (nullable = true)

root
 |-- transaction_id: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- payment_amount: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- transaction_date: date (nullable = true)
 |-- country: string (nullable = true)
 |-- product_id: string (nullable = true)

+--------------------+------------+----------------+-------------+--------------------+
|          product_id|product_name|product_category|product_price| product_description|
+--------------------+------------+----------------+-------------+--------------------+
|1d2fd6dc-403f-427...|        baby|     Electronics|       157.74|Third develop guy...|
|6b615674-ee9f-44c...|       order|        Clothing| 

In [None]:
# Display the first 10 records
display(spark_payment_df.limit(10))


In [2]:
# check data types
spark_payment_df.dtypes
# double is a float in pyspark

[('transaction_id', 'string'),
 ('customer_id', 'string'),
 ('customer_name', 'string'),
 ('payment_amount', 'double'),
 ('payment_method', 'string'),
 ('transaction_date', 'date'),
 ('country', 'string'),
 ('product_id', 'string')]

In [3]:

# Feature Engineering
from pyspark.sql.functions import year, month, dayofweek, datediff, current_date, when, col, avg, sum, row_number, desc
from pyspark.sql.window import Window # Window is used for window functions and we need lag function for days since last transaction
from pyspark.sql.functions import lag

# 1. Extract time-based features. for each row, extract the year, month, and day of the week
spark_payment_df = spark_payment_df.withColumn("transaction_year", year("transaction_date"))
spark_payment_df = spark_payment_df.withColumn("transaction_month", month("transaction_date"))
spark_payment_df = spark_payment_df.withColumn("transaction_day_of_week", dayofweek("transaction_date"))

# 2. Calculate days since last transaction (for each customer), so we can see if the customer is active or not
window_spec = Window.partitionBy("customer_id").orderBy("transaction_date") # Window specification for the window function
spark_payment_df = spark_payment_df.withColumn("days_since_last_transaction", 
                                               datediff("transaction_date", lag("transaction_date").over(window_spec))) # lag is the previous row by partition via window specification

# 3. Create a binary feature for high-value transactions, so we can see if the transaction is a high value transaction
avg_payment = spark_payment_df.select(avg("payment_amount")).first()[0]
spark_payment_df = spark_payment_df.withColumn("is_high_value_transaction", 
                                               when(col("payment_amount") > avg_payment, 1).otherwise(0))

# 4. Calculate customer lifetime value (CLV), this is the total amount of money a customer will spend on the business, it implies customer loyalty
clv_df = spark_payment_df.groupBy("customer_id").agg(sum("payment_amount").alias("customer_lifetime_value"))
spark_payment_df = spark_payment_df.join(clv_df, on="customer_id", how="left")

# 5. Create a binary feature for preferred payment method, this is to see if the customer is using their preferred payment method
# we extract it by grouping the payment method by customer and ordering it by the count of the payment method, then we take the first row, which gives us the most used payment method
preferred_payment_method = spark_payment_df.groupBy("customer_id", "payment_method").count() \
                                           .withColumn("rank", row_number().over(Window.partitionBy("customer_id").orderBy(desc("count")))) \
                                           .filter(col("rank") == 1) \
                                           .select("customer_id", "payment_method").withColumnRenamed("payment_method", "preferred_payment_method")
spark_payment_df = spark_payment_df.join(preferred_payment_method, on="customer_id", how="left") \
                                   .withColumn("is_preferred_payment_method", 
                                               when(col("payment_method") == col("preferred_payment_method"), 1).otherwise(0))



In [4]:
#strip time from transaction_date
spark_payment_df = spark_payment_df.withColumn("transaction_date", col("transaction_date").cast(DateType()))

In [5]:
#check values in tx date
spark_payment_df.select("transaction_date").show()

+----------------+
|transaction_date|
+----------------+
|      2024-05-12|
|      2024-08-05|
|      2024-02-27|
|      2024-06-22|
|      2024-07-31|
|      2024-09-23|
|      2024-10-12|
|      2024-06-15|
|      2024-01-09|
|      2024-01-11|
|      2024-04-27|
|      2024-07-07|
|      2024-03-01|
|      2024-04-29|
|      2024-08-14|
|      2024-07-08|
|      2024-07-26|
|      2024-10-03|
|      2024-03-10|
|      2024-06-25|
+----------------+
only showing top 20 rows



In [6]:
#check product data
spark_product_df.show()

+--------------------+------------+----------------+-------------+--------------------+
|          product_id|product_name|product_category|product_price| product_description|
+--------------------+------------+----------------+-------------+--------------------+
|1d2fd6dc-403f-427...|        baby|     Electronics|       157.74|Third develop guy...|
|6b615674-ee9f-44c...|       order|        Clothing|       714.54|Collection up for...|
|aa8192f6-9eed-40d...|     believe|     Electronics|       550.49|Admit ball war sh...|
|1ffd70aa-3e7b-4c8...|        trip|     Electronics|        801.9|Understand if the...|
|6ce3e2a9-8533-4bf...|     history|           Books|       208.92|Travel she alone ...|
|beda3173-208e-440...|     feeling|  Home & Kitchen|       231.41|Manager lawyer po...|
|8c480eb5-dda6-4b3...|        talk|     Electronics|       519.46|Wind including di...|
|0ca3b4c2-791c-453...|        thus|        Clothing|        123.5|Discuss picture o...|
|e86883ca-d671-4af...|      myse