# **Resumo do Projeto: Apache Spark para ETL e Construção de Data Warehouse**

O projeto em questão utiliza Apache Spark no ambiente Google Colab para realizar o processo de ETL (Extração, Transformação e Carga) de dados, com o objetivo final de construir um robusto Data Warehouse. O Apache Spark é uma ferramenta de processamento de dados distribuído que oferece eficiência e escalabilidade, tornando-o ideal para manipulação de grandes conjuntos de dados.

## **Objetivo do Projeto**

O propósito principal deste projeto é realizar a seguinte sequência de operações:

1. **Extração de Dados:** Coletar dados brutos de diversas fontes, seja de arquivos, bancos de dados ou outras origens, utilizando as capacidades de processamento paralelo do Apache Spark.

2. **Transformação de Dados:** Aplicar operações de transformação nos dados extraídos para prepará-los para análise. Isso inclui limpeza, formatação, agregação e outras manipulações necessárias.

3. **Carga dos Dados Tratados:** Armazenar os dados tratados em um formato otimizado para consulta, como um Data Warehouse. O Apache Spark facilita a eficiência de gravação dos dados processados em sistemas de armazenamento apropriados.

## **Benefícios do Uso do Apache Spark no Colab**

O ambiente Google Colab proporciona uma infraestrutura eficiente para execução do Apache Spark, permitindo explorar e manipular grandes volumes de dados de forma interativa. A combinação dessas tecnologias oferece os seguintes benefícios:

- **Escalabilidade:** O Apache Spark distribui o processamento de dados de maneira eficiente, lidando com grandes volumes de dados de forma escalável.

- **Integração com Ferramentas Analíticas:** Os dados tratados podem ser facilmente integrados com ferramentas analíticas e de visualização, proporcionando insights valiosos para tomada de decisões.

- **Flexibilidade:** O Colab permite uma rápida prototipagem e experimentação, enquanto o Apache Spark oferece a flexibilidade necessária para lidar com diferentes fontes e tipos de dados.

- **Eficiência no Processamento:** A capacidade de processamento paralelo do Apache Spark acelera o tempo de execução das tarefas, otimizando o desempenho do ETL.



# Configuração de ambiente (conecte o Gmail e Kaggle account)

Connect to drive


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Catch Kaggle Key

--- baixar kaggle json

In [None]:
from google.colab import files
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"huggydt","key":"4082adeb51618c1b2fccc1977a1b1aa3"}'}

In [None]:
!ls -lha kaggle.json
!pip install -q kaggle # installing the kaggle package
!mkdir -p ~/.kaggle # creating .kaggle folder where the key should be placed
!cp kaggle.json ~/.kaggle/ # move the key to the folder
!pwd # checking the present working directory

-rw-r--r-- 1 root root 63 Dec 11 17:57 kaggle.json
/content


In [None]:
!chmod 600 ~/.kaggle/kaggle.json

In [None]:
import os

# Specify the paths for the downloaded files and extraction directories
zip_file1_path = '/content/drive/MyDrive/PysparkProject/millions-of-movies.zip'
extracted_dir1_path = '/content/drive/MyDrive/PysparkProject/csvs/millions-of-movies/'
dataset1 = 'akshaypawar7/millions-of-movies'

zip_file2_path = '/content/drive/MyDrive/PysparkProject/imdb-dataset-of-top-1000-movies-and-tv-shows.zip'
extracted_dir2_path = '/content/drive/MyDrive/PysparkProject/csvs/imdb-dataset-of-top-1000-movies-and-tv-shows/'
dataset2 = 'harshitshankhdhar/imdb-dataset-of-top-1000-movies-and-tv-shows'

zip_file3_path = '/content/drive/MyDrive/PysparkProject/top-100-rotten-tomatoes-movies-by-genres.zip'
extracted_dir3_path = '/content/drive/MyDrive/PysparkProject/csvs/top-100-rotten-tomatoes-movies-by-genres/'
dataset3 = 'prasertk/top-100-rotten-tomatoes-movies-by-genres'

def download_kaggle(zip_path, dataset):
    if not os.path.exists(zip_path):
        !kaggle datasets download -d {dataset} -p /content/drive/MyDrive/PysparkProject

# Function to check and extract if needed
def check_and_extract(zip_path, extract_path):
    if not os.path.exists(extract_path):
        !unzip {zip_path} -d {extract_path}

if not os.path.exists('/content/drive/MyDrive/PysparkProject'):
  os.makedirs('/content/drive/MyDrive/PysparkProject')
# Download and extract for the first zip file
download_kaggle(zip_file1_path, dataset1)
check_and_extract(zip_file1_path, extracted_dir1_path)

# Download and extract for the second zip file
download_kaggle(zip_file2_path, dataset2)
check_and_extract(zip_file2_path, extracted_dir2_path)

# Download and extract for the third zip file
download_kaggle(zip_file3_path, dataset3)
check_and_extract(zip_file3_path, extracted_dir3_path)


In [None]:
import shutil

def move_csvs_to_folder(source_folder, destination_folder):
    # Iterate through the source folder
    for filename in os.listdir(source_folder):
        if filename.endswith(".csv"):
            source_path = os.path.join(source_folder, filename)
            destination_path = os.path.join(destination_folder, filename)
            # Move the CSV file to the destination folder
            shutil.move(source_path, destination_path)

local_dest = '/content/drive/MyDrive/PysparkProject/csvs/'
# Usage example:
# Move CSVs from extracted_dir1_path to ''
move_csvs_to_folder(extracted_dir1_path, local_dest)

# Move CSVs from extracted_dir2_path to '/content/drive/MyDrive/PysparkProject/csvs/'
move_csvs_to_folder(extracted_dir2_path, local_dest)

# Move CSVs from extracted_dir3_path to '/content/drive/MyDrive/PysparkProject/csvs/'
move_csvs_to_folder(extracted_dir3_path, local_dest)


In [None]:
#Instalar Java 8
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
#Realizar o download do Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.4.2/spark-3.4.2-bin-hadoop3.tgz

In [None]:
#Descompartar o arquivo baixado
!tar xf spark-3.4.2-bin-hadoop3.tgz
#Instalando a findspark
!pip install -q findspark

In [None]:
#Importando a biblioteca os
import os

#Definindo a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

#Definindo a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.4.2-bin-hadoop3"

#Importando a findspark
import findspark

#Iniciando o findspark
findspark.init('spark-3.4.2-bin-hadoop3')

In [None]:
# iniciar uma sessão local
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DoubleType
import pyspark.sql.functions as F
import time
sc = SparkSession.builder.master('local[*]').config('spark.ui.port', '4050').enableHiveSupport().getOrCreate()

In [None]:
spark = SparkSession(sc)

In [None]:
os.environ["Drive_csvs"] = '/content/drive/MyDrive/PysparkProject'

# Mapeando e tratando os intervalos de dados divergentes


Esse trecho foi para utilizar funcionalidades do spark para extrair csvs simultaneamente, tal como utiliza-lo para tratas os dados posteriormente

In [None]:
import pyspark.pandas as pd
import os

drive_csvs_path = os.environ.get("Drive_csvs")
csv_file_path = '/content/drive/MyDrive/PysparkProject/csvs/*.csv'

pd.set_option('compute.default_index_type', 'distributed-sequence')

df_spark = pd.read_csv(csv_file_path, lines=True, index_col=None)
# Define the intervals for each CSV file
interval_first_csv = (0, 722618)
interval_second_csv = (722619, 723618)
interval_third_csv = (723619, 725230)

# Convert the index to a column
df_spark = df_spark.reset_index()
# Filter rows for the first CSV file
df_first_csv = df_spark[(df_spark['index'] >= interval_first_csv[0]) & (df_spark['index'] <= interval_first_csv[1])]

# Filter rows for the second CSV file
df_second_csv = df_spark[(df_spark['index'] >= interval_second_csv[0]) & (df_spark['index'] <= interval_second_csv[1])]

# Filter rows for the third CSV file
df_third_csv = df_spark[(df_spark['index'] >= interval_third_csv[0]) & (df_spark['index'] <= interval_third_csv[1])]



In [None]:
columns_to_keep = ['index', 'id', 'title', 'genres', 'original_language', 'overview']
df_third_csv = df_third_csv[columns_to_keep]

In [None]:
new_column_names = {'index': 'index', 'id': 'genres', 'title': 'rank', 'genres': 'RatingTomatometer', 'original_language': 'title', 'overview': 'No. of Reviews'}
df_third_csv = df_third_csv.rename(columns=new_column_names)

In [None]:
columns_to_keep1 = ['index', 'title', 'genres', 'overview', 'popularity','production_companies']
df_second_csv = df_second_csv[columns_to_keep1]

In [None]:
new_column_names1 = {'index': 'index', 'title': 'title', 'genres': 'Released_Year', 'overview':'Runtime',"popularity":"Genre","production_companies":"IMDB_Rating"}
df_second_csv = df_second_csv.rename(columns=new_column_names1)

In [None]:
columns_to_keep11 = ['index', 'title', 'genres', 'original_language','production_companies','release_date','budget','revenue','runtime','status','vote_average','credits']
df_first_csv = df_first_csv[columns_to_keep11]

In [None]:
#df1_2 = pd.merge(df_first_csv,df_second_csv, on = 'title',how = 'inner')
df1_2 = pd.merge(df_first_csv,df_second_csv[['title','IMDB_Rating']],  left_on='title', right_on='title', how='left')

In [None]:
df_third_csv_limpo = df_third_csv
df_third_csv_limpo['title'] = df_third_csv_limpo['title'].str.replace(r'\(\d+\)', '').str.strip()

In [None]:
df12_3 = pd.merge(df1_2,df_third_csv_limpo[['title','RatingTomatometer']],  left_on='title', right_on='title', how='left')

In [None]:
df12_3 = df12_3.drop_duplicates(subset=['index'])

In [None]:
df12_3 = df12_3.query("status = 'Released'")

In [None]:
df_temp2 = df12_3
df_temp2['release_date'] = df_temp2['release_date'].str.slice(stop=-3)
df_temp2['year'] = df_temp2['release_date'].str.slice(stop=-3)

In [None]:
df_temp3 = df_temp2
df_temp3['month'] = df_temp3['release_date'].str.slice(-2).astype(int)

In [None]:
columns = df_temp3.columns
schema = StructType([StructField(col, StringType(), True) for col in columns])
# Convert PySpark.Pandas DataFrame to PySpark DataFrame with the dynamically generated schema
df_spark = df_temp3.to_spark()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)

df_spark.createOrReplaceTempView('df_spark')
result = spark.sql('SELECT * FROM df_spark')
display(result.printSchema())

root
 |-- index: long (nullable = false)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- status: string (nullable = true)
 |-- vote_average: string (nullable = true)
 |-- credits: string (nullable = true)
 |-- IMDB_Rating: string (nullable = true)
 |-- RatingTomatometer: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: long (nullable = true)





None

# Criação das Dimensões

In [None]:

spark.sql('DROP TABLE IF EXISTS DIM_FILME')
filtro_FILME = df_spark.select("title").distinct()
filtro_FILME = filtro_FILME.withColumn("ID_FILME", F.monotonically_increasing_id())
dim_FILME = filtro_FILME.withColumnRenamed("title", "NOME")
dim_FILME.createOrReplaceTempView("DIM_FILME")
dim_FILME = spark.sql("""SELECT
    *
    FROM
    Dim_filme
    ;;""")


parquet_path = "/content/drive/MyDrive/tables"

dim_FILME.write.parquet(parquet_path, mode="overwrite")

dim_FILME.write.saveAsTable("DIM_FILME", format="parquet", mode="overwrite")

local_directory = "/content/drive/MyDrive/tables"

In [None]:
spark.sql('DROP TABLE IF EXISTS DIM_TEMPO')

filtro_TEMPO = df_spark.select("year").distinct().filter(F.col("year").isNotNull())

filtro_TEMPO = filtro_TEMPO.withColumn("id_TEMPO", F.monotonically_increasing_id())
dim_TEMPO = filtro_TEMPO.createOrReplaceTempView('DIM_TEMPO')
dim_TEMPO = spark.sql("Select * from DIM_TEMPO")


parquet_path = "/content/drive/MyDrive/tables"

dim_TEMPO.write.parquet(parquet_path, mode="overwrite")

dim_TEMPO.write.saveAsTable("DIM_TEMPO", format="parquet", mode="overwrite")

local_directory = "/content/drive/MyDrive/tables"
#dim_TEMPO.orderBy(F.desc("month"), "year","id_TEMPO").show(dim_TEMPO.count(),truncate=False)
'''

filtro_year.orderBy(F.desc("year"),).show(filtro_TEMPO.count(),truncate=False)
filtro_TEMPO = filtro_TEMPO(filtro_year)
'''
'''
filtro_TEMPO.orderBy(F.desc("month"), "year").show(filtro_TEMPO.count(),truncate=False)
'''
'''

year_granulati = filtro_TEMPO.select('year').distinct()
month_granulati = filtro_TEMPO.select('month').distinct()
'''
'''
distinct_year = (
    filtro_TEMPO
    .select("year")
    .distinct()
    .withColumn("month", F.lit(None).cast("int"))
)

distinct_month = (
    filtro_TEMPO
    .select("month")
    .distinct()
    .withColumn("year", F.lit(None).cast("int"))
)


# Union the DataFrames
result_df = distinct_year.unionAll(distinct_month).withColumn("index", F.monotonically_increasing_id())

# Show all rows in the result_df DataFrame
result_df.show(result_df.count(), truncate=False)
''''''
dim_TEMPO = (
    year_granulati
    .unionAll(month_granulati)
    .withColumn("index", F.monotonically_increasing_id())
)


dim_TEMPO = spark.sql("SELECT * FROM DIM_TEMPO")

dim_TEMPO.show(20)
'''

CPU times: user 309 ms, sys: 37.8 ms, total: 346 ms
Wall time: 1min 10s


'\ndistinct_year = (\n    filtro_TEMPO\n    .select("year")\n    .distinct()\n    .withColumn("month", F.lit(None).cast("int"))\n)\n\ndistinct_month = (\n    filtro_TEMPO\n    .select("month")\n    .distinct()\n    .withColumn("year", F.lit(None).cast("int"))\n)\n\n\n# Union the DataFrames\nresult_df = distinct_year.unionAll(distinct_month).withColumn("index", F.monotonically_increasing_id())\n\n# Show all rows in the result_df DataFrame\nresult_df.show(result_df.count(), truncate=False)\n\ndim_TEMPO = (\n    year_granulati\n    .unionAll(month_granulati)\n    .withColumn("index", F.monotonically_increasing_id())\n)\n\n\ndim_TEMPO = spark.sql("SELECT * FROM DIM_TEMPO")\n\ndim_TEMPO.show(20)\n'

In [None]:
spark.sql('DROP TABLE IF EXISTS DIM_GENERO')
filtro_GENRE = df_spark.select("genres").distinct()
dim_GENRE = filtro_GENRE.withColumnRenamed("genres", "GENEROS")
dim_GENRE = dim_GENRE.withColumn("ID_GENRES", F.monotonically_increasing_id())
dim_GENRE.createOrReplaceTempView("DIM_GENERO")
dim_GENRE = spark.sql("Select * from DIM_GENERO")



parquet_path = "/content/drive/MyDrive/tables"

dim_GENRE.write.parquet(parquet_path, mode="overwrite")

dim_GENRE.write.saveAsTable("DIM_GENERO", format="parquet", mode="overwrite")

local_directory = "/content/drive/MyDrive/tables"

In [None]:
spark.sql('DROP TABLE IF EXISTS DIM_IDIOM')

filtro_IDIOM = df_spark.select("original_language").distinct()
dim_IDIOM = filtro_IDIOM.withColumnRenamed("original_language", "LINGUA_ORIGINAL")
dim_IDIOM = dim_IDIOM.withColumn("ID_IDIOM", F.monotonically_increasing_id())
dim_IDIOM.createOrReplaceTempView("DIM_IDIOM")
dim_IDIOM = spark.sql("Select * from DIM_IDIOM")



parquet_path = "/content/drive/MyDrive/tables"

dim_IDIOM.write.parquet(parquet_path, mode="overwrite")

dim_IDIOM.write.saveAsTable("DIM_IDIOM", format="parquet", mode="overwrite")

local_directory = "/content/drive/MyDrive/tables"

# Criação da Fato

In [None]:
esquema_fato = StructType([
    StructField("ID_FILME", IntegerType(), False),
    StructField("id_TEMPO", IntegerType(), False),
    StructField("ID_GENRES", IntegerType(), False),
    StructField("ID_IDIOM", IntegerType(), False),
    StructField("nota_media_total", FloatType(), True),
    StructField("tempo_tela_por_genero", DoubleType(), True),
    StructField("saldo_por_ano", FloatType(), True)
])


fato = sc.createDataFrame([], schema=esquema_fato)

fato.createOrReplaceTempView("fato")

result = spark.sql("SELECT * FROM fato")

spark.sql('DROP TABLE IF EXISTS fatohive')

parquet_path = "/content/drive/MyDrive/table"

result.write.parquet(parquet_path, mode="overwrite")

result.write.saveAsTable("fatohive", format="parquet", mode="overwrite")

local_directory = "/content/drive/MyDrive/tables"


#result.show()

#fato.createOrReplaceTempView("fato")

#result = spark.sql("SELECT * FROM fato")

#result.show()

In [None]:
df_spark = df_spark.withColumn(
    "RatingTomatometer",
    F.expr("substring(RatingTomatometer, 1, 2)")
)

In [None]:
Filtra_FATO = df_spark.createOrReplaceTempView("JOIN_FT")
Filtra_FATO = spark.sql("""
    SELECT
        F.ID_FILME,
        F.NOME,
        G.ID_GENRES,
        G.GENEROS,
        T.id_TEMPO,
        T.year,
        I.ID_IDIOM,
        I.LINGUA_ORIGINAL,
        FT.budget,
        FT.revenue,
        FT.runtime,
        COALESCE(FT.vote_average, -1) as vote_average,
        COALESCE(FT.IMDB_Rating, -1) as IMDB_Rating,
        COALESCE(FT.RatingTomatometer, -1) as RatingTomatometer
    FROM
        JOIN_FT as FT, DIM_FILME AS F, DIM_TEMPO AS T, DIM_IDIOM AS I, DIM_GENERO AS G
    WHERE
        FT.title = F.NOME AND
        FT.original_language = I.LINGUA_ORIGINAL AND
        FT.genres = G.GENEROS AND
        FT.year = T.year
""")
Filtra_FATO.createOrReplaceTempView("new_fato")


In [None]:
Filtra_FATO = Filtra_FATO.withColumn("RatingTomatometer", F.col("RatingTomatometer").cast(FloatType()))
Filtra_FATO = Filtra_FATO.withColumn("IMDB_Rating", F.col("IMDB_Rating").cast(FloatType()))
Filtra_FATO = Filtra_FATO.withColumn("vote_average", F.col("vote_average").cast(FloatType()))
Filtra_FATO = Filtra_FATO.withColumn("runtime", F.col("runtime").cast(DoubleType()))
#Filtra_FATO = Filtra_FATO.withColumn("IMDB_Rating", F.col("IMDB_Rating").cast("float"))
#Filtra_FATO = Filtra_FATO.withColumn("vote_average", F.col("vote_average").cast("float"))

#Filtra_FATO["RatingTomatometer"] = Filtra_FATO["RatingTomatometer"]

#Filtra_FATO["RatingTomatometer"] = Filtra_FATO["RatingTomatometer"] / 10
#Filtra_FATO.head(1)

In [None]:
spark.catalog.listTables()

[Table(name='dim_filme', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='dim_genero', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='dim_idiom', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='dim_tempo', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='fatohive', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='df_spark', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='DIM_FILME', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='DIM_GENERO', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 

In [None]:
# Carga ID FILME, TEMPO, GENERO IDIOMA E MEDIA DO FILME NAS 3 BASES
nota_media_filme_completo = spark.sql(f""" SELECT ID_FILME,id_TEMPO,ID_GENRES,ID_IDIOM,AVG((vote_average + IMDB_Rating + (RatingTomatometer/10)) / 3) as nota_media_total
                              From new_fato
                              WHERE  vote_average > 0  AND IMDB_Rating > 0 AND RatingTomatometer > 0
                              GROUP BY ID_FILME,id_TEMPO,ID_GENRES,ID_IDIOM""")

nota_media_filme_completo.createOrReplaceTempView("nota_media_filme_completo")

In [None]:
tempo_tela_genero = spark.sql(f""" SELECT ID_GENRES,AVG(runtime) as tempo_tela_por_genero
                              From new_fato
                              GROUP BY ID_GENRES """)
tempo_tela_genero.createOrReplaceTempView("tempo_tela_genero")


In [None]:
#média de budget gasto por ano para cada genêro
saldo_ano = spark.sql(f""" SELECT id_TEMPO,SUM( revenue - budget) as saldo_por_ano
                              From new_fato
                              GROUP BY id_TEMPO""")
saldo_ano.createOrReplaceTempView("saldo_ano")

# Ideias de funções, desconsideradas (não há necessidade de rodas)

In [None]:
'''
nota_media_filme_completo
nota_media_filme_portal1

nota_media_idioma_completo
nota_media_idioma_12
nota_media_idioma_13
nota_media_idioma_1

nota_media_genero_completo
nota_media_genero_12
nota_media_genero_13
nota_media_genero_1

nota_media_ano_completo
nota_media_ano_12
nota_media_ano_13
nota_media_ano_1

tempo_tela_genero
tempo_tela_ano

saldo_por_ano
saldo_por_genero
saldo_por_genero_ano
saldo_por_idioma

tempo_tela_por_genero
saldo_por_ano
'''


'\nnota_media_filme_completo\nnota_media_filme_portal1\n\nnota_media_idioma_completo\nnota_media_idioma_12\nnota_media_idioma_13\nnota_media_idioma_1\n\nnota_media_genero_completo\nnota_media_genero_12\nnota_media_genero_13\nnota_media_genero_1\n\nnota_media_ano_completo\nnota_media_ano_12\nnota_media_ano_13\nnota_media_ano_1\n\ntempo_tela_genero\ntempo_tela_ano\n\nsaldo_por_ano\nsaldo_por_genero\nsaldo_por_genero_ano\nsaldo_por_idioma\n\ntempo_tela_por_genero\nsaldo_por_ano\n'

# Carga Fato

In [None]:
hive_table_name = 'fatohive'

# Cast the columns in nota_media_filme_completo to integer
nota_media_filme_completo_casted = nota_media_filme_completo.withColumn("ID_FILME", F.col("ID_FILME").cast("integer"))
nota_media_filme_completo_casted = nota_media_filme_completo_casted.withColumn("ID_GENRES", F.col("ID_GENRES").cast("integer"))
nota_media_filme_completo_casted = nota_media_filme_completo_casted.withColumn("ID_IDIOM", F.col("ID_IDIOM").cast("integer"))
nota_media_filme_completo_casted = nota_media_filme_completo_casted.withColumn("id_TEMPO", F.col("id_TEMPO").cast("integer"))

# Add two extra columns with null values
nota_media_filme_completo_casted = nota_media_filme_completo_casted.withColumn("tempo_tela_por_genero", F.lit(None).cast("double"))
nota_media_filme_completo_casted = nota_media_filme_completo_casted.withColumn("saldo_por_ano", F.lit(None).cast("float"))

# Insert into fatohive
nota_media_filme_completo_casted.write.mode("append").insertInto(hive_table_name)

# Perform the join with the Hive table
'''
# Drop the column 'nota_media_filme' from the joined data
joined_data_without_nota_media_filme = joined_data.drop('nota_media_filme')

# Select the desired columns for the insert
insert_data = joined_data_without_nota_media_filme.select("ID_FILME", "id_TEMPO", "ID_GENRES", "ID_IDIOM", "fatohive.tempo_tela", "fatohive.saldo", "nota_media_filme_completo.nota_media")

# Write the selected data into the Hive table
insert_data.write.mode("append").insertInto(hive_table_name)

# Confirm the data insertion by querying the Hive table
result_from_hive = spark.sql(f"SELECT * FROM {hive_table_name}")
result_from_hive.show() '''


'\n# Drop the column \'nota_media_filme\' from the joined data\njoined_data_without_nota_media_filme = joined_data.drop(\'nota_media_filme\')\n\n# Select the desired columns for the insert\ninsert_data = joined_data_without_nota_media_filme.select("ID_FILME", "id_TEMPO", "ID_GENRES", "ID_IDIOM", "fatohive.tempo_tela", "fatohive.saldo", "nota_media_filme_completo.nota_media")\n\n# Write the selected data into the Hive table\ninsert_data.write.mode("append").insertInto(hive_table_name)\n\n# Confirm the data insertion by querying the Hive table\nresult_from_hive = spark.sql(f"SELECT * FROM {hive_table_name}")\nresult_from_hive.show() '

In [None]:
spark.sql("""
    INSERT INTO fatohive (ID_GENRES, tempo_tela_por_genero)
    SELECT
        ID_GENRES,
        CAST(tempo_tela_por_genero AS DOUBLE) AS tempo_tela_por_genero
    FROM
        tempo_tela_genero
""")


In [None]:
# Cast the columns in saldo_por_ano
spark.sql("""
    INSERT INTO fatohive (id_TEMPO, saldo_por_ano)
    SELECT
        id_TEMPO,
        CAST(saldo_por_ano AS FLOAT) AS saldo_por_ano
    FROM
        saldo_ano
""")

# Consultas

In [None]:
#média de tempo de tela por gênero

top10_generos_mais_badalados_media = spark.sql("""
    SELECT
        g.GENEROS,
        MAX(fh.tempo_tela_por_genero) AS total_tela_media
    FROM
        fatohive fh
    JOIN
        DIM_GENERO g ON fh.ID_GENRES = g.ID_GENRES
    GROUP BY
        g.GENEROS
    ORDER BY
        total_tela_media DESC
    LIMIT 10
""")


display(top10_generos_mais_badalados_media.show(truncate=False))


+---------------------------------------------------+----------------+
|GENEROS                                            |total_tela_media|
+---------------------------------------------------+----------------+
|Documentary-Action-Adventure-Thriller-Drama-History|540.0           |
|Documentary-Action-Drama-Thriller                  |540.0           |
|Mystery-Fantasy-Crime                              |532.0           |
|Drama-Fantasy-Adventure-History                    |485.0           |
|Romance-Comedy-Music-Drama                         |457.2           |
|Drama-Horror-TV Movie-Thriller                     |449.0           |
|Documentary-Action-History                         |419.5           |
|Romance-Crime-TV Movie                             |416.0           |
|Romance-War-Adventure                              |377.0           |
|Science Fiction-Animation-Action-Horror            |360.0           |
+---------------------------------------------------+----------------+



None

In [None]:
anos_mais_lucrativos = spark.sql("""
    SELECT
        t.year,
        fh.saldo_por_ano
    FROM
        fatohive fh
    JOIN
        DIM_TEMPO t ON fh.id_TEMPO = t.id_TEMPO
    WHERE
         fh.saldo_por_ano > 0
    ORDER BY
        fh.saldo_por_ano DESC
    LIMIT 10
""")


display(anos_mais_lucrativos.show(truncate=False))

+----+-------------+
|year|saldo_por_ano|
+----+-------------+
|2017|2.27155476E10|
|2015|2.17749688E10|
|2018|2.1694038E10 |
|2016|2.11094815E10|
|2019|2.049442E10  |
|2014|1.90217073E10|
|2013|1.80652749E10|
|2012|1.79690598E10|
|2011|1.60923832E10|
|2009|1.53027133E10|
+----+-------------+



None

Leve em consideração que o banco de dados extraido possuia o mesmo filme mas com linguagem e nota diferente, portanto a avaliação do site leva em consideração a linguagem dos votantes e não a lingua original.

In [None]:
notas_by_idioma = spark.sql("""
    WITH RankedFilms AS (
        SELECT
            f.NOME,
            i.LINGUA_ORIGINAL,
            fh.nota_media_total,
            ROW_NUMBER() OVER (PARTITION BY i.LINGUA_ORIGINAL, f.NOME ORDER BY fh.nota_media_total DESC) AS rnk
        FROM
            DIM_FILME f
        JOIN
            fatohive fh ON f.ID_FILME = fh.ID_FILME
        JOIN
            DIM_IDIOM i ON fh.ID_IDIOM = i.ID_IDIOM
    )

    SELECT
        NOME,
        LINGUA_ORIGINAL,
        nota_media_total
    FROM
        RankedFilms
    WHERE
        rnk = 1
""")

m = notas_by_idioma.count()
display(notas_by_idioma.orderBy('nota_media_total', ascending=False).show(m, truncate=False))


+------------------------------------------------------------------------------------------------+---------------+----------------+
|NOME                                                                                            |LINGUA_ORIGINAL|nota_media_total|
+------------------------------------------------------------------------------------------------+---------------+----------------+
|The Godfather                                                                                   |en             |9.203333        |
|Amadeus                                                                                         |cs             |9.2             |
|Her                                                                                             |sq             |9.133333        |
|Sevince                                                                                         |en             |9.090667        |
|The Invisible Man                                                          

None

In [None]:
maior_nota_by_idioma = spark.sql("""
 WITH RankedFilms AS (
    SELECT
        f.NOME,
        i.LINGUA_ORIGINAL,
        fh.nota_media_total,
        ROW_NUMBER() OVER (PARTITION BY i.LINGUA_ORIGINAL ORDER BY fh.nota_media_total DESC) AS rnk
    FROM
        DIM_FILME f
    JOIN
        fatohive fh ON f.ID_FILME = fh.ID_FILME
    JOIN
        DIM_IDIOM i ON fh.ID_IDIOM = i.ID_IDIOM
)

SELECT
    NOME,
    LINGUA_ORIGINAL,
    nota_media_total
FROM
    RankedFilms
WHERE
    rnk = 1;
""")

m = maior_nota_by_idioma.count()
display(maior_nota_by_idioma.orderBy('nota_media_total', ascending=False).show(m, truncate=False))


+-------------------------------+---------------+----------------+
|NOME                           |LINGUA_ORIGINAL|nota_media_total|
+-------------------------------+---------------+----------------+
|The Godfather                  |en             |9.203333        |
|Amadeus                        |cs             |9.2             |
|Her                            |sq             |9.133333        |
|The Wizard of Oz               |ja             |8.933333        |
|Vertigo                        |it             |8.9             |
|A Good Marriage                |de             |8.733334        |
|Rocky                          |ta             |8.666667        |
|Persepolis                     |fr             |8.505           |
|Pulikkuthi Pandi               |es             |8.484333        |
|Rocky                          |hi             |8.466666        |
|Psycho                         |ar             |8.366667        |
|River Guard                    |ko             |8.3          

None

In [None]:
name_fil = input("Enter the movie name: ")

consulta_por_nome = spark.sql(f'''
    SELECT
        f.NOME,
        i.LINGUA_ORIGINAL,
        fh.nota_media_total
    FROM
        DIM_FILME f
    JOIN
        fatohive fh ON f.ID_FILME = fh.ID_FILME
    JOIN
        DIM_IDIOM i ON fh.ID_IDIOM = i.ID_IDIOM
    WHERE
        f.NOME = '{name_fil}';
''')

m = consulta_por_nome.count()
consulta_por_nome.show(m, truncate=False)

Enter the movie name: Her
+----+---------------+----------------+
|NOME|LINGUA_ORIGINAL|nota_media_total|
+----+---------------+----------------+
|Her |en             |8.420667        |
|Her |sq             |9.133333        |
+----+---------------+----------------+

