In [4]:
from pyspark.sql.functions import split, explode, col, udf
from pyspark.sql.types import *
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder.appName('temps-demo').getOrCreate()

# Setting storage account connection
container_name = "datasource"
storage_account_name = "sghuierdatabricks"
storage_account_access_key = "f4hfs9ZwA9kfTBnSRsYF+gGJ7V658cOQhcAd830iPfW0VaT5sZr88sOSvqR64fRR+SqCejlhYYy/+ASttrBBTQ=="
spark.conf.set("fs.azure.account.key." + storage_account_name +".blob.core.windows.net",storage_account_access_key)

In [6]:
# Get stroage data location
ratingsLocation = "wasbs://" + container_name +"@" + storage_account_name + ".blob.core.windows.net/ratings.csv"
moviesLocation = "wasbs://" + container_name +"@" + storage_account_name +".blob.core.windows.net/movies.csv"
# Get ratings and movies data
ratings = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .load(ratingsLocation)
movies = spark.read.format("csv") \
  .option("inferSchema", "true") \
  .option("header", "true") \
  .load(moviesLocation)

In [7]:
movies.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [8]:
ratings.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [9]:
# first we cast the int column to Timestamp
ratingsTemp = ratings \
  .withColumn("ts", ratings.timestamp.cast("Timestamp")) 
  
# then, we cast Timestamp to Date
ratings = ratingsTemp \
  .withColumn("reviewDate", ratingsTemp.ts.cast("Date")) \
  .drop("ts", "timestamp")

In [10]:
ratings.show()

+------+-------+------+----------+
|userId|movieId|rating|reviewDate|
+------+-------+------+----------+
|     1|      1|   4.0|2000-07-31|
|     1|      3|   4.0|2000-07-31|
|     1|      6|   4.0|2000-07-31|
|     1|     47|   5.0|2000-07-31|
|     1|     50|   5.0|2000-07-31|
|     1|     70|   3.0|2000-07-31|
|     1|    101|   5.0|2000-07-31|
|     1|    110|   4.0|2000-07-31|
|     1|    151|   5.0|2000-07-31|
|     1|    157|   5.0|2000-07-31|
|     1|    163|   5.0|2000-07-31|
|     1|    216|   5.0|2000-07-31|
|     1|    223|   3.0|2000-07-31|
|     1|    231|   5.0|2000-07-31|
|     1|    235|   4.0|2000-07-31|
|     1|    260|   5.0|2000-07-31|
|     1|    296|   3.0|2000-07-31|
|     1|    316|   3.0|2000-07-31|
|     1|    333|   5.0|2000-07-31|
|     1|    349|   4.0|2000-07-31|
+------+-------+------+----------+
only showing top 20 rows



In [11]:

# use a Spark UDF(user-defined function) to get the year a movie was made, from the title

def titleToYear(title):
  try:
    return int(title[title.rfind("(")+1:title.rfind(")")])
  except:
    return None
# register the above Spark function as UDF
titleToYearUdf = udf(titleToYear, IntegerType())
# add the movieYear column
movies = movies.withColumn("movieYear", titleToYearUdf(movies.title))
# explode the 'movies'.'genres' values into separate rows
movies_denorm = movies.withColumn("genre", explode(split("genres", "\|"))).drop("genres")
# join movies and ratings datasets on movieId
ratings_denorm = ratings.alias('a').join(movies_denorm.alias('b'), 'movieId', 'inner')

In [12]:
ratings_denorm.show()

+-------+------+------+----------+--------------------+---------+---------+
|movieId|userId|rating|reviewDate|               title|movieYear|    genre|
+-------+------+------+----------+--------------------+---------+---------+
|      1|     1|   4.0|2000-07-31|    Toy Story (1995)|     1995|  Fantasy|
|      1|     1|   4.0|2000-07-31|    Toy Story (1995)|     1995|   Comedy|
|      1|     1|   4.0|2000-07-31|    Toy Story (1995)|     1995| Children|
|      1|     1|   4.0|2000-07-31|    Toy Story (1995)|     1995|Animation|
|      1|     1|   4.0|2000-07-31|    Toy Story (1995)|     1995|Adventure|
|      3|     1|   4.0|2000-07-31|Grumpier Old Men ...|     1995|  Romance|
|      3|     1|   4.0|2000-07-31|Grumpier Old Men ...|     1995|   Comedy|
|      6|     1|   4.0|2000-07-31|         Heat (1995)|     1995| Thriller|
|      6|     1|   4.0|2000-07-31|         Heat (1995)|     1995|    Crime|
|      6|     1|   4.0|2000-07-31|         Heat (1995)|     1995|   Action|
|     47|   