# kmeans
Using spark machine learning library spark-mlib, use kmeans to cluster the
movies using the ratings given by the user, that is, use the item-user matrix from
itemusermat File provided as input to your program.

The itemusermat file contains the ratings given to each movie by the users in Matrix
format.

The file contains the ratings by users for 1000 movies.

Each line contains the movies id and the list of ratings given by the users. 

A rating of 0 is used for entries where the user did not rate a movie.

Below, we show an example of the format of Itemusermat file with the item-user
matrix. 

Note that here, user 1 did not rate movie 2, so we use a rating of 0.

items|user1|user2|
------|----|-----|
movie1|4|3|
movie2|0|2|


In [None]:
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.ml.clustering import KMeans
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors

conf = SparkConf().setMaster("local").setAppName("kmeans")
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()
#item-user matrix
matrix = sc.textFile("itemusermat").map(lambda x: x.split(" "))\
                                            .map(lambda x: (x[0], x[1:]))
# print(matrix.top(2))

# Create dataframe
data = [(x[0], Vectors.dense(x[1]), ) for x in matrix.collect()]
matrix = spark.createDataFrame(data, ["id", "features"])
# print(matrix.show())
# Train k-means
kmeans = KMeans(k=10, seed=1234)
model = kmeans.fit(matrix)

# Predict
transformed = model.transform(matrix)
# print(transformed.show())
predictionIndex = transformed.rdd.map(lambda row: (int(row.id), row.prediction))
# print(predictionIndex.top(20))

# # Join with movieInfo
movieInfo = sc.textFile("movies.dat").map(lambda x: x.split("::"))\
                                    .map(lambda x: (int(x[0]), (x[1], x[2])))
joined = movieInfo.join(predictionIndex)\
                    .map(lambda x: (x[1][1], (x[1][1], x[0], x[1][0][0], x[1][0][1])))
result = joined.groupByKey()\
                .map(lambda x: list(x[1])[0:5])\
                .flatMap(lambda x: x)\
                .sortBy(lambda x: x[0])\
                .collect()
sc.parallelize(result).saveAsTextFile("kmeans_Output")


# ALS model.
Use Collaborative filtering to find the accuracy of ALS model.

For this program, you will use ratings.dat file, which contains:

User id :: movie id :: ratings :: timestamp.

In [None]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
import numpy as np
import math
from pyspark.mllib.recommendation import ALS
spark = SparkSession.builder \
        .appName("recommend") \
        .getOrCreate()

raw = spark.sparkContext.textFile("ratings.dat")
# make the data (uid, mid, timstamp, rating) 
data = raw.map(lambda line: line.split("::"))\
                .map(lambda tokens: (tokens[0], tokens[1], tokens[2]))

tranData, testData = data.randomSplit([7, 3], seed=1011)
tranData.cache()

testDataRdd = testData.map(lambda x:(x[0], x[1]))
testDataRdd.cache()

seed = 1234
iterations = 10
regularization = 0.1
ranks = [8, 10, 12, 15]
errors = [0, 0, 0, 0]
idx = 0

minErr = float('inf')
bestRank = -1

for rank in ranks:
    model = ALS.train(tranData, rank, seed=seed, iterations=iterations,
            lambda_=regularization)
    predictions = model.predictAll(testDataRdd).map(lambda r: ((r[0], r[1]), r[2]))
    ratesPreds = testData.map(lambda r: ((int(r[0]), int(r[1])), float(r[2])))\
                            .join(predictions) 
    error = ratesPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
    errors[idx] = error
    idx += 1
    print('Rank %s MSE : %s'%(rank, error))
    if(error < minErr):
        minErr = error
        bestRank = rank

print("The best rank : %s" % bestRank) 