### Criação do jog spark no AWS GLUE para criar as tabelas fato e dimensão a partir dos arquivos parquet da camada Trusted (TRT) e inserindo na camada Refined (REF) em suas respectivas pastas.

In [None]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

from pyspark.sql import SparkSession
from pyspark.sql.functions import row_number, array_distinct, collect_list, when
from pyspark.sql.window import Window
from pyspark.sql.functions import datediff, to_date, current_date, col
from pyspark.sql.functions import row_number, sum, round, avg, min, max, when


spark = SparkSession.builder.appName("GlueJob").getOrCreate()

# Lendo os dados dos buckets do S3
movies_parquet = spark.read.parquet("s3://projeto-compass-filmes/TRT/Movies/movies.parquet/")
dt_2023_06_26 = spark.read.parquet("s3://projeto-compass-filmes/TRT/franquias.parquet/dt=2023-06-26/")


# Criando a tabela dim_generos
dim_generos = dt_2023_06_26.select(dt_2023_06_26["gêneros"].alias("nome_genero")) \
    .distinct() \
    .select(
        row_number().over(Window.orderBy("nome_genero")).alias("id_genero"),
        "nome_genero"
    )

# Criando a tabela dim_produtoras
dim_produtoras = dt_2023_06_26.select(dt_2023_06_26["produzido por"].alias("nome_produtora")) \
    .distinct() \
    .select(
        row_number().over(Window.orderBy("nome_produtora")).alias("id_produtora"),
        "nome_produtora"
    )

# Criando a tabela dim_filmes
dim_filmes = dt_2023_06_26.join(movies_parquet, dt_2023_06_26["id imdb"] == movies_parquet.id) \
    .select(
        dt_2023_06_26["id imdb"].alias("id_filme"),
        dt_2023_06_26["título"].alias("titulo_por"),
        movies_parquet["tituloPincipal"].alias("titulo_ing"),
        dt_2023_06_26["média de votos"].alias("media_votos"),
        dt_2023_06_26.Popularidade.alias("popularidade"),
        dt_2023_06_26["orçamento"].alias("orcamento"),
        dt_2023_06_26.Receita.alias("receita"),
        (dt_2023_06_26.Receita - dt_2023_06_26["orçamento"]).alias("lucro"),
        dt_2023_06_26["data de lançamento"].alias("data_lancamento"),
        dt_2023_06_26["duração (min)"].alias("duracao_min")
    )


# Renomeando as colunas ambíguas
dt_2023_06_26 = dt_2023_06_26.select(
    col("id imdb").alias("id_imdb_dt"),
    col("gêneros").alias("gêneros_dt"),
    col("produzido por").alias("produzido_por_dt")
)


# Criando a tabela dim_atores
dim_atores = movies_parquet.filter(movies_parquet.anoNascimento != '\\N') \
    .groupBy(movies_parquet.nomeArtista, movies_parquet.anoNascimento, movies_parquet.anoFalecimento) \
    .agg(
        array_distinct(collect_list(movies_parquet.personagem)).alias("personagens"),
        array_distinct(collect_list(movies_parquet.profissao)).alias("profissoes"),
        row_number().over(Window.orderBy(movies_parquet.anoNascimento.desc())).alias("id_ator"),
        when(movies_parquet.anoFalecimento == '\\N', datediff(current_date(), to_date(movies_parquet.anoNascimento, 'yyyy'))) \
            .otherwise(datediff(to_date(movies_parquet.anoFalecimento, 'yyyy'), to_date(movies_parquet.anoNascimento, 'yyyy'))) \
            .alias("idade")
    ) \
    .select(
        movies_parquet.nomeArtista,
        "id_ator",
        "personagens",
        "profissoes",
        movies_parquet.anoNascimento,
        movies_parquet.anoFalecimento,
        "idade"
    )

# Criando a tabela fato_franquias
ff = dim_filmes.select(
    "*",
    when(dim_filmes.titulo_por.isin([
        'Alien: O Oitavo Passageiro', 'Aliens: O Resgate', 'Alien 3', 'Alien: A Ressurreição', 'Prometheus', 'Alien: Covenant'
    ]), "Alien")
    .when(dim_filmes.titulo_por.isin([
        'Mad Max', 'Mad Max 2: A Caçada Continua', 'Mad Max: Além da Cúpula do Trovão', 'Mad Max: Estrada da Fúria'
    ]), "Mad Max")
    .when(dim_filmes.titulo_por.isin([
        'Guerra nas Estrelas', 'Guerra nas Estrelas: O Império Contra-Ataca', 'Guerra nas Estrelas: O Retorno de Jedi',
        'Star Wars: Episódio I - A Ameaça Fantasma', 'Star Wars: Episódio II - Ataque dos Clones',
        'Star Wars: Episódio III - A Vingança dos Sith', 'Star Wars: O Despertar da Força',
        'Star Wars: Episódio VIII - Os Últimos Jedi', 'Star Wars: Episódio IX - A Ascensão Skywalker'
    ]), "Star Wars")
    .when(dim_filmes.titulo_por.isin([
        'De Volta para o Futuro', 'De Volta para o Futuro II', 'De Volta para o Futuro III'
    ]), "De Volta para o Futuro")
    .when(dim_filmes.titulo_por.isin([
        'O Exterminador do Futuro', 'O Exterminador do Futuro 2: O Julgamento Final',
        'O Exterminador do Futuro 3: A Rebelião das Máquinas', 'O Exterminador do Futuro 4: A Salvação',
        'O Exterminador do Futuro: Gênesis', 'O Exterminador do Futuro: Destino Sombrio'
    ]), "Exterminador")
    .when(dim_filmes.titulo_por.isin([
        'Matrix', 'Matrix Reloaded', 'Matrix Revolutions', 'Matrix Resurrections'
    ]), "Matrix")
    .when(dim_filmes.titulo_por.isin([
        'O Senhor dos Anéis: A Sociedade do Anel', 'O Senhor dos Anéis: As Duas Torres',
        'O Senhor dos Anéis: O Retorno do Rei', 'O Hobbit: Uma Jornada Inesperada',
        'O Hobbit: A Desolação de Smaug', 'O Hobbit: A Batalha dos Cinco Exércitos'
    ]), "Senhor dos Anéis")
    .otherwise(None)
    .alias("nome_franquia")
)

# Criando a tabela fato_franquias
fato_franquias = ff.join(dim_atores, movies_parquet.nomeArtista == dim_atores.nomeArtista) \
    .join(dt_2023_06_26, dt_2023_06_26["id_imdb_dt"] == dim_filmes.id_filme) \
    .join(dim_generos, dt_2023_06_26["gêneros_dt"] == dim_generos.nome_genero) \
    .join(dim_produtoras, dt_2023_06_26["produzido_por_dt"] == dim_produtoras.nome_produtora) \
    .groupBy(ff.nome_franquia, dim_atores.id_ator, dim_filmes.id_filme, dim_generos.id_genero, dim_produtoras.id_produtora) \
    .agg(
        row_number().over(Window.orderBy(ff.nome_franquia)).alias("id_franquia"),
        ff.nome_franquia,
        sum(ff.orcamento).alias("orcamento"),
        sum(ff.receita).alias("receita"),
        (sum(ff.receita) - sum(ff.orcamento)).alias("lucro"),
        round(avg(ff.media_votos), 2).alias("media_votos"),
        round(avg(ff.popularidade), 2).alias("popularidade"),
        datediff(
            to_date(max(ff.data_lancamento), "yyyy-MM-dd"),
            to_date(min(ff.data_lancamento), "yyyy-MM-dd")
        ).alias("tempo_franquia"),
        dim_atores.id_ator,
        dim_filmes.id_filme,
        dim_generos.id_genero,
        dim_produtoras.id_produtora
    )


# Renomeando as colunas duplicadas
fato_franquias = fato_franquias.select(
    col("id_franquia"),
    col("nome_franquia"),
    col("orcamento"),
    col("receita"),
    col("lucro"),
    col("media_votos"),
    col("popularidade"),
    col("tempo_franquia"),
    col("id_ator").alias("id_ator_ff"),
    col("id_filme").alias("id_filme_ff"),
    col("id_genero").alias("id_genero_ff"),
    col("id_produtora").alias("id_produtora_ff")
)


# Salvando os dados das tabelas em formato Parquet na camada refined do S3
dim_atores.write.parquet("s3://projeto-compass-filmes/REF/dim_atores")
dim_generos.write.parquet("s3://projeto-compass-filmes/REF/dim_generos")
dim_produtoras.write.parquet("s3://projeto-compass-filmes/REF/dim_produtoras")
dim_filmes.write.parquet("s3://projeto-compass-filmes/REF/dim_filmes")
fato_franquias.write.parquet("s3://projeto-compass-filmes/REF/fato_franquias")


job.commit()