In [0]:
from pyspark.sql.types import *

separator = "::"

movies_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("title", StringType(), True),
  StructField("genres", StringType(), True)
])

In [0]:
movies_df = spark.read.csv(
  "/FileStore/movies.dat",
  header = False,
  schema = movies_schema,
  sep = separator
)

In [0]:
movies_df.show(5)

In [0]:
import matplotlib.pyplot as plt
from wordcloud import WordCloud, STOPWORDS, ImageColorGenerator

movie_titles = movies_df.select("title").collect()
movie_titles_list = [movie_titles[i][0] for i in range(len(movie_titles))]
movie_titles_corpus = (" ").join(title for title in movie_titles_list)

wordcloud = WordCloud(stopwords = STOPWORDS,
                      background_color = "lightgrey",
                      colormap = "hot",
                      max_words = 50,
                      # collocations = False,
                     ).generate(movie_titles_corpus)

plt.figure(figsize = (10, 8))
plt.imshow(wordcloud, interpolation = "bilinear")
plt.axis("off")
plt.show()

In [0]:
posters_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("poster", StringType(), True),
])

In [0]:
posters_df = spark.read.csv(
  "/FileStore/movie_poster.csv",
  header = False,
  schema = posters_schema
)

In [0]:
movies_df = movies_df.join(
  posters_df,
  ["id"]
)

In [0]:
movies_df.show(5)

In [0]:
users_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("gender", StringType(), True),
  StructField("age", IntegerType(), True),
  StructField("occupation_id", IntegerType(), True),
  StructField("zip_code", StringType(), True)
])

In [0]:
users_df = spark.read.csv(
  "/FileStore/users.dat",
  header = False,
  schema = users_schema,
  sep = separator
)

In [0]:
users_df.show(5)

In [0]:
(users_df
  .groupBy("gender")
  .count()
  .show()
)

In [0]:
occupations = [( 0, "other"),
               ( 1, "academic/educator"),
               ( 2, "artist"),
               ( 3, "clerical/admin"),
               ( 4, "college/grad student"),
               ( 5, "customer service"),
               ( 6, "doctor/health care"),
               ( 7, "executive/managerial"),
               ( 8, "farmer"),
               ( 9, "homemaker"),
               (10, "K-12 student"),
               (11, "lawyer"),
               (12, "programmer"),
               (13, "retired"),
               (14, "sales/marketing"),
               (15, "scientist"),
               (16, "self-employed"),
               (17, "technician/engineer"),
               (18, "tradesman/craftsman"),
               (19, "unemployed"),
               (20, "writer")]

occupations_schema = StructType([
  StructField("occupation_id", IntegerType(), True),
  StructField("occupation", StringType(), True)
])

In [0]:
occupations_df = spark.createDataFrame(
  data = occupations,
  schema = occupations_schema
)

In [0]:
users_df = users_df.join(
  occupations_df,
  ["occupation_id"]
)

users_df = users_df.drop(
  "occupation_id"
)

In [0]:
users_df.show(5)

In [0]:
(users_df
  .groupBy("occupation")
  .count()
  .orderBy("count", ascending = False)
  .show(21)
)

In [0]:
ratings_schema = StructType([
  StructField("user_id", IntegerType(), True),
  StructField("movie_id", IntegerType(), True),
  StructField("rating", IntegerType(), True),
  StructField("timestamp", IntegerType(), True)
])

In [0]:
ratings_df = spark.read.csv(
  "/FileStore/ratings.dat",
  header = False,
  schema = ratings_schema,
  sep = separator
)

In [0]:
ratings_df.show(5)

In [0]:
(ratings_df
  .describe("user_id", "movie_id", "rating")
  .show()
)

In [0]:
movie_ratings = ratings_df.select("rating").collect()
movie_ratings_list = [movie_ratings[i][0] for i in range(len(movie_ratings))]

plt.hist(
  movie_ratings_list,
  edgecolor = "white",
  color = "#32B5C9",
  rwidth = 0.9,
  bins = [0.5, 1.5, 2.5, 3.5, 4.5, 5.5]
)

plt.ylabel("Frequency")
plt.xlabel("Rating")
plt.show()

In [0]:
(train, test) = ratings_df.randomSplit([0.7, 0.3], seed = 123)

In [0]:
from pyspark.ml.recommendation import ALS

als = ALS(
  maxIter = 5,
  regParam = 0.01,
  userCol = "user_id",
  itemCol = "movie_id",
  ratingCol = "rating",
  coldStartStrategy = "drop",
  seed = 0
)

model = als.fit(train)

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

predictions = model.transform(test)

predictions.show(5)

In [0]:
re = RegressionEvaluator(
  predictionCol = "prediction",
  labelCol = "rating",
  metricName = "rmse"
)

rmse = re.evaluate(predictions)
print(rmse)

In [0]:
model.userFactors.show(5) # show(5, False) to show the whole column

In [0]:
model.itemFactors.show(5) # show(5, False) to show the whole column

In [0]:
user_factors_df = model.userFactors
item_factors_df = model.itemFactors

In [0]:
import array, binascii

def vector_to_hex(vector):
  vector_bytes = bytes(array.array("f", vector))
  vector_hex = binascii.hexlify(vector_bytes)
  vector_string = str(vector_hex.decode())
  return vector_string

vector_to_hex = udf(vector_to_hex, StringType())
spark.udf.register("vector_to_hex", vector_to_hex)

In [0]:
user_factors_df = user_factors_df.withColumn(
  "factors",
  vector_to_hex("features")
)

item_factors_df = item_factors_df.withColumn(
  "factors",
  vector_to_hex("features")
)

In [0]:
users = users_df.join(
  user_factors_df,
  ["id"]
)

users = users.drop("features")

In [0]:
users.show(5)

In [0]:
movies = movies_df.join(
  item_factors_df,
  ["id"]
)

movies = movies.drop("features")

In [0]:
%run ./Setup

In [0]:
spark.conf.set("spark.datasource.singlestore.ddlEndpoint", cluster)
spark.conf.set("spark.datasource.singlestore.user", "admin")
spark.conf.set("spark.datasource.singlestore.password", password)
spark.conf.set("spark.datasource.singlestore.disablePushdown", "false")

In [0]:
(users.write
   .format("singlestore")
   .option("loadDataCompression", "LZ4")
   .mode("ignore")
   .save("recommender_db.users")
)

In [0]:
(movies.write
   .format("singlestore")
   .option("loadDataCompression", "LZ4")
   .mode("ignore")
   .save("recommender_db.movies")
)