In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, IDF
from pyspark.ml import Pipeline
from pyspark.sql.functions import udf, concat_ws
from pyspark.sql.types import StringType
import datetime

In [None]:
spark = SparkSession.builder \
    .appName("NewsProcessingPipeline") \
    .getOrCreate()

In [None]:
bucket = 'bucket-datos-periodico'
key = f"headlines/final/periodico=elTiempo/year={datetime.datetime.now().strftime('%Y')}/month={datetime.datetime.now().strftime('%m')}/elTiempo{datetime.datetime.now().strftime('%Y')}-{datetime.datetime.now().strftime('%m')}-{datetime.datetime.now().strftime('%d')}.csv"
file_path = f"s3://{bucket}/{key}"
file_path

In [None]:
df = spark.read.csv(file_path, header=True, inferSchema=True)

In [None]:
#Define the stages of the pipeline.
regex_tokenizer = RegexTokenizer(inputCol="titulo", outputCol="words", pattern="\\W")
stopwords_remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
count_vectorizer = CountVectorizer(inputCol="filtered_words", outputCol="raw_features")
idf = IDF(inputCol="raw_features", outputCol="features")

In [None]:
#Create pipeline
pipeline = Pipeline(stages=[regex_tokenizer, stopwords_remover, count_vectorizer, idf])

In [None]:
pipeline_model = pipeline.fit(df)
processed_data = pipeline_model.transform(df)

In [None]:
array_to_string_udf = udf(lambda arr: ' '.join(str(x) for x in arr), StringType())
processed_data = processed_data.withColumn("features_str", array_to_string_udf(processed_data["features"]))

In [None]:
processed_data.show()

In [None]:
output_path = "s3://bucket-datos-periodico/headlines/tokens"
processed_data.select("titulo", "categoria", "link", "features_str").write.csv(output_path, header=True, mode="overwrite")