===========================================


Title: 9.2 Exercises


Author: Chad Wood


Date: 12 Feb 2022


Modified By: Chad Wood


Description: This program demonstrates the use of spark to perform clustering with LDA, K-Means, and regression models on sample data. Code implimented from sample sites, per instruction. Refrences included.

=========================================== 

# Clustering with Spark

## Assignment 9

The following code initializes a Spark session that you use for the remainder of the assignment. 

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("DSC 400 Assignment 9") \
    .getOrCreate()

# TODO: Change these to point to versions on your local path

sample_kmeans_data_path = r'data\sample_kmeans_data.txt'
sample_lda_data_path = r'data\sample_lda_data.txt'
sample_movielens_data_path = r'data\sample_movielens_data.txt'

### Assignment 9.1

Run the PySpark version of the k-means clustering example found in [Apache Spark's k-means clustering documentation](https://spark.apache.org/docs/latest/ml-clustering.html#k-means). You can also find the code in [Apache Spark's Github repository](https://github.com/apache/spark/tree/master/examples/src/main/python/mllib). 

The example references a `sample_kmeans_data.txt` file. You can find the file at https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_kmeans_data.txt. Put this file in your working path and change the following code to point to your local version of the file. 

```
dataset = spark.read.format("libsvm").load("data/sample_kmeans_data.txt")
```

In [3]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

dataset = spark.read.format("libsvm").load(sample_kmeans_data_path)

# TODO: Implement the remainder of the code from the k-means clustering example

# Code implimented from https://spark.apache.org/docs/latest/ml-clustering.html#k-means 

# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

# Make predictions
predictions = model.transform(dataset)

# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Silhouette with squared euclidean distance = 0.9997530305375207
Cluster Centers: 
[9.1 9.1 9.1]
[0.1 0.1 0.1]


### Assignment 9.2

Run the PySpark version of the Latent Dirichlet Allocation (LDA) example found in [Apache Spark's LDA documentation](https://spark.apache.org/docs/latest/ml-clustering.html#latent-dirichlet-allocation-lda). You can also find the code in [Apache Spark's Github repository](https://github.com/apache/spark/tree/master/examples/src/main/python/mllib). 

The example references a `sample_lda_libsvm_data.txt
` file. You can find the file at https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_lda_libsvm_data.txt
. Put this file in your working path and change the following code to point to your local version of the file. 

```
dataset = dataset = spark.read.format("libsvm").load("data/sample_lda_data.txt")
```

In [4]:
from pyspark.ml.clustering import LDA

# Loads data.
dataset = spark.read.format("libsvm").load(sample_lda_data_path)

# TODO: Implement the remainder of the code from the LDA example

# Code implimented from https://spark.apache.org/docs/latest/ml-clustering.html#k-means 

# Trains a LDA model.
lda = LDA(k=10, maxIter=10)
model = lda.fit(dataset)

ll = model.logLikelihood(dataset)
lp = model.logPerplexity(dataset)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(dataset)
transformed.show(truncate=False)

The lower bound on the log likelihood of the entire corpus: -796.5399751899204
The upper bound on perplexity: 3.0636152891920014
The topics described by their top-weighted terms:




+-----+-----------+---------------------------------------------------------------+
|topic|termIndices|termWeights                                                    |
+-----+-----------+---------------------------------------------------------------+
|0    |[5, 10, 8] |[0.10142142863147144, 0.10067380726498025, 0.09966345558028378]|
|1    |[6, 1, 4]  |[0.10397121695942116, 0.10168853086587061, 0.09387262850459155]|
|2    |[7, 0, 9]  |[0.10585830481697496, 0.10328073972555847, 0.09961278145953019]|
|3    |[3, 5, 4]  |[0.10607015157178218, 0.10025508640011113, 0.09950695826225199]|
|4    |[4, 5, 1]  |[0.1625975422412709, 0.14053559661747037, 0.1358456725227939]  |
|5    |[10, 6, 9] |[0.2511233507213596, 0.15468631922794587, 0.12582321496738322] |
|6    |[5, 0, 7]  |[0.10727666672469126, 0.09553834984523368, 0.09510957588066214]|
|7    |[9, 0, 5]  |[0.10770202564363905, 0.10638992800158185, 0.09907350535796362]|
|8    |[7, 3, 5]  |[0.1088447905791633, 0.10266373799409191, 0.0949329361684

### Assignment 9.3

Run the PySpark version of the collaborative filtering example found in [Apache Spark's collaborative filtering documentation](https://spark.apache.org/docs/latest/ml-collaborative-filtering.html). You can also find the code in [Apache Spark's Github repository](https://github.com/apache/spark/tree/master/examples/src/main/python/mllib). 

The example references a `sample_movielens_ratings.txt` file. You can find the file at https://raw.githubusercontent.com/apache/spark/master/data/mllib/als/sample_movielens_ratings.txt. Put this file in your working path and change the following code to point to your local version of the file. 

```
lines = spark.read.text("data/mllib/als/sample_movielens_data.txt").rdd
```

In [5]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = spark.read.text(sample_movielens_data_path).rdd

# TODO: Implement the remainder of the code from the collaborative filtering example

# Code implimented from https://spark.apache.org/docs/latest/ml-collaborative-filtering.html

parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

Root-mean-square error = 1.6521732989246491


### 