# ALS Recommendation
### Data import

In [1]:
# tell jupyter where pyspark is
import findspark
findspark.init()

In [17]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.regression import LinearRegression
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.ml.feature import IndexToString, StringIndexer
import pandas as pd


In [3]:
df_r = pd.read_csv('./rating.csv',index_col=None)

In [4]:
df_ = pd.read_csv('./rating_recommendation.csv',index_col=False)

In [5]:
df_.head()

Unnamed: 0,b80344d063b5ccb3212f76538f3d9e43d87dca9e,SOBBMDR12A8C13253B,2
0,b80344d063b5ccb3212f76538f3d9e43d87dca9e,SOBXHDL12A81C204C0,1
1,b80344d063b5ccb3212f76538f3d9e43d87dca9e,SOCNMUH12A6D4F6E6D,1
2,b80344d063b5ccb3212f76538f3d9e43d87dca9e,SODXRTY12AB0180F3B,1
3,b80344d063b5ccb3212f76538f3d9e43d87dca9e,SOHQWYZ12A6D4FA701,1
4,b80344d063b5ccb3212f76538f3d9e43d87dca9e,SOIYTOA12A6D4F9A23,1


### Feature selection

In [6]:
SparkConf().setMaster("master").setAppName("ALSExample")
spark = SparkSession.builder.config(conf=SparkConf())

In [7]:
# Load data as RDD, then transform it to DataFrame format
lines = spark.read.text("./rating_recommendation.csv").rdd
parts = lines.map(lambda row: row.value.split(","))
ratingsRDD = parts.map(lambda p: Row(userId=str(p[0]), songID=str(p[1]),
                                     play_count=int(p[2])))

In [8]:
rating = spark.createDataFrame(ratingsRDD)

In [9]:
indexer = StringIndexer(inputCol="userId",outputCol = "userId_digtal")
indexed = indexer.fit(rating).transform(rating)
indexed.show(5)

+----------+------------------+--------------------+-------------+
|play_count|            songID|              userId|userId_digtal|
+----------+------------------+--------------------+-------------+
|         2|SOBBMDR12A8C13253B|b80344d063b5ccb32...|     197276.0|
|         1|SOBXHDL12A81C204C0|b80344d063b5ccb32...|     197276.0|
|         1|SOCNMUH12A6D4F6E6D|b80344d063b5ccb32...|     197276.0|
|         1|SODXRTY12AB0180F3B|b80344d063b5ccb32...|     197276.0|
|         1|SOHQWYZ12A6D4FA701|b80344d063b5ccb32...|     197276.0|
+----------+------------------+--------------------+-------------+
only showing top 5 rows



In [10]:
indexed.show(3)

+----------+------------------+--------------------+-------------+
|play_count|            songID|              userId|userId_digtal|
+----------+------------------+--------------------+-------------+
|         2|SOBBMDR12A8C13253B|b80344d063b5ccb32...|     197276.0|
|         1|SOBXHDL12A81C204C0|b80344d063b5ccb32...|     197276.0|
|         1|SOCNMUH12A6D4F6E6D|b80344d063b5ccb32...|     197276.0|
+----------+------------------+--------------------+-------------+
only showing top 3 rows



In [11]:
feature = StringIndexer(inputCol="songID",outputCol="songId_digtal")
indexed2 = feature.fit(rating).transform(indexed)
indexed2.show(5)

+----------+------------------+--------------------+-------------+-------------+
|play_count|            songID|              userId|userId_digtal|songId_digtal|
+----------+------------------+--------------------+-------------+-------------+
|         2|SOBBMDR12A8C13253B|b80344d063b5ccb32...|     197276.0|       1175.0|
|         1|SOBXHDL12A81C204C0|b80344d063b5ccb32...|     197276.0|         70.0|
|         1|SOCNMUH12A6D4F6E6D|b80344d063b5ccb32...|     197276.0|       5007.0|
|         1|SODXRTY12AB0180F3B|b80344d063b5ccb32...|     197276.0|        165.0|
|         1|SOHQWYZ12A6D4FA701|b80344d063b5ccb32...|     197276.0|       2603.0|
+----------+------------------+--------------------+-------------+-------------+
only showing top 5 rows



In [12]:
(training, test) = indexed2.randomSplit([0.8, 0.2])

In [13]:
# Build the recommendation model using ALS on the training data

als = ALS(maxIter=10, regParam=0.3, userCol="userId_digtal", itemCol="songId_digtal", ratingCol="play_count",
          coldStartStrategy="drop")
model = als.fit(training)

In [14]:
# Make predictions and output a RMSE to check the precision
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="play_count",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 12.952059997398477


In [15]:
userRecs = model.recommendForAllUsers(10)
userRecs.show(5)

+-------------+--------------------+
|userId_digtal|     recommendations|
+-------------+--------------------+
|          148|[[9825, 625.0377]...|
|          463|[[9835, 68.95866]...|
|          471|[[9835, 319.26242...|
|          496|[[9835, 253.67091...|
|          833|[[9835, 218.59013...|
+-------------+--------------------+
only showing top 5 rows



In [16]:
# Recommend 5 users for top 6 songs
users = indexed2.select(als.getUserCol()).distinct().limit(6)
userSubsetRecs = model.recommendForUserSubset(users, 6)

## Deal with Output

In [18]:
matrix = [[row[0]]+row[1] for row in song_]
result = []
for row in matrix:
    tmp = [row[0]]
    for j in row[1:]:
        tmp.append(j[0])
    result.append(tmp)

In [19]:
sc =SparkContext(conf=spark)
row = Row('user_ID','song_ID1','song_ID2','song_ID3','song_ID4','song_ID5','song_ID6')
df_output = SparkContext.parallelize([row(result[0][i],result[1][i],result[2][i],result[3][i], \
                                           result[4][i],result[5][i]) for i in range(5)]).toDF() 

In [20]:
## get their original id
converter = IndexToString(inputCol="user_ID", outputCol="user_id")
converted = converter.transform(df_test)
for x in range(6):
    converter = IndexToString(inputCol="song_ID"+str(x+1), outputCol="song_id"+str(x+1))
    converted = converter.transform(df_test)   

In [21]:
final_result=converted[['user_id']+['song_id'+str(i) for i in range(6)]]
final_result.to_csv('./CF_Recommendation.csv',index=False)

In [25]:
present = pd.read_csv('./CF_Recommendation.csv',index_col=False)
present.head()

Unnamed: 0,user_id,song_id1,song_id2,song_id3,song_id4,song_id5,song_id6
0,d6589314c0a9bcbca4fee0c93b14bc402363afea,SOFCQWJ12AB0185041,SOWCKVR12A8C142411,SOKLRPJ12A8C13C3FE,SOIIUWE12A8C132FCB,SOFNZIF12AC3DF5590,SOIOZHO12AB017FE5E
1,3f152d355d53865a2ca27ac5ceeffb7ebaea0a26,SOVDSJC12A58A7A271,SOAXGDH12A8C13F8A1,SOAUWYT12A81C206F1,SOXWYZP12AF72A42A6,SOIJAMG12A8AE47E21,SOPAYPV12AB017DB0C
2,b61afb42335287239bd40e1dea50d849cbf8a9a9,SOIOZHO12AB017FE5E,SOOXXCA12A8C139214,SOBOAFP12A8C131F36,SOAVJKT12AB0185C6D,SOPUCYA12A8C13A694,SOKLRPJ12A8C13C3FE
3,884209a41deb55df792f074bccf8af1c1c31768b,SOMGIYR12AB0187973,SOIJAMG12A8AE47E21,SOWKEUN12AF72AB837,SOHFJAQ12AB017E4AF,SOXWYZP12AF72A42A6,SOIOZHO12AB017FE5E
4,9b887e10a4711486085c4fae2d2599fc0d2c484d,SOKLRPJ12A8C13C3FE,SOMRYYN12A6310F0F3,SOTLVSV12A8C136E8D,SOLLNTU12A6701CFDC,SOBOAFP12A8C131F36,SOHFJAQ12AB017E4AF
