In [8]:
import sys
import os
from datetime import datetime

import boto3
import botocore

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, flatten, explode, collect_list, concat_ws
from pyspark.sql.functions import (year, month, dayofmonth, hour, weekofyear, date_format,
                                   to_timestamp, from_unixtime)

def create_spark_session():
    """Create spark session"""

    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark

In [10]:
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate()

In [12]:
sc.setLogLevel("WARN")


In [9]:
spark = create_spark_session()

spark

<pyspark.sql.session.SparkSession object at 0x7f11248dc2e8>

In [2]:
# set project S3 bucket
bucket = "akyk-dend-capstone"

# input data path in s3 
tmdb_dir = "tmdb-data"
ml_dir = "ml-latest-201908"

#  output data path in s3
output_dir = "result"

update_prefix = ""

## Load data

In [8]:
def check_s3(client, bucket, prefix):
    """Check whether the given s3 path exist or not
    client: s3 client
    bucket: s3 bucket to check
    prefix: Returns `True` when found item which matches to the `prefix`    
    """    
    res = client.list_objects(
        Bucket=bucket,
        Prefix=prefix
    )
    
    return "Contents" in res

def merge_dataset(df_latest, df_update, client, bucket, tmp_path, latest_path):
    """Merge latest data and additional data
    df_latest: Dataframe of latest dataset
    df_update: Dataframe of addition dataset
    client: s3 client
    bucket: Project s3 bucket
    tmp_path: Temporary path name used to save merged data
    latest_path: Path of latest data is saved
    """
    
    df_latest.createOrReplaceTempView("data_latest")
    df_update.createOrReplaceTempView("data_update")
    
    # Data merge is done by a following steps
    # 1. Filter latest data with id which included in update data
    # 2. Concate update data
    df_merged = spark.sql("""
select
    *
from
    data_latest
where id not in (select id from data_update)

union

select
    *
from
    data_update

""")
    
    df_merged.persist()
    print("merged dataset")
    print("size: ",df_merged.count())    
    print("Write merged data to: ", latest_path)
    df_merged.write.mode('overwrite').parquet(f"s3a://{bucket}/{latest_path}")
    
    return df_merged

## Merge latest TMDB data with additional data 

the following process merge the latest TMDB data with additional TMDB data which uploaded in update folder in project s3 bucket.
The merged data is saved as the latest TMDB date and previous latest data is overwritten.

In [9]:
def merge_latest(spark, bucket, tmdb_dir, update_prefix):

    s3client = boto3.client('s3')
    
    latest_name = "latest.parquet"    
    tmp_path = f"{tmdb_dir}/tmp.parquet"
    latest_path = f"{tmdb_dir}/{latest_name}"
    has_latest = check_s3(s3client, bucket,  latest_path)
    has_update =  check_s3(s3client, bucket, f"{tmdb_dir}/update/{update_prefix}")

    if has_latest:
        print("Load latest data")
        df_tmdb_latest = spark.read.parquet(f"s3a://{bucket}/{tmdb_dir}/{latest_name}").persist()
        print("size: ", df_tmdb_latest.count())
    if has_update:
        ## update data are uploaded as json files
        print("Load update files")
        df_tmdb_update = spark.read.json(f"s3a://{bucket}/{tmdb_dir}/update/{update_prefix}*").persist()
        print("size: ", df_tmdb_update.count())

    sys.stdout.flush()
    
    if has_latest and has_update:
        print("merge data")        
        df_tmdb_latest = merge_dataset(df_tmdb_latest, df_tmdb_update, s3client, bucket, tmp_path, latest_path)

    elif has_latest and not has_update:
        print("no merge")    
        pass

    elif (not has_latest) and has_update:
        print("Any latest data is not found. Use update data as latest")
        df_tmdb_latest = df_tmdb_update

        write_path = f"s3a://{bucket}/{tmdb_dir}/{latest_name}"

        print("Write to: ", write_path)
        df_tmdb_latest.write.mode('overwrite').parquet(write_path)

    else:

        raise OSError("no dataset found")

    return df_tmdb_latest

In [11]:
spark = create_spark_session()
df_tmdb = merge_latest(spark,  bucket, tmdb_dir, update_prefix)

print("tmdb data count: ", df_tmdb.count())

df_tmdb.printSchema()

Load latest data
size:  57267
Load update files
size:  57267
merge data
merged dataset
size:  57267
Write merged data to:  tmdb-data/latest.parquet
tmdb data count:  57267
root
 |-- adult: boolean (nullable = true)
 |-- backdrop_path: string (nullable = true)
 |-- belongs_to_collection: struct (nullable = true)
 |    |-- backdrop_path: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- poster_path: string (nullable = true)
 |-- budget: long (nullable = true)
 |-- genres: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: long (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- keywords: struct (nullable = true)
 |    |-- keywords: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: long (nullable = true)
 |    |

In [12]:
print("read links.csv")
df_link = spark.read.csv(f"s3a://{bucket}/{ml_dir}/links.csv", header=True)
df_link.printSchema()

print("read movies.csv")
df_ml_movie = spark.read.csv(f"s3a://{bucket}/{ml_dir}/movies.csv", header=True)
df_ml_movie.printSchema()


print("read ratings.csv")
df_ml_rate = spark.read.csv(f"s3a://{bucket}/{ml_dir}/ratings.csv", header=True)
df_ml_rate.printSchema()

print("create temp views")
df_tmdb.withColumn("release_date", to_timestamp("release_date", "yyyy-MM-dd")).createOrReplaceTempView("tmdb_data")
df_link.createOrReplaceTempView("link_data")
df_ml_movie.createOrReplaceTempView("ml_movie_data")

read links.csv
root
 |-- movieId: string (nullable = true)
 |-- imdbId: string (nullable = true)
 |-- tmdbId: string (nullable = true)

read movies.csv
root
 |-- movieId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

read ratings.csv
root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)
 |-- timestamp: string (nullable = true)

create temp views

## Movie data

In [119]:
df_movie = spark.sql("""
select 
    ln.movieId as ml_movie_id
    , tmdb.id as tmdb_movie_id
    , ml.genres as ml_genre
    , year(release_date) as release_year
    , month(release_date) as release_month
    , dayofmonth(release_date) as release_day
    , tmdb.budget as budget
    , tmdb.revenue as revenue
    , tmdb.title as title
    , vote_average as vote_average
    , vote_count as vote_count
    from link_data as ln
    inner join tmdb_data as tmdb on ln.tmdbId = tmdb.id
    inner join ml_movie_data as ml on ml.movieId = ln.movieId
""")

df_movie.printSchema()

root
 |-- ml_movie_id: string (nullable = true)
 |-- tmdb_movie_id: long (nullable = true)
 |-- ml_genre: string (nullable = true)
 |-- release_year: integer (nullable = true)
 |-- release_month: integer (nullable = true)
 |-- release_day: integer (nullable = true)
 |-- budget: long (nullable = true)
 |-- revenue: long (nullable = true)
 |-- title: string (nullable = true)
 |-- vote_average: double (nullable = true)
 |-- vote_count: long (nullable = true)

In [123]:
df_movie.write.mode('overwrite').parquet(f"s3a://{BUCKET}/{OUTPUT_DIR}/movies.parquet")

## Keywords

In [90]:
df_tmdb.select("id", explode("keywords.keywords").alias("kw") ).printSchema()

root
 |-- id: long (nullable = true)
 |-- kw: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)

In [112]:
df_keywords = spark.sql("""
select
    id
    , concat_ws(' ', collect_list(keyword)) as keywords
from (
    select
        id
        , keyword.name as keyword
    from (
    select 
        id
        , explode(keywords.keywords) as keyword
        from tmdb_data
        ) as tmp
    ) as tmp2
group by id
""")

df_keywords.printSchema()

df_keywords.limit(5).show()


root
 |-- id: long (nullable = true)
 |-- keywords: string (nullable = false)

+----+--------------------+
|  id|            keywords|
+----+--------------------+
|  26|berlin germany ga...|
| 964|paris france oper...|
|1677|black people loss...|
|1697|loss of loved one...|
|1950|poker sport las v...|
+----+--------------------+

In [114]:
df_keywords.createOrReplaceTempView("keyword_data")

## Movie for search

In [117]:
df_search = spark.sql("""
select
    tmdb_data.id as tmdb_id
    , title
    , keyword_data.keywords
    , tagline
    , overview
from tmdb_data
inner join keyword_data on keyword_data.id = tmdb_data.id
""")

df_search.printSchema()
df_search.limit(5).show()

root
 |-- tmdb_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- keywords: string (nullable = false)
 |-- tagline: string (nullable = true)
 |-- overview: string (nullable = true)

In [None]:
df_search.write.mode('overwrite').parquet(f"s3a://{BUCKET}/{OUTPUT_DIR}/movie_for_search.parquet")

## Rating data

In [129]:
df_ml_rate.limit(5).show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+

In [130]:
df_ml_rate.withColumn("ts", from_unixtime("timestamp")).limit(5).show()

+------+-------+------+----------+-------------------+
|userId|movieId|rating| timestamp|                 ts|
+------+-------+------+----------+-------------------+
|     1|    307|   3.5|1256677221|2009-10-27 21:00:21|
|     1|    481|   3.5|1256677456|2009-10-27 21:04:16|
|     1|   1091|   1.5|1256677471|2009-10-27 21:04:31|
|     1|   1257|   4.5|1256677460|2009-10-27 21:04:20|
|     1|   1449|   4.5|1256677264|2009-10-27 21:01:04|
+------+-------+------+----------+-------------------+

In [131]:
# recreate log_data view
df_ml_rate.withColumn("ts", from_unixtime("timestamp")).createOrReplaceTempView("ml_rate_data")


In [134]:
# extract columns to create time table
df_rating_time = spark.sql("""
SELECT DISTINCT 
    timestamp
    , year(ts) as year
    , month(ts) as month
    , cast(date_format(ts, "dd") as INTEGER) as day
    , cast(date_format(ts, "HH") as INTEGER) as hour
    , weekofyear(ts) as week 
    , dayofweek(ts) as weekday
FROM ml_rate_data
""")

df_rating_time.printSchema()
df_rating_time.limit(5).show()

root
 |-- timestamp: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- weekday: integer (nullable = true)

+----------+----+-----+---+----+----+-------+
| timestamp|year|month|day|hour|week|weekday|
+----------+----+-----+---+----+----+-------+
|1493407021|2017|    4| 28|  19|  17|      6|
|1493406667|2017|    4| 28|  19|  17|      6|
|1493405824|2017|    4| 28|  18|  17|      6|
|1493401234|2017|    4| 28|  17|  17|      6|
|1117764448|2005|    6|  3|   2|  22|      6|
+----------+----+-----+---+----+----+-------+

In [136]:
df_rating_time.groupby("year").count().show()

+----+-------+
|year|  count|
+----+-------+
|2003| 895529|
|2007|1067537|
|2018| 994970|
|2015|1718546|
|2006|1181392|
|2013| 624558|
|1997| 339290|
|2014| 575015|
|2004|1168738|
|1996| 817334|
|1998| 148473|
|2012| 774572|
|2009| 967277|
|2016|1862618|
|1995|      2|
|2001| 651806|
|2005|1710357|
|2000| 936235|
|2010| 958032|
|2011| 820000|
+----+-------+
only showing top 20 rows

In [138]:
df_rating_time.write.mode('overwrite').partitionBy("year").parquet(f"s3a://{BUCKET}/{OUTPUT_DIR}/rating_time.parquet")

In [5]:
spark = create_spark_session()
df_result_movie = spark.read.parquet(f"s3a://{bucket}/{output_dir}/movies.parquet").persist()
df_result_rating_time = spark.read.parquet(f"s3a://{bucket}/{output_dir}/rating_time.parquet").persist()
df_result_search = spark.read.parquet(f"s3a://{bucket}/{output_dir}/movie_for_search.parquet").persist()


df_ml_movie =  spark.read.csv(f"s3a://{bucket}/{ml_dir}/movies.csv").persist()


In [7]:
# check result file not empty
assert df_result_movie.count() > 0, "result movie data is empty"
assert df_result_rating_time.count() > 0, "result rating time data is empty"
assert df_result_search.count() > 0, "result search data is empty"

print("All result data are not empty")

In [None]:
# check 
assert df_result_movie.count() > 0, "result movie data is empty"
assert df_result_rating_time.count() > 0, "result rating time data is empty"
assert df_result_search.count() > 0, "result search data is empty"

print("All result data are not empty")