In [1]:
import os
import subprocess
def module(*args):        
    if isinstance(args[0], list):        
        args = args[0]        
    else:        
        args = list(args)        
    (output, error) = subprocess.Popen(['/usr/bin/modulecmd', 'python'] + args, stdout=subprocess.PIPE).communicate()
    exec(output)    
module('load', 'apps/java/jdk1.8.0_102/binary')    
os.environ['PYSPARK_PYTHON'] = os.environ['HOME'] + '/.conda/envs/jupyter-spark/bin/python'

## Start of code

In [2]:
#Initialize pyspark
import pyspark

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[2]") \
    .appName("COM6012: Assignment 1 - Q2") \
    .getOrCreate()

sc = spark.sparkContext

In [3]:
#Load and prepare data
ratings = spark.read.load("Data/ml-20m/ratings.csv", format="csv", inferSchema="true", header="true")

In [4]:
data_split = ratings.randomSplit([0.2, 0.2, 0.2, 0.2, 0.2], seed=3233)

In [5]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

#Method for running ALS operations
def perform_ALS(training, test, rp=0.1, mi=10):
    als = ALS(maxIter=mi, regParam=rp, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
    model = als.fit(training)
    
    predictions = model.transform(test)
    evaluator_r = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
    evaluator_m = RegressionEvaluator(metricName="mae", labelCol="rating",predictionCol="prediction")
    
    rmse = evaluator_r.evaluate(predictions)
    mae = evaluator_m.evaluate(predictions)
    
    return rmse, mae, model

In [None]:
from pyspark.sql.types import StructType, IntegerType, DoubleType, StructField

#Method to create empty dataframe
def create_empty_df():
    fields = [StructField("userId", IntegerType()), StructField("movieId", IntegerType()), StructField("rating", DoubleType()), StructField("timestamp", IntegerType())]
    schema = StructType(fields)
    return spark.createDataFrame(sc.emptyRDD(), schema)

#### 2A - Performing five-fold cross validation

In [None]:
rmse_da, rmse_na, mae_da, mae_na, model_d = [], [], [], [], []

for i in range(5):
    #create training data
    test_data = data_split[i]
    training_data = create_empty_df()
    for j in range(5):
        if i==j: continue
        training_data = training_data.union(data_split[j])
    print("[*] Performing ALS for fold: ", (i+1))
    rmse_d, mae_d, model = perform_ALS(training_data, test_data)
    rmse_n, mae_n, _ = perform_ALS(training_data, test_data, 0.5, 2)
    
    rmse_da.append(rmse_d)
    rmse_na.append(rmse_n)
    mae_da.append(mae_d)
    mae_na.append(mae_n)
    model_d.append(model)
    

[*] Performing ALS for fold:  1
[*] Performing ALS for fold:  2
[*] Performing ALS for fold:  3
[*] Performing ALS for fold:  5


#### 2B - RMSE and MAE values for five-fold cross validation

In [None]:
import numpy as np

print("RMSE values: " + ", ".join([str(d) for d in rmse_da]))
print("MAE values: " + ", ".join([str(m) for m in mae_da]))
print("RMSE with custom options - values: " + ", ".join([str(k) for k in rmse_na]))
print("MAE with custom options - values:  " + ", ".join([str(l) for l in mae_na]))


print("Mean of RMSE ", np.mean(rmse_da))
print("Mean of MAE  ", np.mean(mae_da))
print("Mean of RMSE with custom options - values:  ", np.mean(rmse_na))
print("Mean of MAE with custom options - values:  ", np.mean(mae_na))

print("Standard Deviation of RMSE ", np.std(rmse_da))
print("Standard Deviation of MAE ", np.std(mae_da))
print("Standard Deviation of RMSE with custom options - values: ", np.std(rmse_na))
print("Standard Deviation of MAE with custom options - values: ", np.std(mae_na))

RMSE values: 0.8064884240564395, 0.8059774588491122, 0.8072957942454753, 0.8062745627609654, 0.8067585740423929
MAE values: 0.6269468884422882, 0.6262886457008252, 0.6273757782009896, 0.6262157412542964, 0.626908928562934
RMSE with custom options - values: 1.3566787585699531, 1.3777952968865996, 1.372110291632093, 1.4288687200916226, 1.3873552846179906
MAE with custom options - values:  1.165281779098615, 1.1860035720320061, 1.1797459733772688, 1.233657008631722, 1.1949690147307974
Mean of RMSE  0.806558962791
Mean of MAE   0.626747196432
Mean of RMSE with custom options - values:   1.38456167036
Mean of MAE with custom options - values:   1.19193146957
Standard Deviation of RMSE  0.000448725120441
Standard Deviation of MAE  0.000436775136752
Standard Deviation of RMSE with custom options - values:  0.0242866067973
Standard Deviation of MAE with custom options - values:  0.0229959978235


In [13]:
from pyspark.sql.functions import to_json, from_json, col, struct, lit
from pyspark.sql.types import StructType, StructField
from pyspark.ml.linalg import VectorUDT

#Convert arrarytype to vectorUDT - https://stackoverflow.com/questions/42138482/how-do-i-convert-an-array-i-e-list-column-to-vector
json_vec = to_json(struct(struct(
    lit(1).alias("type"),  # type 1 is dense, type 0 is sparse
    col("features").alias("values")
).alias("v")))

schema = StructType([StructField("v", VectorUDT())])

#### 2C - Finding top 5 tags from top 3 largest clusters using K-Means

In [14]:
#Load up other files for peforming K-Means analysis
movie_tag = spark.read.load("Data/ml-20m/genome-scores.csv", format="csv", inferSchema="true", header="true")
tag_tag = spark.read.load("Data/ml-20m/genome-tags.csv", format="csv", inferSchema="true", header="true")


In [19]:
from pyspark.sql.functions import count, col, desc, sum, rank

from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
all_t5_tags = []
for model in model_d:
    movie_factors = model.itemFactors
    
    kmeans = KMeans(k=20, seed=3233)
    movie_factors = movie_factors.withColumn("parsed_features", from_json(json_vec, schema).getItem("v")).select(col("parsed_features").alias("features"), col("id"))
    model_k = kmeans.fit(movie_factors.select("features"))

    movie_pred = model_k.transform(movie_factors)
    l3_clusters = movie_pred.select("prediction").groupBy("prediction").agg(count("prediction").alias("n_pred")).sort(desc("n_pred")).limit(3)
    
    l3_cluster_movies = movie_pred.join(l3_clusters, ['prediction'])
    
    #Join cluster_movies and movie_tags and tag_tag
    all_t5_tags.append(l3_cluster_movies.alias("mc").join(movie_tag.alias("mt"), col("mc.id") == col("mt.movieId")).select(col("mt.tagId"), col("mt.relevance"), col("mc.prediction")).groupBy("mc.prediction", "mt.tagId").agg(sum("mt.relevance").alias("r_sum")).sort("prediction", "r_sum", ascending=False).alias("tags").join(tag_tag.alias("tt"), col("tags.tagId") == col("tt.tagId")).select(col("tags.prediction").alias("cluster_id"), col("tt.tagId"), col("tt.tag"), col("r_sum")).where(col("tt.tagId") == col("tags.tagId")).sort(desc("r_sum")))
    all_t5_tags[-1].show()
    
    

+----------+-----+------------------+-----------------+
|cluster_id|tagId|               tag|            r_sum|
+----------+-----+------------------+-----------------+
|         7|  742|          original|830.0850000000004|
|         7|  972|      storytelling|654.5745000000003|
|         7|  452|   good soundtrack|650.7422500000004|
|         7|  936| social commentary|618.2425000000002|
|         7|  468|      great ending|614.0780000000003|
|         7|  646|            mentor|606.3747499999998|
|         7|  323|             drama|591.1667500000002|
|         7|  302|          dialogue|572.1697500000002|
|         7|  554|       interesting|571.2022500000002|
|         7|  465|      great acting|570.4087499999999|
|         7|  240|           complex|569.2529999999997|
|         7| 1008|             talky|555.8592499999997|
|         7|  640|       melancholic|552.1002500000002|
|         7| 1091|visually appealing|546.5605000000002|
|         7|  169|         brutality|546.3347499

In [20]:
from pyspark.sql.window import Window

for t5_tag in all_t5_tags:
    window = Window.partitionBy(t5_tag['cluster_id']).orderBy(t5_tag['r_sum'].desc())
    t5_tag.select('*', rank().over(window).alias('rank')).filter(col('rank') <= 5).show() 

+----------+-----+-----------------+------------------+----+
|cluster_id|tagId|              tag|             r_sum|rank|
+----------+-----+-----------------+------------------+----+
|        16|  742|         original| 443.9157500000002|   1|
|        16|  646|           mentor|319.86100000000016|   2|
|        16|  468|     great ending|           303.499|   3|
|        16|  302|         dialogue|294.24799999999993|   4|
|        16|  188|      catastrophe|268.07975000000016|   5|
|         7|  742|         original| 830.0850000000004|   1|
|         7|  972|     storytelling| 654.5745000000003|   2|
|         7|  452|  good soundtrack| 650.7422500000004|   3|
|         7|  936|social commentary| 618.2425000000002|   4|
|         7|  468|     great ending| 614.0780000000003|   5|
|        18|  742|         original|           509.652|   1|
|        18|  646|           mentor|421.34300000000013|   2|
|        18|  468|     great ending|404.00949999999983|   3|
|        18|  445|      

In [None]:
all_t5_tags.show()