In [8]:
#pip install boto3


In [36]:
import sys
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, monotonically_increasing_id
import boto3

# Inicializar o SparkSession
spark = SparkSession.builder \
    .appName("Transformação TRT") \
    .getOrCreate()


# Caminho de origem dos dados
source_file_tmdb = f"./TRT/TMDB/Fantasy/dt=2024-04-12/"
source_file_movie = f"./TRT/Movies/movies"

# Carregar dados do arquivo Parquet
df_tmdb = spark.read.parquet(source_file_tmdb)
df_movie = spark.read.parquet(source_file_movie)

#####
# Verificar se a coluna 'data_coleta_tmdb' está presente em df_tmdb
print("Esquema de df_tmdb após a junção:")
df_tmdb.printSchema()

# Verificar se 'data_coleta_tmdb' está presente em df_tmdb
print("Verificar se 'data_coleta_tmdb' está presente em df_tmdb:")
df_tmdb.show()
#######

# junção dataframes
df_movie = df_movie.withColumnRenamed("imdb_id", "movie_imdb_id")

df_geral = df_movie.join(df_tmdb, df_movie.movie_imdb_id == df_tmdb.imdb_id, 'inner')


# Verificar se a coluna 'data_coleta_tmdb' está presente em df_tmdb
print("Esquema de df_geral após a junção:")
df_geral.printSchema()

# Verificar se 'data_coleta_tmdb' está presente em df_tmdb
print("Verificar se 'data_coleta_tmdb' está presente em df_geral:")
df_geral.show()


# Removendo linhas duplicadas com base na coluna imdb_id para garantir que seja uma chave primária
df_geral = df_geral.dropDuplicates(["movie_imdb_id"])

# Criando uma view temporária a partir do DataFrame geral
df_geral.createOrReplaceTempView('df_geral_view')

# Visualizar o esquema do DataFrame df_geral
df_geral.printSchema()

Esquema de df_tmdb após a junção:
root
 |-- anoLancamento_tmdb: long (nullable = true)
 |-- id_genero: long (nullable = true)
 |-- adulto: boolean (nullable = true)
 |-- orcamento: integer (nullable = true)
 |-- id_tmdb: long (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- popularidade: double (nullable = true)
 |-- receita: integer (nullable = true)
 |-- duracao: integer (nullable = true)
 |-- titulo: string (nullable = true)
 |-- votacao_media: double (nullable = true)
 |-- contagem_de_voto: integer (nullable = true)
 |-- data_coleta_tmdb: date (nullable = true)

Verificar se 'data_coleta_tmdb' está presente em df_tmdb:
+------------------+---------+------+---------+-------+---------+------------+----------+-------+--------------------+-------------+----------------+----------------+
|anoLancamento_tmdb|id_genero|adulto|orcamento|id_tmdb|  imdb_id|popularidade|   receita|duracao|              titulo|votacao_media|contagem_de_voto|data_coleta_tmdb|
+------------------+--

In [37]:
# Definir as consultas para as tabelas


df_data_query = """
    SELECT DISTINCT anoLancamento_tmdb AS Ano_Lancamento 
    FROM df_geral_view
"""


df_titulo_query = """
    SELECT tituloPrincipal AS Titulo, 
        movie_imdb_id as titulo_imdb_id
    FROM df_geral_view
"""

df_adulto_query = """
    SELECT adulto AS Adulto, 
        movie_imdb_id as adulto_imdb_id
    FROM df_geral_view
"""

# Criar DataFrames usando as consultas
df_data = spark.sql(df_data_query)
df_titulo = spark.sql(df_titulo_query)
df_adulto = spark.sql(df_adulto_query)


df_data.printSchema()
df_titulo.printSchema()
df_adulto.printSchema()

root
 |-- Ano_Lancamento: long (nullable = true)

root
 |-- Titulo: string (nullable = true)
 |-- titulo_imdb_id: string (nullable = true)

root
 |-- Adulto: boolean (nullable = true)
 |-- adulto_imdb_id: string (nullable = true)



In [40]:
# Criar uma janela para particionar os dados por nada, resultando em uma partição única
# Definir a ordem na janela
window_data = Window.orderBy("Ano_Lancamento")
window_titulo = Window.orderBy("titulo_imdb_id")
window_adulto = Window.orderBy("adulto_imdb_id")

# Adicionar uma nova coluna de ID em cada DataFrame
# Adicionar uma nova coluna de ID em cada DataFrame
df_data = df_data.withColumn("id_tabela_data", concat(lit("dtg"), row_number().over(window_data)))
df_titulo = df_titulo.withColumn("id_tabela_titulo", concat(lit("tig"), row_number().over(window_titulo)))
df_adulto = df_adulto.withColumn("id_tabela_adulto", concat(lit("adg"), row_number().over(window_adulto)))

# Converter id_adulto para string
df_adulto = df_adulto.withColumn("id_tabela_adulto", col("id_tabela_adulto").cast("string"))
df_data = df_data.withColumn("id_tabela_data", col("id_tabela_data").cast("string"))
df_titulo = df_titulo.withColumn("id_tabela_titulo", col("id_tabela_titulo").cast("string"))


df_data.show()
df_data.describe().show()
df_titulo.show()
df_titulo.describe().show()
df_adulto.show()
df_adulto.describe().show()

+--------------+--------------+
|Ano_Lancamento|id_tabela_data|
+--------------+--------------+
|          2001|          dtg1|
|          2002|          dtg2|
|          2003|          dtg3|
|          2004|          dtg4|
|          2005|          dtg5|
|          2006|          dtg6|
|          2007|          dtg7|
|          2008|          dtg8|
|          2009|          dtg9|
|          2010|         dtg10|
|          2011|         dtg11|
|          2012|         dtg12|
|          2013|         dtg13|
|          2014|         dtg14|
|          2015|         dtg15|
+--------------+--------------+

+-------+----------------+--------------+
|summary|  Ano_Lancamento|id_tabela_data|
+-------+----------------+--------------+
|  count|              15|            15|
|   mean|          2008.0|          NULL|
| stddev|4.47213595499958|          NULL|
|    min|            2001|          dtg1|
|    max|            2015|          dtg9|
+-------+----------------+--------------+

+-----------

In [44]:
df_fato_query = """
    SELECT DISTINCT
        df_geral_view.movie_imdb_id as fato_imdb_id, 
        orcamento, 
        receita, 
        tempoMinutos, 
        popularidade, 
        contagem_de_voto, 
        votacao_media, 
        data_coleta_tmdb,
        anoLancamento_tmdb,
        adulto
    FROM df_geral_view
"""
df_fato = spark.sql(df_fato_query)


#df_fato.printSchema()



df_fato = df_fato.join(df_data.select('id_tabela_data', 'Ano_Lancamento'),
                       df_fato.anoLancamento_tmdb==df_data.Ano_Lancamento, 'inner')
#df_fato = df_fato.drop('anoLancamento_tmdb', 'Ano_Lancamento')
                                    

df_fato = df_fato.join(df_titulo.select('id_tabela_titulo', 'titulo_imdb_id'),
                       df_fato.fato_imdb_id==df_titulo.titulo_imdb_id, 'inner')
#df_fato = df_fato.drop('titulo_imdb_id')


df_fato = df_fato.join(df_adulto.select('id_tabela_adulto', 'adulto_imdb_id'),
                       df_fato.fato_imdb_id==df_adulto.adulto_imdb_id, 'inner')
#df_fato = df_fato.drop('adulto', 'Adulto')



df_temp = df_fato.toPandas()
drop_colunas = ['anoLancamento_tmdb', 'Ano_Lancamento', 'titulo_imdb_id', 'adulto', 'adulto_imdb_id']
df_temp = df_temp.drop(drop_colunas, axis=1)
df_fato = spark.createDataFrame(df_temp)

# Remover colunas redundantes após o join
#df_fato = df_fato_sagas.drop("anoLancamento_tmdb", "Ano_Lancamento", "adulto", "ts_imdb_id", "as_imdb_id")


df_fato.orderBy('fato_imdb_id').show
df_fato.printSchema()

root
 |-- fato_imdb_id: string (nullable = true)
 |-- orcamento: long (nullable = true)
 |-- receita: long (nullable = true)
 |-- tempoMinutos: string (nullable = true)
 |-- popularidade: double (nullable = true)
 |-- contagem_de_voto: long (nullable = true)
 |-- votacao_media: double (nullable = true)
 |-- data_coleta_tmdb: date (nullable = true)
 |-- id_tabela_data: string (nullable = true)
 |-- id_tabela_titulo: string (nullable = true)
 |-- id_tabela_adulto: string (nullable = true)



In [5]:
####################################################
# Print e mostrar o DataFrame df_fato
#print(df_fato)
df_fato.show()
df_fato.describe().show()

# Contar o número total de linhas em df_fato
total_linhas_fato = df_fato.count()

# Mostrar o número total de linhas em df_fato
print("Número total de linhas em df_fato:", total_linhas_fato)

# Print e mostrar o DataFrame df_data
#print(df_data)
df_data.show()
df_data.describe().show()

# Contar o número total de linhas em df_data
total_linhas_data = df_data.count()

# Mostrar o número total de linhas em df_data
print("Número total de linhas em df_data:", total_linhas_data)

# Print e mostrar o DataFrame df_titulo
#print(df_titulo)
df_titulo.show()
df_titulo.describe().show()

# Contar o número total de linhas em df_titulo
total_linhas_titulo = df_titulo.count()

# Mostrar o número total de linhas em df_titulo
print("Número total de linhas em df_titulo:", total_linhas_titulo)

# Print e mostrar o DataFrame df_adulto
#print(df_adulto)
df_adulto.show()
df_adulto.describe().show()

# Contar o número total de linhas em df_adulto
total_linhas_adulto = df_adulto.count()

# Mostrar o número total de linhas em df_adulto
print("Número total de linhas em df_adulto:", total_linhas_adulto)

+------------+---------+---------+------------+---------+------------+----------------+-------------+----------------+-------+---------+---------+
|fato_imdb_id|orcamento|  receita|tempoMinutos|id_genero|popularidade|contagem_de_voto|votacao_media|data_coleta_tmdb|id_data|id_titulo|id_adulto|
+------------+---------+---------+------------+---------+------------+----------------+-------------+----------------+-------+---------+---------+
|   tt0035423| 48000000| 76019048|         118|       14|      30.262|            1281|          6.3|      2024-04-12|     14|      227|      227|
|   tt0120667|100000000|333535934|         106|       14|       51.39|            9009|        5.783|      2024-04-12|     12|      228|      228|
|   tt0120804| 33000000|103000000|         100|      878|       38.21|            6063|        6.615|      2024-04-12|     10|      381|      381|
|   tt0121765|120000000|649398328|         142|      878|      47.459|           12751|        6.559|      2024-04-12|

In [6]:
########################################################################################


# Caminho de saída para os dados no formato Parquet
output_path = "./Refined"

# Escrever DataFrames como tabelas no AWS Glue
def salvar_tabela(df, tabela_nome):
    df.write.format("parquet") \
        .mode("overwrite") \
        .save(output_path + "/" + tabela_nome)


# Salvando as tabelas
salvar_tabela(df_fato, "tabela_fato")
salvar_tabela(df_data, "tabela_data")
salvar_tabela(df_titulo, "tabela_titulo")
salvar_tabela(df_adulto, "tabela_adulto")

In [34]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, concat, lit

# Caminho de origem dos dados
source_file_tmdb = f"./TRT/TMDB/Fantasy/dt=2024-04-12/"
source_file_movie = f"./TRT/Movies/movies"

# Carregar dados do arquivo Parquet
df_tmdb = spark.read.parquet(source_file_tmdb)
df_movie = spark.read.parquet(source_file_movie)


df_movie = df_movie.withColumnRenamed("imdb_id", "movie_imdb_id")



# junção dataframes
df_geral = df_movie.join(df_tmdb, df_movie.movie_imdb_id == df_tmdb.imdb_id, 'inner')


# Remover linhas duplicadas com base na coluna movie_imdb_id para garantir que seja uma chave primária
df_geral = df_geral.dropDuplicates(["movie_imdb_id"])

# Criando uma view temporária a partir do DataFrame geral
df_geral.createOrReplaceTempView('df_geral_view')

# Visualizar o esquema do DataFrame df_geral
df_geral.printSchema()

# Definir as consultas para as tabelas
ids_sagas = ['tt0241527', 'tt0295297', 'tt0304141', 'tt0330373', 'tt0373889', 'tt0417741', 'tt0926084', 'tt1201607', 'tt1392170', 'tt1951264', 'tt1951265', 'tt1951266', 'tt1099212', 'tt1259571', 'tt1324999', 'tt1325004', 'tt1673434']



ids_sagas_str = ", ".join([f"'{id}'" for id in ids_sagas])
df_sagas_data_query = f"""
    SELECT anoLancamento_tmdb AS Ano_Lancamento 
    FROM df_geral_view
    WHERE movie_imdb_id IN ({ids_sagas_str})
"""


# Montar a parte da consulta SQL para buscar os títulos das sagas específicas
ids_sagas_str = ", ".join([f"'{id}'" for id in ids_sagas])
df_sagas_titulo_query = f"""
    SELECT tituloPrincipal AS Titulo, 
        movie_imdb_id as ts_imdb_id
    FROM df_geral_view
    WHERE movie_imdb_id IN ({ids_sagas_str})
"""


# Montar a parte da consulta SQL para buscar os títulos das sagas específicas
ids_sagas_str = ", ".join([f"'{id}'" for id in ids_sagas])
df_sagas_adulto_query = f"""
    SELECT adulto AS Adulto, 
        movie_imdb_id as as_imdb_id
    FROM df_geral_view
    WHERE movie_imdb_id IN ({ids_sagas_str})
"""


# Criar DataFrames usando as consultas
df_sagas_data = spark.sql(df_sagas_data_query)
df_sagas_titulo = spark.sql(df_sagas_titulo_query)
df_sagas_adulto = spark.sql(df_sagas_adulto_query)



# Criar uma janela para particionar os dados por nada, resultando em uma partição única
# Definir a ordem na janela
window_data = Window.orderBy("Ano_Lancamento")
window_titulo = Window.orderBy("ts_imdb_id")
window_adulto = Window.orderBy("as_imdb_id")



# Adicionar uma nova coluna de ID em cada DataFrame
# Adicionar uma nova coluna de ID em cada DataFrame
df_sagas_data = df_sagas_data.withColumn("id_tabela_data", concat(lit("dt"), row_number().over(window_data)))
df_sagas_titulo = df_sagas_titulo.withColumn("id_tabela_titulo", concat(lit("ti"), row_number().over(window_titulo)))
df_sagas_adulto = df_sagas_adulto.withColumn("id_tabela_adulto", concat(lit("ad"), row_number().over(window_adulto)))


# Converter id_adulto para string
df_sagas_data = df_sagas_data.withColumn("id_tabela_data", col("id_tabela_data").cast("string"))
df_sagas_titulo = df_sagas_titulo.withColumn("id_tabela_titulo", col("id_tabela_titulo").cast("string"))
df_sagas_adulto = df_sagas_adulto.withColumn("id_tabela_adulto", col("id_tabela_adulto").cast("string"))

# Montar a parte da consulta SQL para buscar os dados das sagas específicas
ids_sagas_str = ", ".join([f"'{id}'" for id in ids_sagas])
df_fato_sagas_query = f"""
    SELECT DISTINCT
        df_geral_view.movie_imdb_id as fs_movie_imdb_id, 
        orcamento, 
        receita, 
        tempoMinutos, 
        popularidade, 
        contagem_de_voto, 
        votacao_media, 
        data_coleta_tmdb,
        anoLancamento_tmdb
    FROM df_geral_view
    WHERE df_geral_view.movie_imdb_id IN ({ids_sagas_str})
"""


df_fato_sagas = spark.sql(df_fato_sagas_query)


df_fato_sagas.printSchema()



# Joining and dropping redundant columns
df_fato_sagas = df_fato_sagas.join(df_sagas_data.select("id_tabela_data", "Ano_Lancamento"),
                                   df_fato_sagas.anoLancamento_tmdb == df_sagas_data.Ano_Lancamento, "inner")
#df_fato_sagas = df_fato_sagas.drop("anoLancamento_tmdb", "Ano_Lancamento")


df_fato_sagas = df_fato_sagas.join(df_sagas_titulo.select('id_tabela_titulo', 'ts_imdb_id'),
                       df_fato_sagas.fs_movie_imdb_id==df_sagas_titulo.ts_imdb_id, 'inner')
#df_fato_sagas = df_fato_sagas.drop('ts_imdb_id')


df_fato_sagas = df_fato_sagas.join(df_sagas_adulto.select('id_tabela_adulto', 'as_imdb_id'),
                       df_fato_sagas.fs_movie_imdb_id==df_sagas_adulto.as_imdb_id, 'inner')
#df_fato = df_fato.drop('adulto', 'Adulto')

# Remover colunas redundantes após o join
df_fato_sagas = df_fato_sagas.drop("anoLancamento_tmdb", "Ano_Lancamento", "adulto", "ts_imdb_id", "as_imdb_id")

# Eliminar linhas duplicadas
df_fato_sagas = df_fato_sagas.dropDuplicates(["fs_movie_imdb_id"])


df_fato_sagas.orderBy('fs_movie_imdb_id').show
df_fato_sagas.printSchema()


# Caminho de saída para os dados no formato Parquet
output_path = "./Refined"

# Escrever DataFrames como tabelas no AWS Glue
def salvar_tabela(df, tabela_nome):
    df.write.format("parquet") \
        .mode("overwrite") \
        .save(output_path + "/" + tabela_nome)



# Salvando as tabelas
salvar_tabela(df_fato_sagas, "tabela_fato_sagas")
salvar_tabela(df_sagas_data, "tabela_sagas_data")
salvar_tabela(df_sagas_titulo, "tabela_sagas_titulo")
salvar_tabela(df_sagas_adulto, "tabela_sagas_adulto")


root
 |-- movie_imdb_id: string (nullable = true)
 |-- tituloPrincipal: string (nullable = true)
 |-- tempoMinutos: string (nullable = true)
 |-- notaMedia_imdb: string (nullable = true)
 |-- numeroVotos_imdb: string (nullable = true)
 |-- anoLancamento_tmdb: long (nullable = true)
 |-- id_genero: long (nullable = true)
 |-- adulto: boolean (nullable = true)
 |-- orcamento: integer (nullable = true)
 |-- id_tmdb: long (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- popularidade: double (nullable = true)
 |-- receita: integer (nullable = true)
 |-- duracao: integer (nullable = true)
 |-- titulo: string (nullable = true)
 |-- votacao_media: double (nullable = true)
 |-- contagem_de_voto: integer (nullable = true)
 |-- data_coleta_tmdb: date (nullable = true)

root
 |-- fs_movie_imdb_id: string (nullable = true)
 |-- orcamento: integer (nullable = true)
 |-- receita: integer (nullable = true)
 |-- tempoMinutos: string (nullable = true)
 |-- popularidade: double (nullable = t

In [35]:
df_fato_sagas.describe().show()
df_fato_sagas.show(25)
df_sagas_data.describe().show()
df_sagas_titulo.describe().show()
df_sagas_adulto.describe().show()

+-------+----------------+--------------------+--------------------+------------------+------------------+-----------------+------------------+--------------+----------------+----------------+
|summary|fs_movie_imdb_id|           orcamento|             receita|      tempoMinutos|      popularidade| contagem_de_voto|     votacao_media|id_tabela_data|id_tabela_titulo|id_tabela_adulto|
+-------+----------------+--------------------+--------------------+------------------+------------------+-----------------+------------------+--------------+----------------+----------------+
|  count|              17|                  17|                  17|                17|                17|               17|                17|            17|              17|              17|
|   mean|            NULL|1.2676470588235295E8| 8.246426584117647E8|137.35294117647058|114.60499999999999|16339.64705882353| 7.187176470588235|          NULL|            NULL|            NULL|
| stddev|            NULL| 5.794127