In [62]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import explode
from datetime import date

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [63]:
#Preprocesamiento

df = spark.read.option("recursiveFileLookup", "true").csv("s3://buck-eriodicos/headlines/final/").toDF(*['categoria', 'titulo', 'link'])

#Se hace un filtro basado en la categoría -> Si hay datos que no pertenecen a la categoría
df = df.filter(df["categoria"] != "categoria")

#Se eliminan los datos nulos
df = df.dropna(subset=["categoria","titulo","link"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [64]:
df.show(df.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+
|           categoria|              titulo|                link|
+--------------------+--------------------+--------------------+
|                EEUU| Las impactantes ...| eltiempo.com/mun...|
|Conflicto y Narco...| Gobernador del M...| eltiempo.com/jus...|
|            Ciclismo| Rigoberto Urán a...| eltiempo.com/dep...|
|            Medellín| El macabro crime...| eltiempo.com/col...|
|Contenido Patroci...| ¿Por qué su empr...| eltiempo.com/con...|
|       Más Contenido| eCommerce Day Co...| eltiempo.com/mas...|
|Contenido Patroci...| ¿Por qué su empr...| eltiempo.com/con...|
|       Más Contenido| eCommerce Day Co...| eltiempo.com/mas...|
|               Gente| La modelo de Onl...| eltiempo.com/cul...|
|              Bogotá| Así es vivir en ...| eltiempo.com/bog...|
|   Fútbol Colombiano| Los pecados que ...| eltiempo.com/dep...|
|            Medellín| Cayó 'la coja' a...| eltiempo.com/col...|
|  Partidos Políticos| Es

In [65]:
# Nombre del archivo como columna
df_filename = df.withColumn("filename", input_file_name())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [67]:
# Tokenización
tokenizer = Tokenizer(inputCol="titulo", outputCol="palabras_titulo")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
#StopWords

# Se crea una lista de stopwords comunes en español
stopwordList = ["el", "la", "los", "las", "de", "en", "un", "una", "unos", "unas", "y", "o", "pero", "por", "para", "del"]
stopwords_remover = StopWordsRemover(inputCol="palabras_titulo", outputCol="titulo_sin_stopwords", stopWords=stopwordList)

In [68]:
# Vectorización con TF-IDF
count_vectorizer = CountVectorizer(inputCol="titulo_sin_stopwords", outputCol="features")
idf = IDF(inputCol="features", outputCol="tfidf")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [69]:
# Conversión de etiquetas a valores numéricos
label_indexer = StringIndexer(inputCol="categoria", outputCol="label")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [70]:
# Pipeline
pipeline = Pipeline(stages=[tokenizer, stopwords_remover, count_vectorizer, idf, label_indexer])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [71]:
model = pipeline.fit(df_filename)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [72]:
# Transformación de los datos
transformed_data = model.transform(df_filename)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [73]:
# Explode the array column
exploded_data = transformed_data.selectExpr("categoria", "titulo", "link")

# Save the exploded data as CSV
exploded_data.write.mode("overwrite").csv("s3://buck-eriodicos/headlines/processed/datos_procesados_{}.csv".format(date.today()), header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…