Nova demanda no projeto: inclusão de uma nova etapa no pipeline. Além do desenvolvimento da extração de dados, será necessário implementar uma fase adicional para realizar a transformação desses dados. Essa transformação tem como objetivo gerar um novo conjunto de dados que irá apoiar o desenvolvimento de um relatório específico. Nessa nova etapa, o foco será extrair valores relacionados à data em que os tweets foram postados.

Essa abordagem permitirá visualizar o desempenho da palavra-chave "data science" nos tweets de forma mais específica e detalhada. Resumindo, a ideia é utilizar os dados já coletados até o momento na etapa de extração e, agora, aplicar uma transformação, consolidando essas informações por dia da semana. Essa estratégia proporcionará uma análise mais refinada e direcionada ao aspecto temporal, otimizando a geração de insights para o desenvolvimento do relatório.

In [2]:
# importação biblioteca spark para exploração de dados
from pyspark.sql import SparkSession

In [3]:
# criando sessão spark
spark = SparkSession\
    .builder\
    .appName("api_silver")\
    .getOrCreate()

24/01/14 16:26:37 WARN Utils: Your hostname, alex-virtual-machine resolves to a loopback address: 127.0.1.1; using 192.168.112.128 instead (on interface ens33)
24/01/14 16:26:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/01/14 16:26:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
# leitura dos dados dentro da camada silver
df_tweet = spark.read.json('/home/alex/Documents/airflow/datalake/silver/api_datascience/tweet')

                                                                                

In [6]:
# analisando o schema dos dados
df_tweet.printSchema()

root
 |-- author_id: string (nullable = true)
 |-- conversation_id: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- id: string (nullable = true)
 |-- like_count: long (nullable = true)
 |-- quote_count: long (nullable = true)
 |-- reply_count: long (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- text: string (nullable = true)
 |-- data_processamento: date (nullable = true)



In [7]:
from pyspark.sql import functions as f

In [8]:
# criando novo dataframe com informações agregadas
tweet_conversas = df_tweet.alias('tweet')\
                  .groupBy(f.to_date('created_at').alias('created_date'))\
                  .agg(
                      f.countDistinct('author_id').alias('n_tweets'),
                      f.sum('like_count').alias('n_like'),
                      f.sum('quote_count').alias('n_quote'),
                      f.sum('reply_count').alias('n_reply'),
                      f.sum('retweet_count').alias('n_retweet')
                  ).withColumn('weekday', f.date_format('created_date', 'E'))

In [9]:
tweet_conversas.show()

                                                                                

+------------+--------+------+-------+-------+---------+-------+
|created_date|n_tweets|n_like|n_quote|n_reply|n_retweet|weekday|
+------------+--------+------+-------+-------+---------+-------+
|  2024-01-11|      20|  4720|   3780|   4024|     3532|    Thu|
|  2024-01-13|      10|   978|   1062|    864|      972|    Sat|
|  2024-01-10|      10|  2280|   2000|   2016|     2388|    Wed|
|  2024-01-12|      10|   986|    894|    992|      868|    Fri|
+------------+--------+------+-------+-------+---------+-------+

