In [8]:
import os
import numpy as np
from pyspark.sql import SparkSession
import matplotlib.pyplot as plt
import seaborn as sns
get_ipython().magic('matplotlib inline')

from pyspark.mllib.recommendation import ALS
from pyspark.ml.recommendation import ALS as mlals
from pyspark.ml.evaluation import RegressionEvaluator

import math
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.evaluation import RegressionEvaluator

In [9]:
spark = SparkSession \
    .builder \
    .appName("Recom") \
    .config("spark.recom.demo", "1") \
    .getOrCreate()

In [10]:
ratings_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("train.csv")

In [11]:
ratings_df

DataFrame[10: int, 11: int, 5: int]

In [12]:
ratings_df.show(5)

+---+----+---+
| 10|  11|  5|
+---+----+---+
|  1|  13|  4|
|  1|  30|  5|
|  1|6011|  2|
|  1|6017|  4|
|  1|6027|  2|
+---+----+---+
only showing top 5 rows



In [13]:
test_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("test1.csv")

In [14]:
test_df

DataFrame[1: int, 6081: int]

In [15]:
test_df.show(5)

+---+----+
|  1|6081|
+---+----+
|  1|6247|
|  2|   1|
|  2|  23|
|  2|  33|
|  2|6365|
+---+----+
only showing top 5 rows



In [16]:
seed = 5 #Random seed for initial matrix factorization model. A value of None will use system time as the seed.
iterations = 10
regularization_parameter = 0.1 #run for different lambdas - e.g. 0.01
ranks = [4, 8, 12] #number of features
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1

In [17]:
# Let us traing our dataset and check the best rank with lowest RMSE
# predictAll method of the ALS takes only RDD format and hence we need to convert our dataframe into RDD
# df.rdd will automatically converts Dataframe into RDD

for rank in ranks:
    model = ALS.train(ratings_df, rank, seed=seed, iterations=iterations,
                      lambda_=regularization_parameter)
    predictions = model.predictAll(test_df.rdd).map(lambda r: ((r[0], r[1]), r[2]))

In [18]:
predictions.take(10)

[((1911, 13244), 2.686858193761662),
 ((2843, 15120), 0.5319647587397218),
 ((1863, 9384), 2.8524570219453107),
 ((2736, 2334), 2.3466822447918014),
 ((1154, 2334), 2.060899496350277),
 ((1333, 2334), 1.7013525563923948),
 ((8552, 10502), 3.1966633329495346),
 ((5119, 10502), 4.746823441263004),
 ((860, 14554), 3.2445437481146935),
 ((2124, 9336), 3.3383123111104194)]

In [19]:
type(predictions)

pyspark.rdd.PipelinedRDD

In [20]:
def toCSVLine(data):
  return ','.join(str(d) for d in data)

In [21]:
lines = predictions.map(toCSVLine)

In [23]:
lines.saveAsTextFile('/Users/arshiya/Documents/CMPE256_format1.csv')