## Libraries and UDFs

In [65]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.ml.fpm import FPGrowth
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import re

spark = SparkSession \
        .builder \
        .appName("frequent_itemsets") \
        .getOrCreate()

In [66]:
def clean_tokens(tokens):
    if not tokens:
        return []
    valid_word_pattern = re.compile(r"^[a-zA-ZáéíóúñÁÉÍÓÚÑüÜ]+$")
    return [token for token in tokens if valid_word_pattern.match(token)]
clean_tokens_udf = udf(clean_tokens, ArrayType(StringType()))

## Preprocessing

In [67]:
path = r"../data/interventions_sample.csv"
interventions_sample = spark.read.csv(path, header=True)
int_df = interventions_sample.select("session_id", "intervention_id", 
                                     "intervention_text", "intervention_words")

In [68]:
path = r"../data/sample_clusters.csv" 
df = spark.read.csv(path, header=True)

In [80]:
int_df = int_df.join(df.select("intervention_id", "cluster"), on="intervention_id", how="inner")
int_df.show()

In [70]:
int_df.printSchema()

root
 |-- intervention_id: string (nullable = true)
 |-- session_id: string (nullable = true)
 |-- intervention_text: string (nullable = true)
 |-- intervention_words: string (nullable = true)
 |-- cluster: string (nullable = true)



In [72]:
esp_stopwords = [
    "a", "al", "algo", "algunas", "algunos", "ante", "antes", "aquel", "aquella",
    "aquellas", "aquellos", "aquí", "cada", "casi", "como", "con", "contra",
    "cual", "cuales", "cuando", "cuanta", "cuantas", "cuanto", "cuantos", "de",
    "del", "dentro", "donde", "dos", "el", "él", "ella", "ellas", "ellos", "en",
    "entre", "esa", "esas", "ese", "eso", "esos", "esta", "estas", "este",
    "estos", "lo", "los", "la", "las", "me", "mi", "mí", "mis", "nos", "nosotras",
    "nosotros", "o", "otra", "otras", "otro", "otros", "para", "pero", "poco",
    "por", "que", "qué", "se", "sí", "sin", "sobre", "su", "sus", "tu", "tú",
    "tus", "un", "una", "unas", "uno", "unos", "vosotras", "vosotros", "vuestra",
    "vuestras", "vuestro", "vuestros", "y", "ya", "senador", "presidente", "honorable",
    "secretario", "honorable", "cámara", "comisión", "representante", "gracias", "comisión"
]

In [73]:
#tokenize to create word "baskets"
tokenizer = Tokenizer(inputCol="intervention_text", outputCol="tokens")

stopwords_remover = StopWordsRemover(inputCol="tokens",
                                     outputCol="filtered_tokens",
                                     stopWords=esp_stopwords)
pipeline = Pipeline(stages=[tokenizer, stopwords_remover])
pipeline_model = pipeline.fit(int_df)

result_df = pipeline_model.transform(int_df)
result_df.select('tokens', 'filtered_tokens').show(10)

                                                                                

+--------------------+--------------------+
|              tokens|     filtered_tokens|
+--------------------+--------------------+
|[de, acuerdo, con...|[acuerdo, observa...|
|[albán, urbano, l...|[albán, urbano, l...|
|[presidente, el, ...|[siguiente, bloqu...|
|[perdone, un, seg...|[perdone, segundi...|
|[antes, de, empez...|[empezar, orden, ...|
|[celular, 3188620...|[celular, 3188620...|
|[un, segundo, ya,...|[segundo, traslad...|
|[doctora, lo, que...|[doctora, pasa, e...|
|[continúa, el, se...|[continúa, señor,...|
|[señor, president...|[señor, president...|
+--------------------+--------------------+
only showing top 10 rows



In [8]:
cluster_df_dict = {}
cluster_labels = [row[0] for row in result_df.select("cluster").distinct().collect()]

for label in cluster_labels:
    cluster_df = result_df.filter(result_df['cluster'] == label)
    cluster_df = cluster_df.filter((F.col('filtered_tokens').isNotNull()))
    cluster_df_dict[label] = cluster_df

                                                                                

## FP Growth

In [82]:
cleaned_df = int_df.withColumn(
    "cleaned_intervention_text", 
    F.regexp_replace(F.col("intervention_text"), r"[^a-zA-ZáéíóúñÁÉÍÓÚÑüÜ\s]", "")  # Keep only letters and spaces
)

# Apply Tokenizer to the cleaned text
tokenizer = Tokenizer(inputCol="cleaned_intervention_text", outputCol="tokens")

stopwords_remover = StopWordsRemover(inputCol="tokens", outputCol="filtered_tokens", stopWords=esp_stopwords)

pipeline = Pipeline(stages=[tokenizer, stopwords_remover])
pipeline_model = pipeline.fit(cleaned_df)

result_df = pipeline_model.transform(cleaned_df)

In [83]:
test_df = result_df.filter(result_df['cluster'] == '1')
test_df.show(3)



+---------------+--------------+--------------------+--------------------+-------+-------------------------+--------------------+--------------------+
|intervention_id|    session_id|   intervention_text|  intervention_words|cluster|cleaned_intervention_text|              tokens|     filtered_tokens|
+---------------+--------------+--------------------+--------------------+-------+-------------------------+--------------------+--------------------+
|         451438| gaceta_68 (7)|perdone un segund...|['comisión', 'deb...|      1|     perdone un segund...|[perdone, un, seg...|[perdone, segundi...|
|         451192|gaceta_456 (6)|antes de empezar ...|['asistir', 'comp...|      1|     antes de empezar ...|[antes, de, empez...|[empezar, orden, ...|
|         450234| gaceta_49 (9)|continúa el señor...|['anglicana', 'ap...|      1|     continúa el señor...|[continúa, el, se...|[continúa, señor,...|
+---------------+--------------+--------------------+--------------------+-------+------------

                                                                                

In [87]:
fp = FPGrowth(itemsCol="filtered_tokens", minSupport=0.01, minConfidence=0.1)
fpm = fp.fit(test_df)




24/12/05 23:06:42 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-ba21d64c-375f-4e8c-a6a6-4db8f0d256d6/2c/temp_shuffle_cadebd40-3925-44c3-946e-82f0ad8e72a3
24/12/05 23:06:42 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-ba21d64c-375f-4e8c-a6a6-4db8f0d256d6/05/temp_shuffle_b2618d88-469e-4f04-9600-36bbde32b188
24/12/05 23:06:42 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-ba21d64c-375f-4e8c-a6a6-4db8f0d256d6/06/temp_shuffle_bc06e1f4-932c-4045-bcdd-79084f7f3bfc
24/12/05 23:06:42 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-ba21d64c-375f-4e8c-a6a6-4db8f0d256d6/1c/temp_shuffle_5b27650e-66a0-4d35-841b-06e4ccdeab44
24/12/05 23:06:42 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-ba21d64c-375f-4e8c-a6a6-4db8f0d256d6/18/temp_shuffle_a1d499a6-ffa4-49bd-a541-2b26c93ec2e9
24/12/05 23:06:42 WARN DiskBlockObjectWriter: Error deleting /tmp/blockmgr-ba21d64c-375f-4e8c-a6a6-4db8f0d256d6/19/temp_shuffle_ca541dd8-b572-428a-9f44-def6c0863bf3
24/12/05 2

Py4JJavaError: An error occurred while calling o5397.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 168.0 failed 1 times, most recent failure: Lost task 2.0 in stage 168.0 (TID 1274) (midway3-0096.rcc.local executor driver): org.apache.spark.SparkException: Failed to execute user defined function (Tokenizer$$Lambda$3814/0x0000000801adade8: (string) => array<string>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:190)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.NullPointerException

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function (Tokenizer$$Lambda$3814/0x0000000801adade8: (string) => array<string>)
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:190)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:168)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
	at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: java.lang.NullPointerException
