In [1]:
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable


In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql import functions as F
from datetime import datetime
from pyspark.sql.functions import col, substring, regexp_replace, expr, size, levenshtein, avg, when, format_number, sum, lower


spark = SparkSession \
    .builder \
    .master("local[*]")\
    .appName("ELT Refined") \
    .getOrCreate()


/bin/hadoop


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/apache-spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/hadoop/common/lib/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:

path1 = f"/mnt/wsl/PHYSICALDRIVE2/Projects/Compasso/Azimute/Sprint_9/Desafio03_ETL/Tarefa_03/assets/Local/"
df_movies_IMDB_trusted = spark.read.parquet(path1 + 'movies_IMDB_trusted.parquet')
df_series_IMDB_trusted = spark.read.parquet(path1 + 'series_IMDB_trusted.parquet') 

path2 = f"/mnt/wsl/PHYSICALDRIVE2/Projects/Compasso/Azimute/Sprint_9/Desafio03_ETL/Tarefa_03/assets/TMDB/"
df_movies_TMDB_trusted = spark.read.parquet(path2 + 'movies_TMDB_trusted.parquet')
df_series_TMDB_trusted = spark.read.parquet(path2 + 'series_TMDB_trusted.parquet')


                                                                                

## 2. Normalização
#### 2.1. Cria tabela movies IMDB

In [4]:
df_movies_IMDB = df_movies_IMDB_trusted.select("id", "tituloPrincipal", "tituloOriginal", "anoLancamento", "tempoMinutos", "genero", "notaMedia", "numeroVotos")
df_movies_IMDB = df_movies_IMDB.dropDuplicates(['id'])  # Remover duplicatas pelo ID
df_movies_IMDB = df_movies_IMDB.withColumnRenamed("id", "movie_id")  # Renomear a coluna "id" para "movie_id"

#### 2.2. Cria tabela series IMDB

In [5]:
df_series_IMDB = df_series_IMDB_trusted.select("id", "tituloPrincipal", "tituloOriginal", "anoLancamento", "anoTermino", "tempoMinutos", "genero", "notaMedia", "numeroVotos")
df_series_IMDB = df_series_IMDB.dropDuplicates(['id'])  # Remover duplicatas pelo ID
df_series_IMDB = df_series_IMDB.withColumnRenamed("id", "seriesId")  # Renomear a coluna "id" para "movie_id"

## 3.Junção 

### 3.1 Unir DF dos FILMES

In [6]:
# Aplicar regexp_replace no dataframe df_movies_TMDB
df_movies_TMDB_titulos_datas = df_movies_TMDB_trusted.select(
    regexp_replace(col('title'), r"[^a-zA-Z0-9\s]", "").alias('title_replaced'),
    substring(col('release_date'), 1, 4).alias('releaseDate'),
    col('id'),
    col('genre_ids'),
    col('original_language'),
    col('popularity'),
    col('vote_average'),
    col('vote_count'),
    col('title'),
    col('release_date')
)

# Aplicar regexp_replace no dataframe df_movies_IMDB
df_movies_IMDB_titulos_datas = df_movies_IMDB.select(
    regexp_replace(col('tituloPrincipal'), r"[^a-zA-Z0-9\s]", "").alias('tituloPrincipal_replaced'),
    substring(col('anoLancamento'), 1, 4).alias('anoLancamento'),
    col('movie_id'),
    col('tituloOriginal'),
    col('tempoMinutos'),
    col('genero'),
    col('notaMedia'),
    col('numeroVotos')
)

# Remover a sentença "Final Chapter Part" do dataframe df_movies_IMDB
df_movies_IMDB_titulos_datas = df_movies_IMDB_titulos_datas.withColumn('tituloPrincipal_replaced', regexp_replace(col('tituloPrincipal_replaced'), 'Final Chapter Part', '')
)

# Realizar a comparação de similaridade entre os títulos dos dataframes e filtrar pelo ano de lançamento
df_movies_refined = df_movies_TMDB_titulos_datas.join(
    df_movies_IMDB_titulos_datas,
    expr(
        "(size(split(title_replaced, ' ')) = 1 AND " +
        "size(filter(split(title_replaced, ' '), x -> length(x) > 3 AND instr(tituloPrincipal_replaced, x) > 0)) >= 1) OR " +
        "(size(split(title_replaced, ' ')) = 2 AND " +
        "size(filter(split(title_replaced, ' '), x -> length(x) >= 1 AND instr(tituloPrincipal_replaced, x) > 0)) >= 2) OR " +
        "(size(split(title_replaced, ' ')) = 3 AND " +
        "size(filter(split(title_replaced, ' '), x -> length(x) > 4 AND instr(tituloPrincipal_replaced, x) > 0)) >= 2) OR " +
        "(size(split(title_replaced, ' ')) = 4 AND " +
        "size(filter(split(title_replaced, ' '), x -> length(x) > 3 AND instr(tituloPrincipal_replaced, x) > 0)) >= 3) OR " +
        "(size(split(title_replaced, ' ')) >= 5 AND " +
        "size(filter(split(title_replaced, ' '), x -> length(x) >= 3 AND instr(tituloPrincipal_replaced, x) > 0)) >= 3 AND " +
        "levenshtein(tituloPrincipal_replaced, title_replaced) <= 9)"
    ) &
    (df_movies_TMDB_titulos_datas['releaseDate'] == df_movies_IMDB_titulos_datas['anoLancamento']) &
    (col('tituloPrincipal_replaced').isNotNull()) &
    (col('tituloPrincipal_replaced') != '') &
    (col('title_replaced').isNotNull()) &
    (col('title_replaced') != ''),
    'left_outer'
)

In [7]:
# Calcular a soma de vote_count e numeroVotos (se não forem nulos)
df_movies_refined = df_movies_refined.withColumn('votos', when(
    col('vote_count').isNotNull() & col('numeroVotos').isNotNull(),
    col('vote_count') + col('numeroVotos')
).otherwise(
    when(col('vote_count').isNotNull(), col('vote_count')).otherwise(col('numeroVotos'))
))

# Calcular a média de vote_average e notaMedia (se não forem nulos)
df_movies_refined = df_movies_refined.withColumn('nota_media', when(
    col('vote_average').isNotNull() & col('notaMedia').isNotNull(),
    format_number((col('vote_average') + col('notaMedia')) / 2, 1)
).otherwise(
    when(col('vote_average').isNotNull(), format_number(col('vote_average'), 1)).otherwise(format_number(col('notaMedia'), 1))
))

# Apagar as colunas indesejadas
df_movies_refined = df_movies_refined.drop('releaseDate', 'tituloPrincipal_replaced', 'anoLancamento', 'title_replaced','vote_average','vote_count','notaMedia','numeroVotos','tituloOriginal','genero')

# Selecione apenas as colunas desejadas
df_movies_refined = df_movies_refined.select('id','movie_id','genre_ids', 'title', 'tempoMinutos', 'release_date', 'original_language', 'popularity', 'nota_media', 'votos')

df_movies_refined = df_movies_refined.distinct()


#### 3.2. Filtrar Series TMDB por Filme do TMDB (antes de unir DF SERIES)

In [8]:
# Aplicar regexp_replace no dataframe df_series_TMDB

df_series_TMDB_titulos_datas = df_series_TMDB_trusted.select(
    substring(col('first_air_date'), 1, 4).alias('releaseDate'),
    col('name_replaced'),
    col('id'),
    col('genre_ids'),
    col('original_language'),
    col('popularity'),
    col('vote_average'),
    col('vote_count'),
    col('name'),
    col('first_air_date')
)

# dataframe df_movies_TMDB
df_movies_TMDB_titulos_datas = df_movies_TMDB_titulos_datas.select(col('title_replaced'))

# Realizar a comparação de similaridade entre os títulos dos dataframes e filtrar pelo ano de lançamento
df_series_TMDB_Layer1 = df_series_TMDB_titulos_datas.join(
    df_movies_TMDB_titulos_datas,
    expr(
        "(size(split(name_replaced, ' ')) = 1 AND " +
        "size(filter(split(name_replaced, ' '), x -> length(x) > 3 AND instr(title_replaced, x) > 0)) >= 1) OR " +
        "(size(split(name_replaced, ' ')) = 2 AND " +
        "size(filter(split(name_replaced, ' '), x -> length(x) >= 1 AND instr(title_replaced, x) > 0)) >= 2) OR " +
        "(size(split(name_replaced, ' ')) = 3 AND " +
        "size(filter(split(name_replaced, ' '), x -> length(x) > 4 AND instr(title_replaced, x) > 0)) >= 2) OR " +
        "(size(split(name_replaced, ' ')) = 4 AND " +
        "size(filter(split(name_replaced, ' '), x -> length(x) > 3 AND instr(title_replaced, x) > 0)) >= 3) OR " +
        "(size(split(name_replaced, ' ')) >= 5 AND " +
        "size(filter(split(name_replaced, ' '), x -> length(x) >= 3 AND instr(title_replaced, x) > 0)) >= 3 AND " +
        "levenshtein(title_replaced, name_replaced) <= 8)"
    ) &
    (col('title_replaced').isNotNull()) &
    (col('title_replaced') != '') &
    (col('name_replaced').isNotNull()) &
    (col('name_replaced') != ''),
    'inner'
)

# Apagar as colunas indesejadas
df_series_TMDB_Layer1 = df_series_TMDB_Layer1.drop('title_replaced')

#### 3.1 Unir DF das SERIES

In [9]:
# Aplicar regexp_replace no dataframe df_movies_IMDB
df_series_IMDB_titulos_datas = df_series_IMDB.select(
    regexp_replace(col('tituloPrincipal'), r"[^a-zA-Z0-9\s]", "").alias('tituloPrincipal_replaced'),
    substring(col('anoLancamento'), 1, 4).alias('anoLancamento'),
    col('seriesId'),
    col('tituloPrincipal'),
    col('tempoMinutos'),
    col('anoTermino'),
    col('notaMedia'),
    col('numeroVotos')
)

# Realizar a comparação de similaridade entre os títulos dos dataframes e filtrar pelo ano de lançamento
df_series_refined = df_series_TMDB_Layer1.join(
    df_series_IMDB_titulos_datas,
    expr(
        "(size(split(name_replaced, ' ')) = 1 AND " +
        "size(filter(split(name_replaced, ' '), x -> length(x) > 3 AND instr(tituloPrincipal_replaced, x) > 0)) >= 1) OR " +
        "(size(split(name_replaced, ' ')) = 2 AND " +
        "size(filter(split(name_replaced, ' '), x -> length(x) >= 1 AND instr(tituloPrincipal_replaced, x) > 0)) >= 2) OR " +
        "(size(split(name_replaced, ' ')) = 3 AND " +
        "size(filter(split(name_replaced, ' '), x -> length(x) > 4 AND instr(tituloPrincipal_replaced, x) > 0)) >= 2) OR " +
        "(size(split(name_replaced, ' ')) = 4 AND " +
        "size(filter(split(name_replaced, ' '), x -> length(x) > 3 AND instr(tituloPrincipal_replaced, x) > 0)) >= 3) OR " +
        "(size(split(name_replaced, ' ')) >= 5 AND " +
        "size(filter(split(name_replaced, ' '), x -> length(x) >= 3 AND instr(tituloPrincipal_replaced, x) > 0)) >= 3 AND " +
        "levenshtein(tituloPrincipal_replaced, name_replaced) <= 9)"
    ) &
    (df_series_TMDB_Layer1['releaseDate'] == df_series_IMDB_titulos_datas['anoLancamento']) &
    (col('tituloPrincipal_replaced').isNotNull()) &
    (col('tituloPrincipal_replaced') != '') &
    (col('name_replaced').isNotNull()) &
    (col('name_replaced') != ''),
    'left_outer'
)


In [10]:
# Calcular a soma de vote_count e numeroVotos (se não forem nulos)
df_series_refined = df_series_refined.withColumn('votos', when(
    col('vote_count').isNotNull() & col('numeroVotos').isNotNull(),
    col('vote_count') + col('numeroVotos')
).otherwise(
    when(col('vote_count').isNotNull(), col('vote_count')).otherwise(col('numeroVotos'))
))

# Calcular a média de vote_average e notaMedia (se não forem nulos)
df_series_refined = df_series_refined.withColumn('nota_media', when(
    col('vote_average').isNotNull() & col('notaMedia').isNotNull(),
    format_number((col('vote_average') + col('notaMedia')) / 2, 1)
).otherwise(
    when(col('vote_average').isNotNull(), format_number(col('vote_average'), 1)).otherwise(format_number(col('notaMedia'), 1))
))

# Apagar as colunas indesejadas
df_series_refined = df_series_refined.drop('releaseDate', 'tituloPrincipal_replaced', 'releaseDate', 'name_replaced','vote_average','vote_count','notaMedia','numeroVotos','tituloPrincipal','anoLancamento')

# Selecione apenas as colunas linhas desejadas
df_series_refined = df_series_refined.select('id','seriesId','genre_ids', 'name', 'tempoMinutos', 'first_air_date', 'anoTermino', 'original_language', 'popularity', 'nota_media', 'votos')
df_series_refined = df_series_refined.distinct()


## 4. Cria tabela actors  

In [28]:

# Cria DF Actors
df_movies_actors = df_movies_IMDB_trusted.select("id", "personagem", "nomeArtista", "generoArtista", "anoNascimento", "anoFalecimento", "profissao", "titulosMaisConhecidos")
df_series_actors = df_series_IMDB_trusted.select("id", "personagem", "nomeArtista", "generoArtista", "anoNascimento", "anoFalecimento", "profissao", "titulosMaisConhecidos")


# Obter a lista de film_ids presentes na tabela de filmes selecionados
filme_ids = df_movies_refined.select("movie_id")
serie_ids = df_series_refined.select("seriesId")


# Realizar o join entre os DataFrames
df_movies_actors = df_movies_actors.join(filme_ids, df_movies_actors["id"] == filme_ids["movie_id"], "inner")
df_series_actors = df_series_actors.join(serie_ids, df_series_actors["id"] == serie_ids["seriesId"], "inner")

# Apaga coluna desnecessaria
df_movies_actors = df_movies_actors.drop("movie_id")
df_series_actors = df_series_actors.drop("seriesId")

# Apaga dados repetidos
df_movies_actors = df_movies_actors.distinct()
df_series_actors = df_series_actors.distinct()

# Unir df Atores de filmes e series
df_actors = df_movies_actors.unionAll(df_series_actors)
df_actors = df_series_actors.withColumn("IdArtista", F.monotonically_increasing_id())  # Adicionar uma coluna de ID único para os artistas


## 5. Cria Views  

In [None]:
df_movies_refined.createOrReplaceTempView("movies")
df_series_refined.createOrReplaceTempView("series")
df_actors.createOrReplaceTempView("actors")

## 5. Escreve os dados na camada Refined no S3 no formato parquet


In [None]:

# Obtém a data atual para criar os diretórios correspondentes
current_date = datetime.now()
ano = current_date.strftime("%Y")
mes = current_date.strftime("%m")
dia = current_date.strftime("%d")

# Define os caminhos de destino no S3 para os DataFrames no formato Parquet
path_ref_movies = "s3://data-lake-do-fabricio/Refined/Filmes/{ano}/{mes}/{dia}/"
path_ref_series = "s3://data-lake-do-fabricio/Refined/Series/{ano}/{mes}/{dia}/"
path_ref_actors = "s3://data-lake-do-fabricio/Refined/Actors/{ano}/{mes}/{dia}/"

# Salva os DataFrames no formato Parquet no S3
df_movies_refined.write.parquet(path_ref_movies.format(ano=ano, mes=mes, dia=dia))
df_series_refined.write.parquet(path_ref_series.format(ano=ano, mes=mes, dia=dia))
df_actors.write.parquet(path_ref_actors.format(ano=ano, mes=mes, dia=dia))

