In [1]:
import glob
import os
import matplotlib.pyplot as plt
import numpy as np

from pyspark.sql import SparkSession, Row
from datetime import datetime
from decouple import config
from datetime import datetime
from matplotlib.ticker import FormatStrFormatter, StrMethodFormatter

In [2]:
mypath = "/opt/spark_jars"
files = glob.glob(f'{mypath}/*')
my_jars = ','.join(files)
aws_access_key = config('AWS_ACCESS_KEY_ID')
aws_secret_key = config('AWS_SECRET_ACCESS_KEY')
aws_endpoint_url = "http://dataeng-minio-nginx:9000"

current_time = datetime.now()
current_date = current_time .strftime("%Y-%m-%d")
year = current_time .strftime("%Y")
month = current_time .strftime("%m")
day = current_time .strftime("%d")
current_folder = f"year={year}/month={month}/day={day}"

!ls /opt/spark_jars

aws-java-sdk-1.12.400.jar	    httpclient5-5.2.1.jar
aws-java-sdk-bundle-1.12.231.jar    jets3t-0.9.4.jar
aws-java-sdk-core-1.12.400.jar	    joda-time-2.12.2.jar
aws-java-sdk-dynamodb-1.12.400.jar  postgresql-42.6.0.jar
aws-java-sdk-kms-1.12.400.jar	    slf4j-api-2.0.6.jar
aws-java-sdk-s3-1.12.400.jar	    slf4j-reload4j-2.0.6.jar
hadoop-aws-3.3.1.jar


In [3]:
# spark.stop()
spark = SparkSession \
    .builder \
    .appName("JupyterLabDataExtraction") \
    .config("spark.jars", my_jars) \
    .master(config('SPARK_URL')) \
    .getOrCreate() 

hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", aws_endpoint_url)
hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false")
hadoop_conf.set("fs.s3a.access.key", aws_access_key)
hadoop_conf.set("fs.s3a.secret.key", aws_secret_key)
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

23/07/20 19:42:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [None]:
# Loading title_ratings

foldername = f"title_ratings/{current_folder}"
fileurl = f"s3a://{os.getenv('AWS_S3_IMDB_BUCKET')}/datalake/{foldername}/data"
title_rating_raw_df = spark.read.parquet(fileurl, sep=r'\t', header=True)
title_rating_raw_df.createOrReplaceTempView("title_ratings")
# title_rating_raw_df.printSchema(), title_rating_raw_df.count(), title_rating_raw_df.take(2)

In [None]:
# Loading title_akas: responsavel pelo o nome dos titulos por região e idioma

foldername = f"title_akas/{current_folder}"
fileurl = f"s3a://{os.getenv('AWS_S3_IMDB_BUCKET')}/datalake/{foldername}/data"
title_akas_raw_df = spark.read.parquet(fileurl, sep=r'\t', header=True)
# title_akas_raw_df.printSchema(), title_akas_raw_df.count(), title_akas_raw_df.take(2)

In [None]:
# Filtering by country, in this case: BR
title_akas_br_raw_df = title_akas_raw_df.where(title_akas_raw_df.region == "BR")
title_akas_br_raw_df.createOrReplaceTempView("title_akas_br")
# title_akas_br_raw_df.count(), title_akas_br_raw_df.show(2)

In [None]:
sql = """
    select
        *
    from
        title_akas_br ta
    left join
        title_ratings tr on ta.titleId = tr.tconst
"""
akas_and_rating_df = spark.sql(sql);

akas_and_rating_df.createOrReplaceTempView("titles_akas_ratings_br")

# akas_and_rating_df.count(), akas_and_rating_df.show(2)

In [None]:
sql_more_voteds = """
    select
        tr.title,
        tr.numVotes
    from
        titles_akas_ratings_br tr
    order by 
        tr.numVotes DESC
    limit 10
    
"""
more_votes_df = spark.sql(sql_more_voteds).toPandas()


In [None]:
more_votes_df.head(10)

In [None]:
py_more_voteds_pd_df = akas_and_rating_df.select('title', 'numVotes').sort(akas_and_rating_df.numVotes.desc(), akas_and_rating_df.averageRating.desc()).limit(10).toPandas()
# py_more_voteds_pd_df.head(10)

In [None]:
fig = plt.figure()
ax = fig.add_axes([0,0,1,1])
ax.bar(py_more_voteds_pd_df.title,py_more_voteds_pd_df.numVotes)
plt.xticks(rotation=30, ha='right')
plt.show()

In [None]:
# Loading title_basics

foldername = f"title_basics/{current_folder}"
fileurl = f"s3a://{os.getenv('AWS_S3_IMDB_BUCKET')}/datalake/{foldername}/data"
title_basics_raw_df = spark.read.parquet(fileurl, sep=r'\t', header=True)
# title_basics_raw_df.createOrReplaceTempView("title_basics")
# title_basics_raw_df.printSchema(), title_basics_raw_df.count(), title_basics_raw_df.take(2)

In [None]:
new_title_basics_row_df = title_basics_raw_df.sort(title_basics_raw_df.tconst.desc()).limit(1000000)

In [None]:
new_title_basics_row_df.write.format("jdbc")\
    .option("url", "jdbc:postgresql://dataeng-postgres:5432/awari_imdb") \
    .option("driver", "org.postgresql.Driver").option("dbtable", "title_basics") \
    .mode("overwrite") \
    .option("user", "postgres").option("password", "postgres").save()

In [None]:
spark.close()

In [None]:
# Loading name_basics: Name from Directos, Actor, everybody from casting, etc
# root
#  |-- nconst: string (nullable = true)
#  |-- primaryName: string (nullable = true)
#  |-- birthYear: string (nullable = true)
#  |-- deathYear: string (nullable = true)
#  |-- primaryProfession: string (nullable = true)
#  |-- knownForTitles: string (nullable = true)
# count: ~12.685.224 registers

module = "name_basics"

foldername = f"{module}/{current_folder}"
fileurl = f"s3a://{os.getenv('AWS_S3_IMDB_BUCKET')}/datalake/{foldername}/data"
name_basics_raw_df = spark.read.parquet(fileurl, sep=r'\t', header=True)
name_basics_raw_df.createOrReplaceTempView("name_basics")
# name_basics_raw_df.printSchema(), name_basics_raw_df.count(), name_basics_raw_df.show(2)

In [None]:
# Loading title_akas: Catalog with Titles Names in another languages
# root
#  |-- titleId: string (nullable = true)
#  |-- ordering: string (nullable = true)
#  |-- title: string (nullable = true)
#  |-- region: string (nullable = true)
#  |-- language: string (nullable = true)
#  |-- types: string (nullable = true)
#  |-- attributes: string (nullable = true)
#  |-- isOriginalTitle: string (nullable = true)
# count: ~36.602.195 registers

module = "title_akas"

foldername = f"{module}/{current_folder}"
fileurl = f"s3a://{os.getenv('AWS_S3_IMDB_BUCKET')}/datalake/{foldername}/data"
title_akas_raw_df = spark.read.parquet(fileurl, sep=r'\t', header=True)
title_akas_raw_df.createOrReplaceTempView("title_akas")
# title_akas_raw_df.printSchema(), title_akas_raw_df.count(), title_akas_raw_df.show(2)

In [None]:
# Loading title_basics: All Titles with bascis fields
# root
#  |-- tconst: string (nullable = true)
#  |-- titleType: string (nullable = true)
#  |-- primaryTitle: string (nullable = true)
#  |-- originalTitle: string (nullable = true)
#  |-- isAdult: string (nullable = true)
#  |-- startYear: string (nullable = true)
#  |-- endYear: string (nullable = true)
#  |-- runtimeMinutes: string (nullable = true)
#  |-- genres: string (nullable = true)
# count: ~10.006.917 registers

module = "title_basics"

foldername = f"{module}/{current_folder}"
fileurl = f"s3a://{os.getenv('AWS_S3_IMDB_BUCKET')}/datalake/{foldername}/data"
title_basics_raw_df = spark.read.parquet(fileurl, sep=r'\t', header=True)
title_basics_raw_df.createOrReplaceTempView("title_basics")
# title_basics_raw_df.printSchema(), title_basics_raw_df.count(), title_basics_raw_df.show(2)

In [None]:
# Loading title_crew: Relationship between Names Basics and Titles
# root
#  |-- tconst: string (nullable = true)
#  |-- directors: string (nullable = true)
#  |-- writers: string (nullable = true)
# count: ~10.006.917 registers

module = "title_crew"

foldername = f"{module}/{current_folder}"
fileurl = f"s3a://{os.getenv('AWS_S3_IMDB_BUCKET')}/datalake/{foldername}/data"
title_crew_raw_df = spark.read.parquet(fileurl, sep=r'\t', header=True)
title_crew_raw_df.createOrReplaceTempView("title_crew")
# title_crew_raw_df.printSchema(), title_crew_raw_df.count(), title_crew_raw_df.show(2)

In [None]:
# Loading title_episode: Relationships with all title that are related for a Title, like a SHOW(Series em portugues), and seasons
# root
#  |-- tconst: string (nullable = true)
#  |-- parentTconst: string (nullable = true)
#  |-- seasonNumber: string (nullable = true)
#  |-- episodeNumber: string (nullable = true)
# count: ~7.612.350 registers

module = "title_episode"

foldername = f"{module}/{current_folder}"
fileurl = f"s3a://{os.getenv('AWS_S3_IMDB_BUCKET')}/datalake/{foldername}/data"
title_episode_raw_df = spark.read.parquet(fileurl, sep=r'\t', header=True)
title_episode_raw_df.createOrReplaceTempView("title_episode")
# title_episode_raw_df.printSchema(), title_episode_raw_df.count(), title_episode_raw_df.show(2)

In [None]:
# Loading title_principals: Name of main Names related to Title, like : Director, Main Actor, etc
# root
#  |-- tconst: string (nullable = true)
#  |-- ordering: string (nullable = true)
#  |-- nconst: string (nullable = true)
#  |-- category: string (nullable = true)
#  |-- job: string (nullable = true)
#  |-- characters: string (nullable = true)
# count: ~57.165.969 registers


module = "title_principals"

foldername = f"{module}/{current_folder}"
fileurl = f"s3a://{os.getenv('AWS_S3_IMDB_BUCKET')}/datalake/{foldername}/data"
title_principals_raw_df = spark.read.parquet(fileurl, sep=r'\t', header=True)
title_principals_raw_df.createOrReplaceTempView("title_principals")
# title_principals_raw_df.printSchema(), title_principals_raw_df.count(), title_principals_raw_df.show(2)

In [4]:
# Loading title_ratings
# root
#  |-- tconst: string (nullable = true)
#  |-- averageRating: string (nullable = true)
#  |-- numVotes: string (nullable = true)
# count: ~1.329.800 registers

module = "title_ratings"

foldername = f"{module}/{current_folder}"
fileurl = f"s3a://{os.getenv('AWS_S3_IMDB_BUCKET')}/datalake/{foldername}/data"
title_ratings_raw_df = spark.read.parquet(fileurl, sep=r'\t', header=True)
title_ratings_raw_df.createOrReplaceTempView("title_ratings")
# title_ratings_raw_df.printSchema(), title_ratings_raw_df.count(), title_ratings_raw_df.show(2)

23/07/20 19:42:54 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


                                                                                

root
 |-- tconst: string (nullable = true)
 |-- averageRating: string (nullable = true)
 |-- numVotes: string (nullable = true)



                                                                                

+---------+-------------+--------+
|   tconst|averageRating|numVotes|
+---------+-------------+--------+
|tt0000001|          5.7|    1987|
|tt0000002|          5.8|     265|
+---------+-------------+--------+
only showing top 2 rows



                                                                                

(None, 1331252, None)

In [None]:
fig = plt.figure()
ax = fig.add_axes([0,0,1,1])
modules = ['name_basics', 'title_akas', 'title_basics', 'title_crew', 'title_episode', 'title_principals', 'title_ratings']
totals = [
    name_basics_raw_df.count(), 
    title_akas_raw_df.count(), 
    title_basics_raw_df.count(), 
    title_crew_raw_df.count(), 
    title_episode_raw_df.count(), 
    title_principals_raw_df.count(), 
    title_ratings_raw_df.count()
]

# get the length of the longest string in text, for the numpy str dtype
# this is only necessary if make sure the entire string is included in the array
str_len = max([len(t) for t in modules])

# create numpy array with dtypes
t = np.array(list(zip(modules, totals)), dtype = [('text', f'S{str_len}'), ('values', int)])

# sort array
t = np.sort(t, order=['values'])[::-1]

# print(np.sort(t['values']))

ax.bar(x=t['text'], height=t['values'])
ax.yaxis.set_major_formatter(StrMethodFormatter('{x:,.0f}'))
# plt.gca().set_yticklabels(['{:,.0f}'.format(x) for x in np.sort(t['values'])])
plt.xticks(rotation=30, ha='right')
plt.show()

In [None]:
spark.stop()