In [1]:
import os
import logging
import boto3

import pyspark
from pyspark.sql import SparkSession
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row


In [2]:
logging.basicConfig(format="[%(levelname)s] [%(asctime)s] %(message)s",
                    level=logging.WARN,
                    datefmt="%d/%m/%y %H:%M:%S",
                    encoding="utf-8")

AWS_ENDPOINT = "http://minio-webserver:9000"
AWS_ACCESS_KEY_ID = "U6dHDkvTv3CdrviA"
AWS_SECRET_ACCESS_KEY = "o8LTVjrVtNuzOv7DjGOxhN0HQDeksEej"
AWS_REGION ="us-east-1"

SPARK_APP_NAME= "projeto-imdb-notebook"
SPARK_MASTER_URL="spark://spark-master:7077"

BUCKET_ANALYTICS="projeto-imdb-analytics"
BUCKET_STAGE="projeto-imdb-stage"


def obterJars():
    dir = "/notebooks/spark-jars"
    jars = os.listdir(dir)
    stringJars = ""

    for jar in jars:
        logging.info(f"{dir}/{jar}")
        stringJars += f"{dir}/{jar},"

    return stringJars[:-1]

def salvarDataFrameBancoDados(df,tabela):
    df.write.format("jdbc")\
    .option("url", "jdbc:postgresql://postgres-server:5432/projeto_imdb") \
    .option("driver", "org.postgresql.Driver").option("dbtable", tabela) \
    .option("user", "postgres").option("password", "postgres").save(mode="overwrite")

    print(f"{df.count()} registros salvos na tabela {tabela}]")

In [4]:
#spark_client.stop()
conf = pyspark.SparkConf()
conf.setAppName(SPARK_APP_NAME)
conf.setMaster(SPARK_MASTER_URL)

conf.set("spark.hadoop.fs.s3a.endpoint", AWS_ENDPOINT) \
        .set("spark.hadoop.fs.s3a.endpoint.region", AWS_REGION) \
        .set("spark.hadoop.fs.s3a.access.key",AWS_ACCESS_KEY_ID) \
        .set("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) \
        .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .set("spark.hadoop.fs.s3a.connection.ssl.enabled", False) \
        .set("spark.hadoop.com.amazonaws.services.s3.enableV2", True) \
        .set("spark.jars", obterJars()) \
        .set("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "replace") \
        .set("spark.hadoop.fs.s3a.fast.upload", True) \
        .set("spark.hadoop.fs.s3a.path.style.access", True) \
        .set("spark.sql.sources.commitProtocolClass",
             "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol") \
        .set("spark.executor.memory", "2g") \
        .set("spark.driver.memory", "2g") \
        .set("spark.cores.max", "4") 
        # .set("spark.hadoop.fs.s3a.committer.name", "magic") \
        # .set("spark.hadoop.fs.s3a.bucket.all.committer.magic.enabled", "true")


sc = pyspark.SparkContext(conf=conf).getOrCreate()
sc.setLogLevel("warn")
spark_client = SparkSession(sc)
spark_client

23/07/25 19:41:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Criar as tabelas temporárias a partir dos arquivos *.parquet

In [5]:
dfNameBasics = spark_client \
        .read \
        .parquet(f"s3a://{BUCKET_ANALYTICS}/name_basics", header=True)

dfNameBasics.createOrReplaceTempView("name_basics")
dfNameBasics.printSchema()

23/07/25 19:43:53 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

root
 |-- nconst: string (nullable = true)
 |-- primaryName: string (nullable = true)
 |-- birthYear: string (nullable = true)
 |-- deathYear: string (nullable = true)
 |-- primaryProfession: string (nullable = true)
 |-- knownForTitles: string (nullable = true)



In [6]:
dfAkas = spark_client \
        .read \
        .parquet(f"s3a://{BUCKET_ANALYTICS}/title_akas", header=True)

dfAkas.createOrReplaceTempView("title_akas")
dfAkas.printSchema()

[Stage 1:>                                                          (0 + 1) / 1]

root
 |-- titleId: string (nullable = true)
 |-- ordering: string (nullable = true)
 |-- title: string (nullable = true)
 |-- region: string (nullable = true)
 |-- language: string (nullable = true)
 |-- types: string (nullable = true)
 |-- attributes: string (nullable = true)
 |-- isOriginalTitle: string (nullable = true)



                                                                                

In [7]:
dfTitleBasics = spark_client \
        .read \
        .parquet(f"s3a://{BUCKET_ANALYTICS}/title_basics", header=True)

dfTitleBasics.createOrReplaceTempView("title_basics")
dfTitleBasics.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- titleType: string (nullable = true)
 |-- primaryTitle: string (nullable = true)
 |-- originalTitle: string (nullable = true)
 |-- isAdult: string (nullable = true)
 |-- startYear: string (nullable = true)
 |-- endYear: string (nullable = true)
 |-- runtimeMinutes: string (nullable = true)
 |-- genres: string (nullable = true)



In [8]:
dfTitleCrew = spark_client \
        .read \
        .parquet(f"s3a://{BUCKET_ANALYTICS}/title_crew", header=True)

dfTitleCrew.createOrReplaceTempView("title_crew")
dfTitleCrew.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- directors: string (nullable = true)
 |-- writers: string (nullable = true)



In [9]:
dfTitleEpisode = spark_client \
        .read \
        .parquet(f"s3a://{BUCKET_ANALYTICS}/title_episode", header=True)

dfTitleEpisode.createOrReplaceTempView("title_episode")
dfTitleEpisode.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- parentTconst: string (nullable = true)
 |-- seasonNumber: string (nullable = true)
 |-- episodeNumber: string (nullable = true)



In [10]:
dfTitlePrincipals = spark_client \
        .read \
        .parquet(f"s3a://{BUCKET_ANALYTICS}/title_principals", header=True)

dfTitlePrincipals.createOrReplaceTempView("title_principals")
dfTitlePrincipals.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- ordering: string (nullable = true)
 |-- nconst: string (nullable = true)
 |-- category: string (nullable = true)
 |-- job: string (nullable = true)
 |-- characters: string (nullable = true)



In [11]:
dfRatings = spark_client \
        .read \
        .parquet(f"s3a://{BUCKET_ANALYTICS}/title_ratings", header=True)

dfRatings.createOrReplaceTempView("title_ratings")
dfRatings.printSchema()

root
 |-- tconst: string (nullable = true)
 |-- averageRating: string (nullable = true)
 |-- numVotes: string (nullable = true)



# Extrair dados da região do Brasil para análise

In [12]:
#Títulos que possuem lançamentos no Brasil
sqlTitulosBR = """
    select distinct titleId
    from title_akas 
    where 1=1
    and region='BR' 
    --group by titleId having count(*)=1
    order by titleId
"""

dfTitulosBR = spark_client.sql(sqlTitulosBR)
dfTitulosBR.createOrReplaceTempView("title_br")
print(f"Existem {dfTitulosBR.count()} titulos que foram lançados no Brasil")




Existem 111159 titulos que foram lançados no Brasil


                                                                                

In [13]:
sql = """
    select ta.* 
    from title_akas ta 
        inner join title_br tbr 
        on ta.titleId=tbr.titleId
    where 1=1
    and region='BR' 
"""
dfAkasBr = spark_client.sql(sql)
salvarDataFrameBancoDados(dfAkasBr, "title_akas_br")

                                                                                

119471 registros salvos na tabela title_akas_br]


In [14]:
sql = """
    select tb.* 
    from title_basics tb 
        inner join title_br tbr 
        on tb.tconst=tbr.titleId
"""
dfBasicsBr = spark_client.sql(sql)
salvarDataFrameBancoDados(dfBasicsBr, "title_basics_br")



111109 registros salvos na tabela title_basics_br]


                                                                                

In [15]:
sql = """
    select tc.* 
    from title_crew tc 
        inner join title_br tbr 
        on tc.tconst=tbr.titleId
"""
dfCrewBr = spark_client.sql(sql)
salvarDataFrameBancoDados(dfCrewBr, "title_crew_br")



111118 registros salvos na tabela title_crew_br]


                                                                                

In [16]:
sql = """
    select te.* 
    from title_episode te 
        inner join title_br tbr 
        on te.tconst=tbr.titleId
"""
dfEpisodeBr = spark_client.sql(sql)
salvarDataFrameBancoDados(dfEpisodeBr, "title_episode_br")

                                                                                

5868 registros salvos na tabela title_episode_br]


                                                                                

In [17]:
sql = """
    select tp.* 
    from title_principals tp 
        inner join title_br tbr 
        on tp.tconst=tbr.titleId
"""
dfPrincipalsBr = spark_client.sql(sql)
salvarDataFrameBancoDados(dfPrincipalsBr, "title_principals_br")



930936 registros salvos na tabela title_principals_br]


                                                                                

In [18]:
sql = """
    select tr.* 
    from title_ratings tr 
        inner join title_br tbr 
        on tr.tconst=tbr.titleId
"""
dfRatingsBr = spark_client.sql(sql)
salvarDataFrameBancoDados(dfRatingsBr, "title_ratings_br")

[Stage 95:>                                                         (0 + 1) / 1]

81679 registros salvos na tabela title_ratings_br]


                                                                                

In [19]:
dfAkasBr.info()

AttributeError: 'DataFrame' object has no attribute 'info'

23/07/25 21:12:58 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: Master removed our application: KILLED
23/07/25 21:12:58 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exiting due to error from cluster scheduler: Master removed our application: KILLED
	at org.apache.spark.errors.SparkCoreErrors$.clusterSchedulerError(SparkCoreErrors.scala:291)
	at org.apache.spark.scheduler.TaskSchedulerImpl.error(TaskSchedulerImpl.scala:978)
	at org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.dead(StandaloneSchedulerBackend.scala:165)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint.markDead(StandaloneAppClient.scala:263)
	at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$receive$1.applyOrElse(StandaloneAppClient.scala:170)
	at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
	at org.apache.spark.rpc.netty.Inbox.proce