In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, year, to_date, lit

In [2]:
# Inicializando a sessão Spark
spark = SparkSession.builder \
    .appName("Tratamento de Dados YouTube") \
    .getOrCreate()

In [3]:
# 1. Leitura do arquivo 'videos-stats.csv'
df_video = spark.read.csv("videos-stats.csv", header=True, inferSchema=True)


In [4]:
# 2. Substituir valores nulos por 0 nos campos 'Likes', 'Comments' e 'Views'
df_video = df_video.fillna({"Likes": 0, "Comments": 0, "Views": 0})


In [5]:
# 3. Leitura do arquivo 'comments.csv'
df_comentario = spark.read.csv("comments.csv", header=True, inferSchema=True)


In [6]:
# 4. Calcular a quantidade de registros
qtd_video = df_video.count()
qtd_comentario = df_comentario.count()
print(f"Quantidade de registros - Vídeos: {qtd_video}, Comentários: {qtd_comentario}")


Quantidade de registros - Vídeos: 1881, Comentários: 30036


In [7]:
# 5. Remover registros com 'Video ID' nulo e recalcular a quantidade de registros
df_video = df_video.filter(df_video['Video ID'].isNotNull())
df_comentario = df_comentario.filter(df_comentario['Video ID'].isNotNull())
qtd_video_clean = df_video.count()
qtd_comentario_clean = df_comentario.count()
print(f"Registros após limpeza - Vídeos: {qtd_video_clean}, Comentários: {qtd_comentario_clean}")


Registros após limpeza - Vídeos: 1881, Comentários: 22555


In [8]:
# 6. Remover registros duplicados no campo 'Video ID' do df_video
df_video = df_video.dropDuplicates(["Video ID"])


In [9]:
# 7. Converter campos para 'int' no df_video e df_comentario
df_video = df_video.withColumn("Likes", col("Likes").cast("int")) \
                   .withColumn("Comments", col("Comments").cast("int")) \
                   .withColumn("Views", col("Views").cast("int"))

df_comentario = df_comentario.withColumn("Likes", col("Likes").cast("int")) \
                             .withColumnRenamed("Likes", "Likes Comment") \
                             .withColumn("Sentiment", col("Sentiment").cast("int"))


In [10]:
# 8. Criar o campo 'Interaction' com a soma de 'Likes', 'Comments' e 'Views'
df_video = df_video.withColumn("Interaction", col("Likes") + col("Comments") + col("Views"))


In [11]:
# 9. Converter 'Published At' para 'date' e criar o campo 'Year'
df_video = df_video.withColumn("Published At", to_date(col("Published At"))) \
                   .withColumn("Year", year(col("Published At")))

In [12]:
# 10. Mesclar df_comentario no df_video

df_join_video_comments = df_video.join(df_comentario, on="Video ID", how="left")


In [13]:
# 11. Leitura do arquivo 'USvideos.csv'
df_us_videos = spark.read.csv("USvideos.csv", header=True, inferSchema=True)


In [14]:
# 12. Mesclar df_us_videos no df_video

df_join_video_usvideos = df_video.join(df_us_videos, on="Title", how="left")


In [15]:
# 13. Verificar quantidade de nulos em todos os campos
df_video.select([col(c).isNull().cast("int").alias(c) for c in df_video.columns]).show()


+---+-----+--------+------------+-------+-----+--------+-----+-----------+----+
|_c0|Title|Video ID|Published At|Keyword|Likes|Comments|Views|Interaction|Year|
+---+-----+--------+------------+-------+-----+--------+-----+-----------+----+
|  0|    0|       0|           0|      0|    0|       0|    0|          0|   0|
|  0|    0|       0|           0|      0|    0|       0|    0|          0|   0|
|  0|    0|       0|           0|      0|    0|       0|    0|          0|   0|
|  0|    0|       0|           0|      0|    0|       0|    0|          0|   0|
|  0|    0|       0|           0|      0|    0|       0|    0|          0|   0|
|  0|    0|       0|           0|      0|    0|       0|    0|          0|   0|
|  0|    0|       0|           0|      0|    0|       0|    0|          0|   0|
|  0|    0|       0|           0|      0|    0|       0|    0|          0|   0|
|  0|    0|       0|           0|      0|    0|       0|    0|          0|   0|
|  0|    0|       0|           0|      0

In [20]:
# 14. Remover coluna '_c0' e salvar como parquet

df_join_video_comments = df_join_video_comments.drop("_c0")

In [23]:
# Finalizando o arquivo para submissão
print("Tarefa concluída!")

Tarefa concluída!
