In [None]:
!pip install pyspark

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

In [4]:
import findspark
findspark.init()

In [5]:
spark = SparkSession.builder.appName("RecommendationSystem").config("spark.driver.memory", "16g").config("spark.network.timeout", "600s").getOrCreate()

# Load Data

In [6]:
movieSchema = StructType([
    StructField("movie_id", IntegerType(), True),
    StructField("movie_title", StringType(), True),
    StructField("release_date", StringType(), True),
    StructField("video_release_date", StringType(), True),
    StructField("imdb_url", StringType(), True),
    StructField("unknown", IntegerType(), True),
    StructField("action", IntegerType(), True),
    StructField("adventure", IntegerType(), True),
    StructField("animation", IntegerType(), True),
    StructField("children", IntegerType(), True),
    StructField("comedy", IntegerType(), True),
    StructField("crime", IntegerType(), True),
    StructField("documentary", IntegerType(), True),
    StructField("drama", IntegerType(), True),
    StructField("fantasy", IntegerType(), True),
    StructField("film_noir", IntegerType(), True),
    StructField("horror", IntegerType(), True),
    StructField("musical", IntegerType(), True),
    StructField("mystery", IntegerType(), True),
    StructField("romance", IntegerType(), True),
    StructField("sci_fi", IntegerType(), True),
    StructField("thriller", IntegerType(), True),
    StructField("war", IntegerType(), True),
    StructField("western", IntegerType(), True),
])


#movie_df = spark.read.format("csv").schema(movieSchema).option("delimiter", "|").load("ml-100k/u.item")
movie_df = spark.read.format("csv").schema(movieSchema).option("delimiter", "|").load("hdfs://localhost:9000/user/vvd09/data/u.item")

# Data processing

In [12]:
from pyspark.sql.functions import when, col, array

genre_name_list = movie_df.columns[5:]
genres_df = movie_df.select("movie_id", array([col(column) for column in genre_name_list]).alias("genres"))

In [13]:
from pyspark.sql.functions import when, col, array, array_remove
#genres_df = movie_df.withColumn("genres", array([when(col(col_name) == 1, col_name).otherwise(None) for col_name in genre_name_list]))
genres_df = movie_df.withColumn("genres", array_remove(array([when(col(col_name) == 1, col_name).otherwise(0) for col_name in genre_name_list]), '0'))

In [14]:
genres_df = genres_df.select("movie_id", "movie_title", "genres")

In [15]:
genres_df.show(5)

+--------+-----------------+--------------------+
|movie_id|      movie_title|              genres|
+--------+-----------------+--------------------+
|       1| Toy Story (1995)|[animation, child...|
|       2| GoldenEye (1995)|[action, adventur...|
|       3|Four Rooms (1995)|          [thriller]|
|       4|Get Shorty (1995)|[action, comedy, ...|
|       5|   Copycat (1995)|[crime, drama, th...|
+--------+-----------------+--------------------+
only showing top 5 rows



# Select Features

In [16]:
# Import Libraries
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import Normalizer

# Convert genres to a feature vector using TF-IDF
hashingTF = HashingTF(inputCol="genres", outputCol="rawFeatures", numFeatures=19)
idf = IDF(inputCol="rawFeatures", outputCol="feature")
genres_df = hashingTF.transform(genres_df)
genres_df = idf.fit(genres_df).transform(genres_df)

# Normalize the feature vectors
normalizer = Normalizer(inputCol="feature", outputCol="norm")
genres_df = normalizer.transform(genres_df)

# Find similarity

In [17]:
rec_movie_id = 420 # target movie
target_movie = genres_df.filter(f"movie_id == {rec_movie_id}").first()
similarity_scores = genres_df.rdd\
    .map(lambda row: (row["movie_id"], row["movie_title"],float(row["norm"].dot(target_movie["norm"]))))\
    .toDF(["movie_id", "movie_title", "similarity"])

In [18]:
top10 = similarity_scores.where(similarity_scores.movie_id!=rec_movie_id).orderBy(["similarity"], ascending=False).limit(10)
top10.show()

+--------+--------------------+------------------+
|movie_id|         movie_title|        similarity|
+--------+--------------------+------------------+
|     432|     Fantasia (1940)|0.9999999999999999|
|     588|Beauty and the Be...|0.9999999999999999|
|     473|James and the Gia...|0.9999999999999999|
|      71|Lion King, The (1...|0.9999999999999999|
|     596|Hunchback of Notr...|0.9999999999999999|
|     418|   Cinderella (1950)|0.9999999999999999|
|     103|All Dogs Go to He...|0.9999999999999999|
|      99|Snow White and th...|0.9999999999999999|
|     501|        Dumbo (1941)|0.9999999999999999|
|     538|    Anastasia (1997)|0.9999999999999999|
+--------+--------------------+------------------+



In [20]:
print("Target Movie: ", movie_df.select("movie_title").where(movie_df.movie_id == rec_movie_id).collect()[0][0])
print("Target movie genres: ", genres_df.select("genres").where(genres_df.movie_id == rec_movie_id).collect()[0][0])

Target Movie:  Alice in Wonderland (1951)
Target movie genres:  ['animation', 'children', 'musical']


In [25]:
top10.join(genres_df, on=['movie_id']).select(genres_df.genres).collect()

[Row(genres=['animation', 'children', 'musical']),
 Row(genres=['animation', 'children', 'musical']),
 Row(genres=['animation', 'children', 'musical']),
 Row(genres=['animation', 'children', 'musical']),
 Row(genres=['animation', 'children', 'musical']),
 Row(genres=['animation', 'children', 'musical']),
 Row(genres=['animation', 'children', 'musical']),
 Row(genres=['animation', 'children', 'musical']),
 Row(genres=['animation', 'children', 'musical']),
 Row(genres=['animation', 'children', 'musical'])]

In [1]:
#movie_df.select("movie_id", "movie_title").orderBy("movie_title").show(50)

In [None]:
#176