In [1]:
!pip install pyspark



In [2]:
import pyspark
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("Tratamento de dados YouTube").getOrCreate()

In [4]:
df_video = spark.read.csv("videos-stats.csv", header=True, inferSchema=True)

In [5]:
df_video.show(5)

+---+--------------------+-----------+------------+-------+-------+--------+---------+
|_c0|               Title|   Video ID|Published At|Keyword|  Likes|Comments|    Views|
+---+--------------------+-----------+------------+-------+-------+--------+---------+
|  0|Apple Pay Is Kill...|wAZZ-UWGVHI|  2022-08-23|   tech| 3407.0|   672.0| 135612.0|
|  1|The most EXPENSIV...|b3x28s61q3c|  2022-08-24|   tech|76779.0|  4306.0|1758063.0|
|  2|My New House Gami...|4mgePWWCAmA|  2022-08-23|   tech|63825.0|  3338.0|1564007.0|
|  3|Petrol Vs Liquid ...|kXiYSI7H2b0|  2022-08-23|   tech|71566.0|  1426.0| 922918.0|
|  4|Best Back to Scho...|ErMwWXQxHp0|  2022-08-08|   tech|96513.0|  5155.0|1855644.0|
+---+--------------------+-----------+------------+-------+-------+--------+---------+
only showing top 5 rows



In [6]:
df_video = df_video.fillna({
    'Likes': 0,
    'Comments': 0,
    'Views': 0
})

In [7]:
df_comentario = spark.read.csv("comments.csv", header=True, inferSchema=True)
df_comentario.show(5)
df_comentario.printSchema()

+---+-----------+--------------------+-----+---------+
|_c0|   Video ID|             Comment|Likes|Sentiment|
+---+-----------+--------------------+-----+---------+
|  0|wAZZ-UWGVHI|Let's not forget ...| 95.0|      1.0|
|  1|wAZZ-UWGVHI|Here in NZ 50% of...| 19.0|      0.0|
|  2|wAZZ-UWGVHI|I will forever ac...|161.0|      2.0|
|  3|wAZZ-UWGVHI|Whenever I go to ...|  8.0|      0.0|
|  4|wAZZ-UWGVHI|Apple Pay is so c...| 34.0|      2.0|
+---+-----------+--------------------+-----+---------+
only showing top 5 rows

root
 |-- _c0: string (nullable = true)
 |-- Video ID: string (nullable = true)
 |-- Comment: string (nullable = true)
 |-- Likes: string (nullable = true)
 |-- Sentiment: string (nullable = true)



In [8]:
qtd_videos = df_video.count()
qtd_comentarios = df_comentario.count()

print(f"Quantidade de vídeos: {qtd_videos}")
print(f"Quantidade de comentários: {qtd_comentarios}")

Quantidade de vídeos: 1881
Quantidade de comentários: 30036


In [9]:
from pyspark.sql.functions import col

In [13]:
df_video = df_video.filter(col("Video ID").isNotNull())
df_comentario = df_comentario.filter(col("Video ID").isNotNull())

In [14]:
qtd_videos = df_video.count()
qtd_comentarios = df_comentario.count()

print(f"Quantidade de vídeos após limpeza: {qtd_videos}")
print(f"Quantidade de comentários após limpeza: {qtd_comentarios}")

Quantidade de vídeos após limpeza: 1881
Quantidade de comentários após limpeza: 22555


In [15]:
df_video = df_video.dropDuplicates(["Video ID"])

print(f"Quantidade de vídeos após remoção de duplicados: {df_video.count()}")

Quantidade de vídeos após remoção de duplicados: 1869


In [16]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col

In [17]:
df_video = df_video.withColumn("Likes", col("Likes").cast(IntegerType()))
df_video = df_video.withColumn("Comments", col("Comments").cast(IntegerType()))
df_video = df_video.withColumn("Views", col("Views").cast(IntegerType()))

# Visualizar o schema atualizado
df_video.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- Title: string (nullable = true)
 |-- Video ID: string (nullable = true)
 |-- Published At: date (nullable = true)
 |-- Keyword: string (nullable = true)
 |-- Likes: integer (nullable = true)
 |-- Comments: integer (nullable = true)
 |-- Views: integer (nullable = true)



In [18]:
df_comentario = df_comentario.withColumn("Likes", col("Likes").cast(IntegerType())) \
                             .withColumn("Sentiment", col("Sentiment").cast(IntegerType())) \
                             .withColumnRenamed("Likes", "Likes Comment")

In [19]:
df_video = df_video.withColumn(
    "Interaction",
    col("Likes") + col("Comments") + col("Views")
)

In [20]:
from pyspark.sql.types import DateType

In [21]:
df_video = df_video.withColumn("Published At", col("Published At").cast(DateType()))

In [22]:
from pyspark.sql.functions import year, col

df_video = df_video.withColumn("Year", year(col("Published At")))

In [24]:
df_join_video_comments = df_video.join(
    df_comentario,
    on="Video ID",
    how="inner"  # você pode usar 'inner', 'left', 'right', 'outer' conforme precisar
)
df_join_video_comments.show(5)

+-----------+---+--------------------+------------+-------+-----+--------+------+-----------+----+---+--------------------+-------------+---------+
|   Video ID|_c0|               Title|Published At|Keyword|Likes|Comments| Views|Interaction|Year|_c0|             Comment|Likes Comment|Sentiment|
+-----------+---+--------------------+------------+-------+-----+--------+------+-----------+----+---+--------------------+-------------+---------+
|wAZZ-UWGVHI|  0|Apple Pay Is Kill...|  2022-08-23|   tech| 3407|     672|135612|     139691|2022|  0|Let's not forget ...|           95|        1|
|wAZZ-UWGVHI|  0|Apple Pay Is Kill...|  2022-08-23|   tech| 3407|     672|135612|     139691|2022|  1|Here in NZ 50% of...|           19|        0|
|wAZZ-UWGVHI|  0|Apple Pay Is Kill...|  2022-08-23|   tech| 3407|     672|135612|     139691|2022|  2|I will forever ac...|          161|        2|
|wAZZ-UWGVHI|  0|Apple Pay Is Kill...|  2022-08-23|   tech| 3407|     672|135612|     139691|2022|  3|Whenever I

In [25]:
df_us_videos = spark.read.csv(
    "USvideos.csv",
    header=True,
    inferSchema=True
)
df_us_videos.show(5)
df_us_videos.printSchema()

+-----------+-------------+--------------------+--------------------+-----------+--------------------+--------------------+-------+------+--------+-------------+--------------------+-----------------+----------------+----------------------+--------------------+
|   video_id|trending_date|               title|       channel_title|category_id|        publish_time|                tags|  views| likes|dislikes|comment_count|      thumbnail_link|comments_disabled|ratings_disabled|video_error_or_removed|         description|
+-----------+-------------+--------------------+--------------------+-----------+--------------------+--------------------+-------+------+--------+-------------+--------------------+-----------------+----------------+----------------------+--------------------+
|2kyS6SvSYSE|     17.14.11|WE WANT TO TALK A...|        CaseyNeistat|         22|2017-11-13T17:13:...|     SHANtell martin| 748374| 57527|    2966|        15954|https://i.ytimg.c...|            False|           Fal

In [26]:
df_us_videos = df_us_videos.withColumnRenamed("title", "Title")

In [29]:
df_join_video_usvideos = df_video.join(
    df_us_videos,
    on="Title",
    how="inner"
)
df_join_video_usvideos.show(5)
df_join_video_usvideos.printSchema()

+--------------------+---+-----------+------------+-------+------+--------+---------+-----------+----+-----------+-------------+-------------+-----------+--------------------+--------------------+--------+------+--------+-------------+--------------------+-----------------+----------------+----------------------+--------------------+
|               Title|_c0|   Video ID|Published At|Keyword| Likes|Comments|    Views|Interaction|Year|   video_id|trending_date|channel_title|category_id|        publish_time|                tags|   views| likes|dislikes|comment_count|      thumbnail_link|comments_disabled|ratings_disabled|video_error_or_removed|         description|
+--------------------+---+-----------+------------+-------+------+--------+---------+-----------+----+-----------+-------------+-------------+-----------+--------------------+--------------------+--------+------+--------+-------------+--------------------+-----------------+----------------+----------------------+--------------

In [30]:
from pyspark.sql.functions import col, sum, when

df_video.select([
    sum(when(col(c).isNull() | (col(c) == ""), 1).otherwise(0)).alias(c)
    for c in df_video.columns
]).show()

+---+-----+--------+------------+-------+-----+--------+-----+-----------+----+
|_c0|Title|Video ID|Published At|Keyword|Likes|Comments|Views|Interaction|Year|
+---+-----+--------+------------+-------+-----+--------+-----+-----------+----+
|  0|    0|       0|           0|      0|    0|       0|    0|          0|   0|
+---+-----+--------+------------+-------+-----+--------+-----+-----------+----+



In [31]:
df_video = df_video.drop("_c0")

In [35]:
df_video.write.mode("overwrite").parquet("videos-tratados-parquet")

In [36]:
df_verificado = spark.read.parquet("videos-tratados-parquet")
df_verificado.show(5)

+--------------------+-----------+------------+-------+------+--------+--------+-----------+----+
|               Title|   Video ID|Published At|Keyword| Likes|Comments|   Views|Interaction|Year|
+--------------------+-----------+------------+-------+------+--------+--------+-----------+----+
|ASMR MUKBANG DOUB...|--ZI0dSbbNU|  2020-04-18|mukbang|378858|   18860|17975269|   18372987|2020|
|Deadly car bomb d...|--hxd1CrOqg|  2022-08-22|   news|  6379|    4853|  808787|     820019|2022|
|How Biden&#39;s s...|--ixiTypG8g|  2022-08-24|   news|  1029|    2347|   97434|     100810|2022|
|Celebrating My 40...|-64r1hcxtV4|  2022-05-30|mukbang| 45628|   17264| 5283664|    5346556|2022|
|Physics Review - ...|-6IgkG5yZfo|  2017-01-02|physics| 10959|     525|  844015|     855499|2017|
+--------------------+-----------+------------+-------+------+--------+--------+-----------+----+
only showing top 5 rows



In [33]:
df_join_video_comments = df_join_video_comments.drop("_c0")

In [34]:
df_join_video_comments.write.mode("overwrite").parquet("videos-comments-tratados-parquet")

In [38]:
df_ver_comments = spark.read.parquet("videos-comments-tratados-parquet")
df_ver_comments.show(5)

+-----------+--------------------+------------+-------+-----+--------+------+-----------+----+--------------------+-------------+---------+
|   Video ID|               Title|Published At|Keyword|Likes|Comments| Views|Interaction|Year|             Comment|Likes Comment|Sentiment|
+-----------+--------------------+------------+-------+-----+--------+------+-----------+----+--------------------+-------------+---------+
|wAZZ-UWGVHI|Apple Pay Is Kill...|  2022-08-23|   tech| 3407|     672|135612|     139691|2022|Let's not forget ...|           95|        1|
|wAZZ-UWGVHI|Apple Pay Is Kill...|  2022-08-23|   tech| 3407|     672|135612|     139691|2022|Here in NZ 50% of...|           19|        0|
|wAZZ-UWGVHI|Apple Pay Is Kill...|  2022-08-23|   tech| 3407|     672|135612|     139691|2022|I will forever ac...|          161|        2|
|wAZZ-UWGVHI|Apple Pay Is Kill...|  2022-08-23|   tech| 3407|     672|135612|     139691|2022|Whenever I go to ...|            8|        0|
|wAZZ-UWGVHI|Apple P

In [39]:
df_ver_comments.printSchema()

root
 |-- Video ID: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- Published At: date (nullable = true)
 |-- Keyword: string (nullable = true)
 |-- Likes: integer (nullable = true)
 |-- Comments: integer (nullable = true)
 |-- Views: integer (nullable = true)
 |-- Interaction: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Comment: string (nullable = true)
 |-- Likes Comment: integer (nullable = true)
 |-- Sentiment: integer (nullable = true)

