In [1]:
# SparkContext is already defined as sc
HDFS = 'hdfs://scut0:9000/ml-100k/'

In [2]:
movies = sc.textFile(HDFS+'u.item')
print(movies.first())

1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0


# Extracting features from the MovieLens dataset

## Extracting movie genre labels

In [3]:
genres = sc.textFile(HDFS+'u.genre')
print(genres.take(5))

[u'unknown|0', u'Action|1', u'Adventure|2', u'Animation|3', u"Children's|4"]


In [4]:
def id2GenreText(line):
    text, idx = line.strip().split('|')
    return (idx, text)

genresMapping = genres.filter(lambda line: len(line) > 0).map(id2GenreText).collectAsMap()
print(genresMapping)

{u'11': u'Horror', u'10': u'Film-Noir', u'13': u'Mystery', u'12': u'Musical', u'15': u'Sci-Fi', u'14': u'Romance', u'17': u'War', u'16': u'Thriller', u'18': u'Western', u'1': u'Action', u'0': u'unknown', u'3': u'Animation', u'2': u'Adventure', u'5': u'Comedy', u'4': u"Children's", u'7': u'Documentary', u'6': u'Crime', u'9': u'Fantasy', u'8': u'Drama'}


In [5]:
def getMovieGenreText(indices, genresMapping):
    texts = []
    for i in xrange(len(indices)):
        if indices[i] == '1':
            texts.append(genresMapping[str(i)])
    return tuple(texts)

titlesAndGenres = movies.map(lambda line : line.strip().split('|')).\
                       map(lambda fields:(int(fields[0]), (fields[1], getMovieGenreText(fields[5:], genresMapping))))

print(titlesAndGenres.first())

(1, (u'Toy Story (1995)', (u'Animation', u"Children's", u'Comedy')))


## Training the recommendation model

In [6]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

# avoid the case when iteration is large, stackoverflow error occurs
# see, https://stackoverflow.com/questions/31484460/spark-gives-a-stackoverflowerror-when-training-using-als
sc.setCheckpointDir('hdfs://scut0:9000/checkpoint')

# extract rating data
data = sc.textFile(HDFS+'u.data')
dataFields = data.map(lambda line: line.split())
ratings = dataFields.map(lambda fields: Rating(fields[0], fields[1], fields[2]))

# parameters
rank = 200
iterations = 50
lambda_ = 0.01

# train model and validate with MSE
# referer: https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS
ALSModel = ALS.train(ratings, rank, iterations, lambda_)

In [7]:
from pyspark.mllib.linalg import Vectors
movieFactors = ALSModel.productFeatures().map(lambda line: (line[0], Vectors.dense(line[1])))
movieVectors = movieFactors.map(lambda factor:factor[1])
userFactors = ALSModel.userFeatures().map(lambda line: (line[0], Vectors.dense(line[1])))
userVectors = userFactors.map(lambda feature: feature[1])                                          

## Normalization

In [8]:
from pyspark.mllib.linalg.distributed import RowMatrix
movieMatrix = RowMatrix(movieVectors)
movieMatrixSummary = movieMatrix.computeColumnSummaryStatistics()
userMatrix = RowMatrix(userVectors)
userMatrixSummary = userMatrix.computeColumnSummaryStatistics()
print('movie factors mean', movieMatrixSummary.mean())
print('movie factors variance', movieMatrixSummary.variance())
print('movie factors mean', userMatrixSummary.mean())
print('movie factors variance', userMatrixSummary.variance())

('movie factors mean', array([ 0.11809382, -0.13237481, -0.46428199, -0.07625494,  0.06556419,
       -0.11278825, -0.2739639 , -0.0847419 , -0.16477472,  0.08392202,
       -0.01721492,  0.18962527,  0.26780778,  0.22818298,  0.09100991,
       -0.05390393, -0.26537003,  0.07825395,  0.21351414, -0.0052318 ,
       -0.19581692,  0.20449573, -0.0519071 ,  0.04844744,  0.04228252,
        0.05602511,  0.13238836,  0.18402514,  0.46892526,  0.4472949 ,
       -0.04606717, -0.02975684,  0.21997183, -0.07276207, -0.11343684,
        0.20983819,  0.11016285, -0.0480332 , -0.09460587,  0.05803705,
       -0.06033211,  0.08083551, -0.05254183,  0.03882401, -0.3982243 ,
       -0.04410023,  0.18310318, -0.00816594,  0.04608939,  0.00518365,
       -0.04198804,  0.0213554 , -0.19914427, -0.00421197,  0.16454437,
        0.05091756,  0.06234672,  0.16551899,  0.15863412,  0.13749641,
        0.0016159 , -0.1042815 , -0.0037928 ,  0.0656051 , -0.1398644 ,
        0.14840497, -0.13846924,  0.00692

# Training a clustering model

## Training a clustering model on the MovieLens dataset

In [9]:
from pyspark.mllib.clustering import KMeans
numClusters = 5
numIterations = 10
numRuns = 100

movieClusterModel = KMeans.train(movieVectors, numClusters, numRuns)
userClusterModel = KMeans.train(userVectors, numClusters, numRuns)

## Making predictions using a clustering model

In [10]:
movie1 = movieVectors.first()
movieCluster = movieClusterModel.predict(movie1)
print(movieCluster)

2


In [11]:
predictions = movieClusterModel.predict(movieVectors)
print(predictions.take(10))

[2, 2, 3, 2, 1, 4, 4, 2, 1, 4]


## Interpreting cluster predictions on the MovieLens dataset

In [12]:
titleAndFactors = titlesAndGenres.join(movieFactors)
print(titleAndFactors.first())

(1540, ((u'Amazing Panda Adventure, The (1995)', (u'Adventure', u"Children's")), DenseVector([0.234, -0.2199, -0.5585, 0.2199, -0.0496, 0.0459, -0.4296, -0.0749, -0.0467, 0.0349, -0.1858, 0.1342, 0.0666, 0.2118, -0.0681, 0.1427, -0.5221, 0.0186, -0.0265, -0.221, -0.216, 0.1894, -0.1392, 0.2243, -0.0098, 0.0178, 0.0451, -0.0718, 0.3665, 0.3728, -0.075, -0.0225, 0.0587, 0.0456, 0.2062, 0.1332, 0.3274, 0.0906, -0.0385, -0.0546, 0.0008, -0.0454, -0.0351, 0.0521, -0.2825, 0.2131, 0.182, 0.0201, -0.1137, -0.0585, -0.1132, 0.1397, 0.004, -0.2206, 0.2745, 0.1547, 0.0268, 0.0687, 0.1379, 0.0902, 0.0335, -0.2444, -0.0138, 0.1482, -0.1185, 0.1155, -0.0707, -0.1499, -0.2697, 0.1059, -0.1956, -0.065, 0.2094, -0.1402, 0.0313, 0.1448, 0.0099, -0.0958, -0.0858, 0.1401, -0.0989, 0.0866, 0.235, -0.2522, -0.0274, 0.143, -0.0521, 0.0342, 0.0183, -0.2108, 0.2349, 0.0166, 0.0751, 0.0947, -0.256, 0.011, 0.0009, 0.0605, 0.0853, -0.1887, 0.0327, 0.0699, -0.2483, 0.1679, 0.1062, -0.0733, 0.2747, 0.2443, 0.0097,

In [13]:
import numpy as np
def clusterMovie(record):
    id = record[0]
    title, genres = record[1][0]
    v1 = record[1][1]
    pred = movieClusterModel.predict(v1)
    v2 = movieClusterModel.clusterCenters[pred]
    dist = np.power(v1 - v2, 2).sum()
    return (id, title, ' '.join(genres), pred, dist)
clusteredMovies = titleAndFactors.map(clusterMovie)
print(clusteredMovies.first())

(1540, u'Amazing Panda Adventure, The (1995)', u"Adventure Children's", 3, 3.0427364600596549)


In [14]:
clusterAssignments = clusteredMovies.groupBy(lambda r:r[3]).collectAsMap()
print(clusterAssignments)

{0: <pyspark.resultiterable.ResultIterable object at 0x7f5169bb1550>, 1: <pyspark.resultiterable.ResultIterable object at 0x7f5169c19390>, 2: <pyspark.resultiterable.ResultIterable object at 0x7f5169bb1d90>, 3: <pyspark.resultiterable.ResultIterable object at 0x7f5169c0add0>, 4: <pyspark.resultiterable.ResultIterable object at 0x7f5169bb6b10>}


In [15]:
for k, v in clusterAssignments.items():
    print('cluster {0}'.format(k))
    sortedMovie = sc.parallelize(list(v)).sortBy(lambda r:r[4])
    for record in sortedMovie.take(10):
        print(record)
    print('\n')

cluster 0
(439, u'Amityville: A New Generation (1993)', u'Horror', 0, 0.24283231812956452)
(437, u"Amityville 1992: It's About Time (1992)", u'Horror', 0, 0.24283231812956452)
(858, u'Amityville: Dollhouse (1996)', u'Horror', 0, 0.24876935180846005)
(1659, u'Getting Away With Murder (1996)', u'Comedy', 0, 0.27014749792514103)
(1678, u"Mat' i syn (1997)", u'Drama', 0, 0.27357245255136109)
(1308, u'Babyfever (1994)', u'Comedy Drama', 0, 0.276010587124352)
(599, u'Police Story 4: Project S (Chao ji ji hua) (1993)', u'Action', 0, 0.2763828655522782)
(1408, u'Gordy (1995)', u'Comedy', 0, 0.28295344087708019)
(784, u'Beyond Bedlam (1993)', u'Drama Horror', 0, 0.28398255013035545)
(1621, u'Butterfly Kiss (1995)', u'Thriller', 0, 0.29291435336393407)


cluster 1
(1124, u'Farewell to Arms, A (1932)', u'Romance War', 1, 2.4876754639703824)
(617, u'Blue Angel, The (Blaue Engel, Der) (1930)', u'Drama', 1, 2.5302594012278812)
(608, u'Spellbound (1945)', u'Mystery Romance Thriller', 1, 2.58937102314

# Evaluating the performance of clustering models

Internal evaluation refers to the case where the same data used to train the model is used for evaluation. 
External evaluation refers to using data external to the training data for
evaluation purposes

## Internal evaluation metrics

Common internal evaluation metrics include the **within cluster sum of squared errors**(WCSS) we covered earlier (which is
exactly the K-means objective function), the Davies-Bouldin index, the Dunn Index,
and the silhouette coeffcient. All these measures tend to reward clusters where
elements within a cluster are relatively close together, while elements in different
clusters are relatively far away from each other.

See:https://en.wikipedia.org/wiki/Cluster_analysis#Internal_evaluation

## External evaluation metrics

Since clustering can be thought of as unsupervised classifcation, if we have some
form of labeled (or partially labeled) data available, we could use these labels to
evaluate a clustering model. We can make predictions of clusters (that is, the class
labels) using the model and evaluate the predictions against the true labels using
metrics similar to some that we saw for classifcation evaluation (that is, based on
true positive and negative and false positive and negative rates).
These include the Rand measure, F-measure, Jaccard index, and others.

See： https://en.wikipedia.org/wiki/Cluster_analysis#External_evaluation

## Computing performance metrics on the MovieLens dataset

In [16]:
movieCost = movieClusterModel.computeCost(movieVectors)
userCost = userClusterModel.computeCost(userVectors)
print('WCSS for movies', movieCost)
print('WCSS for users', userCost)

('WCSS for movies', 7225.765996352291)
('WCSS for users', 4330.977086110632)


# Tuning parameters for clustering models

## Selecting K through cross-validation

In [31]:
trainMovies, testMovies = movieVectors.randomSplit([0.6, 0.4], seed = 123)

numRuns = 20
K = [2, 3, 4, 6]
for k in K:
    loss = KMeans.train(trainMovies, k, numRuns).computeCost(testMovies)
    print('{0} clusters, WCSS {1}'.format(k, loss))

2 clusters, WCSS 3185.05663453
3 clusters, WCSS 3064.85163573
4 clusters, WCSS 3015.1151521
6 clusters, WCSS 2940.19717096
