### Basic Spark imports to set up session

The Spark session gives the entire application some context and helps the master node communicate with the worker nodes

In [None]:
# a SparkSession object can perform the most common data processing tasks
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('PySpark Practice').getOrCreate() # will return existing session if one was created before and was not closed

In [None]:
# To get some info about the spark session that is currently running
spark

### Read dataset and define schemas

Define dataset schemas for each of the 3 DAT files

In [None]:
dataset_folder = "TwitterMoviesLatest-11OCT23"

In [None]:
userdata = dataset_folder+"/users.dat"
ratingdata = dataset_folder+"/ratings.dat"
moviedata = dataset_folder+"/movies.dat"

In [None]:
from pyspark.sql import types

# It is usually a good idea to define the schema for pyspark to read CSVs nicely
# string -> StringType, int -> IntegerType
# Be sure to import types from pyspark.sql
userdata_schema = types.StructType([
    types.StructField('serial', types.IntegerType()),
    types.StructField('user_id', types.StringType()),
])

moviedata_schema = types.StructType([
    types.StructField('movie_id', types.StringType()),
    types.StructField('movie_title_year', types.StringType()),
    types.StructField('movie_genre', types.StringType()),
])

ratingdata_schema = types.StructType([
    types.StructField('user_id', types.StringType()),
    types.StructField('movie_id', types.StringType()),
    types.StructField('rating', types.IntegerType()),
    types.StructField('timestamp', types.DoubleType()),
])

In [None]:
userdata_df = spark.read.csv(userdata, sep="::", schema=userdata_schema)

In [None]:
moviedata_df = spark.read.csv(moviedata, sep="::", schema=moviedata_schema)

In [None]:
ratingdata_df = spark.read.csv(ratingdata, sep="::", schema=ratingdata_schema)

In [None]:
# Check schema information
userdata_df.printSchema()

In [None]:
# Inspect first n (5, in this case) rows of df
userdata_df.show(5)

In [None]:
moviedata_df.printSchema()

In [None]:
moviedata_df.show(5)

In [None]:
ratingdata_df.printSchema()

In [None]:
ratingdata_df.show(5)

### Transform datasets

Some of the data is weirdly inserted in the columns (movie_title (year), genre|genre|genre, epoch timestamps etc.). Here we want to convert it into a flatter style. We also don't want to drop null values right away (maybe think of imputing the values).

In [None]:
# This helps us split columns on patterns
from pyspark.sql.functions import split

In [None]:
moviedata_df.select(["movie_title_year", "movie_genre"]).show(5, truncate=False)

In [None]:
# Split column on "\\(" (the \\ is needed to escape the character)
split_movie_title_year = split(moviedata_df["movie_title_year"], " \\(")

In [None]:
title = split_movie_title_year.getItem(0) # Movie title

In [None]:
year_intermediate = split_movie_title_year.getItem(1)
year = split(year_intermediate, "\\)").getItem(0)

In [None]:
moviedata_df2 = moviedata_df.withColumns({
    "movie_title": title,
    "movie_year": year
})

In [None]:
moviedata_df2.drop("movie_title_year").show(5, truncate=False)

In [None]:
moviedata_df2 = moviedata_df2.withColumn("movie_genres", split(moviedata_df2["movie_genre"], "\\|"))

In [None]:
moviedata_df2.drop("movie_genre").drop("movie_title_year").show(5, truncate=False)

In [None]:
moviedata_df = moviedata_df2.select(["movie_id", "movie_title", "movie_year", "movie_genres"])

In [None]:
moviedata_df.show(5, truncate=False)

In [None]:
# Delete unnecessary dataframes and columns
del moviedata_df2
del split_movie_title_year
del title
del year_intermediate
del year

### Merging the data

Join rating data across users and movie

In [None]:
merged_df = ratingdata_df.join(moviedata_df, ["movie_id"], "inner").join(userdata_df, ["user_id"], "inner")
merged_df = merged_df.drop(merged_df["serial"])

In [None]:
merged_df.show(5)

In [None]:
merged_df.printSchema()

In [None]:
# Convert to proper timestamp
from pyspark.sql.functions import to_timestamp
merged_df = merged_df.withColumn("timestamp_proper", to_timestamp(merged_df["timestamp"])).drop(merged_df["timestamp"]).withColumnRenamed("timestamp_proper", "timestamp")

In [None]:
merged_df.show(5)

In [None]:
# Reorder the columns
merged_df = merged_df.select(["movie_title", "movie_year", "movie_genres", "user_id", "rating", "timestamp"])

In [None]:
merged_df.show(5)

In [None]:
merged_df = merged_df.sort(merged_df["timestamp"].desc())

In [None]:
merged_df.cache()

### Grouped stats

In [None]:
# Number of and min, max and avg ratings across movies
from pyspark.sql.functions import sum, avg, max, min, mean, count

movieratingstats_df = merged_df.groupBy("movie_title").agg(
    count("rating").alias("num_ratings"),
    min("rating").alias("min_rating"),
    max("rating").alias("max_rating"),
    avg("rating").alias("avg_rating"),
)

In [None]:
movieratingstats_df.show(5, truncate=False)

In [None]:
# Check across different time periods
from pyspark.sql.functions import year
movieratingyearstats_df = merged_df.withColumn("year", year("timestamp"))
movieratingyearstats_df = movieratingyearstats_df.drop(movieratingyearstats_df["timestamp"])
movieratingyearstats_df = movieratingyearstats_df.sort(movieratingyearstats_df["year"].desc())

In [None]:
movieratingyearstats_df.show(5, truncate=False)

In [None]:
# Check to see how movies performed in terms of ratings over time

### User-movie stats

This is a little harder to do given the price limits around the Twitter API. I can do 25 user profile requests every 24 hours, which is pathetic!

In [None]:
# Join with usernames and build user profiles based on what they rated
userdata_df.select("user_id").distinct().count() # 65771 unique user IDs
userdata_df = userdata_df.dropDuplicates(["user_id"])

In [None]:
# Check to see if users' recommendations/tastes changed over time and how