In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.conf import SparkConf
config = SparkConf()
# config.set("property", "value")
config.setMaster("local").setAppName("sandbox")

from pyspark.sql import SparkSession
# spark Session, entry point for Spark SQL, DataFrame
spark = SparkSession.builder\
                    .config(conf=config)\
                    .getOrCreate()

sc = spark.sparkContext

22/05/19 00:55:01 WARN Utils: Your hostname, ubuntu-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.174.129 instead (on interface ens33)
22/05/19 00:55:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/05/19 00:55:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/19 00:55:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/05/19 00:55:06 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
22/05/19 00:55:06 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
22/05/19 00:55:06 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.
2

In [4]:
# READ movie Parquet  FILE into DataFrame

moviesParquetDf = spark.read.format("parquet")\
                .load("hdfs://localhost:9000/application/movies")


moviesParquetDf.show(5)

                                                                                

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [5]:
# READ Parquet FILE into DataFrame

ratingsParquetDf = spark.read.format("parquet")\
                .load("hdfs://localhost:9000/application/ratings")


ratingsParquetDf.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [6]:

from pyspark.sql.functions import col, desc, avg, count

# find  the most popular movies, where as rated by many users, at least movies should be rated by 100 users
# and the average rating should be at least 3.5 and above
# and sort the movies by total_ratings
mostPopularMoviesDf = ratingsParquetDf\
     .groupBy("movieId")\
     .agg(avg("rating").alias("avg_rating"), count("userId").alias("total_ratings") )\
     .sort(desc("total_ratings"))\
     .filter( (col("total_ratings") >= 100) & (col("avg_rating") >=3.5) )\


mostPopularMoviesDf.show(10)



+-------+------------------+-------------+
|movieId|        avg_rating|total_ratings|
+-------+------------------+-------------+
|    356| 4.175304878048781|          328|
|    318| 4.429022082018927|          317|
|    296| 4.221311475409836|          305|
|    593| 4.201086956521739|          276|
|   2571|  4.26007326007326|          273|
|    260|             4.246|          250|
|    480|3.7637130801687766|          237|
|    110| 4.046610169491525|          236|
|    589| 4.018099547511312|          221|
|    527| 4.259174311926605|          218|
+-------+------------------+-------------+
only showing top 10 rows



                                                                                

In [10]:

# get the movie title for the mostPopularMoviesDf
# join mostPopularMoviesDf with movieDf based on condition that mostPopularMoviesDf.movieId == movieDf.movieId

popularMoviesDf = mostPopularMoviesDf.join(moviesParquetDf, mostPopularMoviesDf.movieId == moviesParquetDf.movieId)\
                                     .select(moviesParquetDf.movieId, "title", "avg_rating", "total_ratings")\
                                     .sort(desc("total_ratings"))



popularMoviesDf.show(20)



+-------+--------------------+------------------+-------------+
|movieId|               title|        avg_rating|total_ratings|
+-------+--------------------+------------------+-------------+
|    356| Forrest Gump (1994)| 4.175304878048781|          328|
|    318|Shawshank Redempt...| 4.429022082018927|          317|
|    296| Pulp Fiction (1994)| 4.221311475409836|          305|
|    593|Silence of the La...| 4.201086956521739|          276|
|   2571|  Matrix, The (1999)|  4.26007326007326|          273|
|    260|Star Wars: Episod...|             4.246|          250|
|    480|Jurassic Park (1993)|3.7637130801687766|          237|
|    110|   Braveheart (1995)| 4.046610169491525|          236|
|    589|Terminator 2: Jud...| 4.018099547511312|          221|
|    527|Schindler's List ...| 4.259174311926605|          218|
|   2959|   Fight Club (1999)| 4.325581395348837|          215|
|      1|    Toy Story (1995)|3.9369158878504673|          214|
|   1196|Star Wars: Episod...| 4.2333333

                                                                                

In [11]:
popularMoviesDf.write.mode("overwrite")\
                .option("header", True)\
                .parquet("hdfs://localhost:9000/sandbox/most-popular-movies")


                                                                                