In [138]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from textblob import TextBlob


In [139]:
# Crear una sesión de Spark
spark = SparkSession.builder.appName("NewsDataInspection").getOrCreate()

# Cargar el archivo CSV
file_path = '../data/raw/news.csv'
df = spark.read.csv(file_path, header=True, inferSchema=True)

In [140]:
# Contar el número de filas y columnas
num_rows = df.count()
num_cols = len(df.columns)
print(f"\nDimensiones del DataFrame: {num_rows} filas, {num_cols} columnas")


Dimensiones del DataFrame: 1919 filas, 4 columnas


In [141]:
# Mostrar las primeras filas del DataFrame
print("Primeras filas del DataFrame:")
df.show(5,truncate=False)

Primeras filas del DataFrame:
+------------------------------------------------------------------------------------------------------------------+----------+-------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [142]:
# Mostrar el esquema del DataFrame (tipos de datos de cada columna)
print("\nEsquema del DataFrame:")
df.printSchema()


Esquema del DataFrame:
root
 |-- titulo: string (nullable = true)
 |-- fecha: string (nullable = true)
 |-- link: string (nullable = true)
 |-- cuerpo: string (nullable = true)



In [143]:
# Describir las estadísticas básicas
print("\nDescripción estadística de las columnas numéricas:")
df.describe().show()



Descripción estadística de las columnas numéricas:
+-------+--------------------+----------+--------------------+--------------------+
|summary|              titulo|     fecha|                link|              cuerpo|
+-------+--------------------+----------+--------------------+--------------------+
|  count|                1918|      1919|                1919|                1914|
|   mean|                NULL|      NULL|                NULL|                NULL|
| stddev|                NULL|      NULL|                NULL|                NULL|
|    min|          """Bury Me| My Love""|    dice su creador"|"""A la gente que...|
|    max|“Tenemos paciente...|2024-08-29|https://www.lanac...|“¡Pon a remojar l...|
+-------+--------------------+----------+--------------------+--------------------+



In [144]:
from pyspark.sql.functions import col, sum
# Crea una lista con el nombre de las columnas y la cantidad de nulos en cada una
null_counts = [(column, df.filter(col(column).isNull()).count()) for column in df.columns]

# Imprime la tabla
print("Columna\t\tCantidad de Nulos")
print("-" * 40)
for column, count in null_counts:
    print(f"{column}\t\t{count}")

Columna		Cantidad de Nulos
----------------------------------------
titulo		1
fecha		0
link		0
cuerpo		5


In [145]:
# Filtrar los registros que tienen valores nulos en alguna columna
df_with_nulls = df.filter(F.greatest(*[F.col(c).isNull().cast("int") for c in df.columns]) == 1)

# Mostrar los registros con nulos
print("Registros con valores nulos en alguna columna:")
df_with_nulls.show(truncate=False)

Registros con valores nulos en alguna columna:
+--------------------------------------------------------------------------+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|titulo                                                                    |fecha     |link                                                                                                                                                            |cuerpo                

In [146]:
# Eliminar los registros que tienen algún valor nulo
df_cleaned = df.dropna()

In [147]:
# Crea una lista con el nombre de las columnas y la cantidad de nulos en cada una
null_counts = [(column, df.filter(col(column).isNull()).count()) for column in df.columns]

# Imprime la tabla
print("Columna\t\tCantidad de Nulos")
print("-" * 40)
for column, count in null_counts:
    print(f"{column}\t\t{count}")

Columna		Cantidad de Nulos
----------------------------------------
titulo		1
fecha		0
link		0
cuerpo		5


In [148]:

# Verifica la fila correspondiente en el DataFrame de Spark
fila_nula_spark = df.filter(F.col('fecha').isNull()).show(truncate = False)



+------+-----+----+------+
|titulo|fecha|link|cuerpo|
+------+-----+----+------+
+------+-----+----+------+



In [149]:
# Definir una función para calcular el sentimiento usando TextBlob
def get_sentiment(text):
    if text is None:
        return None
    return TextBlob(text).sentiment.polarity

# Registrar la función como una UDF (User Defined Function)
sentiment_udf = F.udf(get_sentiment)

# Crear la columna 'sentiment' basada en el cuerpo de la noticia
df_with_sentiment = df.withColumn("sentiment", sentiment_udf(F.col("cuerpo")))

# Mostrar algunas filas con la nueva columna de sentimiento
print("Primeras filas con puntuación de sentimiento:")
df_with_sentiment.show(3)

Primeras filas con puntuación de sentimiento:
+--------------------+----------+--------------------+--------------------+--------------------+
|              titulo|     fecha|                link|              cuerpo|           sentiment|
+--------------------+----------+--------------------+--------------------+--------------------+
|Día del Gamer 202...|2024-08-29|https://www.lanac...|Este 29 de agosto...| 0.08333333333333333|
|Star Wars Outlaws...|2024-08-27|https://www.lanac...|Mucho se habló de...|-0.01785714285714...|
|Gamescom 2024: vu...|2024-08-21|https://www.lanac...|Borderlands 4 el ...|-0.00432900432900...|
+--------------------+----------+--------------------+--------------------+--------------------+
only showing top 3 rows



In [150]:

# Verifica la fila correspondiente en el DataFrame de Spark
fila_nula_spark = df_with_sentiment.filter(F.col('fecha').isNull()).show(truncate = False)



+------+-----+----+------+---------+
|titulo|fecha|link|cuerpo|sentiment|
+------+-----+----+------+---------+
+------+-----+----+------+---------+



In [151]:
df_with_sentiment.write.mode("overwrite").csv("../data/processed/news.csv", header=True)

                                                                                

In [152]:
import seaborn as sns
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
import pandas as pd

# Verifica la fila correspondiente en el DataFrame de Spark
fila_nula_spark = df_with_sentiment.filter(F.col('fecha').isNull()).show(truncate = False)
print("Filas nulas luego de pasar formato", fila_nula_spark)

df_with_sentiment = df_with_sentiment.dropna(subset=['sentiment'])

# Crear columnas de año y mes
df_with_sentiment = df_with_sentiment.withColumn('year', F.year(F.col('fecha')))
df_with_sentiment = df_with_sentiment.withColumn('month', F.month(F.col('fecha')))

# Agrupar por año y mes en lugar de por fecha exacta
sentiment_by_month = df_with_sentiment.groupBy('year', 'month').agg(F.avg('sentiment').alias('avg_sentiment')).orderBy('year', 'month')

# Convertir a Pandas para visualización
sentiment_pd = sentiment_by_month.toPandas()

# Crear una columna de fecha combinando año y mes
sentiment_pd['fecha'] = pd.to_datetime(sentiment_pd[['year', 'month']].assign(day=1))  # Usa el primer día del mes

# Ordenar los valores
sentiment_pd = sentiment_pd.sort_values('fecha')

# Verificar los datos antes de la visualización
print(sentiment_pd.head())

# Visualización
plt.figure(figsize=(14, 7))
sns.lineplot(data=sentiment_pd, x='fecha', y='avg_sentiment', marker='o')
plt.title('Evolución del Sentimiento Promedio de Noticias de Videojuegos (Agrupado por Mes)')
plt.xlabel('Fecha')
plt.ylabel('Sentimiento Promedio')
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig('sentiment_trend_monthly.png')  # Guarda la gráfica como una imagen
plt.close()

# Paso 10: Sistema de Alerta
#sentiment_pd['sentiment_change'] = sentiment_pd['avg_sentiment'].diff()
#latest_change = sentiment_pd['sentiment_change'].iloc[-1] if not sentiment_pd['sentiment_change'].empty else None
#latest_date = sentiment_pd['fecha'].iloc[-1] if not sentiment_pd['fecha'].empty else None

# Comprobamos si latest_date es NaT antes de usar strftime
#if pd.notna(latest_change) and pd.notna(latest_date):
#    threshold = 0.2
#    if latest_change <= -threshold:
#        print(f"ALERTA: El sentimiento promedio cayó {abs(latest_change):.2f} puntos en {latest_date.strftime('%B %Y')}.")
#        # Aquí podrías añadir código para enviar un correo electrónico o una notificación
#    else:
#        print(f"El sentimiento promedio está estable en {latest_date.strftime('%B %Y')}.")
#else:
#    print("No hay suficientes datos para generar una alerta o la fecha es inválida.")


+------+-----+----+------+---------+
|titulo|fecha|link|cuerpo|sentiment|
+------+-----+----+------+---------+
+------+-----+----+------+---------+

Filas nulas luego de pasar formato None


                                                                                

     year  month  avg_sentiment      fecha
1  2012.0    6.0      -0.004167 2012-06-01
2  2012.0    7.0      -0.100000 2012-07-01
3  2012.0    8.0       0.000000 2012-08-01
4  2012.0    9.0       0.058333 2012-09-01
5  2012.0   10.0      -0.120139 2012-10-01


In [153]:
# Suponiendo que 'sentiment_pd' es tu DataFrame
nulos_en_fecha = sentiment_pd[sentiment_pd['fecha'].isna()]

# Mostrar las filas donde la columna 'fecha' tiene valores nulos
print(nulos_en_fecha)


   year  month  avg_sentiment fecha
0   NaN    NaN            0.0   NaT


In [154]:
# Filtra la fila con el valor nulo en Pandas
fila_nula = sentiment_pd[sentiment_pd['fecha'].isna()]
print(fila_nula)

# Verifica la fila correspondiente en el DataFrame de Spark
fila_nula_spark = df_with_sentiment.filter(F.col('fecha').isNull()).show(truncate = False)


   year  month  avg_sentiment fecha
0   NaN    NaN            0.0   NaT
+------+-----+----+------+---------+----+-----+
|titulo|fecha|link|cuerpo|sentiment|year|month|
+------+-----+----+------+---------+----+-----+
+------+-----+----+------+---------+----+-----+

