## **Informações contidas na base de dados**

**Base 1:**
1. ID do filme
2. título e ano de lançamento

**Base 2:**
1. Cust_Id: ID do customer que fez a avaliação
2. Rating: avaliação (nota)
3. Date: data da avaliação
4. Movie_Id: ID do filme

### Código para tratamento dos bancos dados com Pandas

#### Tratamento do banco de dados da base1 com o nome dos filmes e id de identificação

In [None]:
# Importando o Spark
from pyspark.sql import SparkSession

In [None]:
# Iniciando uma sessao Spark
spark = SparkSession.builder.appName('pyspark_treatment').getOrCreate()

In [None]:
# Importando a biblioteca time para pegar os tempos de execucao
import time

In [None]:
# Dicionario para salvar os tempos de execucao para o dataset movies.csv
execution_time_spk = {}

In [None]:
# Importando a base de dados dos filmes
t1 = time.time()
df_movies = spark.read.csv('movies.csv', sep=';', header=False, inferSchema=True)
tempo_exec = time.time() - t1
execution_time_spk['import_time_csv'] = tempo_exec
print(execution_time_spk)

In [None]:
# Visualizando a base de dados
df_movies.show()

In [None]:
# Transformando coluna de nomes e anos de lancamento misturados
# Coluna com nome do filme

# Primeiramente precisamos importar o pyspark.sql.functions
from pyspark.sql.functions import *

# Eliminando os parentes dos valores da coluna dos nomes dos filmes e das datas de lancamento
df_movies = df_movies.withColumn("_c1", regexp_replace(df_movies["_c1"], "\(|\)", ""))

# Separando coluna com split pela virgula
df_movies = df_movies.withColumn('splited_column', split(col('_c1'), ','))

# Pegando primeira parte da coluna splitada e criando a coluna com nome do filme
t1 = time.time()
df_movies = df_movies.withColumn('Movie_Name', col('splited_column')[0])
tempo_exec = time.time() - t1
execution_time_spk['new_column_movie'] = tempo_exec
print(execution_time_spk)

In [None]:
# Pegando segunda parte da coluna splitada e criando a coluna com ano de lancamento
t1 = time.time()
df_movies = df_movies.withColumn('Release_Year', col('splited_column')[1])
tempo_exec = time.time() - t1
execution_time_spk['new_column_release_year'] = tempo_exec
print(execution_time_spk)

In [None]:
# Visualizando o dataframe modificado
df_movies.show(5)

In [None]:
# Exclusao da coluna splitada
df_movies = df_movies.drop('splited_column')

In [None]:
# Exclusao da coluna inicial (_c1) com nome e data de lancamento
t1 = time.time()
df_movies = df_movies.drop('_c1')
tempo_exec = time.time() - t1
execution_time_spk['remove_initial_column'] = tempo_exec
print(execution_time_spk)

In [None]:
# Visualização do banco de dados 
df_movies.show(5)

In [None]:
# Renomeação da coluna de ID do filme com a mesma nomenclatura da base de dados de avaliações.
t1 = time.time()
df_movies = df_movies.withColumnRenamed('_c0', 'Movie_Id')
tempo_exec = time.time() - t1
execution_time_spk['rename_column_movie'] = tempo_exec
print(execution_time_spk)

In [None]:
# Informações sobre o dataframe do banco de dados de filmes.
df_movies.printSchema()

In [None]:
# Realizando a modificando do tipo de dado da coluna com a data de lançamento.
t1 = time.time()
df_movies = df_movies.withColumn('Release_Year', to_date(df_movies['Release_Year']))
tempo_exec = time.time() - t1
execution_time_spk['change_type_year'] = tempo_exec
print(execution_time_spk)

In [None]:
# Informações sobre o dataframe do banco de dados de filmes.
df_movies.printSchema()

In [None]:
# Visualização do dataframe com todas as alterações.
df_movies.show(5)

In [None]:
# Verificando se possui dados ausentes.
t1 = time.time()
column_names = ['Movie_Id', 'Movie_Name', 'Release_Year']
for column_name in column_names:
    null_count = df_movies.filter(df_movies[column_name].isNull()).count()
    print(f'{column_name}\t{null_count}')
tempo_exec = time.time() - t1
execution_time_spk['check_missing_data'] = tempo_exec
print(execution_time_spk)

In [None]:
# Verificando se possui linhas duplicadas na base de dados e removendo
t1 = time.time()
df_movies_without_duplicates = df_movies.dropDuplicates()
duplicated_rows = df_movies.count() - df_movies_without_duplicates.count()
tempo_exec = time.time() - t1
execution_time_spk['check_duplicate_lines'] = tempo_exec
print(execution_time_spk)
print(f'Linhas duplicadas: {duplicated_rows}')

In [None]:
# Verificando se possui id's duplicados.
t1 = time.time()
df_without_duplitate_id = df_movies.dropDuplicates(subset=['Movie_Id'])
count_df_without_duplitate_id = df_movies.count() - df_without_duplitate_id.count()
tempo_exec = time.time() - t1
execution_time_spk['check_duplicate_lines_ids'] = tempo_exec
print(execution_time_spk)
print(f'IDs duplicados: {count_df_without_duplitate_id}')

In [None]:
# Verificando se possui filmes com mesmo nome e mesmo ano de lançamento.
t1 = time.time()
df_group = df_movies.groupBy(df_movies.Movie_Name, df_movies.Release_Year).agg(count('*').alias('Count'))
df_duplicates = df_group.filter(df_group['Count'] > 1)
tempo_exec = time.time() - t1
execution_time_spk['check_duplicate_lines_movie_year'] = tempo_exec
print(execution_time_spk)
df_duplicates.show(5)

In [None]:
# Obtendo tabela com os registros do filme repetido
t1 = time.time()
df_duplicates_final = df_duplicates.join(df_movies, \
    on=((df_duplicates.Movie_Name == df_movies.Movie_Name) & \
        (df_duplicates.Release_Year == df_movies.Release_Year)))
tempo_exec = time.time() - t1
execution_time_spk['duplicate_line_dr_quinn'] = tempo_exec
print(execution_time_spk)
df_duplicates_final.show()

Observação: Apesar de existir dois registros com o mesmo nome (Dr. Quinn) e mesmo ano de lançamento (1993), realizando uma busca na internet, foi verificado que no ano de 1993 foram lançados duas temporadas de Dr. Quinn. Dessa forma, os registros estão se referindo a temporadas diferentes, não sendo considerado um registro duplicado.

#### Tratamento do banco de dados da base2 com as avaliações dos usuários

In [None]:
# Criando dicionario para salvar tempos de execucao
execution_time2_spk = {}

In [None]:
# Importando a segunda base de dados
t1 = time.time()
df_customers_rating = spark.read.csv('customers_rating.csv', sep=';', header=True, inferSchema=True)
tempo_exec = time.time() - t1
execution_time2_spk['import_time_csv'] = tempo_exec
print(execution_time2_spk)

In [None]:
# Visualizacao do dataframe
df_customers_rating.show(5)

In [None]:
# Verificando se possui dados ausentes.
t1 = time.time()
null_counts = df_customers_rating.select([count(when(col(c).isNull(), c)).alias(c) for c in df_customers_rating.columns])
tempo_exec = time.time() - t1
execution_time2_spk['check_missing_data'] = tempo_exec
print(execution_time2_spk)
null_counts.show()

In [None]:
# Informações sobre o dataframe. 
df_customers_rating.printSchema()

In [None]:
# Realizando a converteção da coluna de Date para datetime. Importante trabalhar com essa coluna no formato de data.
t1 = time.time()
df_customers_rating = df_customers_rating.withColumn('Date', to_date(df_customers_rating['Date']))
tempo_exec = time.time() - t1
execution_time2_spk['change_type_date'] = tempo_exec
print(execution_time2_spk)

In [None]:
# Visualizacao do dataframe
df_customers_rating.show(5)

In [None]:
# Confirmando a mudança da coluna Date
df_customers_rating.printSchema()

In [None]:
# Verificação se possui avaliações duplicadas de clientes, 
# considerando mesmo id do cliente, id do filme e avaliação
t1 = time.time()
df2_group = df_customers_rating.groupBy(df_customers_rating.Cust_Id, \
    df_customers_rating.Movie_Id, df_customers_rating.Rating).agg(count('*').alias('Count'))
df2_duplicates = df2_group.filter(df2_group['Count'] > 1)
tempo_exec = time.time() - t1
execution_time2_spk['check_duplicate_lines_movie_year'] = tempo_exec
print(execution_time2_spk)
df2_duplicates.show(5)

### **Criando um novo dataframe com join do df_movies e df_customers_rating**

In [None]:
df_customers_rating.printSchema()
df_movies.printSchema()

In [None]:
# Como a análise será realizada apenas com os filmes que possuem avaliações de clientes, 
# será realizado um join considerando a interseção entre as tabelas.
t1 = time.time()
df_join_movies_rating = df_movies.join(df_customers_rating, \
    on=(df_movies.Movie_Id == df_customers_rating.Movie_Id), \
        how='inner').drop('Movie_Id')
tempo_exec = time.time() - t1
execution_time2_spk['merge_tables'] = tempo_exec
print(execution_time2_spk)

In [None]:
# Visualizacao do dataframe
df_join_movies_rating.show(5)

In [None]:
# Quantidade de registros no dataframe
df_join_movies_rating.count()

In [None]:
"""
# Load dos dados (salvamento)
# Delta
t1 = time.time()
df_join_movies_rating.write.format("delta").mode("overwrite").save('./final_data.delta')
tempo_exec = time.time() - t1
execution_time2_spk['final_load_delta'] = tempo_exec
print(execution_time2_spk)
"""

In [None]:
"""
# Load dos dados (salvamento)
# parquet
t1 = time.time()
df_join_movies_rating.write.format("parquet").mode("overwrite").save('./final_data.parquet')
tempo_exec = time.time() - t1
execution_time2_spk['final_load_parquet'] = tempo_exec
print(execution_time2_spk)
"""

In [None]:
"""
# Load dos dados (salvamento)
# csv
t1 = time.time()
df_join_movies_rating.write.format("csv").mode("overwrite").save('./final_data.csv')
tempo_exec = time.time() - t1
execution_time2_spk['final_load_csv'] = tempo_exec
print(execution_time2_spk)
"""

In [None]:
# Salvar os dicionarios com os tempos de execucao para criar o grafico (formato json)
import json

with open('execution_time_spk.json', 'w') as f:
    json.dump(execution_time_spk, f)

with open('execution_time2_spk.json', 'w') as f:
    json.dump(execution_time2_spk, f)