In [243]:
import pyspark
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [244]:
sc.stop()

In [245]:
sc = SparkContext("local","first app")

In [246]:
sqlContext = SQLContext(sc)

In [247]:
from pyspark.sql.types import *

In [248]:
#structure of the dataframe is defined
schema = StructType([StructField("userId", IntegerType(), True),StructField("movieId", IntegerType(), True),StructField("rating", FloatType(), True),StructField("timestamp", StringType(), True)])

In [249]:
df = sqlContext.read.csv("/Users/feli-v/Downloads/ml-25m/ratings.csv",header='true',schema=schema)

In [250]:
df.count()

25000095

In [251]:
df.dtypes

[('userId', 'int'),
 ('movieId', 'int'),
 ('rating', 'float'),
 ('timestamp', 'string')]

In [252]:
train,test = df.randomSplit([0.9,0.1], seed=12345)

In [253]:
train.count()

22500609

In [254]:
test.count()

2499486

In [255]:
#save the training data into training.csv 
train.write.csv('training_data', header=True)

In [256]:
import glob
import pandas as pd
path="/Users/feli-v/ipython-in-depth/training_data"
filenames = glob.glob(path + "/*.csv")
dftr = []
for filename in filenames:
    dftr.append(pd.read_csv(filename))
training_frame = pd.concat(dftr, ignore_index=True)

In [257]:
training_frame.to_csv('training.csv',index=False,header='true')

In [258]:
test.write.csv('testing_data', header=True)

In [259]:
import glob
import pandas as pd
path ="/Users/feli-v/ipython-in-depth/testing_data"
filenames = glob.glob(path + "/*.csv")
dfte = []
for filename in filenames:
    dfte.append(pd.read_csv(filename))
testing_frame = pd.concat(dfte, ignore_index=True)

In [260]:
testing_frame.to_csv('testing.csv',index=False,header='true')

In [261]:
training_frame.head(5)

Unnamed: 0,userId,movieId,rating,timestamp
0,97673,166528,4.0,1494434933
1,97673,167746,3.5,1514760811
2,97673,168248,4.0,1514760796
3,97673,168250,4.5,1500224509
4,97673,168252,4.0,1498927028


In [262]:
testing_frame.head(5)

Unnamed: 0,userId,movieId,rating,timestamp
0,1,4144,5.0,1147868898
1,1,4308,3.0,1147868534
2,1,5952,4.0,1147868053
3,1,7327,3.5,1147868855
4,1,7820,2.5,1147878050


In [263]:
train_rating=train.rdd.map(lambda w:w[2])

In [264]:

train_rating.take(5)

[5.0, 3.5, 5.0, 5.0, 3.5]

In [265]:
count_rating = train_rating.count()

In [266]:
sum_rating = train_rating.sum()

In [267]:
avg_rating = sum_rating/count_rating

In [268]:
print(avg_rating)

3.5339388591659895


In [269]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [270]:
#implementing ALS model on training dataset
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(train)

In [271]:
test_data = test['userId','movieId']

In [272]:
test_data.head(2)

[Row(userId=1, movieId=4144), Row(userId=1, movieId=4308)]

In [273]:
# predict thbe rating
predictions = model.transform(test)

In [274]:
predictions

DataFrame[userId: int, movieId: int, rating: float, timestamp: string, prediction: float]

In [275]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [276]:
rmse = evaluator.evaluate(predictions)

In [277]:
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8072459003251546
