In [1]:
# Similar to SparkContext, for SparkSQL you need a SparkSession
from pyspark.sql import SparkSession
# Also all the functions (select, where, groupby) needs to be imported
from pyspark.sql.functions import *

In [2]:
# Get spark session
spark = SparkSession.builder.getOrCreate()

In [3]:
# read data into dataframe
ratings_df = spark.read.csv("/home/fieldengineer/Documents/courses/architect_big_data_solutions_with_spark-master/Datasets/movielens/ratings.csv", header=True)

### DataFrames Operations

In this part you will learn how to programmatically use the SQL capabilities of DataFrame. For the full list of documentation: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql

In [5]:
# You can use the select method to grab specific columns
ratings_df.select(['movieId','rating']).show()

+-------+------+
|movieId|rating|
+-------+------+
|     31|   2.5|
|   1029|   3.0|
|   1061|   3.0|
|   1129|   2.0|
|   1172|   4.0|
|   1263|   2.0|
|   1287|   2.0|
|   1293|   2.0|
|   1339|   3.5|
|   1343|   2.0|
|   1371|   2.5|
|   1405|   1.0|
|   1953|   4.0|
|   2105|   4.0|
|   2150|   3.0|
|   2193|   2.0|
|   2294|   2.0|
|   2455|   2.5|
|   2968|   1.0|
|   3671|   3.0|
+-------+------+
only showing top 20 rows



In [6]:
# see how ratings are in string
ratings_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [9]:
from pyspark.sql.types import IntegerType 
ratings_df = ratings_df.withColumn('rating', ratings_df['rating'].cast(IntegerType()))

In [10]:
ratings_df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: string (nullable = true)



In [11]:
# You can change the Data type of any column by casting them to your desired data type
# First you have to import that data type from pyspark.sql.types
from pyspark.sql.types import IntegerType
# Then you can use withColumn() to apply / cast each row of the column (Notice how the square bracket annotation is used)
ratings_df = ratings_df.withColumn("rating", ratings_df['rating'].cast(IntegerType()))
# take a look at the schema now
ratings_df.select(['movieId','rating']).printSchema()

root
 |-- movieId: string (nullable = true)
 |-- rating: integer (nullable = true)



In [12]:
# You can use the filter() here to filter on a condition (just like we did with RDD!)
# For example we can check if there are any missing ratings 
ratings_df.filter(ratings_df.rating.isNull()).count()

0

In [13]:
# similar to filter you can also use where (from SQL syntax)
ratings_df.where(ratings_df.rating.isNull()).count()

0

### Group By
The GROUP BY statement is used with **aggregate functions (COUNT, MAX, MIN, SUM, AVG)** to group the result-set by one or more columns.

In [14]:
# For instance, we can group by the movieId over rating and aggregate over the average value and total reviews (very easily)
ratings_df.groupBy('movieId').agg(avg('rating').alias('avg_rating'), count('rating').alias('reviews')).show()

+-------+------------------+-------+
|movieId|        avg_rating|reviews|
+-------+------------------+-------+
|   2294| 3.150943396226415|     53|
|    296| 4.157407407407407|    324|
|   3210|               3.5|     52|
|  88140|3.4545454545454546|     22|
| 115713| 3.769230769230769|     26|
|  27317|3.6666666666666665|      6|
|   1090|3.8088235294117645|     68|
|   2088|              2.52|     25|
|   2136| 2.611111111111111|     18|
|   6240|               3.0|      2|
|   3959|             3.375|     16|
|   3606| 4.111111111111111|      9|
|   6731|3.1666666666666665|      6|
|  62912|2.6666666666666665|      3|
|  89864|3.5384615384615383|     13|
| 106022|               4.0|      1|
|  48738| 3.933333333333333|     15|
|   2069|3.7777777777777777|      9|
|   3414|               4.0|      4|
|   2162|2.5555555555555554|      9|
+-------+------------------+-------+
only showing top 20 rows



In [15]:
# We can also see the top 10 rated movies if they have been reviewed at least 50 times or more
ratings_sum_df = ratings_df.groupBy('movieId').agg(avg('rating').alias('avg_rating'), count('rating').alias('reviews'))
ratings_sum_df.filter(ratings_sum_df.reviews > 50).sort('avg_rating', ascending=False).limit(10).show()

+-------+------------------+-------+
|movieId|        avg_rating|reviews|
+-------+------------------+-------+
|    318| 4.405144694533762|    311|
|    858|             4.395|    200|
|    913| 4.338709677419355|     62|
|   1221| 4.303703703703704|    135|
|     50|4.2835820895522385|    201|
|   1252|              4.25|     76|
|    904| 4.217391304347826|     92|
|    527| 4.209016393442623|    244|
|   2019| 4.203703703703703|     54|
|   1203| 4.202702702702703|     74|
+-------+------------------+-------+



### User Defined Functions (UDF)
Similar to custom functions for Map, you can write user defined function to transform one or more columns. 
More about UDF on https://docs.databricks.com/spark/latest/spark-sql/udf-in-python.html

In [18]:
# Using UDF is a three step process. Before anything you will need to import the udf library
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

In [19]:
# If you can express your user defined function as lambda then you can register the UDF and define it in one line like below
# for example this UDF will tell me if I should watch a movie or not based on its average rating
watchable_udf = udf(lambda avg_rating: 'yes' if avg_rating > 3.5 else 'no', StringType())

In [20]:
# Otherwise you can first write your function
# as you can see here we have more flexibility
# I will write the function to also incorporate the total number of reviews
def watchable_udf(avg_rating, reviews):
  if avg_rating > 3.5 and reviews > 50:
    return 'yes'
  elif avg_rating > 3.5 and reviews < 50:
    return 'maybe'
  else:
    return 'no'
# and then register it as an UDF with the return type declared
watchable_udf = udf(watchable_udf, StringType())

In [21]:
# Now you can use withColumn to apply the UDF over every row and create a new column 'watchable'
ratings_sum_df = ratings_sum_df.withColumn('watchable', watchable_udf(ratings_sum_df.avg_rating,ratings_sum_df.reviews))

In [23]:
ratings_sum_df.show()

+-------+------------------+-------+---------+
|movieId|        avg_rating|reviews|watchable|
+-------+------------------+-------+---------+
|   2294| 3.150943396226415|     53|       no|
|    296| 4.157407407407407|    324|      yes|
|   3210|               3.5|     52|       no|
|  88140|3.4545454545454546|     22|       no|
| 115713| 3.769230769230769|     26|    maybe|
|  27317|3.6666666666666665|      6|    maybe|
|   1090|3.8088235294117645|     68|      yes|
|   2088|              2.52|     25|       no|
|   2136| 2.611111111111111|     18|       no|
|   6240|               3.0|      2|       no|
|   3959|             3.375|     16|       no|
|   3606| 4.111111111111111|      9|    maybe|
|   6731|3.1666666666666665|      6|       no|
|  62912|2.6666666666666665|      3|       no|
|  89864|3.5384615384615383|     13|    maybe|
| 106022|               4.0|      1|    maybe|
|  48738| 3.933333333333333|     15|    maybe|
|   2069|3.7777777777777777|      9|    maybe|
|   3414|    

### Joins
A JOIN clause is used to combine rows from two or more tables, based on a related column between them. Here are the a few basic types of joins explained:

* (INNER) JOIN: Returns records that have matching values in both tables
* LEFT (OUTER) JOIN: Return all records from the left table, and the matched records from the right table
* RIGHT (OUTER) JOIN: Return all records from the right table, and the matched records from the left table
* FULL (OUTER) JOIN: Return all records when there is a match in either left or right table

Spark Supports more than just basic joins however. With the latest spark you get: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti joins! Take a look in  https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#join for more details.

In [24]:
# lets use the movies csv file to make sense of the movies in our previous results
movies_df = spark.read.csv("/home/fieldengineer/Documents/courses/architect_big_data_solutions_with_spark-master/Datasets/movielens/movies.csv", header=True)
movies_df.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [25]:
# we do an inner join to get more information about each movies
movie_ratings_sum_df = ratings_sum_df.join(movies_df, ratings_sum_df.movieId == movies_df.movieId)

In [26]:
# lets display a few results
movie_ratings_sum_df.select(['title','avg_rating','reviews','watchable']).show()

+--------------------+------------------+-------+---------+
|               title|        avg_rating|reviews|watchable|
+--------------------+------------------+-------+---------+
|         Antz (1998)| 3.150943396226415|     53|       no|
| Pulp Fiction (1994)| 4.157407407407407|    324|      yes|
|Fast Times at Rid...|               3.5|     52|       no|
|Captain America: ...|3.4545454545454546|     22|       no|
|   Ex Machina (2015)| 3.769230769230769|     26|    maybe|
|Audition (Ôdishon...|3.6666666666666665|      6|    maybe|
|      Platoon (1986)|3.8088235294117645|     68|      yes|
|       Popeye (1980)|              2.52|     25|       no|
|Nutty Professor, ...| 2.611111111111111|     18|       no|
| One Good Cop (1991)|               3.0|      2|       no|
|Time Machine, The...|             3.375|     16|       no|
|  On the Town (1949)| 4.111111111111111|      9|    maybe|
|Day of the Dead (...|3.1666666666666665|      6|       no|
|High School Music...|2.6666666666666665

### Challenge: Can you create a table of the highest rated movie per category?