In [1]:
# Tratamiento de los datos brutos de noticias

In [2]:
fec_inicio = "2019-01-01"
fec_fin = "2025-01-01"

# Rutas

In [3]:
path_datos = "gs://bucket-tfm-llc/datos"

# Librerias

In [4]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime, timedelta
from pyspark.sql.window import Window

# Lectura datos

In [5]:
df_noticias_sentiment = spark.read.parquet(f"{path_datos}/noticias_con_analisis_sentimiento")

                                                                                

In [6]:
df_fec_financiera = spark.read.parquet(f"{path_datos}/datos_financieros_amzn_trat").select("date", "target").\
                          filter(F.col("date") >= fec_inicio).filter(F.col("date") < fec_fin).distinct()

                                                                                

In [7]:
df_fec_financiera.select(F.min("date"), F.max("date")).show()

                                                                                

+-------------------+-------------------+
|          min(date)|          max(date)|
+-------------------+-------------------+
|2019-01-02 00:00:00|2024-12-31 00:00:00|
+-------------------+-------------------+



# Tratamiento datos

In [8]:
w_temporal = Window.orderBy("date_trat")

In [9]:
# Noticias por año:
df_noticias_sentiment.groupBy("year").agg(F.count("*").alias("num_noticias"), F.countDistinct("date_trat").alias("num_dias")).orderBy("year").show()



+----+------------+--------+
|year|num_noticias|num_dias|
+----+------------+--------+
|2019|        1311|     245|
|2020|         904|     234|
|2021|        1147|     236|
|2022|         970|     230|
|2023|        1125|     237|
|2024|        1047|     243|
+----+------------+--------+



                                                                                

In [10]:
print(f"Nº fechas total: ", df_fec_financiera.count())
print(f"Nº fechas con noticias: ", df_noticias_sentiment.select("date_trat").distinct().count())

                                                                                

Nº fechas total:  1510
Nº fechas con noticias:  1425


In [11]:
df_noticias_sentiment.filter(F.col("page_title").isNull()).count()

200

In [12]:
df_noticias_sentiment.filter(F.col("page_title")=="").count()

770

In [13]:
vars_sent = ["GKGRECORDID","date_trat","page_title",
            "global_tone","positive_score","negative_score","polarity","activity_reference_density","group_reference_density","word_count",
             "finbert_pos","finbert_neg","finbert_neu"]

df_noticias_sentiment_trat = df_noticias_sentiment.select(*vars_sent).\
        withColumn("page_title", F.when(F.trim(F.col("page_title"))=="", F.lit(None)).otherwise(F.col("page_title"))).\
        withColumn("ind_pos_v2tone", F.when(F.col("global_tone")>0, F.lit(1)).otherwise(F.lit(0))).\
        withColumn("ind_neg_v2tone", F.when(F.col("global_tone")<0, F.lit(1)).otherwise(F.lit(0))).\
        withColumn("ind_neu_v2tone", F.when(F.col("global_tone")==0, F.lit(1)).otherwise(F.lit(0))).\
        withColumn("ind_pos_finbert", F.when((F.col("finbert_pos") > F.col("finbert_neg")) & (F.col("finbert_pos") > F.col("finbert_neu")), F.lit(1)).otherwise(F.lit(0))).\
        withColumn("ind_neg_finbert", F.when((F.col("finbert_neg") > F.col("finbert_pos")) & (F.col("finbert_neg") > F.col("finbert_neu")), F.lit(1)).otherwise(F.lit(0))).\
        withColumn("ind_neu_finbert", F.when((F.col("finbert_neu") > F.col("finbert_pos")) & (F.col("finbert_neu") > F.col("finbert_neg")), F.lit(1)).otherwise(F.lit(0))).\
        withColumn("ind_sin_title", F.when(F.col("page_title").isNull(), F.lit(1)).otherwise(F.lit(0)))
df_noticias_sentiment_trat.show(truncate=False, n=4)

+-------------------+----------+---------------------------------------------+-----------+--------------+--------------+---------+--------------------------+-----------------------+----------+--------------------+-------------------+------------------+--------------+--------------+--------------+---------------+---------------+---------------+-------------+
|GKGRECORDID        |date_trat |page_title                                   |global_tone|positive_score|negative_score|polarity |activity_reference_density|group_reference_density|word_count|finbert_pos         |finbert_neg        |finbert_neu       |ind_pos_v2tone|ind_neg_v2tone|ind_neu_v2tone|ind_pos_finbert|ind_neg_finbert|ind_neu_finbert|ind_sin_title|
+-------------------+----------+---------------------------------------------+-----------+--------------+--------------+---------+--------------------------+-----------------------+----------+--------------------+-------------------+------------------+--------------+-------------

In [14]:
df_noticias_diario = df_noticias_sentiment_trat.groupBy("date_trat").agg(F.countDistinct("GKGRECORDID").alias("num_noticias_total"),
                                                       F.sum("ind_pos_v2tone").alias("num_noticias_pos_v2"),
                                                       F.sum("ind_neg_v2tone").alias("num_noticias_neg_v2"),
                                                       F.sum("ind_neu_v2tone").alias("num_noticias_neu_v2"),
                                                       F.sum("ind_pos_finbert").alias("num_noticias_pos_fb"),
                                                       F.sum("ind_neg_finbert").alias("num_noticias_neg_fb"),
                                                       F.sum("ind_neu_finbert").alias("num_noticias_neu_fb"),
                                                       F.sum("ind_sin_title").alias("num_noticias_sin_titulo"),
                                                       F.mean("polarity").alias("avg_polaridad"),
                                                        F.mean("positive_score").alias("avg_pos_v2"), 
                                                       F.mean("negative_score").alias("avg_neg_v2"),
                                                        F.mean("finbert_pos").alias("avg_pos_fb"),
                                                        F.mean("finbert_neg").alias("avg_neg_fb"),
                                                        F.mean("finbert_neu").alias("avg_neu_fb"))
        
df_noticias_diario.show(n=2)

[Stage 30:>                                                         (0 + 3) / 3]

+----------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-----------------------+-----------------+------------------+------------------+-------------------+-------------------+------------------+
| date_trat|num_noticias_total|num_noticias_pos_v2|num_noticias_neg_v2|num_noticias_neu_v2|num_noticias_pos_fb|num_noticias_neg_fb|num_noticias_neu_fb|num_noticias_sin_titulo|    avg_polaridad|        avg_pos_v2|        avg_neg_v2|         avg_pos_fb|         avg_neg_fb|        avg_neu_fb|
+----------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-----------------------+-----------------+------------------+------------------+-------------------+-------------------+------------------+
|2019-06-04|                 7|                  5|                  2|                  0|                  0|                

                                                                                

In [15]:
# Variables de sentimiento neto y proporciones
df_noticias_diario_pct = df_noticias_diario.\
        withColumn("sent_neto_v2", F.col("avg_pos_v2") - F.col("avg_neg_v2")).\
        withColumn("sent_neto_fb", F.col("avg_pos_fb") - F.col("avg_neg_fb")).\
        withColumn("pct_pos_v2", F.col("num_noticias_pos_v2")/F.col("num_noticias_total")).\
        withColumn("pct_neg_v2", F.col("num_noticias_neg_v2")/F.col("num_noticias_total")).\
        withColumn("pct_neu_v2", F.col("num_noticias_neu_v2")/F.col("num_noticias_total")).\
        withColumn("pct_pos_fb", F.col("num_noticias_pos_fb")/F.col("num_noticias_total")).\
        withColumn("pct_neg_fb", F.col("num_noticias_neg_fb")/F.col("num_noticias_total")).\
        withColumn("pct_neu_fb", F.col("num_noticias_neu_fb")/F.col("num_noticias_total")).\
        withColumn("pct_sin_titulo", F.col("num_noticias_sin_titulo")/F.col("num_noticias_total")).\
        withColumn("ratio_pos_neg_v2", F.col("num_noticias_pos_v2") / (F.col("num_noticias_neg_v2")+1)).\
        withColumn("ratio_pos_neg_fb", F.col("num_noticias_pos_fb") / (F.col("num_noticias_neg_fb")+1)).\
        withColumn("diff_sent_fb_v2", F.col("sent_neto_fb") - F.col("sent_neto_v2")).\
        withColumn("diff_pct_pos_fb_v2", F.col("pct_pos_fb") - F.col("pct_pos_v2")).\
        withColumn("diff_pct_neg_fb_v2", F.col("pct_neg_fb") - F.col("pct_neg_v2"))

In [16]:
# Variables de cambio (delta)
df_noticias_diario_delta = df_noticias_diario_pct.\
        withColumn("delta_sent_neto_v2", F.col("sent_neto_v2") - F.lag("sent_neto_v2", 1).over(w_temporal)).\
        withColumn("delta_sent_neto_fb", F.col("sent_neto_fb") - F.lag("sent_neto_fb", 1).over(w_temporal)).\
        withColumn("delta_num_noticias", F.col("num_noticias_total") - F.lag("num_noticias_total", 1).over(w_temporal))

In [17]:
# Ventanas móviles:
df_noticias_diario_vm = df_noticias_diario_delta
for w in [5, 10, 20]:
    w_temporal_roll = Window.orderBy("date_trat").rowsBetween(-w+1, 0)

    df_noticias_diario_vm = df_noticias_diario_vm.\
            withColumn(f"sent_neto_fb_mean_{w}d", F.avg("sent_neto_fb").over(w_temporal_roll)).\
            withColumn(f"sent_neto_v2_mean_{w}d", F.avg("sent_neto_v2").over(w_temporal_roll)).\
            withColumn(f"sent_neto_fb_std_{w}d", F.stddev("sent_neto_fb").over(w_temporal_roll)).\
            withColumn(f"sent_neto_v2_std_{w}d", F.stddev("sent_neto_v2").over(w_temporal_roll)).\
            withColumn(f"num_noticias_total_{w}d", F.sum("num_noticias_total").over(w_temporal_roll)).\
            withColumn(f"avg_pos_fb_{w}d", F.avg("avg_pos_fb").over(w_temporal_roll)).\
            withColumn(f"avg_pos_v2_{w}d", F.avg("avg_pos_v2").over(w_temporal_roll)).\
            withColumn(f"avg_neg_fb_{w}d", F.avg("avg_neg_fb").over(w_temporal_roll)).\
            withColumn(f"avg_neg_v2_{w}d", F.avg("avg_neg_v2").over(w_temporal_roll)).\
            withColumn(f"delta_sent_fb_{w}d", F.sum("delta_sent_neto_fb").over(w_temporal_roll)).\
            withColumn(f"delta_sent_v2_{w}d", F.sum("delta_sent_neto_v2").over(w_temporal_roll)).\
            withColumn(f"avg_polaridad_{w}d", F.avg("avg_polaridad").over(w_temporal_roll)).\
            withColumn(f"sent_neto_fb_trend_{w}d", F.col("sent_neto_fb") - F.lag("sent_neto_fb",w).over(w_temporal)).\
            withColumn(f"sent_neto_v2_trend_{w}d", F.col("sent_neto_v2") - F.lag("sent_neto_v2",w).over(w_temporal))


In [18]:
# Interacciones adicionales:
df_noticias_diario_final = df_noticias_diario_vm.\
        withColumn("sent_neto_fb_x_num_noticias", F.col("sent_neto_fb") * F.col("num_noticias_total")).\
        withColumn("sent_neto_v2_x_num_noticias", F.col("sent_neto_v2") * F.col("num_noticias_total")).\
        withColumn("avg_pos_x_neg_fb", F.col("avg_pos_fb") * F.col("avg_neg_fb")).\
        withColumn("avg_pos_x_neg_v2", F.col("avg_pos_v2") * F.col("avg_neg_v2")).\
        withColumn("delta_sent_x_sent_neto_fb", F.col("delta_sent_neto_fb") * F.col("sent_neto_fb")).\
        withColumn("delta_sent_x_sent_neto_v2", F.col("delta_sent_neto_v2") * F.col("sent_neto_v2"))


In [19]:
# df_noticias_diario_final.drop(*vars_drop).columns

# Output

In [20]:
vars_drop = ['num_noticias_pos_v2',
 'num_noticias_neg_v2',
 'num_noticias_neu_v2',
 'num_noticias_pos_fb',
 'num_noticias_neg_fb',
 'num_noticias_neu_fb',
 'num_noticias_sin_titulo']

In [21]:
df_noticias_diario_output = df_fec_financiera.join(df_noticias_diario_final.withColumnRenamed("date_trat","date"), "date", "left").\
            fillna(0).\
            withColumn("ind_existencia_noticias", F.when(F.col("num_noticias_total")==0, F.lit(0)).otherwise(F.lit(1))).\
            drop(*vars_drop)

In [22]:
len(df_noticias_diario_output.drop("date","target").columns)

73

In [23]:
df_noticias_diario_output.write.mode("overwrite").format("parquet").save(f"{path_datos}/datos_noticias_19_24_trat")

25/06/21 09:04:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/21 09:04:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/21 09:04:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/21 09:04:28 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/06/21 09:04:28 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
25/06/21 09:04:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradat