In [39]:
# Imports
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col
from pyspark.sql.types import IntegerType

# Create spark session
spark = SparkSession.builder.getOrCreate()

# Read in database
df = spark.read.text('trust.txt')
df1 = spark.read.text('ratings.txt')

# Name columns
df = df.withColumn('Truster', split(df['value'], ' ').getItem(0)) \
       .withColumn('Trustee', split(df['value'], ' ').getItem(1)) \
       .withColumn('Trust Value', split(df['value'], ' ').getItem(2))

df1 = df1.withColumn('Truster 1', split(df1['value'], ' ').getItem(0)) \
        .withColumn('Movie', split(df1['value'], ' ').getItem(1)) \
        .withColumn('Rating', split(df1['value'], ' ').getItem(2))

# Drop original column
df = df.drop(col('value'))
df1 = df1.drop(col('value'))

# Join dataframes
FilmTrust = df.join(df1,df['Truster'] == df1['Truster 1'])

# Drop duplicate column
FilmTrust = FilmTrust.drop(col('Truster 1'))

# Makes columns integer
FilmTrust = FilmTrust.withColumn("Truster", FilmTrust["Truster"].cast(IntegerType()))
FilmTrust = FilmTrust.withColumn("Movie", FilmTrust["Movie"].cast(IntegerType()))
FilmTrust = FilmTrust.withColumn("Rating", FilmTrust["Rating"].cast(IntegerType()))

# Drop null
FilmTrust = FilmTrust.na.drop() 

# Drop duplicates
FilmTrust = FilmTrust.dropDuplicates()

# Drop Outliers
FilmTrust = FilmTrust.filter('Truster<1509')

FilmTrust.show()

+-------+-------+-----------+-----+------+
|Truster|Trustee|Trust Value|Movie|Rating|
+-------+-------+-----------+-----+------+
|      6|   1192|          1|    7|     3|
|     29|   1106|          1|  265|     3|
|     29|    403|          1|  277|     0|
|     29|   1350|          1|  290|     3|
|     29|    509|          1|  295|     3|
|     29|   1350|          1|  298|     3|
|     36|   1229|          1|  219|     3|
|     63|   1524|          1|   11|     4|
|     79|    355|          1|  386|     2|
|     79|   1072|          1|  393|     4|
|     79|    782|          1|  212|     0|
|     79|   1153|          1|  450|     1|
|     79|    188|          1|  504|     0|
|     83|    208|          1|  255|     2|
|    110|   1491|          1|  205|     3|
|    116|    738|          1|   12|     3|
|    129|     89|          1|  213|     3|
|    141|    965|          1|   10|     4|
|    146|   1355|          1|    7|     4|
|    146|     23|          1|  250|     4|
+-------+--

In [64]:
from pyspark.sql.functions import asc, desc, count, when, isnan

# Summary of rating values
FilmTrust.select('Rating').summary().show()

# Top rated movies
FilmTrust.select(FilmTrust.Movie,FilmTrust.Rating).sort(FilmTrust.Rating.desc()).show()

# Lower rated movies
FilmTrust.select(FilmTrust.Movie,FilmTrust.Rating).sort(FilmTrust.Rating.asc()).show()

# Number of ratings per user
FilmTrust.groupBy('Truster').count().show()

# Number of ratings per movie
FilmTrust.groupBy('Movie').count().show()

+-------+------------------+
|summary|            Rating|
+-------+------------------+
|  count|             69639|
|   mean|2.7181033616220796|
| stddev|1.0252497865060926|
|    min|                 0|
|    25%|                 2|
|    50%|                 3|
|    75%|                 3|
|    max|                 4|
+-------+------------------+

+-----+------+
|Movie|Rating|
+-----+------+
|  821|     4|
|   69|     4|
|  213|     4|
|   11|     4|
|  236|     4|
|   10|     4|
|  918|     4|
|  250|     4|
|  282|     4|
|  813|     4|
|  432|     4|
|  241|     4|
|  241|     4|
|  442|     4|
|    9|     4|
|    7|     4|
|  214|     4|
|  817|     4|
|  842|     4|
|  148|     4|
+-----+------+
only showing top 20 rows

+-----+------+
|Movie|Rating|
+-----+------+
|  267|     0|
| 1318|     0|
|    9|     0|
|  212|     0|
|  739|     0|
|  694|     0|
|  584|     0|
|   70|     0|
|  584|     0|
|    1|     0|
|    6|     0|
| 1311|     0|
|  909|     0|
|  734|     0|
|  216|   

In [70]:
# Imports
from pyspark.ml.evaluation import RegressionEvaluator 
from pyspark.ml.recommendation import ALS

# Split data
train_data, test_data = FilmTrust.randomSplit([0.8, 0.2])

# Build recommender model
als = ALS(maxIter=5, regParam=0.01, userCol='Truster', itemCol='Movie', ratingCol='Rating') 

# Fit
model = als.fit(train_data)

# Evaluate
predictions = model.transform(test_data)
predictions.show()

# Drop null
predictions = predictions.na.drop() 

# RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating",predictionCol="prediction") 
rmse = evaluator.evaluate(predictions) 
print("Root-mean-square error = " + str(rmse))

+-------+-------+-----------+-----+------+------------+
|Truster|Trustee|Trust Value|Movie|Rating|  prediction|
+-------+-------+-----------+-----+------+------------+
|      2|    966|          1|   13|     2|   1.9987769|
|      7|   1510|          1|    2|     4|   2.8954399|
|      7|   1510|          1|    3|     0|   1.4503899|
|      7|   1510|          1|  213|     1|   2.4761174|
|      7|   1510|          1|  215|     3|   1.6636865|
|     12|    234|          1|    4|     3|   2.4782534|
|     12|    234|          1|   12|     4|    4.560299|
|     15|   1239|          1|    2|     4|   3.9954674|
|     16|    301|          1|    2|     4|    3.677348|
|     16|    301|          1|    4|     2|   2.3871377|
|     16|    301|          1|  219|     3|   3.0826468|
|     16|    509|          1|    4|     2|   2.3871377|
|     26|   1179|          1|  214|     0|0.0053533614|
|     27|   1513|          1|    3|     1|   1.4925051|
|     27|   1513|          1|    5|     2|   1.2