In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.conf import SparkConf
config = SparkConf()
# config.set("property", "value")
config.setMaster("local").setAppName("MovieLensSQL")

config.set("spark.local.dir", "/home/ubuntu/spark-temp")

# while using hive.metastore.warehouse.dir, we should not use spark warehouse dir

config.set("hive.metastore.uris", "thrift://localhost:9083")
config.set("hive.metastore.warehouse.dir", "hdfs://localhost:9000/user/hive/warehouse")


from pyspark.sql import SparkSession
# spark Session, entry point for Spark SQL, DataFrame
spark = SparkSession.builder\
                    .config(conf=config)\
                    .enableHiveSupport()\
                    .getOrCreate()

sc = spark.sparkContext

In [3]:
# hdfs dfs -ls /movies
# hdfs dfs -ls /ratings

In [3]:
# how to create schema programatically instead of using inferSchema
from pyspark.sql.types import StructType, LongType, StringType, IntegerType, DoubleType
# True is nullable, False is non nullable
movieSchema = StructType()\
                .add("movieId", IntegerType(), True)\
                .add("title", StringType(), True)\
                .add("genres", StringType(), True)

ratingSchema = StructType()\
                .add("userId", IntegerType(), True)\
                .add("movieId", IntegerType(), True)\
                .add("rating", DoubleType(), True)\
                .add("timestamp", LongType(), True)



In [4]:
# read movie data
# read using dataframe with defind schema
# we can use folder path - all csv in the folder read
# use file path, only that file read

# spark is session, entry point for data frame/sql

movieDf = spark.read.format("csv")\
                .option("header", True)\
                .schema(movieSchema)\
                .load("hdfs://localhost:9000/movies")

movieDf.printSchema()
movieDf.show(2)

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 2 rows



In [5]:

ratingDf = spark.read.format("csv")\
                .option("header", True)\
                .schema(ratingSchema)\
                .load("hdfs://localhost:9000/ratings")

ratingDf.printSchema()
ratingDf.show(2)

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
+------+-------+------+---------+
only showing top 2 rows



In [6]:
# we haven't configured hive yet
spark.sql ("""
SHOW DATABASES
""").show()

+------------+
|databaseName|
+------------+
|    brandsdb|
|     default|
|     moviedb|
|  productsdb|
|    removeme|
|      testdb|
+------------+



In [7]:
# create a temp table/temp view from data frame
movieDf.createOrReplaceTempView("movies")
ratingDf.createOrReplaceTempView("ratings")

In [8]:
# two temp tables shall be listed from default 
spark.sql("SHOW TABLES").show()

+--------+----------+-----------+
|database| tableName|isTemporary|
+--------+----------+-----------+
| default|    brands|      false|
| default| employees|      false|
| default|employees2|      false|
| default|  payroles|      false|
|        |    movies|       true|
|        |   ratings|       true|
+--------+----------+-----------+



In [10]:
spark.sql("""
SELECT * FROM movies LIMIT 5
""").show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+



In [11]:
spark.sql("""
SELECT * FROM ratings LIMIT 5
""").show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+



In [13]:
# spark sql returns data frame
df1 = spark.sql("select userId, rating from ratings")
df1.printSchema()
df1.rdd.take(3)

root
 |-- userId: integer (nullable = true)
 |-- rating: double (nullable = true)



[Row(userId=1, rating=4.0),
 Row(userId=1, rating=4.0),
 Row(userId=1, rating=4.0)]

In [14]:
# Rating Analytics for popular movies

popularMoviesDf = spark.sql("""
SELECT movieId, avg(rating) as avg_rating, count(userId) as total_ratings
FROM ratings
GROUP BY movieId
""")

popularMoviesDf.printSchema()
popularMoviesDf.show(5)

root
 |-- movieId: integer (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- total_ratings: long (nullable = false)

+-------+-----------------+-------------+
|movieId|       avg_rating|total_ratings|
+-------+-----------------+-------------+
|   1580|3.487878787878788|          165|
|   2366|             3.64|           25|
|   3175|             3.58|           75|
|   1088|3.369047619047619|           42|
|  32460|             4.25|            4|
+-------+-----------------+-------------+
only showing top 5 rows



In [9]:
# Rating Analytics for popular movies
# create a temporary view out of SQL SELECT statement
# CTAS - CREATE TABLE AS, the popular_movies shall have output analytical query
spark.sql("""
CREATE OR REPLACE TEMP VIEW popular_movies AS
SELECT movieId, avg(rating) as avg_rating, count(userId) as total_ratings
FROM ratings
GROUP BY movieId
HAVING avg_rating >= 3.5 AND total_ratings >= 100
""")

DataFrame[]

In [10]:
spark.sql("SHOW TABLES").show()

+--------+--------------+-----------+
|database|     tableName|isTemporary|
+--------+--------------+-----------+
| default|        brands|      false|
| default|     employees|      false|
| default|    employees2|      false|
| default|      payroles|      false|
|        |        movies|       true|
|        |popular_movies|       true|
|        |       ratings|       true|
+--------+--------------+-----------+



In [24]:
spark.sql("SELECT * FROM popular_movies").show(5)

+-------+------------------+-------------+
|movieId|        avg_rating|total_ratings|
+-------+------------------+-------------+
|    858|         4.2890625|          192|
|   1270| 4.038011695906433|          171|
|   1265| 3.944055944055944|          143|
|    588|3.7923497267759565|          183|
|    296| 4.197068403908795|          307|
+-------+------------------+-------------+
only showing top 5 rows



In [19]:
# do not run 
# spark.sql("DROP  VIEW popular_movies")

DataFrame[]

In [26]:
# join to get the movie titles
spark.sql("""
SELECT movies.movieId, title, avg_rating, total_ratings 
FROM popular_movies
INNER JOIN movies ON popular_movies.movieId = movies.movieId
ORDER BY avg_rating DESC
""").show()

+-------+--------------------+------------------+-------------+
|movieId|               title|        avg_rating|total_ratings|
+-------+--------------------+------------------+-------------+
|    318|Shawshank Redempt...| 4.429022082018927|          317|
|    858|Godfather, The (1...|         4.2890625|          192|
|   2959|   Fight Club (1999)| 4.272935779816514|          218|
|   1221|Godfather: Part I...|  4.25968992248062|          129|
|  48516|Departed, The (2006)| 4.252336448598131|          107|
|   1213|   Goodfellas (1990)|              4.25|          126|
|    912|   Casablanca (1942)|              4.24|          100|
|  58559|Dark Knight, The ...| 4.238255033557047|          149|
|     50|Usual Suspects, T...| 4.237745098039215|          204|
|   1197|Princess Bride, T...| 4.232394366197183|          142|
|    260|Star Wars: Episod...| 4.231075697211155|          251|
|    527|Schindler's List ...|             4.225|          220|
|   1208|Apocalypse Now (1...| 4.2196261

In [13]:
# CREATE A PERMANENT MANAGED TABLE in HIVE CATALOG
# DATA SHALL BE STORED IN /user/hive/warehouse/moviedb.db/popular_movies
# CTAS - CREATE TABLE AS SELECT
# FIXME
spark.sql("""
CREATE OR REPLACE TEMP VIEW most_popular_movies AS 
SELECT movies.movieId, title, avg_rating, total_ratings 
FROM popular_movies
INNER JOIN movies ON popular_movies.movieId = movies.movieId
ORDER BY avg_rating DESC
""")

++
||
++
++



In [22]:
# CREATE A PERMANENT MANAGED TABLE in HIVE CATALOG
# DATA SHALL BE STORED IN /user/hive/warehouse/moviedb.db/popular_movies
# CTAS - CREATE TABLE AS SELECT
# FIXME
spark.sql("""
CREATE TABLE most_popular_movies5 AS 
SELECT movies.movieId, title, avg_rating, total_ratings 
FROM popular_movies
INNER JOIN movies ON popular_movies.movieId = movies.movieId
ORDER BY avg_rating DESC
""")

DataFrame[]

In [15]:
spark.sql("SELECT * FROM most_popular_movies").show(5)

+-------+--------------------+-----------------+-------------+
|movieId|               title|       avg_rating|total_ratings|
+-------+--------------------+-----------------+-------------+
|    318|Shawshank Redempt...|4.429022082018927|          317|
|    858|Godfather, The (1...|        4.2890625|          192|
|   2959|   Fight Club (1999)|4.272935779816514|          218|
|   1221|Godfather: Part I...| 4.25968992248062|          129|
|  48516|Departed, The (2006)|4.252336448598131|          107|
+-------+--------------------+-----------------+-------------+
only showing top 5 rows



In [19]:
# get dataframe from table/temp view

mostPopularMoviesDf = spark.table("most_popular_movies")

mostPopularMoviesDf.write\
                    .mode('overwrite')\
                    .saveAsTable("moviedb.most_popular_movies")

mostPopularMoviesDf.show(5)

+-------+--------------------+-----------------+-------------+
|movieId|               title|       avg_rating|total_ratings|
+-------+--------------------+-----------------+-------------+
|    318|Shawshank Redempt...|4.429022082018927|          317|
|    858|Godfather, The (1...|        4.2890625|          192|
|   2959|   Fight Club (1999)|4.272935779816514|          218|
|   1221|Godfather: Part I...| 4.25968992248062|          129|
|  48516|Departed, The (2006)|4.252336448598131|          107|
+-------+--------------------+-----------------+-------------+
only showing top 5 rows



In [20]:
spark.sql("SELECT * FROM moviedb.most_popular_movies").show(5)

+-------+--------------------+------------------+-------------+
|movieId|               title|        avg_rating|total_ratings|
+-------+--------------------+------------------+-------------+
|   4896|Harry Potter and ...|3.7616822429906542|          107|
|   1198|Raiders of the Lo...|            4.2075|          200|
|    293|Léon: The Profess...| 4.018796992481203|          133|
|   6539|Pirates of the Ca...| 3.778523489932886|          149|
|   4993|Lord of the Rings...| 4.106060606060606|          198|
+-------+--------------------+------------------+-------------+
only showing top 5 rows



In [23]:
spark.sql("SHOW TABLES IN moviedb").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| moviedb| most_popular_movies|      false|
| moviedb|most_popular_movies2|      false|
| moviedb|              movies|      false|
| moviedb|             ratings|      false|
| moviedb|             reviews|      false|
|        | most_popular_movies|       true|
|        |              movies|       true|
|        |      popular_movies|       true|
|        |             ratings|       true|
+--------+--------------------+-----------+



In [None]:
# On Hive cli

"""
select * from moviedb.most_popular_movies;
"""