# Instlação

* pip install --upgrade pip
* pip install kafka-python
* pip install newsapi-python
* pip install schedule

# Bibliotecas

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import Window
import datetime
import pickle

# Critérios de Busca

In [0]:
class Search_Criteria:
    def __init__(self, subject, keywords):
        self.__subject = subject # "genomics"
        self.__keywords = keywords # ["DNA", "genetics", "treatment"]
    

    # Adicionar palavra chave
    def add_keyworkd(self, new_keyword):
        self.__keywords.append(new_keyword)


    # Remover palavra chave
    def remove_keyworkd(self, old_keyword):
        self.__keywords.pop(self.__keywords.index(old_keyword))

    @property
    def subject(self):
        return self.__subject
    
    @subject.setter
    def subject(self, new_subject):
        self.__subject = new_subject

    @property
    def keywords(self):
        return self.__keywords

# API Client

In [0]:
class APIClient:
    def __init__(self, key, query) -> None:
        self.__client = NewsApiClient(api_key=key)
        self.__query = query
        self.__all_data = None
        self.__articles = []


    # Critérios de busca
    def search(self, page):
        try:
            self.__all_data = self.__client.get_everything(q=self.__query, page = page)
        except:
            self.__all_data = {"status": "end"}


    # Requerer os artigos da API
    def get_articles(self):
        page = 1
        self.search(page=page)
        while self.__all_data["status"] == "ok":
            self.__articles += self.__all_data["articles"]
            page += 1
            self.search(page=page)

    
    @property
    def articles(self):
        return self.__articles
    
    @property
    def all_data(self):
        return self.__all_data

# Pipeline

In [0]:
class Data_Pipeline_API:
    def __init__(self) -> None:
        self.__df = None
        self.__df_parquet = None


    # Carregar dados da API
    def load_api(self, data) -> None:
        self.__df = spark.createDataFrame(data)
        ordem = self.__df.columns
        self.__df = self.__df.withColumn("source", col("source").getItem("name"))
        self.__df = self.__df.select(ordem)


    # Transformar os dados .parquet em DataFrame
    def load_parquet(self, name) -> None:
        file = f"{name}.parquet"
        self.__df_parquet = spark.read.parquet(f"/FileStore/Projeto/data/{file}")


    # Concatenando o DataFrame com o arquivo parquet
    # Mantém apenas colunas únicas
    def save_data(self, name) -> None:
        file = f"{name}.parquet"
        if name == "raw_data":
            ordem = "publishedAt"
        else: 
            ordem = "data_publicacao"
        if file in [y.name.strip() for y in dbutils.fs.ls("/FileStore/Projeto/data/")] or f"{file}/" in [y.name.strip() for y in dbutils.fs.ls("/FileStore/Projeto/data/")]:
            self.load_parquet(name)
            windowSpec = Window.orderBy(ordem)
            self.__df = self.__df.unionByName(self.__df_parquet)
            self.__df = (self.__df.withColumn("row_index", row_number().over(windowSpec))
                         .drop("row_index")
                         )

            self.__df = self.__df.dropDuplicates()
        self.__df.write.mode("overwrite").parquet(f"/FileStore/Projeto/data/{file}")


    # Limpeza dos dados Brutos
    def data_cleaning(self):
        df_order = ['data_publicacao', 'titulo', 'autor', 'descricao', 'url', 'fonte', 'conteudo']
        windowSpec = Window.orderBy("data_publicacao")

        self.__df = (self.__df_parquet.drop(col("urlToImage"))
                     .withColumnRenamed("author", "autor")
                     .withColumnRenamed("title", "titulo")
                     .withColumnRenamed("description", "descricao")
                     .withColumnRenamed("publishedAt", "data_publicacao")
                     .withColumnRenamed("content", "conteudo")
                     .withColumnRenamed("source", "fonte")
                     .withColumn("data_publicacao", to_date(substring(trim(col("data_publicacao")), 1, 10), "yyyy-MM-dd"))
                     .withColumn("row_index", row_number().over(windowSpec))
                     .drop(col("row_index"))
                    )
        self.__df = self.__df.select(*df_order)
    

    # Salvando em Delta Table
    def delta_table(self):
        self.load_parquet("cleaned_data")

        # Salvando em Delta
        self.__df_parquet.write.format("delta").mode("overwrite").save("/FileStore/Projeto/delta")

        # Disponibilizando em banco de dados
        self.__df_parquet.write.format("delta").mode("overwrite").saveAsTable("pipeline_delta")
    

    def to_gold(self, keywords):
        self.load_parquet("cleaned_data")

        # Agrupamento por ano, mês e dia
        df_time_stats = self.__df_parquet.groupBy(
            year(col("data_publicacao")).alias("year"),
            month(col("data_publicacao")).alias("month"),
            dayofmonth(col("data_publicacao")).alias("day")).count()

        # Ordenar pela soma
        df_time_stats = df_time_stats.orderBy(col("count").desc())

        # Disponibilizar no db
        df_time_stats.write.format("delta").mode("overwrite").saveAsTable("artigos_ano_mes_dia")


        # Agrupamento por fonte e autor
        df_source_author_stats = self.__df_parquet.groupBy("fonte", "autor").count()

        # Ordenar pela soma
        df_source_author_stats = df_source_author_stats.orderBy(col("count").desc())

        # Disponibilizar no db
        df_source_author_stats.write.format("delta").mode("overwrite").saveAsTable("artigos_fonte_autor")


        # Para Tratamento das palavras chave
        df_keywords = self.__df_parquet
        
        # Adicionar colunas de contagem de palavras-chave (1 se a palavra aparecer, 0 caso contrário)
        for keyword in keywords:
            df_keywords = df_keywords.withColumn(keyword, when(col("conteudo").contains(keyword), 1).otherwise(0))

        # Agrupar por ano, mês e dia e somar as aparições das palavras-chave
        df_keyword_stats = df_keywords.groupBy(
            year(col("data_publicacao")).alias("year"),
            month(col("data_publicacao")).alias("month")    
        ).agg(*[sum(col(keyword)).alias(f"sum_{keyword}") for keyword in keywords])

        # Disponibilizar no db
        df_keyword_stats.write.format("delta").mode("overwrite").saveAsTable("artigos_palavras_chave")


    @property
    def df(self):
        return self.__df
    
    @df.setter
    def df(self, dataframe):
        self.__df = dataframe


    @property
    def df_parquet(self):
        return self.__df_parquet