In [1]:
!pip install pyspark



In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from posixpath import join
from pyspark.sql.functions import broadcast

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
df_video = spark.read.parquet('/content/videos-preparados.snappy.parquet')
df_video.explain()

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [Title#0,Video ID#1,Published At#2,Keyword#3,Likes#4,Comments#5,Views#6,Interaction#7,Year#8,Month#9,Keyword Index#10,Features PCA#11,Features Normal#12,Features#13] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/videos-preparados.snappy.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Title:string,Video ID:string,Published At:date,Keyword:string,Likes:int,Comments:int,Views...




In [5]:
df_comments = spark.read.parquet('/content/videos-comments-tratados.snappy.parquet')
df_comments.explain()

== Physical Plan ==
*(1) ColumnarToRow
+- FileScan parquet [Video ID#28,Title#29,Published At#30,Keyword#31,Likes#32,Comments#33,Views#34,Interaction#35,Year#36,Comment#37,Sentiment#38,Likes Comment#39] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/content/videos-comments-tratados.snappy.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Video ID:string,Title:string,Published At:date,Keyword:string,Likes:int,Comments:int,Views...




In [6]:
df_video.createOrReplaceTempView('video_temp')
df_comments.createOrReplaceTempView('comments_temp')

In [7]:
join_video_comments_df = spark.sql("""
SELECT *
FROM video_temp v
JOIN comments_temp c
ON v.`Video ID` = c.`Video ID`
""")

In [8]:
join_video_comments_df.printSchema()
join_video_comments_df.show(5)
join_video_comments_df.explain()

root
 |-- Title: string (nullable = true)
 |-- Video ID: string (nullable = true)
 |-- Published At: date (nullable = true)
 |-- Keyword: string (nullable = true)
 |-- Likes: integer (nullable = true)
 |-- Comments: integer (nullable = true)
 |-- Views: integer (nullable = true)
 |-- Interaction: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Keyword Index: double (nullable = true)
 |-- Features PCA: vector (nullable = true)
 |-- Features Normal: vector (nullable = true)
 |-- Features: vector (nullable = true)
 |-- Video ID: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Published At: date (nullable = true)
 |-- Keyword: string (nullable = true)
 |-- Likes: integer (nullable = true)
 |-- Comments: integer (nullable = true)
 |-- Views: integer (nullable = true)
 |-- Interaction: integer (nullable = true)
 |-- Year: string (nullable = true)
 |-- Comment: string (nullable = true)
 |-- Sentiment: integer (nullab

In [9]:
#Usando repartition

df_video_repartition = df_video.repartition(10, "Video ID")
df_comments_repartition = df_comments.repartition(10, "Video ID")

In [10]:
print(df_video_repartition.rdd.getNumPartitions())
print(df_comments_repartition.rdd.getNumPartitions())

10
10


In [11]:
particoes = df_video_repartition.rdd.glom().map(len).collect()
print("Número de registros por partiçao de df_video", particoes)

Número de registros por partiçao de df_video [167, 193, 188, 188, 174, 191, 200, 198, 188, 182]


In [12]:
df_video.createOrReplaceTempView('video_temp')
df_comments.createOrReplaceTempView('comments_temp')

In [13]:
df_video_small = broadcast(df_video)
broadcast_data = df_comments.join(df_video_small, "Video ID")
broadcast_data.show(5)

+-----------+--------------------+------------+-------+-----+--------+------+-----------+----+--------------------+---------+-------------+--------------------+------------+-------+-----+--------+------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+
|   Video ID|               Title|Published At|Keyword|Likes|Comments| Views|Interaction|Year|             Comment|Sentiment|Likes Comment|               Title|Published At|Keyword|Likes|Comments| Views|Interaction|Year|Month|Keyword Index|        Features PCA|     Features Normal|            Features|
+-----------+--------------------+------------+-------+-----+--------+------+-----------+----+--------------------+---------+-------------+--------------------+------------+-------+-----+--------+------+-----------+----+-----+-------------+--------------------+--------------------+--------------------+
|wAZZ-UWGVHI|Apple Pay Is Kill...|  2022-08-23|   tech| 3407|     672|135612|     139691

In [14]:
join2_video_comments_df = spark.sql("""
SELECT
    v.`Title`,
    v.`Video ID`,
    v.`Published At`,
    v.`Keyword`,
    v.`Likes`,
    v.`Comments` as video_comments,
    v.`Views`,
    v.`Interaction`,
    v.`Year` as video_year,
    v.`Month`,
    v.`Keyword Index`,
    v.`Features PCA`,
    v.`Features Normal`,
    v.`Features`,
    c.`Title` as comment_title,
    c.`Published At` as comment_published_at,
    c.`Keyword` as comment_keyword,
    c.`Likes` as comment_likes,
    c.`Comments` as comment_comments,
    c.`Views` as comment_views,
    c.`Interaction` as comment_interaction,
    c.`Year` as comment_year,
    c.`Comment`,
    c.`Sentiment`,
    c.`Likes Comment`
FROM video_temp v
JOIN comments_temp c
ON v.`Video ID` = c.`Video ID`
""")

In [15]:
joinfinal = join2_video_comments_df.coalesce(1)

In [16]:
# Explicando as diferenças entre os joins
joinfinal.explain()
join2_video_comments_df.explain()
join_video_comments_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Coalesce 1
   +- Project [Title#0, Video ID#1, Published At#2, Keyword#3, Likes#4, Comments#5 AS video_comments#311, Views#6, Interaction#7, Year#8 AS video_year#312, Month#9, Keyword Index#10, Features PCA#11, Features Normal#12, Features#13, Title#29 AS comment_title#313, Published At#30 AS comment_published_at#314, Keyword#31 AS comment_keyword#315, Likes#32 AS comment_likes#316, Comments#33 AS comment_comments#317, Views#34 AS comment_views#318, Interaction#35 AS comment_interaction#319, Year#36 AS comment_year#320, Comment#37, Sentiment#38, Likes Comment#39]
      +- BroadcastHashJoin [Video ID#1], [Video ID#28], Inner, BuildLeft, false
         :- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false]),false), [plan_id=300]
         :  +- Filter isnotnull(Video ID#1)
         :     +- FileScan parquet [Title#0,Video ID#1,Published At#2,Keyword#3,Likes#4,Comments#5,Views#6,Interaction#7,Year#8,Month#9,K

In [17]:
#salvando o join mais otimizado

joinfinal.write.parquet('/content/join_final.parquet')

In [18]:
#testando o join mais otimizado sugerido pelo prof mas que eu nao entendi como incluir junto da atividade inicial
from pyspark.sql import functions as F

In [19]:
df_videos2 = spark.read.parquet('/content/videos-preparados.snappy.parquet')
df_comments2 = spark.read.parquet('/content/videos-comments-tratados.snappy.parquet')

In [23]:
colunas_video = ['Video ID', "Title", "Published At"]
df_videos2 = spark.read.parquet('/content/videos-preparados.snappy.parquet').select(colunas_video)

In [24]:
keys_to_filter = df_videos2.select('Video ID').rdd.flatMap(lambda x: x).collect()

In [30]:
colunas_comments = ["Video ID", "Comment", "Sentiment"]

In [29]:
df_comments_optimized = (spark.read.parquet("/content/videos-comments-tratados.snappy.parquet").filter(F.col("Video ID").isin(keys_to_filter)).select(colunas_comments))

In [28]:
df_resultado = df_comments_optimized.join(df_videos2, on="Video ID", how="inner")