# Modules

In [1]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from time import time

# Data

## Data users

In [2]:
data_users= pd.read_csv("users.csv")

In [3]:
data_users.head()

Unnamed: 0,UserName,GameId,GameName,Status,UserRanking,Ranking
0,gandalfgrismx,148494,"1,2,3! Now you see me...",1,7.0,6.06084
1,gandalfgrismx,316377,7 Wonders (Second Edition),1,,7.95069
2,gandalfgrismx,155987,Abyss,1,10.0,7.32909
3,gandalfgrismx,21569,Adigma,1,7.0,7.52585
4,gandalfgrismx,31260,Agricola,1,10.0,7.90633


# Preprocesing

## Users

In [4]:
prepo_users= data_users.copy()

### Create id user

In [5]:
dc_userId={key:value for (value,key) in enumerate(list(prepo_users["UserName"].unique()))}

### Missings

In [6]:
ls_userRating=[]
ls_userId=[]
for index,row in prepo_users.iterrows():
    rating= row["UserRanking"]
    name= row["UserName"]
    if(pd.isna(rating)):
        ls_userRating.append(row["Ranking"])
    else:
        ls_userRating.append(rating)
    ls_userId.append(dc_userId[name])
prepo_users["UserRanking"]= ls_userRating
prepo_users["UserId"] = ls_userId

In [7]:
prepo_users.sample(5)

Unnamed: 0,UserName,GameId,GameName,Status,UserRanking,Ranking,UserId
1395,marianoth,148228,Splendor,1,8.0,7.43173,9
837,dvtlpzrc,193558,The Oracle of Delphi,1,8.0,7.29245,6
2068,Earth_Worm_Jim,121408,Trains,1,7.14211,7.14211,12
2909,Ramssesmex,242684,Reavers of Midgard,1,7.4172,7.4172,19
3732,KaeruGames,196340,Yokohama,1,9.0,7.82911,24


In [8]:
prepo_users["Status"].value_counts()/len(prepo_users)

1    0.959539
0    0.040461
Name: Status, dtype: float64

In [9]:
df_users= prepo_users[["UserId", "GameId", "UserRanking", "Status"]].copy()

# Recomendation system

## Pyspark setup

### Session

In [10]:
sc = pyspark.SparkContext(appName ="clase_datos_masivos")

22/11/11 08:21:28 WARN Utils: Your hostname, LAPTOP-7NHMLFJ5 resolves to a loopback address: 127.0.1.1; using 172.28.217.209 instead (on interface eth0)
22/11/11 08:21:28 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/11/11 08:21:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [11]:
# local[*] usará todos los núcleos disponibles 
spark = SparkSession.builder\
                    .master('local[*]') \
                    .appName('first_spark') \
                    .getOrCreate()

In [12]:
sc.defaultParallelism

8

### Get data

In [13]:
py_users= spark.createDataFrame(df_users)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [14]:
py_users.show()

[Stage 0:>                                                          (0 + 1) / 1]

+------+------+-----------+------+
|UserId|GameId|UserRanking|Status|
+------+------+-----------+------+
|     0|148494|        7.0|     1|
|     0|316377|    7.95069|     1|
|     0|155987|       10.0|     1|
|     0| 21569|        7.0|     1|
|     0| 31260|       10.0|     1|
|     0|161970|    7.66002|     1|
|     0|124742|       10.0|     1|
|     0| 17329|        7.0|     1|
|     0| 25643|        9.0|     1|
|     0| 12005|        6.0|     1|
|     0|230802|        8.0|     1|
|     0|287954|        9.0|     1|
|     0|302388|        8.0|     1|
|     0| 27225|        7.0|     1|
|     0|  2453|        8.0|     1|
|     0|231696|        6.0|     1|
|     0|332386|        8.0|     1|
|     0|184921|        8.0|     1|
|     0|   822|        8.0|     1|
|     0|164127|        9.0|     1|
+------+------+-----------+------+
only showing top 20 rows



                                                                                

In [15]:
py_users= py_users.select(py_users.UserId.cast("integer"), py_users.GameId.cast("integer"), py_users.UserRanking.cast("double"), py_users.Status.cast("integer"))
py_users.printSchema()

root
 |-- UserId: integer (nullable = true)
 |-- GameId: integer (nullable = true)
 |-- UserRanking: double (nullable = true)
 |-- Status: integer (nullable = true)



In [16]:
py_users.createOrReplaceTempView("Users")

## Train test

In [17]:
# Una predicción que NO está
query = " select * from Users where Status =1"
py_own= spark.sql(query)

In [18]:
(train, test) = py_own.randomSplit([0.8, 0.2], seed = 202012)

## ALS

In [19]:
als= ALS(userCol="UserId", itemCol="GameId", ratingCol= "UserRanking", coldStartStrategy='drop')
model= als.fit(train)

                                                                                

22/11/11 08:21:59 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/11/11 08:21:59 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/11/11 08:21:59 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


                                                                                

In [20]:
predictions= model.transform(test)

In [21]:
predictions.show()

                                                                                

+------+------+-----------+------+----------+
|UserId|GameId|UserRanking|Status|prediction|
+------+------+-----------+------+----------+
|     1|128475|        6.0|     1| 4.2141476|
|     1|169786|       10.0|     1|  7.514125|
|     1|176458|        7.0|     1| 4.1195335|
|     1|193308|        8.0|     1|   5.28733|
|     1|223770|        8.0|     1| 6.5889993|
|     1|234190|        6.0|     1|   5.44687|
|     1|246912|        7.0|     1|  7.111369|
|     1|291457|        8.5|     1|  6.589264|
|     3|    13|      8.049|     1|  7.836399|
|     3|   188|      6.529|     1|  5.880723|
|     3|  7688|       4.19|     1| 5.9795923|
|     3| 13823|    6.71614|     1|  6.752316|
|     3| 14612|      1.701|     1| 1.5968853|
|     3| 19996|    6.64698|     1| 5.5200057|
|     3| 26033|    6.49244|     1|  7.249144|
|     3| 26033|      7.449|     1|  7.249144|
|     3|100901|      8.989|     1| 6.9153695|
|     3|129622|      8.119|     1|  6.601458|
|     3|130792|      9.129|     1|

In [22]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="UserRanking", predictionCol="prediction")

In [23]:
RMSE_1 = evaluator.evaluate(predictions)
print (RMSE_1)

1.689408479063479


### Search grid

In [24]:
als_grid = ALS(userCol="UserId", itemCol="GameId", ratingCol="UserRanking", nonnegative = False, implicitPrefs = False, coldStartStrategy="drop")

In [25]:
param_grid = ParamGridBuilder() \
            .addGrid(als_grid.rank, [10, 20, 50, 100]) \
            .addGrid(als_grid.maxIter, [10, 15, 20, 25]) \
            .addGrid(als_grid.regParam, [.01, .05, .1, 1, 10]) \
            .build()

In [26]:
evaluator_grid = RegressionEvaluator(metricName="rmse", labelCol="UserRanking", predictionCol="prediction")

In [27]:
cv = CrossValidator(estimator=als_grid, estimatorParamMaps=param_grid, evaluator=evaluator_grid, numFolds=5)

start_time = time()

model_grid = cv.fit(train)

end_time = time()
elapsed_time = end_time - start_time
print("Elapsed time: %.10f seconds." % elapsed_time)

### Best model

In [28]:
best_model = model_grid.bestModel

NameError: name 'model_grid' is not defined

In [None]:
print("  Rank:", best_model._java_obj.parent().getRank())
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())
print("  RegParam:", best_model._java_obj.parent().getRegParam())

In [29]:
als= ALS(userCol="UserId", itemCol="GameId", ratingCol= "UserRanking",coldStartStrategy='drop', rank=20, maxIter=25, regParam=0.1)
best_model= als.fit(train)

In [30]:
test_best= best_model.transform(test)

In [31]:
test_best.show()

+------+------+-----------+------+----------+
|UserId|GameId|UserRanking|Status|prediction|
+------+------+-----------+------+----------+
|     1|128475|        6.0|     1|  6.103056|
|     1|169786|       10.0|     1|  8.753156|
|     1|176458|        7.0|     1| 4.9189773|
|     1|193308|        8.0|     1|  5.723008|
|     1|223770|        8.0|     1|  7.035811|
|     1|234190|        6.0|     1|  7.300802|
|     1|246912|        7.0|     1|  7.158746|
|     1|291457|        8.5|     1| 7.9315968|
|     3|    13|      8.049|     1|  7.859281|
|     3|   188|      6.529|     1| 6.7911835|
|     3|  7688|       4.19|     1| 5.9626737|
|     3| 13823|    6.71614|     1|  7.013989|
|     3| 14612|      1.701|     1| 1.5923666|
|     3| 19996|    6.64698|     1|  6.322235|
|     3| 26033|    6.49244|     1| 7.1928277|
|     3| 26033|      7.449|     1| 7.1928277|
|     3|100901|      8.989|     1|  7.130616|
|     3|129622|      8.119|     1|  7.228336|
|     3|130792|      9.129|     1|

### Validation data

In [32]:
query = " select * from Users where Status =0"
py_want= spark.sql(query)

In [33]:
validation= best_model.transform(py_want)

In [37]:
validation.show()

+------+------+-----------+------+----------+
|UserId|GameId|UserRanking|Status|prediction|
+------+------+-----------+------+----------+
|     3|223321|     7.7548|     0|  7.375931|
|     3|328479|    7.34765|     0| 5.4891925|
|    19|180263|    7.95149|     0|  8.139436|
|    19|172818|    7.38501|     0|  6.298144|
|    19| 90419|    7.29918|     0| 7.2850313|
|    19|137408|    7.45685|     0|  6.354576|
|    19|124742|    7.87582|     0| 7.6993775|
|    19|205637|    8.15033|     0|  7.965737|
|    19|   760|    7.41117|     0| 6.9036164|
|    19|123123|    7.85888|     0|  8.589381|
|    19| 89409|    7.68587|     0| 6.6604757|
|    19| 34219|    7.20377|     0|  6.469095|
|    19| 90137|    7.38292|     0| 6.1256604|
|    19|224517|    8.65432|     0|   8.64801|
|    19| 43111|    7.66854|     0|  6.534974|
|    19|   478|    7.07254|     0|  6.754976|
|    19|123499|    7.17567|     0| 6.1565228|
|    19|224037|     7.4921|     0|  4.306654|
|    19| 39463|    7.53478|     0|

In [38]:
RMSE = evaluator.evaluate(validation)

In [39]:
print(RMSE)

1.1806893531506077


### Users Factors

In [41]:
best_model.userFactors.collect()

[Row(id=0, features=[0.3536931574344635, -0.5811689496040344, -0.3402414917945862, 1.7530345916748047, 0.46533674001693726, -0.06366005539894104, -0.688411295413971, -0.1699032187461853, -0.19235379993915558, 0.30084583163261414, 0.40537309646606445, 0.015324448235332966, 0.26758354902267456, 0.8794505596160889, -0.22862465679645538, -0.05299420282244682, 0.2697645127773285, 0.28585100173950195, 0.13467884063720703, -0.6639338731765747]),
 Row(id=10, features=[0.12741589546203613, -0.21891352534294128, -0.07230743020772934, 1.2686548233032227, 0.44414904713630676, -0.05033480003476143, -0.5686956644058228, 0.4052482843399048, -0.3169593811035156, 0.5619913935661316, 0.764437198638916, 0.20037110149860382, 0.0578824020922184, 0.757807195186615, -0.4014401137828827, -0.15014894306659698, -0.05341615900397301, 0.47835028171539307, -0.0024034143425524235, -0.9035914540290833]),
 Row(id=20, features=[0.25406163930892944, -0.5000564455986023, -0.07294382154941559, 1.2242987155914307, 0.29401