In [64]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql import types

# Initialize Spark session
spark = SparkSession.builder.appName("ApacheLogAnalysis").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark.conf.set("spark.sql.repl.eagerEval.maxNumRows", 10)
spark.conf.set("spark.sql.repl.eagerEval.truncate", 30)

In [65]:
movie_item_schema = types.StructType([\
    types.StructField("movie_id", types.IntegerType(), False),
    types.StructField("movie_name", types.StringType(), False),
    types.StructField("release_date", types.StringType(), True),
])

movie_item = spark.read.option("sep", "|").schema(movie_item_schema).csv("../data/ml-100k/u.item")
movie_item

movie_id,movie_name,release_date
1,Toy Story (1995),01-Jan-1995
2,GoldenEye (1995),01-Jan-1995
3,Four Rooms (1995),01-Jan-1995
4,Get Shorty (1995),01-Jan-1995
5,Copycat (1995),01-Jan-1995
6,Shanghai Triad (Yao a yao y...,01-Jan-1995
7,Twelve Monkeys (1995),01-Jan-1995
8,Babe (1995),01-Jan-1995
9,Dead Man Walking (1995),01-Jan-1995
10,Richard III (1995),22-Jan-1996


In [66]:
movie_item = movie_item.withColumn("release_date", func.to_date("release_date", "d-MMM-yyyy"))
movie_item

movie_id,movie_name,release_date
1,Toy Story (1995),1995-01-01
2,GoldenEye (1995),1995-01-01
3,Four Rooms (1995),1995-01-01
4,Get Shorty (1995),1995-01-01
5,Copycat (1995),1995-01-01
6,Shanghai Triad (Yao a yao y...,1995-01-01
7,Twelve Monkeys (1995),1995-01-01
8,Babe (1995),1995-01-01
9,Dead Man Walking (1995),1995-01-01
10,Richard III (1995),1996-01-22


In [67]:
movie_item_b = func.broadcast(movie_item)
movie_item_b.createOrReplaceTempView("movie")

In [68]:
schema = types.StructType([\
    types.StructField("user_id", types.IntegerType(), True),
    types.StructField("movie_id", types.IntegerType(), True),
    types.StructField("rating", types.IntegerType(), True),
    types.StructField("timestamp", types.LongType(), True)])
movies = spark.read.option("sep", "\t").schema(schema).csv("../data/ml-100k/u.data")
movies

user_id,movie_id,rating,timestamp
196,242,3,881250949
186,302,3,891717742
22,377,1,878887116
244,51,2,880606923
166,346,1,886397596
298,474,4,884182806
115,265,2,881171488
253,465,5,891628467
305,451,3,886324817
6,86,3,883603013


In [69]:
movies.createOrReplaceTempView("rating")

top_10_popular_movies = spark.sql("""
SELECT movie_id, COUNT(*) as num_ratings
FROM rating
GROUP BY movie_id
ORDER BY COUNT(*) DESC
LIMIT 10
""")
top_10_popular_movies

movie_id,num_ratings
50,583
258,509
100,508
181,507
294,485
286,481
288,478
1,452
300,431
121,429


In [70]:
top_10_popular_movies.createOrReplaceTempView("top_ten_movies")
top_10_popular_movies = spark.sql("""
SELECT t.movie_id, m.movie_name, m.release_date, t.num_ratings
FROM top_ten_movies t
JOIN movie m ON m.movie_id = t.movie_id
""")
top_10_popular_movies

movie_id,movie_name,release_date,num_ratings
50,Star Wars (1977),1977-01-01,583
258,Contact (1997),1997-07-11,509
100,Fargo (1996),1997-02-14,508
181,Return of the Jedi (1983),1997-03-14,507
294,Liar Liar (1997),1997-03-21,485
286,"English Patient, The (1996)",1996-11-15,481
288,Scream (1996),1996-12-20,478
1,Toy Story (1995),1995-01-01,452
300,Air Force One (1997),1997-01-01,431
121,Independence Day (ID4) (1996),1996-07-03,429


In [71]:
# Stop Spark session
spark.stop()