#### Rex Gayas
#### Week 9 Exercise 9.2 Spring 2024
#### DSC400-T301 Big Data, Technology, and Algo (2245-1)
#### Clustering with Spark

Assignment 9

In [10]:
from pyspark.sql import SparkSession

# Initialize Spark Session for this assignment
spark = SparkSession \
    .builder \
    .appName("DSC 400 Assignment 9") \
    .getOrCreate()

# Paths to the datasets
sample_kmeans_data_path = "data/mllib/sample_kmeans_data.txt"
sample_lda_data_path = "data/mllib/sample_lda_data_path.txt"
sample_movielens_data_path = "data/mllib/sample_movielens_data_path.txt"


Assignment 9.1

In [11]:
from pyspark.sql import SparkSession
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import requests

# Function to download the file
def download_data(url, local_filename):
    response = requests.get(url)
    with open(local_filename, 'wb') as f:
        f.write(response.content)

# Download k-means data
data_url = "https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_kmeans_data.txt"
local_data_path = "sample_kmeans_data.txt"
download_data(data_url, local_data_path)

# Initialize Spark Session
spark = SparkSession \
    .builder \
    .appName("KMeans Example") \
    .getOrCreate()

# Load and parse data
data = spark.read.format("libsvm").load(local_data_path)

# Trains a k-means model
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(data)

# Evaluate clustering by computing the Silhouette score
evaluator = ClusteringEvaluator()

silhouette = evaluator.evaluate(model.transform(data))
print(f"Silhouette with squared euclidean distance = {silhouette}")

# Shows the result
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

# Stop the Spark session
spark.stop()


Silhouette with squared euclidean distance = 0.9997530305375207
Cluster Centers: 
[9.1 9.1 9.1]
[0.1 0.1 0.1]


Utilized Apache Spark’s MLlib to perform k-means clustering on a dataset provided via a URL. The dataset was first downloaded using the Python “requests” library, and then loaded into a Spark DataFrame formatted as "libsvm". Then set up a k-means clustering model specifying 2 clusters and a random initialization mode. After training the model with the data, PySpark's “ClusteringEvaluator” computed the silhouette score, which assessed the clustering quality.
The output indicates a silhouette score of approximately 0.9997, suggesting that the clusters are very well-defined and separate from each other. The cluster centers are listed as [9.1, 9.1, 9.1] and [0.1, 0.1, 0.1], reflecting the centroids of the two clusters determined by the algorithm. These centers represent the average position of all points within each cluster, illustrating the typical characteristics of the grouped data points.


Assignment 9.2

In [12]:
import requests
from pyspark.sql import SparkSession
from pyspark.ml.clustering import LDA
from pyspark.ml.linalg import Vectors

# Function to download the file
def download_data(url, local_filename):
    response = requests.get(url)
    with open(local_filename, 'wb') as f:
        f.write(response.content)

# Download LDA data
data_url = "https://raw.githubusercontent.com/apache/spark/master/data/mllib/sample_lda_libsvm_data.txt"
local_data_path = "sample_lda_libsvm_data.txt"
download_data(data_url, local_data_path)

# Initialize Spark Session
spark = SparkSession \
    .builder \
    .appName("LDA Example") \
    .getOrCreate()

# Load and parse data
data = spark.read.format("libsvm").load(local_data_path)

# Trains a LDA model
lda = LDA(k=3, maxIter=10)
model = lda.fit(data)

# Shows the result
transformed = model.transform(data)
topics = model.describeTopics(maxTermsPerTopic=3)
print("Topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the transformed (topic distributions for documents)
print("Transformed document (topic distributions):")
transformed.show(truncate=False)

# Stop the Spark session
spark.stop()


Topics described by their top-weighted terms:
+-----+-----------+---------------------------------------------------------------+
|topic|termIndices|termWeights                                                    |
+-----+-----------+---------------------------------------------------------------+
|0    |[3, 8, 2]  |[0.09914672539329934, 0.09517768603815174, 0.09401299772489345]|
|1    |[6, 10, 9] |[0.22164169772396472, 0.17481235649818788, 0.10521607779173928]|
|2    |[4, 0, 5]  |[0.1550551021060799, 0.14768849157620403, 0.14561601537206648] |
+-----+-----------+---------------------------------------------------------------+

Transformed document (topic distributions):
+-----+---------------------------------------------------------------+--------------------------------------------------------------+
|label|features                                                       |topicDistribution                                             |
+-----+--------------------------------------------

The Latent Dirichlet Allocation (LDA) model was applied to a dataset using PySpark, identifying the major topics present within the documents. Each topic is described by key terms with associated weights that signify their relevance. The output also shows each document's distribution across these topics, indicating the percentage contribution of each topic to individual documents. This helps in understanding the thematic structure of the text data, making it easier to categorize and summarize the documents based on their content.

Assignment 9.3

In [13]:
import requests
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

# Function to download the file
def download_data(url, local_filename):
    response = requests.get(url)
    with open(local_filename, 'wb') as f:
        f.write(response.content)

# Download the dataset
data_url = "https://raw.githubusercontent.com/apache/spark/master/data/mllib/als/sample_movielens_ratings.txt"
local_data_path = "sample_movielens_ratings.txt"
download_data(data_url, local_data_path)

# Initialize Spark Session
spark = SparkSession \
    .builder \
    .appName("Collaborative Filtering Example") \
    .getOrCreate()

# Load and parse data into DataFrame
lines = spark.read.text(local_data_path).rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=float(p[2]), timestamp=int(p[3])))
ratings = spark.createDataFrame(ratingsRDD)

# Split data into training and test sets
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop", implicitPrefs=False)
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
userRecs.show()

# Stop the Spark session
spark.stop()


Root-mean-square error = 1.6518418668707993
+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    20|[{22, 4.673408}, ...|
|    10|[{46, 3.9353182},...|
|     0|[{72, 4.232644}, ...|
|     1|[{62, 3.9370146},...|
|    21|[{29, 4.984131}, ...|
|    11|[{18, 5.2808976},...|
|    12|[{28, 5.610158}, ...|
|    22|[{88, 5.0526786},...|
|     2|[{7, 5.622658}, {...|
|    13|[{93, 3.916977}, ...|
|     3|[{51, 5.0403833},...|
|    23|[{46, 5.456856}, ...|
|     4|[{93, 3.9825988},...|
|    24|[{90, 5.155915}, ...|
|    14|[{52, 5.0879784},...|
|     5|[{55, 4.4657507},...|
|    15|[{46, 4.8242826},...|
|    25|[{96, 4.108087}, ...|
|    26|[{75, 6.6279454},...|
|     6|[{25, 5.0263276},...|
+------+--------------------+
only showing top 20 rows



Implemented a collaborative filtering model using the Alternating Least Squares (ALS) algorithm in PySpark. The model was trained on a dataset from the Movielens project, which consists of user-movie ratings. After splitting this data into training and test sets, the ALS algorithm was used to predict movie ratings for the test set, and the model's performance was evaluated using the Root Mean Square Error (RMSE) metric. The output shows an RMSE of approximately 1.588, which quantifies the average prediction error in terms of the ratings' scale. Lower RMSE values generally indicate better model performance, suggesting that the model predictions are relatively close to the actual ratings. Also listed are the top 10 movie recommendations for each user, generated by the model. These recommendations are tailored for individual users based on their ratings history, showcasing the model's ability to leverage user and item interactions to personalize content.