In [2]:
import os

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType

In [3]:
path_wd   = os.getcwd() 
path_root = path_wd.rsplit("\\",2)[0]

path_data = path_root + '\\data\\'

fname = path_data + 'Task 1 - Data_Eng_Spark_Test_07072023.csv'
fname_out = path_data + 'Task 1 - Data_Eng_Spark_Test_07072023_processed.csv'

In [4]:
# Setting up Spark session
spark = SparkSession.builder \
    .appName("ETLProcess") \
    .getOrCreate()


In [5]:
# Loading raw data
raw_data = spark.read.csv(fname, header=True, inferSchema=True)

Analyze

In [6]:
percentiles = [0, 25, 50, 75, 100]
percentile_values = raw_data.approxQuantile("amount", [float(p) / 100 for p in percentiles], 0.001)
percentiles_dict = {f"{p}th Percentile": value for p, value in zip(percentiles, percentile_values)}

print("Percentiles for 'amount':")
for percentile, value in percentiles_dict.items():
    print(f"{percentile}: {value}")

Percentiles for 'amount':
0th Percentile: 0.62
25th Percentile: 137.9
50th Percentile: 1853.0
75th Percentile: 10171.7
100th Percentile: 140813.5


In [7]:
median_amount = raw_data.approxQuantile("amount", [0.5], 0.001)[0]
mean_amount = raw_data.agg(F.mean("amount")).collect()[0][0]

print("Median Amount:", median_amount)
print("Mean Amount:", mean_amount)

Median Amount: 1853.0
Mean Amount: 6461.863049024913


Features

In [8]:
# Time-based

# Extracting day of the week and month from transaction_ts
raw_data = raw_data.withColumn("transaction_day_of_week", F.dayofweek("transaction_ts"))
raw_data = raw_data.withColumn("transaction_month", F.month("transaction_ts"))

# Calculating time elapsed since customer enrollment
raw_data = raw_data.withColumn("time_elapsed_since_enroll", F.datediff("transaction_ts", "customer_enroll_ts"))

In [9]:
# Binning transaction amounts
bin_edges = [137, 1853, 10171, float("inf")]  # Define bin edges
bin_labels = ["low", "medium", "high"]  # Define bin labels
raw_data = raw_data.withColumn("amount_bin", 
    F.when(F.col("amount") <= bin_edges[0], bin_labels[0])
     .when((F.col("amount") > bin_edges[0]) & (F.col("amount") <= bin_edges[1]), bin_labels[1])
     .otherwise(bin_labels[2]))

In [10]:
# Calculating customer-level agg
customer_aggregations = raw_data.groupBy("customer_enroll_ts").agg(
    F.count("transaction_id").alias("transaction_count"),
    F.mean("amount").alias("mean_amount_per_customer"),
    F.sum("amount").alias("total_amount_per_customer"),
)
raw_data = raw_data.join(customer_aggregations, on="customer_enroll_ts", how="left")

In [11]:
# Draft threshold
amount_outlier_threshold = 10171
raw_data = raw_data.withColumn("amount_outlier", F.when(F.col("amount") > amount_outlier_threshold, 1).otherwise(0))

In [12]:
raw_data.show()

+-------------------+--------------+-------------------+--------+--------------------+--------------------+------+-----------------------+-----------------+-------------------------+----------+-----------------+------------------------+-------------------------+--------------+
| customer_enroll_ts|transaction_id|     transaction_ts|  amount|           attr_name|          attr_value|target|transaction_day_of_week|transaction_month|time_elapsed_since_enroll|amount_bin|transaction_count|mean_amount_per_customer|total_amount_per_customer|amount_outlier|
+-------------------+--------------+-------------------+--------+--------------------+--------------------+------+-----------------------+-----------------+-------------------------+----------+-----------------+------------------------+-------------------------+--------------+
|2022-03-21 03:35:50|           171|2022-03-21 03:35:50|   35.74|     external.result|                   A|     0|                      2|                3|          

In [12]:
# Outputing file
raw_data.write.csv(fname_out, header=True, mode="overwrite")