<a href="https://colab.research.google.com/github/guipantiga/CRMAnalysis/blob/main/Collaborative_Filtering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Customer Relationship Management (CRM) - Collaborative Filtering using Spark

Collaborative filtering (CF) is a technique used by recommender systems. Collaborative filtering has two senses, a narrow one and a more general one.

In the newer, narrower sense, collaborative filtering is a method of making automatic predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption of the collaborative filtering approach is that if a person A has the same opinion as a person B on an issue, A is more likely to have B's opinion on a different issue than that of a randomly chosen person.

### About

*   Data source: https://www.kaggle.com/datasets/jihyeseo/online-retail-data-set-from-uci-ml-repo



In [1]:
%%capture
!pip install pyspark

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
import plotly.express as px

In [3]:
# Import
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.dataframe import DataFrame
from pyspark.ml.feature import QuantileDiscretizer, Bucketizer
from pyspark.sql import SparkSession
# Create a Spark Session
spark = SparkSession.builder.master("local").appName("Colab").getOrCreate()
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

# Check Spark Session Information
spark

In [4]:
rawDF = spark.read.csv('/content/data.csv', encoding="ISO-8859-1", header=True)

In [5]:
# Begin the previous treatment for Online Retail
# Filtering
# Cleaning and formating
# Renaming Columns
# Creating Columns
# Aggregating

df = (rawDF
      .filter(
          (F.col("CustomerID").isNotNull())
          & (F.col('Quantity') > 0)
          & (F.col('UnitPrice') > 0)
          & (F.col('InvoiceDate').isNotNull())
          & (F.col('Description').isNotNull())
          & (F.col('Description') != " ")
          & (F.trim(F.regexp_replace(F.col('StockCode'), '[^a-zA-Z]+', '')) == "")
          )
      .withColumn('Description', F.upper(F.trim(F.regexp_replace('Description', '[^a-zA-Z0-9]+', ' '))))
      .withColumn('transactionDate',
                  F.date_format(
                      F.to_date(
                          F.lpad(F.col('InvoiceDate').substr(1,10),10,'0'), 'MM/dd/yyyy')
                      , 'yyyy-MM-dd'
                      )
                  )
      .select(
          F.col('CustomerID').cast('bigint').alias('customerID'),
          F.col('Country').cast('string').alias('additionalSegmentation'),
          F.col('InvoiceNo').cast('string').alias('invoiceNo'),
          F.col('Description').cast('string').alias('productDescription'),
          F.col('transactionDate'),
          F.col('Quantity').cast('bigint').alias('productQuantity'),
          F.col('UnitPrice').cast('double').alias('productPrice'),
          )
      .withColumn("monetaryValue",(F.col('productPrice')*F.col('productQuantity')).cast('decimal(19,2)'))
      .groupBy('customerID','additionalSegmentation','invoiceNo','productDescription','transactionDate')
      .agg(
          F.sum('productQuantity').cast('bigint').alias('productQuantityValue'),
          F.sum('productPrice').cast('double').alias('productPriceValue'),
          F.sum('monetaryValue').cast('double').alias('monetaryValue'),
          )
      # Separate dates into Year, month and days
      .withColumn("transactionDateYear", F.date_format(F.col("transactionDate"), "Y"))
      .withColumn("transactionDateMonth", F.date_format(F.col("transactionDate"), "MM"))
      .withColumn("transactionDateDay", F.date_format(F.col("transactionDate"), "dd"))
      .filter(
          (F.col('productDescription').isNotNull())
          & (~F.col('productDescription').isin(""," "))
          )
      ).repartition(20).sortWithinPartitions(['customerID','transactionDate'])

# Implicit value

Note that we dont have a explicit value such as the "like" buttom on facebook, so I am going to create one based on the assumptions I made about the dataset. This is very important because it's going to change dramatically the outcome, but it also gives the business expert the flexibility to input the value that is mos important.

In [27]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import MinMaxScaler

# So here I am using the monetary value as the ranking.

dfDescription = (df
                 .groupBy('productDescription').agg(F.avg('monetaryValue').alias('monetaryValue'))
                 )

assembler = VectorAssembler(
    inputCols=["monetaryValue"],
    outputCol="monetaryValueAssembler")

dfDescription = assembler.transform(dfDescription)

scaler = MinMaxScaler(inputCol="monetaryValueAssembler", outputCol="rankedScaled")

# Compute summary statistics and generate MinMaxScalerModel
scalerModel = scaler.fit(dfDescription)

# rescale each feature to range [min, max].
dfDescription = scalerModel.transform(dfDescription)

# Be aware that if this list of products changes, youre going to need to run all that again
dfDescription = (dfDescription.withColumn('productID', F.row_number()
         .over(Window
               .orderBy("productDescription")
              )))

In [29]:
from pyspark.ml.linalg import Vectors
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import lit, udf
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

def ith_(v, i):
    try:
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

dfCollab = (df
            .join(
                dfDescription.select('productDescription','productID',ith("rankedScaled", lit(0)).cast('decimal(19,5)').alias('rankID'))
                , on = ['productDescription']
                , how = 'inner'
                )
            .withColumn('userID', F.col('customerID').cast('int'))
            )

(training, test) = dfCollab.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="productID", ratingCol="rankID", implicitPrefs=True,
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rankID",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 3 product recommendations for each user
userRecs = model.recommendForAllUsers(3)
# Generate top 3 user recommendations for each product
productRecs = model.recommendForAllItems(3)

# Generate top 10 product recommendations for a specified set of users
users = dfCollab.select(als.getUserCol()).distinct().limit(10)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of product
products = dfCollab.select(als.getItemCol()).distinct().limit(10)
productsSubSetRecs = model.recommendForItemSubset(products, 10)

Root-mean-square error = 2.0634193690818936


In [30]:
userRecs

DataFrame[userID: int, recommendations: array<struct<productID:int,rating:float>>]

In [45]:
# Lets get the first item that we recommend for each customer
transformedDf = (userRecs
                 .withColumn("recommendedStruct", F.col('recommendations').getItem(0))
                 .select('userID',
                         'recommendedStruct',
                         F.col("recommendedStruct.productID").alias("productID"),
                         )
                 .join(
                    dfDescription
                    , on = ['productID']
                    , how = 'left'
                 )
                 .select(F.col('userID').alias('customerID'),F.col('productDescription').alias('recommendedProduct'))
                 )

In [47]:
transformedDf.show(10, False)

+------+------------------------------+
|userID|recommendedProduct            |
+------+------------------------------+
|12346 |CHILLI LIGHTS                 |
|12347 |REGENCY CAKESTAND 3 TIER      |
|12348 |PAPER CHAIN KIT 50 S CHRISTMAS|
|12349 |REGENCY CAKESTAND 3 TIER      |
|12350 |HAND OVER THE CHOCOLATE SIGN  |
|12352 |REGENCY CAKESTAND 3 TIER      |
|12353 |GIN TONIC DIET METAL SIGN     |
|12354 |LUNCH BAG RED RETROSPOT       |
|12355 |REGENCY CAKESTAND 3 TIER      |
|12356 |REGENCY CAKESTAND 3 TIER      |
+------+------------------------------+
only showing top 10 rows

