In [1]:
import findspark

In [2]:
findspark.init()

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, lower, explode, instr,col, when, contains
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
import warnings
warnings.filterwarnings('ignore')



In [4]:
# Create a SparkSession
spark = SparkSession.builder \
    .appName("Movielens Data Wrangling") \
    .getOrCreate()


In [5]:
# Read CSV file into DataFrame
df = spark.read.csv("ml-25m\movies.csv", header=True, inferSchema=True)

In [6]:
df.columns

['movieId', 'title', 'genres']

In [7]:
df.count() # Returns the number of rows in this DataFrame.

62423

In [8]:
# Show the DataFrame schema
df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [9]:
# Show the first three rows of the DataFrame
df.show(3)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 3 rows



In [10]:
genres_values = df.select("genres").rdd.flatMap(lambda x: x).collect()
genres_values[:10]

['Adventure|Animation|Children|Comedy|Fantasy',
 'Adventure|Children|Fantasy',
 'Comedy|Romance',
 'Comedy|Drama|Romance',
 'Comedy',
 'Action|Crime|Thriller',
 'Comedy|Romance',
 'Adventure|Children',
 'Action',
 'Action|Adventure|Thriller']

In [11]:
def split_column_and_add(df, col_name):
    # Split the column by '|' and create a new column
    df_1 = df.withColumn("genres_lr_arr", split(lower(df[col_name]), "\|"))
    return df_1

In [12]:
df_1 = split_column_and_add(df, "genres")

In [13]:
df_1.select("genres_lr_arr").show(5,truncate=False)

+-------------------------------------------------+
|genres_lr_arr                                    |
+-------------------------------------------------+
|[adventure, animation, children, comedy, fantasy]|
|[adventure, children, fantasy]                   |
|[comedy, romance]                                |
|[comedy, drama, romance]                         |
|[comedy]                                         |
+-------------------------------------------------+
only showing top 5 rows



In [14]:
all_words = df_1.select(explode("genres_lr_arr").alias("word"))

In [15]:
all_words.show(5)
all_words.count()

+---------+
|     word|
+---------+
|adventure|
|animation|
| children|
|   comedy|
|  fantasy|
+---------+
only showing top 5 rows



112307

In [16]:
distinct_words = all_words.select("word").distinct()

In [17]:
distinct_words.show()

+------------------+
|              word|
+------------------+
|             crime|
|              imax|
|           fantasy|
|       documentary|
|            action|
|         animation|
|           mystery|
|            horror|
|         film-noir|
|           musical|
|         adventure|
|             drama|
|(no genres listed)|
|           western|
|          children|
|               war|
|           romance|
|          thriller|
|            sci-fi|
|            comedy|
+------------------+



In [18]:
distinct_words.count()

20

In [19]:
category = "adventure"
df_1.withColumn("genres_" + category, instr(lower(col("genres")), category)).select("genres", "genres_adventure").show()

+--------------------+----------------+
|              genres|genres_adventure|
+--------------------+----------------+
|Adventure|Animati...|               1|
|Adventure|Childre...|               1|
|      Comedy|Romance|               0|
|Comedy|Drama|Romance|               0|
|              Comedy|               0|
|Action|Crime|Thri...|               0|
|      Comedy|Romance|               0|
|  Adventure|Children|               1|
|              Action|               0|
|Action|Adventure|...|               8|
|Comedy|Drama|Romance|               0|
|       Comedy|Horror|               0|
|Adventure|Animati...|               1|
|               Drama|               0|
|Action|Adventure|...|               8|
|         Crime|Drama|               0|
|       Drama|Romance|               0|
|              Comedy|               0|
|              Comedy|               0|
|Action|Comedy|Cri...|               0|
+--------------------+----------------+
only showing top 20 rows



In [20]:
df_1.withColumn("genres_" + category, 
                       when(instr(lower(col("genres")), category) == 0, 0).otherwise(1)) \
           .select("genres", "genres_" + category).show()

+--------------------+----------------+
|              genres|genres_adventure|
+--------------------+----------------+
|Adventure|Animati...|               1|
|Adventure|Childre...|               1|
|      Comedy|Romance|               0|
|Comedy|Drama|Romance|               0|
|              Comedy|               0|
|Action|Crime|Thri...|               0|
|      Comedy|Romance|               0|
|  Adventure|Children|               1|
|              Action|               0|
|Action|Adventure|...|               1|
|Comedy|Drama|Romance|               0|
|       Comedy|Horror|               0|
|Adventure|Animati...|               1|
|               Drama|               0|
|Action|Adventure|...|               1|
|         Crime|Drama|               0|
|       Drama|Romance|               0|
|              Comedy|               0|
|              Comedy|               0|
|Action|Comedy|Cri...|               0|
+--------------------+----------------+
only showing top 20 rows



In [21]:
categories = ["crime", "imax", "fantasy", "documentary", "action", "animation", "mystery", 
              "horror", "film-noir", "musical", "adventure", "drama", 
              "western", "children", "war", "romance", "thriller", "sci-fi", "comedy"]


In [22]:
oneHotDF = df_1
for category in categories:
    oneHotDF = oneHotDF.withColumn("genres_" + category, 
                                   when(instr(lower(col("genres")), category) == 0, 0).otherwise(1))

In [23]:
oneHotDF.show(1)

+-------+----------------+--------------------+--------------------+------------+-----------+--------------+------------------+-------------+----------------+--------------+-------------+----------------+--------------+----------------+------------+--------------+---------------+----------+--------------+---------------+-------------+-------------+
|movieId|           title|              genres|       genres_lr_arr|genres_crime|genres_imax|genres_fantasy|genres_documentary|genres_action|genres_animation|genres_mystery|genres_horror|genres_film-noir|genres_musical|genres_adventure|genres_drama|genres_western|genres_children|genres_war|genres_romance|genres_thriller|genres_sci-fi|genres_comedy|
+-------+----------------+--------------------+--------------------+------------+-----------+--------------+------------------+-------------+----------------+--------------+-------------+----------------+--------------+----------------+------------+--------------+---------------+----------+-------

In [24]:
oneHotDF.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- genres_lr_arr: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- genres_crime: integer (nullable = false)
 |-- genres_imax: integer (nullable = false)
 |-- genres_fantasy: integer (nullable = false)
 |-- genres_documentary: integer (nullable = false)
 |-- genres_action: integer (nullable = false)
 |-- genres_animation: integer (nullable = false)
 |-- genres_mystery: integer (nullable = false)
 |-- genres_horror: integer (nullable = false)
 |-- genres_film-noir: integer (nullable = false)
 |-- genres_musical: integer (nullable = false)
 |-- genres_adventure: integer (nullable = false)
 |-- genres_drama: integer (nullable = false)
 |-- genres_western: integer (nullable = false)
 |-- genres_children: integer (nullable = false)
 |-- genres_war: integer (nullable = false)
 |-- genres_romance: integer (nullable = false)
 |-- genres_thriller:

In [25]:
featurecols = oneHotDF.select(
    *filter(lambda col: col.startswith("genres_") and col != "genres_lr_arr", oneHotDF.columns)
)

In [26]:
featurecols.printSchema()

root
 |-- genres_crime: integer (nullable = false)
 |-- genres_imax: integer (nullable = false)
 |-- genres_fantasy: integer (nullable = false)
 |-- genres_documentary: integer (nullable = false)
 |-- genres_action: integer (nullable = false)
 |-- genres_animation: integer (nullable = false)
 |-- genres_mystery: integer (nullable = false)
 |-- genres_horror: integer (nullable = false)
 |-- genres_film-noir: integer (nullable = false)
 |-- genres_musical: integer (nullable = false)
 |-- genres_adventure: integer (nullable = false)
 |-- genres_drama: integer (nullable = false)
 |-- genres_western: integer (nullable = false)
 |-- genres_children: integer (nullable = false)
 |-- genres_war: integer (nullable = false)
 |-- genres_romance: integer (nullable = false)
 |-- genres_thriller: integer (nullable = false)
 |-- genres_sci-fi: integer (nullable = false)
 |-- genres_comedy: integer (nullable = false)



In [27]:
len(featurecols.columns)

19

In [28]:
# Assuming you have a DataFrame named oneHotDF
featurecols = list(filter(lambda col: col.startswith("genres_") and col != "genres_lr_arr", oneHotDF.columns))

# Create a VectorAssembler
assembler = VectorAssembler(inputCols=featurecols, outputCol="features")

In [29]:
dataset = assembler.transform(oneHotDF)
dataset.show()

+-------+--------------------+--------------------+--------------------+------------+-----------+--------------+------------------+-------------+----------------+--------------+-------------+----------------+--------------+----------------+------------+--------------+---------------+----------+--------------+---------------+-------------+-------------+--------------------+
|movieId|               title|              genres|       genres_lr_arr|genres_crime|genres_imax|genres_fantasy|genres_documentary|genres_action|genres_animation|genres_mystery|genres_horror|genres_film-noir|genres_musical|genres_adventure|genres_drama|genres_western|genres_children|genres_war|genres_romance|genres_thriller|genres_sci-fi|genres_comedy|            features|
+-------+--------------------+--------------------+--------------------+------------+-----------+--------------+------------------+-------------+----------------+--------------+-------------+----------------+--------------+----------------+--------

In [30]:
# Define the number of clusters
k = 5
# Optionally, set a random seed for reproducibility
seed = 42
# Create KMeans instance
kmeans = KMeans().setK(k).setSeed(seed)
# Train the model
model = kmeans.fit(dataset)
# Make predictions
predictions = model.transform(dataset)

In [31]:
# Evaluate clustering by computing Silhouette score The score ranges from -1 to 1, where a high value indicates that the object is well matched to its own cluster and poorly matched to neighboring clusters.
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.3333159398549893


In [32]:
# Shows the result
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
    print(center)

Cluster Centers: 
[0.03162055 0.00240591 0.06306926 0.00601478 0.08455061 0.01185771
 0.13163774 0.99518818 0.00103111 0.00360887 0.02302801 0.1345592
 0.00584293 0.00343702 0.00498367 0.01666953 0.36707338 0.16343014
 0.12648221]
[0.65772707 0.0044451  0.01022374 0.00340791 0.54482146 0.00963106
 0.11853608 0.01748407 0.02993036 0.00266706 0.08001185 0.63668692
 0.01644688 0.00385242 0.04237665 0.03333827 0.48125648 0.05126685
 0.09038376]
[0.0177252  0.00157468 0.01744256 0.21447894 0.06137199 0.04211249
 0.02325675 0.         0.00145355 0.02176283 0.00189769 0.
 0.03734808 0.00076715 0.01562563 0.10324222 0.06153349 0.04380829
 0.38741067]
[1.81128896e-02 2.02190396e-02 2.80117944e-01 3.15922494e-03
 2.91912384e-01 3.31718618e-01 2.80117944e-02 5.89721988e-03
 4.21229992e-04 3.32771693e-02 5.96461668e-01 3.91743892e-02
 2.71693345e-02 4.90311710e-01 2.10614996e-02 7.39258635e-02
 3.09604044e-02 1.65543387e-01 3.56781803e-01]
[8.40707965e-03 7.86627335e-04 2.62045231e-02 1.08161259e-