In [1]:
from pymongo import MongoClient
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark import SparkContext
from datetime import datetime
from functools import reduce
from pyspark.sql.functions import sum
from matplotlib import cbook, cm
from matplotlib.colors import LightSource
#import matplotlib.cm as cm 
import matplotlib.pyplot as plt
from pyspark.sql.window import Window

In [2]:
spark_ = SparkSession.builder \
    .appName("Data Integration") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "6g") \
    .getOrCreate()

df_youtube = spark_.read.format("mongo") \
    .option("uri", "mongodb://mongodb:27017/Final_Database.Youtube").load()

df_company = spark_.read.format("mongo") \
    .option("uri", "mongodb://mongodb:27017/Final_Database.Company").load()


In [3]:
companies = ['nvidia', 'dell', 'ibm', 'intel', 'microsoft', 'sony']

In [5]:
# 2. Preparar os dados de df_youtube para agregar informações diárias
df_youtube = df_youtube.withColumn("day", F.to_date(df_youtube["publishedAt"]))

# 3. Agregar os dados de df_youtube por dia e empresa
# Definir a janela de classificação por 'view_count' para cada empresa e dia
window_spec = Window.partitionBy("company", "day").orderBy(F.desc("view_count"))

# Aplicar o row_number para classificar os vídeos
df_youtube_with_rank = df_youtube.withColumn(
    "rank", F.row_number().over(window_spec)
)

# Filtrar os vídeos com rank 1 (o mais visto) para cada dia e empresa
df_youtube_top_video = df_youtube_with_rank.filter(F.col("rank") == 1)

# Agora, realizar a agregação e obter o vídeo mais visto (nome do vídeo)
df_youtube_daily = df_youtube_top_video.groupBy("company", "day").agg(
    F.sum("view_count").alias("total_views"),
    F.sum("likes").alias("total_likes"),
    F.sum("dislikes").alias("total_dislikes"),
    F.sum("comment_count").alias("total_coments"),
    F.first("title").alias("top_video")  # Obter o título do vídeo mais visto
)

# 4. Adicionar informações de stock do df_company
df_company = df_company.withColumn("day", F.to_date(df_company["Date"]))  # Garantindo que a data seja apenas date
df_company_daily = df_company.groupBy("company_name", "day").agg(
    F.max("High").alias("stock_high"),
    F.min("Low").alias("stock_low"),
    F.avg("Volume").alias("Volume"),
    F.avg("Close").alias("final_stocks")  # Estoque final diário
)

# 5. Calcular o stock médio de todos os dias para cada empresa
df_company_avg_stocks = df_company.groupBy("company_name").agg(
    F.avg("Close").alias("average_stocks_all_days")
)

# 6. Juntar os dois DataFrames, usando alias para resolver ambiguidade na coluna 'day'
df_report = df_youtube_daily.alias("youtube").join(
    df_company_daily.alias("company"),
    (F.col("youtube.company") == F.col("company.company_name")) & (F.col("youtube.day") == F.col("company.day")),
    "left"
).join(
    df_company_avg_stocks.alias("avg_stocks"),
    F.col("youtube.company") == F.col("avg_stocks.company_name"),
    "left"
)

# 7. Substituir valores nulos por zero (no caso dos stocks)
df_report = df_report.withColumn(
    "final_stocks",
    F.coalesce(F.col("final_stocks"), F.lit(0))  
)

# 8. Calcular o stock médio do dia anterior para cada empresa
window_spec = Window.partitionBy("youtube.company").orderBy("youtube.day")

df_report = df_report.withColumn(
    "previous_final_stocks",
    F.coalesce(F.lag("final_stocks").over(window_spec), F.lit(0))  
)

# 9. Calcular a movimentação de stock comparando o stock médio atual com o anterior
df_report = df_report.withColumn(
    "stocks_movement",
    (F.col("final_stocks") - F.col("previous_final_stocks")).cast("float")
)

# 10. 'top_video' de ARRAY<STRING> para STRING
df_report = df_report.withColumn(
    "top_video",
    F.concat_ws(",", F.col("top_video"))
)

# 11. Criar um DataFrame para cada empresa e salvar no MongoDB
company_dfs = {}

for company_name in companies:
    company_report = df_report.filter(F.col("youtube.company") == company_name)
    company_dfs[company_name] = company_report

# 12. Salvar os DataFrames no MongoDB
for company_name, company_df in company_dfs.items():
    company_df.write.format("mongo") \
        .option("uri", f"mongodb://mongodb:27017/Relatorio.{company_name.lower()}") \
        .mode("overwrite") \
        .save()

print("Dados processados e salvos com sucesso.")



Dados processados e salvos com sucesso.
