# DataFrames - PySpark

Example analysis with the Movielens data set to demonstrate common data manipulation steps using PySpark

In [1]:
import datetime

In [2]:
import findspark
findspark.init()

In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col

### Create a Spark session

Standalone Spark with two worker threads

In [4]:
spark = SparkSession.builder.appName("movielens").master("local[2]").getOrCreate()

### Read data from CSV

Movie dimension table

In [5]:
movies = spark.read.format("csv").option("header", "true").load("data/ml-20m/movies.csv")

In [6]:
movies.printSchema()

root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [7]:
movies.show(10)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
+-------+--------------------+--------------------+
only showing top 10 rows



In [8]:
movies.limit(10).toPandas()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy
5,6,Heat (1995),Action|Crime|Thriller
6,7,Sabrina (1995),Comedy|Romance
7,8,Tom and Huck (1995),Adventure|Children
8,9,Sudden Death (1995),Action
9,10,GoldenEye (1995),Action|Adventure|Thriller


Ratings fact table

In [9]:
ratings = spark.read.format("csv").option("header", "true").load("data/ml-20m/ratings.csv")

In [10]:
ratings.show(10)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      2|   3.5|1112486027|
|     1|     29|   3.5|1112484676|
|     1|     32|   3.5|1112484819|
|     1|     47|   3.5|1112484727|
|     1|     50|   3.5|1112484580|
|     1|    112|   3.5|1094785740|
|     1|    151|   4.0|1094785734|
|     1|    223|   4.0|1112485573|
|     1|    253|   4.0|1112484940|
|     1|    260|   4.0|1112484826|
+------+-------+------+----------+
only showing top 10 rows



### Add a column

Convert timestamp to a Spark datetime format

In [11]:
ratingsWithTime = ratings.withColumn("time", F.from_unixtime("timestamp"))
ratingsWithTime.show(10)

+------+-------+------+----------+-------------------+
|userId|movieId|rating| timestamp|               time|
+------+-------+------+----------+-------------------+
|     1|      2|   3.5|1112486027|2005-04-03 00:53:47|
|     1|     29|   3.5|1112484676|2005-04-03 00:31:16|
|     1|     32|   3.5|1112484819|2005-04-03 00:33:39|
|     1|     47|   3.5|1112484727|2005-04-03 00:32:07|
|     1|     50|   3.5|1112484580|2005-04-03 00:29:40|
|     1|    112|   3.5|1094785740|2004-09-10 04:09:00|
|     1|    151|   4.0|1094785734|2004-09-10 04:08:54|
|     1|    223|   4.0|1112485573|2005-04-03 00:46:13|
|     1|    253|   4.0|1112484940|2005-04-03 00:35:40|
|     1|    260|   4.0|1112484826|2005-04-03 00:33:46|
+------+-------+------+----------+-------------------+
only showing top 10 rows



### Filter

Only include movies that were rated in 2009

In [13]:
ratingsFiltered = ratingsWithTime.filter(col("time").between(datetime.date(2009,1,1), datetime.date(2010,1,1)))
ratingsFiltered.show(10)

+------+-------+------+----------+-------------------+
|userId|movieId|rating| timestamp|               time|
+------+-------+------+----------+-------------------+
|    11|      1|   4.5|1230858821|2009-01-02 01:13:41|
|    11|     10|   2.5|1230858959|2009-01-02 01:15:59|
|    11|     19|   3.5|1230783704|2009-01-01 04:21:44|
|    11|     32|   5.0|1230783095|2009-01-01 04:11:35|
|    11|     39|   4.5|1230859032|2009-01-02 01:17:12|
|    11|     65|   2.0|1230856649|2009-01-02 00:37:29|
|    11|    110|   4.0|1230853748|2009-01-01 23:49:08|
|    11|    145|   3.0|1230785947|2009-01-01 04:59:07|
|    11|    150|   5.0|1230785343|2009-01-01 04:49:03|
|    11|    153|   3.5|1230858914|2009-01-02 01:15:14|
+------+-------+------+----------+-------------------+
only showing top 10 rows



### Group by/Aggregation

Group by movieId and aggregate to find the total number of ratings and their average value

In [14]:
averageRatings = ratingsFiltered.groupBy("movieId")\
                                .agg(
                                   F.mean("rating").alias("averageRating"), 
                                   F.count("rating").alias("count")
                                )\
                                .orderBy(col("averageRating").desc())
averageRatings.show(10)

+-------+-------------+-----+
|movieId|averageRating|count|
+-------+-------------+-----+
|   7989|          5.0|    1|
|  72714|          5.0|    1|
|   7887|          5.0|    1|
|  72647|          5.0|    1|
|     59|          5.0|    1|
|   6179|          5.0|    1|
|   5306|          5.0|    1|
|   5149|          5.0|    1|
|   9016|          5.0|    1|
|   8824|          5.0|    1|
+-------+-------------+-----+
only showing top 10 rows



### Filter again

Only include movies with at least 100 reviews

In [15]:
averageRatingsFiltered = averageRatings.filter(col("count") > 100)
#averageRatingsFiltered.show(10)

### Join

Join with the movie dimension table

In [16]:
averageRatingsWithMovie = averageRatingsFiltered.join(movies, on="movieId", how="inner")\
                                                .orderBy(col("averageRating").desc())
averageRatingsWithMovie.show(10)

+-------+------------------+-----+--------------------+--------------------+
|movieId|     averageRating|count|               title|              genres|
+-------+------------------+-----+--------------------+--------------------+
|    318| 4.439954250857796| 2623|Shawshank Redempt...|         Crime|Drama|
|   2959| 4.313030638612034| 2709|   Fight Club (1999)|Action|Crime|Dram...|
|    296|  4.28169014084507| 2556| Pulp Fiction (1994)|Comedy|Crime|Dram...|
|   6016| 4.273633998265395| 1153|City of God (Cida...|Action|Adventure|...|
|   5618| 4.265720081135902|  986|Spirited Away (Se...|Adventure|Animati...|
|     50| 4.259036144578313| 1826|Usual Suspects, T...|Crime|Mystery|Thr...|
|    926|  4.25531914893617|  141|All About Eve (1950)|               Drama|
|   1201|4.2485337243401755|  682|Good, the Bad and...|Action|Adventure|...|
|    858| 4.246623446785521| 1851|Godfather, The (1...|         Crime|Drama|
|  44555| 4.224397590361446|  664|Lives of Others, ...|Drama|Romance|Thr...|

### Filter again

Only include movies with at least 100 reviews

In [17]:
topWesterns = averageRatingsWithMovie.filter(col("genres").like("%Western%"))
#topWesterns.show(10)

### Chain into a single query and cache result

In [18]:
%%time
combinedQuery = ratings.withColumn("time", F.from_unixtime("timestamp"))\
                       .filter(col("time").between(datetime.date(2009,1,1), datetime.date(2010,1,1)))\
                       .groupBy("movieId")\
                       .agg(F.mean("rating").alias("averageRating"), F.count("rating").alias("count"))\
                       .filter(col("count") > 100)\
                       .join(movies, on="movieId", how="inner")\
                       .filter(col("genres").like("%Western%"))\
                       .orderBy(col("averageRating").desc())\
                       .cache()
combinedQuery.show(10)

+-------+------------------+-----+--------------------+--------------------+
|movieId|     averageRating|count|               title|              genres|
+-------+------------------+-----+--------------------+--------------------+
|   1201|4.2485337243401755|  682|Good, the Bad and...|Action|Adventure|...|
|   1209| 4.163732394366197|  284|Once Upon a Time ...|Action|Drama|Western|
|   1254| 4.134615384615385|  130|Treasure of the S...|Action|Adventure|...|
|   1304| 4.041533546325879|  313|Butch Cassidy and...|      Action|Western|
|   3681|  4.03584229390681|  279|For a Few Dollars...|Action|Drama|Thri...|
|   1266| 4.031890660592255|  439|   Unforgiven (1992)|       Drama|Western|
|  26649|3.9615384615384617|  195|Lonesome Dove (1989)|Adventure|Drama|W...|
|   2951|3.9610591900311527|  321|Fistful of Dollar...|      Action|Western|
|    714|3.9166666666666665|  156|     Dead Man (1995)|Drama|Mystery|Wes...|
|  56782|3.9119278779472952|  721|There Will Be Blo...|       Drama|Western|

In [19]:
%%time
combinedQuery = ratings.withColumn("time", F.from_unixtime("timestamp"))\
                       .filter(col("time").between(datetime.date(2009,1,1), datetime.date(2010,1,1)))\
                       .groupBy("movieId")\
                       .agg(F.mean("rating").alias("averageRating"), F.count("rating").alias("count"))\
                       .filter(col("count") > 100)\
                       .join(movies, on="movieId", how="inner")\
                       .filter(col("genres").like("%Western%"))\
                       .orderBy(col("averageRating").desc())\
                       .cache()
combinedQuery.show(10)

+-------+------------------+-----+--------------------+--------------------+
|movieId|     averageRating|count|               title|              genres|
+-------+------------------+-----+--------------------+--------------------+
|   1201|4.2485337243401755|  682|Good, the Bad and...|Action|Adventure|...|
|   1209| 4.163732394366197|  284|Once Upon a Time ...|Action|Drama|Western|
|   1254| 4.134615384615385|  130|Treasure of the S...|Action|Adventure|...|
|   1304| 4.041533546325879|  313|Butch Cassidy and...|      Action|Western|
|   3681|  4.03584229390681|  279|For a Few Dollars...|Action|Drama|Thri...|
|   1266| 4.031890660592255|  439|   Unforgiven (1992)|       Drama|Western|
|  26649|3.9615384615384617|  195|Lonesome Dove (1989)|Adventure|Drama|W...|
|   2951|3.9610591900311527|  321|Fistful of Dollar...|      Action|Western|
|    714|3.9166666666666665|  156|     Dead Man (1995)|Drama|Mystery|Wes...|
|  56782|3.9119278779472952|  721|There Will Be Blo...|       Drama|Western|

### Execution plan

In [21]:
combinedQuery.explain(True)

== Parsed Logical Plan ==
'Sort ['averageRating DESC NULLS LAST], true
+- AnalysisBarrier
      +- Filter genres#12 LIKE %Western%
         +- Project [movieId#45, averageRating#331, count#334L, title#11, genres#12]
            +- Join Inner, (movieId#45 = movieId#10)
               :- Filter (count#334L > cast(100 as bigint))
               :  +- Aggregate [movieId#45], [movieId#45, avg(cast(rating#46 as double)) AS averageRating#331, count(rating#46) AS count#334L]
               :     +- Filter ((time#318 >= cast(14245 as string)) && (time#318 <= cast(14610 as string)))
               :        +- Project [userId#44, movieId#45, rating#46, timestamp#47, from_unixtime(cast(timestamp#47 as bigint), yyyy-MM-dd HH:mm:ss, Some(Europe/London)) AS time#318]
               :           +- Relation[userId#44,movieId#45,rating#46,timestamp#47] csv
               +- Relation[movieId#10,title#11,genres#12] csv

== Analyzed Logical Plan ==
movieId: string, averageRating: double, count: bigint, tit

In [22]:
spark.stop()