In [2]:
# Spark Session is the entry point for all Spark functionality 
# Thorugh the Spark Session you are able to read data, 
# create DataFrames and transform it using Structured API's like pyspark.
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("tsTest").getOrCreate()

In [3]:
### Movies View Creation
print("\n ----> Movies View Creation")
movies_schema = "TITLE STRING, DURATION_MINS FLOAT,ORIGINAL_LANGUAGE STRING,SIZE_MB DOUBLE"

df_movies = spark.read.option("header","true").schema(movies_schema).csv("data-dump/movies.csv")
df_movies.printSchema()
df_movies.show(5)
df_movies.createOrReplaceTempView("movies")

### Users View Creation
print("\n ----> Users View Creation")
df_users = spark.read.option("header","true").csv("data-dump/users.csv")
df_users.show(5)
df_users.createOrReplaceTempView("users")


### Streams View Cration
print("\n ----> Streams View Creation")
streams_schema = "MOVIE_TITLE STRING, USER_EMAIL STRING,SIZE_MB DOUBLE,START_AT STRING,END_AT STRING"

df_streams_1 = spark.read.option("header","true").schema(streams_schema).csv("data-dump/streams.csv")

df_streams_2 = (df_streams_1 #### Transformation 
.withColumn('SIZE_MB', col('size_mb').astype("DOUBLE")) 
.withColumn('START_AT', to_timestamp(col("start_at"),"yyyy-MM-dd'T'HH:mm:ss.SSSZZZ")) 
.withColumn('END_AT', to_timestamp(col("end_at"),"yyyy-MM-dd'T'HH:mm:ss.SSSZZZ"))
)

df_streams_2.printSchema()
df_streams_2.show(5)
df_streams_2.createOrReplaceTempView("streams")


### Authors View Creation
print("\n ----> Authors View Creation")
df_authors = spark.read.option("header","true").json("data-dump/authors.json")

df_authors_2 = (df_authors # transform
.withColumn('died_at', to_timestamp(col("died_at"),"yyyy-MM-dd'T'HH:mm:ss.SSSZZZ"))
) 
df_authors_2.show(5)
df_authors_2.createOrReplaceTempView("authors")


### Books View Creation
print("\n ----> Books View Creation")
df_books = spark.read.option("header","true").json("data-dump/books.json")
df_books.show(5)
df_books.createOrReplaceTempView("books")


### Reviews View Creation
print("\n ----> Reviews View Creation")
df_reviews = spark.read.option("header","true").json("data-dump/reviews.json")
df_reviews.show(5)
df_reviews.createOrReplaceTempView("reviews")


 ----> Movies View Creation
root
 |-- TITLE: string (nullable = true)
 |-- DURATION_MINS: float (nullable = true)
 |-- ORIGINAL_LANGUAGE: string (nullable = true)
 |-- SIZE_MB: double (nullable = true)

+----------------+-------------+-----------------+-------+
|           TITLE|DURATION_MINS|ORIGINAL_LANGUAGE|SIZE_MB|
+----------------+-------------+-----------------+-------+
|The Great Escape|        113.0|           Korean|  876.0|
|   The Third Man|        129.0|           French| 1857.0|
| Cinema Paradiso|        102.0|          Russian|  676.0|
|        Scarface|        112.0|         Japanese| 1236.0|
|         Vertigo|        146.0|          Italian|  649.0|
+----------------+-------------+-----------------+-------+
only showing top 5 rows


 ----> Users View Creation
+----------+---------+--------------------+
|first_name|last_name|               email|
+----------+---------+--------------------+
|      Eddy|   Kirlin|karine@bode-rogah...|
|     Elvia|     Conn|janet_haag@cri

### What percentage of the streamed movies are based on books?

In [122]:
sf_distinct_movies = spark.sql("select distinct MOVIE_TITLE as moviesCount from streams").distinct().count()

In [33]:
spark.sql("select count(distinct book) as reviewsBooksCount from reviews").show(1)

+-----------------+
|reviewsBooksCount|
+-----------------+
|              102|
+-----------------+



In [125]:
q1 = """
SELECT m.title, r.movie
FROM movies as m
LEFT JOIN reviews as r on m.title = r.movie
WHERE r.book is not null
"""
movies_based_on_books_on_sf = spark.sql(q1).distinct().count()
movies_based_on_books_on_sf/sf_distinct_movies

0.9340659340659341

### How many users were watching "Unforgiven" on Christmas morning (between 7 and 12 am on December 25)?

In [120]:
q2 = """
SELECT START_AT
FROM streams
WHERE 
MOVIE_TITLE = 'Unforgiven'
AND HOUR(START_AT) BETWEEN 7 AND 12 
AND DATE(START_AT) = '2021-12-25'
"""
spark.sql(q2).count()

1

### How many movies based on books written by Singaporeans authors were streamed that month?

In [63]:
q3 = """
SELECT distinct movie_title
FROM streams as s
LEFT JOIN reviews as r on s.MOVIE_TITLE = r.movie
left join books b on r.book = b.name
left join authors a on  b.author = a.name
WHERE 
month(s.START_AT) = 12
and year(s.START_AT) = 2021
and a.nationality = 'Singaporeans'
"""
spark.sql(q3).count()

3

### What's the average streaming duration?

In [93]:
q4 = """
WITH 
duration as
(
SELECT (replace(split((END_AT - START_AT),' ')[2],"'","")) diff
FROM streams 
WHERE
month(start_at) = 12
),
duration_timed as
(SELECT 
         AVG(HOUR(DIFF)) * 60 *60 as hour_avg_in_seconds
        ,AVG(MINUTE(DIFF)) * 60 as minute_avg_in_seconds
        ,AVG(SECOND(DIFF)) as second_avg_in_seconds
FROM duration )

SELECT ((hour_avg_in_seconds / 3600) + (minute_avg_in_seconds / 3600) + second_avg_in_seconds / 3600) as avg_streaming_duration
FROM duration_timed
"""
spark.sql(q4).show(100)

+----------------------+
|avg_streaming_duration|
+----------------------+
|    12.041684094613812|
+----------------------+



#### What's the median streaming size in gigabytes?

In [71]:
q5 = """
select  percentile_approx(size_mb, 0.5) as size_in_mb_median
from streams
"""

q6 = """
select *
from streams
"""
spark.sql(q5).show()
spark.sql(q6).describe().show()

+-----------------+
|size_in_mb_median|
+-----------------+
|  935.43929651322|
+-----------------+

+-------+------------+--------------------+-------------------+
|summary| MOVIE_TITLE|          USER_EMAIL|            SIZE_MB|
+-------+------------+--------------------+-------------------+
|  count|        9652|                9652|               9652|
|   mean|        null|                null| 1122.7224951748287|
| stddev|        null|                null|   824.739344337668|
|    min|12 Angry Men|abdul.casper@hage...|0.13392509085694668|
|    max|      WALL·E|zana_leffler@powl...|  3783.921923474418|
+-------+------------+--------------------+-------------------+



#### How many users watched at least 50% of any movie in the last week of the month (7 days)?

In [5]:
q7 = """
 
with 
stream_duration as
(
select 
s.*
, (
  hour(replace(split((END_AT - START_AT),' ')[2],"'",""))*60 
+ minute(replace(split((END_AT - START_AT),' ')[2],"'","")) 
+ second(replace(split((END_AT - START_AT),' ')[2],"'",""))/60

)
 duration
from streams s
where
month(start_at) = 12
and day(start_at) between 25 and 31
),
user_duration as 
(select user_email, date(start_at),weekofyear(start_at), duration/duration_mins as rate
from stream_duration sd
left join movies m on sd.movie_title = m.title
--where user_email like '%rodrick_bergnaum%'
)
select count(distinct user_email) as users_above_50_percent from user_duration where rate between 0.5 and 1
"""
spark.sql(q7).show(100)

+----------------------+
|users_above_50_percent|
+----------------------+
|                    65|
+----------------------+

