### Spark DataFrame API with Python
- **Goal** : Illustrate some methods associated with ***Spark DataFrame API***
- **Data manipulated** : MovieLens 1M Dataset https://grouplens.org/datasets/movielens/

In [2]:
spark.version

##### Import needed libraries

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

##### Load ***"movies.csv"*** file as moviesDF

In [6]:
moviesDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/FileStore/tables/movies.csv").cache()

##### Explore moviesDF

In [8]:
moviesDF.printSchema()

In [9]:
moviesDF.dtypes

In [10]:
moviesDF.show(10)

In [11]:
moviesDF.count()

##### Load ***"ratings.csv"*** file as ratingsDF

In [13]:
ratingsDF = spark.read.format("csv").option("inferSchema","true").option("header","true").load("/FileStore/tables/ratings.csv").cache()

##### Explore ratingsDF

In [15]:
ratingsDF.printSchema()

In [16]:
ratingsDF.show(5)

In [17]:
ratingsDF.count()

##### Validate that ***userId*** and ***movieId*** combination is unique in ratingsDF

In [19]:
ratingsDF.groupBy("movieId","userId").count().filter("count != 1").show()

##### Find top 10 movies based on the highest average ratings. Consider only movies wiht at least 100 ratings. Show movieId, title, average rating and rating count columns. Order the result by average rating in decreasing order.

In [21]:
a = ratingsDF.groupBy("movieId").agg(
  count("movieId").alias("count"), 
  avg("rating").alias("avg_rating"))

a.show(5)
a.count()

In [22]:
a.alias("t1").join(moviesDF.alias("t2"), col("t1.movieId")==col("t2.movieId")).filter("count > 100").orderBy(desc("avg_rating")).select("t1.movieId", "title","avg_rating", "count").limit(10).show()

##### Same question to answer by using SQL

Show temporary views for current Spark session

In [25]:
sql("show tables").show()

In [26]:
moviesDF.createOrReplaceTempView("movies")
ratingsDF.createOrReplaceTempView("ratings")
sql("show tables").show()

In [27]:
sql("""
SELECT t1.movieId, t1.title, avg(t2.rating) avg_rating, count(1) rating_count
FROM movies t1 join ratings t2 on t1.movieId = t2.movieId
GROUP BY t1.movieId, t1.title
HAVING rating_count >= 100
ORDER BY avg_rating desc
LIMIT 10
""").show()

##### Find average rating of each genre

In [29]:
genre_avg_rating = ratingsDF.alias("t1").join(moviesDF.alias("t2"), col("t1.movieId")==col("t2.movieId")).select("rating", explode(split("genres",r"\|")).alias("genre")).groupBy("genre").agg(count("genre").alias("count"),avg("rating").alias("avg_rating")).sort(desc("avg_rating"))

genre_avg_rating.show(5)

##### Plot average rating of each genre

In [31]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

Convert Spark DataFram to Pandas DataFrame

In [33]:
df = genre_avg_rating.toPandas()
df.head()

In [34]:
df.plot("genre", "avg_rating", "bar", title = "Barplot of avg rating by genre")