# Recommendation System from Transaction History

The purpose of this notebook is to create a recommendation system from transaction history. The completed model will allow users to obtain suggested items for each user presented in the database. The data is obtained from Kaggle on the following URL: https://www.kaggle.com/datasets/vipin20/transaction-data

The program used Spark under Python API as the engine to compute a relatively big volume of data (over 1 million records). However, Spark is only experienced on local mode in this notebook, so it cannot utilize the power of distributed system fully and, therefore, is limited in computationally intensive processes.

SparkSession is imported to start programming with Spark.

In [1]:
from pyspark.sql import SparkSession

## 1. CSV File Reading

pyspark.sql.types is imported to contruct dataframe schema manually instead of setting using inferSchema.

In [2]:
from pyspark.sql.types import *

In [3]:
transaction_schema = StructType([StructField('UserID', StringType(), True),
                                StructField('TransactionID', StringType(), True),
                                StructField('TransactionTime', StringType(), True),
                                StructField('ItemCode', StringType(), True),
                                StructField('ItemDescription', StringType(), True),
                                StructField('NumberOfItemPurchased', IntegerType(), True),
                                StructField('CostPerItem', DoubleType(), True),
                                StructField('Country', StringType(), True)])

In [4]:
spark = (SparkSession
 .builder
 .appName("Transaction")
 .getOrCreate())

In [5]:
df = spark.read.csv('transaction_data.csv', header = True, schema = transaction_schema)

After inspection, it is reasonable to conclude that "-1", "?", and "??" are used to represent null values in selected columns in the dataset.

In [6]:
df = df.replace("-1",None,['UserID','TransactionID', 'TransactionTime','ItemCode'])
df = df.replace(["?", "??"],None,['ItemDescription','Country'])

In [7]:
from pyspark.sql.functions import lower, col, to_timestamp

In the dataset, ItemDescription is mostly recorded in uppercase. However, to maintain uniformity, the column is transformed into lowercase. TransactionTime is also brought to the correct data type, timestamp.

In [8]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

df = df.withColumn("ItemDescription", lower(col('ItemDescription')))
df = df.withColumn("TransactionTime", to_timestamp(col('TransactionTime'), "EEE MMM dd HH:mm:ss zzz yyyy"))

## 2. Null Handling

In [9]:
from pyspark.sql.functions import when, count

df.select([(count(when(col(c).isNull(), c))/df.count()).alias(c) for c in df.columns]).show() 

+-----------------+-------------+---------------+-------------------+--------------------+---------------------+-----------+-------+
|           UserID|TransactionID|TransactionTime|           ItemCode|     ItemDescription|NumberOfItemPurchased|CostPerItem|Country|
+-----------------+-------------+---------------+-------------------+--------------------+---------------------+-----------+-------+
|0.249266943342886|          0.0|            0.0|0.00515953785598689|0.002782755038207522|                  0.0|        0.0|    0.0|
+-----------------+-------------+---------------+-------------------+--------------------+---------------------+-----------+-------+



When the null property of the dataset is studied, it is easy to see that there is a high proportion of UserID left unrecorded. As the aim of the notebook is to study the behavior of buyers, unknown users will disturb the process greatly. Imputation of this feature may introduce unwanted biases; therefore, these rows will be isolated from the dataset to study further. 

As ItemCode and ItemDescription can be related, their relationship is considered.

In [10]:
from pyspark.sql.functions import countDistinct
df.groupBy('ItemCode').agg(countDistinct('ItemDescription')).show()

+--------+-------------------------------+
|ItemCode|count(DISTINCT ItemDescription)|
+--------+-------------------------------+
|  443604|                              1|
|  493542|                              2|
|  456057|                              1|
|  492618|                              1|
| 1529052|                              1|
|  455070|                              1|
|  449841|                              2|
|  467271|                              1|
|  489468|                              1|
| 1893108|                              1|
|  485457|                              2|
|  493122|                              1|
|  463554|                              1|
|  434007|                              1|
|  458388|                              1|
| 1660407|                              1|
|  494277|                              2|
|    1512|                              0|
|  489993|                              1|
|  472143|                              1|
+--------+-

From display, we can see that there are several ItemCode having more than 1 definitions or not at all. Therefore, it is not possible to map the ItemCode to ItemDesctiption and vice versa. As the proportion of blank records are low, they can be ignored in the following process.

In [11]:
new_df = df.na.drop()

In [12]:
null_rows = df.filter(col('UserID').isNull() | col('ItemCode').isNull() | col('ItemDescription').isNull())

## 3. Data Accuracy Handling

The summary of two numerical columns are displayed.

In [13]:
new_df.select('NumberOfItemPurchased', 'CostPerItem').summary().show()

+-------+---------------------+------------------+
|summary|NumberOfItemPurchased|       CostPerItem|
+-------+---------------------+------------------+
|  count|               810086|            810086|
|   mean|    36.30636747209555| 8.218449646086775|
| stddev|    747.5887214570251|2665.3132368669712|
|    min|              -242985|               0.0|
|    25%|                    6|              1.73|
|    50%|                   15|               2.7|
|    75%|                   36|              5.18|
|    max|               242985|        1696285.44|
+-------+---------------------+------------------+



In [14]:
new_df.filter(col('CostPerItem') == 0).count()

66

As we can see, the minimum value of CostPerItem is 0. The total records of this value is incosiderable compared to the dataset. Although, the CostPerItem being 0 may be caused by several reasons such as gift, vouchers, etc., their exact drives are not clear, and these numbers may introduce unexpected results later in the transformation. Therefore, these recorded are excluded from the dataset. 

In [15]:
new_df = new_df.filter(col('CostPerItem') > 0)

Another noticable trait is the nagative value of NumberOfItemPurchased.

In [16]:
new_df.where((col('NumberOfItemPurchased') < 0)).show()

+------+-------------+-------------------+--------+--------------------+---------------------+-----------+--------------+
|UserID|TransactionID|    TransactionTime|ItemCode|     ItemDescription|NumberOfItemPurchased|CostPerItem|       Country|
+------+-------------+-------------------+--------+--------------------+---------------------+-----------+--------------+
|300909|      6015757|2018-05-29 22:14:00|  466452|four hook  white ...|                   -3|        2.9|United Kingdom|
|319683|      6036228|2018-06-16 16:28:00|  470883|regency cakestand...|                  -24|       17.6|United Kingdom|
|321531|      5925150|2018-02-24 20:05:00|  446418|victorian sewing ...|                   -3|      15.12|United Kingdom|
|260715|      6165940|2018-09-30 16:26:00|  488061|treasure tin gymk...|                   -3|       2.88|     Australia|
|274869|      6004240|2018-05-19 17:35:00|  470883|regency cakestand...|                  -15|      15.12|United Kingdom|
|331989|      6319203|20

These numbers can be interpreted as returned orders. To present the status of the transaction, TransactionCancel is added to detail this information.

In [17]:
new_df = new_df.withColumn('TransactionCancel', (col('NumberOfItemPurchased') < 0).cast('integer'))

## 4. Duplication Handling

Duplicated recorded are removed from the dataset.

In [18]:
new_df = new_df.dropDuplicates()

## 5. Outlier Handiling

In this dataset, the outliers are detected mostly in numerical columns using Interquartile Range. Even though this is a straightforward method, a more complex procedure (Clustering, Isolation Forest, etc.) may assure a better detection of these points.

In [19]:
def iqr_outlier_treatment(dataframe, column, factor=1.5):
    quantiles = dataframe.approxQuantile(column, [0.25, 0.75], 0.01)
    q1, q3 = quantiles[0], quantiles[1]
    iqr = q3 - q1

    # Define the upper and lower bounds for outliers
    lower_bound = q1 - factor * iqr
    upper_bound = q3 + factor * iqr

    # Filter outliers and update the DataFrame
    dataframe = dataframe.filter((col(column) >= lower_bound) & (col(column) <= upper_bound))

    return dataframe

There are several different ways to treat outliers, including transformation and imputation. In this notebook, these points are removed as they are inconsiderable.

In [20]:
new_df = iqr_outlier_treatment(new_df, 'NumberOfItemPurchased')
new_df = iqr_outlier_treatment(new_df, 'CostPerItem')

## 6. Exploration of ItemCode in each month

In [21]:
new_df = new_df.withColumn('Inventory Value', col('NumberOfItemPurchased') * col('CostPerItem'))

In [22]:
from pyspark.sql.functions import month, year, sum

new_df.groupBy('ItemCode', month('TransactionTime').alias('month'), year('TransactionTime').alias('year')).agg(sum('NumberOfItemPurchased').alias('TotalPurchasedItems'), sum('Inventory Value').alias('Total Prices')).show()

+--------+-----+----+-------------------+------------------+
|ItemCode|month|year|TotalPurchasedItems|      Total Prices|
+--------+-----+----+-------------------+------------------+
|  482349|   10|2018|               1044|           2098.44|
|  476910|    5|2018|                300|             174.0|
|  451878|    4|2018|                111|             64.38|
|  481131|    4|2018|                144|            587.52|
|  491883|    1|2019|                111|            574.98|
|  493815|    1|2019|                333|            959.04|
|  471891|    2|2019|                 75|363.96000000000004|
|  469455|    6|2018|                663|            782.34|
|  485268|   11|2018|                330|           2676.54|
|  463575|    2|2018|                294|            650.88|
|  467103|    5|2018|                444|1012.3199999999999|
|  470085|   11|2018|               1335|3820.7999999999997|
|  456855|    9|2018|                345|2798.6400000000003|
|  480984|    5|2018|   

## 7. Exploration of Users Transaction Behaviors throughout 30 days

In [23]:
new_df.createOrReplaceTempView("df")

new_df = spark.sql(
    """SELECT *, sum(NumberOfItemPurchased) OVER (
        PARTITION BY UserID
        ORDER BY TransactionTime
        RANGE BETWEEN INTERVAL 29 DAYS PRECEDING AND CURRENT ROW
     ) AS NumberOfItemsPurchased_30days
     FROM df""")
new_df.show()

+------+-------------+-------------------+--------+--------------------+---------------------+-----------+-----------+-----------------+-----------------+-----------------------------+
|UserID|TransactionID|    TransactionTime|ItemCode|     ItemDescription|NumberOfItemPurchased|CostPerItem|    Country|TransactionCancel|  Inventory Value|NumberOfItemsPurchased_30days|
+------+-------------+-------------------+--------+--------------------+---------------------+-----------+-----------+-----------------+-----------------+-----------------------------+
|259455|      6076939|2018-07-21 17:49:00| 1528842|rose scent candle...|                   18|       5.87|    Bahrain|                0|           105.66|                          240|
|259455|      6076939|2018-07-21 17:49:00|  476679|roses regency tea...|                   18|       4.08|    Bahrain|                0|            73.44|                          240|
|259455|      6076939|2018-07-21 17:49:00| 1528842|vanilla scent can...|   

## 8. Recommendation System

There are various approaches towards Recommendation Systems for this dataset: Collaborative Filtering, Content-Based Filtering, Customer Segmentation, or Deep Learning Approaches to name a few. However, the primary model for this notebook is Alternating Least Squares (ALS) available in PySpark's MLlib to perform Collaborative Filtering, using user-item interactions to recommend items that similar users have purchased.

This approach is efficient because:
* ALS is designed to work well with sparse data, which is common in recommendation systems.
* It's implemented in PySpark, so it can take advantage of distributed computing when it can.
* It doesn't require extensive feature engineering.

In [24]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

The number of total purchased items will be used as ratings in ALS. As these numbers can be smaller than 0, which is not ideal for ALS, negative values are replaced with 0.1 to present a really small number.

In [25]:
# Prepare the data

als_data = new_df.groupby("UserID","ItemCode").agg(sum("NumberOfItemPurchased").alias("TotalPurchased"))
als_data = als_data.select(
    col("UserID").cast("integer"),
    col("ItemCode").cast("integer"),
    col("TotalPurchased").cast("float")
)

# Adjust ratings based on Total Purchased Items
als_data = als_data.withColumn(
    "AdjustedRating",
    when(col("TotalPurchased") > 0, col("TotalPurchased"))
    .otherwise(0.1)  # Small positive value for cancelled transactions
)

als_data.show()

+------+--------+--------------+--------------+
|UserID|ItemCode|TotalPurchased|AdjustedRating|
+------+--------+--------------+--------------+
|294840|  456855|           9.0|           9.0|
|287889|  485163|         108.0|         108.0|
|337029|  486297|           6.0|           6.0|
|325353| 1734600|           9.0|           9.0|
|313131|  451878|          36.0|          36.0|
|283437|  474432|          12.0|          12.0|
|274659|  442344|         150.0|         150.0|
|297276|  475881|         120.0|         120.0|
|329091|  474201|          36.0|          36.0|
|340284| 1764609|          24.0|          24.0|
|373653|  489678|           3.0|           3.0|
|282555|  452319|           6.0|           6.0|
|277452|  490371|           3.0|           3.0|
|375543|  490392|           6.0|           6.0|
|342930|  489321|           3.0|           3.0|
|346983|  494886|          36.0|          36.0|
|268422|  481383|          36.0|          36.0|
|378504|  444465|          24.0|        

The dataset is splited into training and test set.

In [26]:
# Split the data
(training, test) = als_data.randomSplit([0.8, 0.2])

Due to limit in computational power, parameter maxIter is set to small number (12) and coldStartStrategy is set to drop.

In [28]:
# Build the recommendation model
als = ALS(maxIter=12, regParam=0.8, userCol="UserID", itemCol="ItemCode",
          ratingCol="AdjustedRating", coldStartStrategy="drop")

model = als.fit(training)

Root Mean Squared Error is used to evaluate the performance of the model as the ratings are the target prediction.

In [29]:
# Evaluate the model
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="AdjustedRating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print(f"ALS RMSE: {rmse}")

ALS RMSE: 38.58740921112539


Sample recommendation are calculated for some of the test users.

In [33]:
# Generate recommendations
user_recs = model.recommendForUserSubset(test.select('UserID').distinct(),5)

# Show some recommendations
print("Sample recommendations:")
user_recs.show(truncate = False)

Sample recommendations:
+------+----------------------------------------------------------------------------------------------------------------+
|UserID|recommendations                                                                                                 |
+------+----------------------------------------------------------------------------------------------------------------+
|259350|[{434028, 201.45451}, {1776516, 160.93117}, {1071294, 124.70993}, {466137, 111.27782}, {451899, 102.37344}]     |
|259434|[{1894494, 160.00542}, {358764, 137.56435}, {1768452, 123.26226}, {997878, 122.450905}, {487893, 119.37599}]    |
|259455|[{1071294, 75.6262}, {1782459, 75.436844}, {1783845, 73.60211}, {476553, 71.720436}, {316176, 65.726814}]       |
|259665|[{434028, 217.74918}, {1776516, 208.9491}, {1071294, 118.79203}, {357252, 106.77239}, {736995, 99.60228}]       |
|259770|[{358764, 139.78383}, {1783446, 135.78299}, {458430, 128.63123}, {468279, 118.69232}, {1777125, 117.97449}]     |


As we can see, the RMSE for the model is considerably large. To optimize the model, the number of iterations can be increased or regularization factor can be risen.

As the number of features in this dataset is small, feature extraction can be performed to to increase the information to be used in the model. When the database is expanded further, more context can be included. For example, user profile can be added to study users' preferences, TransactionTime can be used to associate with seasonal products, or Country category can set as background information.

## Stopping the Session

In [34]:
spark.stop()