<a href="https://colab.research.google.com/github/BrunoBmassa/Python-Actitivies/blob/main/otimizacao.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

In [2]:
import pyspark
import gdown
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [3]:
spark= SparkSession.builder.getOrCreate()

In [None]:
#Leia o arquivo 'videos-preparados.snappy.parquet' no dataframe 'df_video'

df_video_url='https://drive.google.com/file/d/1A1iKcyCfaa0ObhTVN-3EfhiALnB0aCN0/view?usp=drive_link'
# Extração do arquivo pela URL
file_id = df_video_url.split('/')[-2]
# Download do arquivo
output_file = 'videos-preparados.snappy.parquet'
gdown.download(id=file_id, output=output_file, quiet=False)
#Leitura do arquivo inferindo o esquema
df_video = spark.read.parquet(output_file)


In [None]:
#Leia o arquivo 'videos-preparados.snappy.parquet' no dataframe 'df_video'

df_comments_url='https://drive.google.com/file/d/1nfM60gLkNeR_sMPQC8O3fvySdovEbrlt/view?usp=drive_link'
# Extração do arquivo pela URL
file_id2 = df_comments_url.split('/')[-2]
# Download do arquivo
output_file2 = 'video-comments-tratados.snappy.parquet'
gdown.download(id=file_id2, output=output_file2, quiet=False)
#Leitura do arquivo inferindo o esquema
df_comments = spark.read.parquet(output_file2)

In [13]:
# Renomear colunas para evitar espaços e inconsistências
df_video_cleaned = df_video.withColumnRenamed("Video ID", "video_id")
df_comments_cleaned = df_comments.withColumnRenamed("Video ID", "video_id")

In [14]:
#Crie tabelas temporárias para ambos os dataframe

df_video_cleaned.createOrReplaceTempView('video_table')
df_comments_cleaned.createOrReplaceTempView('comments_table')

In [None]:
#Faça um join das tabelas criadas anteriormente utilizando o spark.sql no dataframe ‘join_video_comments

join_video_comments = spark.sql("""
    SELECT v.*, c.*
    FROM video_table v
    JOIN comments_table c
    ON v.video_id = c.video_id
""")

join_video_comments.show()

In [16]:
#Faça as mesmas etapas anteriores (1,2,3,4) utilizando repartition e coalesce

# Renomeando a coluna 'Video ID' para 'video_id' no DataFrame df_video
df_video_cleaned = df_video.withColumnRenamed("Video ID", "video_id")
# Renomeando a coluna 'Video ID' para 'video_id' no DataFrame df_comments
df_comments_cleaned = df_comments.withColumnRenamed("Video ID", "video_id")


In [17]:
##  Realizar o JOIN com reparticionamento
df_video_repartition = df_video_cleaned.repartition(10)
df_comments_repartition = df_comments_cleaned.repartition(10)

In [18]:
# Criar tabelas temporárias novamente
df_video_repartition.createOrReplaceTempView("video_table_repartition")
df_comments_repartition.createOrReplaceTempView("comments_table_repartition")

In [None]:
#  JOIN com reparticionamento
join_video_comments_repartition = spark.sql("""
    SELECT v.*, c.*
    FROM video_table_repartition v
    JOIN comments_table_repartition c
    ON v.video_id = c.video_id
""")

join_video_comments_repartition.show()

In [20]:
#  Aplicar coalesce
df_video_coalesce = df_video_cleaned.coalesce(2)
df_comments_coalesce = df_comments_cleaned.coalesce(2)

In [21]:
#  Criar tabelas temporárias para os DataFrames coalesce
df_video_coalesce.createOrReplaceTempView("video_table_coalesce")
df_comments_coalesce.createOrReplaceTempView("comments_table_coalesce")

In [None]:
#  Realizar o JOIN utilizando as tabelas temporárias coalesce
join_video_comments_coalesce = spark.sql("""
    SELECT v.*, c.*
    FROM video_table_coalesce v
    JOIN comments_table_coalesce c
    ON v.video_id = c.video_id
""")
join_video_comments_coalesce.show()

In [None]:
#Utilize o explain para entender melhor as duas formas de realizar as etapas e refaça novamente as etapas anteriores (1,2,3,4), utilizando tudo que você já aprendeu para realizar o join e filter apenas com os dados necessários.

# Visualizar as formas de realização:

#Com repartition
join_video_comments_repartition.explain()

#Com coalesce
join_video_comments_coalesce.explain()

#Padronização dos dataframes

df_video_cleaned = df_video.select(
    *[col(c).alias(c.lower().replace(" ", "_")) for c in df_video.columns]
)

df_comments_cleaned = df_comments.select(
    *[col(c).alias(c.lower().replace(" ", "_")) for c in df_comments.columns]
)

#  Selecionar apenas as colunas com dados necessários
df_video_selected = df_video_cleaned.select("video_id", "title", "keyword")
df_comments_selected = df_comments_cleaned.select("video_id", "comment", "likes")

#  Filtrar apenas os dados não nulos
df_video_filtered = df_video_selected.filter("video_id IS NOT NULL")
df_comments_filtered = df_comments_selected.filter("video_id IS NOT NULL")

#  Criar tabelas temporárias
df_video_filtered.createOrReplaceTempView("video_table")
df_comments_filtered.createOrReplaceTempView("comments_table")

# Realização do JOIN
join_video_comments = spark.sql("""
    SELECT v.video_id, v.title, v.keyword, c.comment, c.likes
    FROM video_table v
    JOIN comments_table c
    ON v.video_id = c.video_id
""")
#Mostra o resultado do JOIN
join_video_comments.show()



In [27]:
# Salvando o DataFrame no formato Parquet
join_video_comments.write.option('header','true').mode("overwrite").parquet("join-videos-comments-otimizado")
