# PySpark para Filtragem Colaborativa com dados Implicitos
## ALS

In [0]:
import pandas as pd
from pyspark.sql.functions import col, explode
from pyspark import SparkContext

In [0]:
from pyspark.sql import SparkSession
sc = SparkContext
spark = SparkSession.builder.appName('Recommendations').getOrCreate()

In [0]:
compras = spark\
        .read.format("csv")\
        .option("inferSchema", "True")\
        .option("header", "True")\
        .csv("/FileStore/tables/recomendacao/retail.csv")

In [0]:
compras.show(10)

+---+----------+---------+--------------------+--------+
|_c0|CustomerID|StockCode|         Description|Quantity|
+---+----------+---------+--------------------+--------+
|  0|   17850.0|   85123A|WHITE HANGING HEA...|       6|
|  1|   17850.0|    71053| WHITE METAL LANTERN|       6|
|  2|   17850.0|   84406B|CREAM CUPID HEART...|       8|
|  3|   17850.0|   84029G|KNITTED UNION FLA...|       6|
|  4|   17850.0|   84029E|RED WOOLLY HOTTIE...|       6|
|  5|   17850.0|    22752|SET 7 BABUSHKA NE...|       2|
|  6|   17850.0|    21730|GLASS STAR FROSTE...|       6|
|  7|   17850.0|    22633|HAND WARMER UNION...|       6|
|  8|   17850.0|    22632|HAND WARMER RED P...|       6|
|  9|   13047.0|    84879|ASSORTED COLOUR B...|      32|
+---+----------+---------+--------------------+--------+
only showing top 10 rows



In [0]:
compras.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)



In [0]:
compras = compras.\
    withColumn('CustomerID', col('CustomerID').cast('integer'))

In [0]:
# Conta o número total de compras no dataset
numerador = compras.select("Quantity").count()

# Conta o número de usuários e filmes distintos 
num_users = compras.select("CustomerID").distinct().count()
num_movies = compras.select("StockCode").distinct().count()

# O denominador vai ser o produto do número de filmes pelo número de usuários únicos
denominador = num_users * num_movies

# Divide o numerador pelo denominador
sparsity = (1.0 - (numerador *1.0)/denominador)*100
print("Esse dataframe é ", "%.2f" % sparsity + "% empty.")

Esse dataframe é  96.96% empty.


In [0]:
item_code = compras.select('StockCode').distinct().coalesce(1)

In [0]:
from pyspark.sql.functions import col, monotonically_increasing_id

In [0]:
item_code = item_code.withColumn('item_id', monotonically_increasing_id()).persist()

In [0]:
item_code.show(5)

+---------+-------+
|StockCode|item_id|
+---------+-------+
|    22728|      0|
|    21889|      1|
|   90210B|      2|
|    21259|      3|
|    21894|      4|
+---------+-------+
only showing top 5 rows



In [0]:
data = compras.join(item_code, how='inner', on='StockCode')

In [0]:
data.show(10)

+---------+---+----------+--------------------+--------+-------+
|StockCode|_c0|CustomerID|         Description|Quantity|item_id|
+---------+---+----------+--------------------+--------+-------+
|   85123A|  0|     17850|WHITE HANGING HEA...|       6|   3269|
|    71053|  1|     17850| WHITE METAL LANTERN|       6|    193|
|   84406B|  2|     17850|CREAM CUPID HEART...|       8|   2819|
|   84029G|  3|     17850|KNITTED UNION FLA...|       6|    280|
|   84029E|  4|     17850|RED WOOLLY HOTTIE...|       6|   1968|
|    22752|  5|     17850|SET 7 BABUSHKA NE...|       2|   2123|
|    21730|  6|     17850|GLASS STAR FROSTE...|       6|   3816|
|    22633|  7|     17850|HAND WARMER UNION...|       6|    145|
|    22632|  8|     17850|HAND WARMER RED P...|       6|    991|
|    84879|  9|     13047|ASSORTED COLOUR B...|      32|   1100|
+---------+---+----------+--------------------+--------+-------+
only showing top 10 rows



In [0]:
data = data.na.drop()

In [0]:
data.count()

Out[31]: 406829

In [0]:
users = data.select("CustomerID").distinct()

In [0]:
itens = data.select("item_id").distinct()

In [0]:
cross_join = users.crossJoin(itens).join(data, ["CustomerID", "item_id"], "left").fillna(0)

In [0]:
cross_join.show(10)

+----------+-------+---------+----+-----------+--------+
|CustomerID|item_id|StockCode| _c0|Description|Quantity|
+----------+-------+---------+----+-----------+--------+
|     15100|    193|     null|null|       null|    null|
|     15100|    991|     null|null|       null|    null|
|     15100|   3816|     null|null|       null|    null|
|     15100|   1100|     null|null|       null|    null|
|     15100|   2123|     null|null|       null|    null|
|     15100|   2819|     null|null|       null|    null|
|     15100|    145|     null|null|       null|    null|
|     15100|   1968|     null|null|       null|    null|
|     15100|    280|     null|null|       null|    null|
|     15100|   2998|     null|null|       null|    null|
+----------+-------+---------+----+-----------+--------+
only showing top 10 rows



#Construindo ALS

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

In [0]:
# Create test and train set
(train, test) = data.randomSplit([0.8, 0.2], seed = 1234)

# Create ALS model
als = ALS(userCol="CustomerID", itemCol="item_id", ratingCol="Quantity", rank = 10, maxIter = 5, regParam = .05, alpha = 20, nonnegative = True, implicitPrefs = True, coldStartStrategy="drop")

# Confirm that a model called "als" was created
type(als)

Out[15]: pyspark.ml.recommendation.ALS

In [0]:
#Fit modelo ALS
model = als.fit(train)

In [0]:
#Gerando previsões no conjunto de teste
predictions = model.transform(test)

In [0]:
predictions.show(10)

+---------+-----+----------+--------------------+--------+-------+-----------+
|StockCode|  _c0|CustomerID|         Description|Quantity|item_id| prediction|
+---------+-----+----------+--------------------+--------+-------+-----------+
|    20685|32141|     15727|DOORMAT RED RETRO...|       1|    458|  1.0362738|
|    20717|32144|     15727|STRAWBERRY SHOPPE...|      10|   3229|  0.9386058|
|    20726|30305|     16503|  LUNCH BAG WOODLAND|      10|   2371|  0.9527782|
|    21054|56153|     16503|NURSE'S BAG SOFT TOY|       2|   1911| 0.39572912|
|    21056|56145|     16503|DOCTOR'S BAG SOFT...|       2|   3103|  0.5546424|
|    21198|55949|     15447|WHITE HEART CONFE...|      12|   3177|0.033664763|
|    21243|30315|     16503|PINK  POLKADOT PL...|       8|   3515| 0.93237853|
|    21244|30316|     16503|BLUE POLKADOT PLATE |       8|   2974| 0.87706995|
|    21558|30307|     16503|SKULL LUNCH BOX W...|       6|   2007| 0.86224145|
|    21715|56164|     16503|GIRLS VINTAGE TIN...|   

In [0]:
def ROEM(predictions, userCol = "cpf", itemCol = "cod_item", ratingCol = "qtd_efetiva"):
    #Creates table that can be queried
    predictions.createOrReplaceTempView("predictions")

    #Sum of total number of plays of all songs
    denominator = predictions.groupBy().sum(ratingCol).collect()[0][0]

    #Calculating rankings of songs predictions by user
    spark.sql("SELECT " + userCol + " , " + ratingCol + " , PERCENT_RANK() OVER (PARTITION BY " + userCol + " ORDER BY prediction DESC) AS rank FROM predictions").createOrReplaceTempView("rankings")

    #Multiplies the rank of each song by the number of plays and adds the products together
    numerator = spark.sql('SELECT SUM(' + ratingCol + ' * rank) FROM rankings').collect()[0][0]

    performance = numerator/denominator

    return performance

In [0]:
ROEM(predictions, userCol = "CustomerID", itemCol = "item_id", ratingCol = "Quantity")

Out[20]: 0.44660756991058115

In [0]:
def ROEM_cv(ratings_df, userCol = "cpf", itemCol = "cod_item", ratingCol = "qtd_efetiva", ranks = [10, 50, 100, 150, 200], maxIters = [10, 25, 50, 100, 200, 400], regParams = [.05, .1, .15], alphas = [10, 40, 80, 100]):

  from pyspark.sql.functions import rand
  from pyspark.ml.recommendation import ALS

  ratings_df = ratings_df.orderBy(rand()) #Shuffling to ensure randomness

  #Building train and validation test sets
  train, test = ratings_df.randomSplit([0.8, 0.2], seed = 20)

  #Building 5 folds within the training set.
  test1, test2, test3, test4, test5 = train.randomSplit([0.2, 0.2, 0.2, 0.2, 0.2], seed = 1)
  train1 = test2.union(test3).union(test4).union(test5)
  train2 = test3.union(test4).union(test5).union(test1)
  train3 = test4.union(test5).union(test1).union(test2)
  train4 = test5.union(test1).union(test2).union(test3)
  train5 = test1.union(test2).union(test3).union(test4)
  

  #Creating variables that will be replaced by the best model's hyperparameters for subsequent printing
  best_validation_performance = 9999999999999
  best_rank = 0
  best_maxIter = 0
  best_regParam = 0
  best_alpha = 0
  best_model = 0
  best_predictions = 0

  #Looping through each combindation of hyperparameters to ensure all combinations are tested.
  for r in ranks:
    for mi in maxIters:
      for rp in regParams:
        for a in alphas:
          #Create ALS model
          als = ALS(rank = r, maxIter = mi, regParam = rp, alpha = a, userCol=userCol, itemCol=itemCol, ratingCol=ratingCol,
                    coldStartStrategy="drop", nonnegative = True, implicitPrefs = True)

          #Fit model to each fold in the training set
          model = als.fit(train)
          model1 = als.fit(train1)
          model2 = als.fit(train2)
          model3 = als.fit(train3)
          model4 = als.fit(train4)
          model5 = als.fit(train5)

          #Generating model's predictions for each fold in the test set
          predictions1 = model1.transform(test1)
          predictions2 = model2.transform(test2)
          predictions3 = model3.transform(test3)
          predictions4 = model4.transform(test4)
          predictions5 = model5.transform(test5)

          #Expected percentile rank error metric function
          def ROEM(predictions, userCol = "cpf", itemCol = "cod_item", ratingCol = "qtd_efetiva"):
              #Creates table that can be queried
              predictions.createOrReplaceTempView("predictions")

              #Sum of total number of plays of all songs
              denominator = predictions.groupBy().sum(ratingCol).collect()[0][0]

              #Calculating rankings of songs predictions by user
              spark.sql("SELECT " + userCol + " , " + ratingCol + " , PERCENT_RANK() OVER (PARTITION BY " + userCol + " ORDER BY prediction DESC) AS rank FROM predictions").createOrReplaceTempView("rankings")

              #Multiplies the rank of each song by the number of plays and adds the products together
              numerator = spark.sql('SELECT SUM(' + ratingCol + ' * rank) FROM rankings').collect()[0][0]

              performance = numerator/denominator

              return performance

          #Calculating expected percentile rank error metric for the model on each fold's prediction set
          performance1 = ROEM(predictions1)
          performance2 = ROEM(predictions2)
          performance3 = ROEM(predictions3)
          performance4 = ROEM(predictions4)
          performance5 = ROEM(predictions5)

          #Printing the model's performance on each fold
          print ("Model Parameters: ", "Rank:", r, "  MaxIter:", mi,"RegParam:", rp,"Alpha: ", a)
          print("Test Percent Rank Errors: ", performance)
           #performance, performance2, performance3, performance4, performance5

          #Validating the model's performance on the validation set
          validation_model = als.fit(train)
          validation_predictions = validation_model.transform(test)
          validation_performance = ROEM(validation_predictions)

          #Printing model's final expected percentile ranking error metric
          print("Validation Percent Rank Error: ", validation_performance)
          print(" ")

          #Filling in final hyperparameters with those of the best-performing model
          if validation_performance < best_validation_performance:
            best_validation_performance = validation_performance
            best_rank = r
            best_maxIter = mi
            best_regParam = rp
            best_alpha = a
            best_model = validation_model
            best_predictions = validation_predictions

  #Printing best model's expected percentile rank and hyperparameters
  print ("**Best Model** ")
  print ("  Percent Rank Error: ", best_validation_performance)
  print ("  Rank: ", best_rank)
  print ("  MaxIter: ", best_maxIter)
  print ("  RegParam: ", best_regParam)
  print ("  Alpha: ", best_alpha)
  return best_model, best_predictions