In [1]:
from pyspark.sql import SparkSession, functions as F
from delta import configure_spark_with_delta_pip
from pyspark.sql.functions import col, trim, lower, regexp_replace
import os

In [2]:
#exemple of path_data
path_landing = "../../../delta_lake/Kaggle/___________datasets_Kaggle_csv_twitter_csv" 

In [3]:
mongo_connector_jar = "/home/provira/Documents/TFM/TFM/src/P2/trusted_zone/jars/mongo-spark-connector_2.12-3.0.1.jar"
mongo_driver_jar = "/home/provira/Documents/TFM/TFM/src/P2/trusted_zone/jars/mongo-java-driver-3.12.10.jar"

In [None]:
builder = SparkSession.builder \
    .appName("Trusted_Zone") \
    .config("spark.jars", f"{mongo_connector_jar},{mongo_driver_jar}") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017") \
    .config("spark.mongodb.write.connection.uri", "mongodb://localhost:27017")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

25/06/27 21:25:16 WARN Utils: Your hostname, provira-ERAZER-P6705-MD61203 resolves to a loopback address: 127.0.1.1; using 192.168.1.55 instead (on interface wlo1)
25/06/27 21:25:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/provira/anaconda3/envs/spark_py3.9/lib/python3.9/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/provira/.ivy2/cache
The jars for the packages stored in: /home/provira/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5b3af7b3-49b1-4eb8-a9e5-632495fcd4ff;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.0.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 428ms :: artifacts dl 39ms
	:: modules in use:
	com.ibm.icu#icu4j;58.2 from central in [default]
	io.delta#delta-core_2.12;1.0.0 from central in [default]
	org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
	org.antlr#ST4;4.0.8 from central in [default]
	org.antlr#antlr-r

In [None]:
spark.sparkContext._jsc.sc().listJars()

                                                                                

DataFrame[Comment: string, Emotion: string]

In [6]:
def readFromDeltaLake(path_landing):
    print(path_landing)
    return spark.read.format("delta").load(path_landing)

# Preprocessing

In [None]:
def cleanCSV(df_csv):
    print(df_csv.columns)
    df_csv_clean = df_csv.dropna()

    rename_map = {
        "_c0": "id",
        "Emotion": "emotion",
        "sentiment": "emotion",
        "tweet": "text",
        "label": "emotion",
    }

    for old, new in rename_map.items():
        if old in df_csv_clean.columns:
            df_csv_clean = df_csv_clean.withColumnRenamed(old, new)

    # Only apply text processing if 'text' exists
    if "text" in df_csv_clean.columns:
        df_csv_clean = df_csv_clean \
            .withColumn("text", trim(col("text"))) \
            .withColumn("text", lower(col("text"))) \
            .withColumn("text", regexp_replace(col("text"), r"\bi m\b", "i'm")) \
            .withColumn("text", regexp_replace(col("text"), r"[^a-zA-Z0-9\s']", ""))  # keep letters, digits, spaces, apostrophes
    return df_csv_clean

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem.snowball import SnowballStemmer
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType

#nltk.download('punkt')
#nltk.download('stopwords')
def tokenizer(df_csv_clean):
    stop_words = set(stopwords.words('english'))

    # Tokenizar
    tokenizer = Tokenizer(inputCol="text", outputCol="words")
    df_words = tokenizer.transform(df_csv_clean) #.limit(1000) limited to 1000 rows for performance

    # Eliminar stopwords (solo en inglés por defecto, pero puedes pasar las tuyas)
    remover = StopWordsRemover(inputCol="words", outputCol="filtered_words")
    remover.setStopWords(list(stop_words))
    df_filtered = remover.transform(df_words)

    #Stemming
    stemmer = SnowballStemmer("english")



    def stem_tokens(tokens):
        return [stemmer.stem(token) for token in tokens]
    
    stem_udf = udf(stem_tokens, ArrayType(StringType()))

    df_stemmed = df_filtered.withColumn("stemmed_words", stem_udf("filtered_words"))

    #df_stemmed.select("text", "filtered_words", "stemmed_words").show(truncate=False)

    return df_stemmed
    



In [9]:
from pyspark.ml.feature import CountVectorizer, IDF

def tf_idf(df_stemmed):

    # Paso 1: Crear el CountVectorizer para extraer el vocabulario y conteo de tokens
    cv = CountVectorizer(inputCol="stemmed_words", outputCol="raw_features")
    cv_model = cv.fit(df_stemmed)             # Entrenas el modelo con el vocabulario
    df_featurized = cv_model.transform(df_stemmed)  # Transformas el DataFrame

    # Paso 2: Calcular TF-IDF a partir del conteo
    idf = IDF(inputCol="raw_features", outputCol="tfidf_features")
    idf_model = idf.fit(df_featurized)          # Ajustar IDF sobre los datos
    df_tfidf = idf_model.transform(df_featurized) # Transformar con TF-IDF

    # Mostrar resultados
    #df_tfidf.select("stemmed_words", "raw_features", "tfidf_features").show(truncate=False)
    df_tfidf.printSchema()

    return df_tfidf


# Store in MongoDB

In [10]:

from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType
from pyspark.ml.linalg import VectorUDT

def df_clean(df_tfidf):
    def vector_to_array(v):
        return v.toArray().tolist() if v else None

    vector_to_array_udf = udf(vector_to_array, ArrayType(FloatType()))

    df_tfidf_safe = df_tfidf \
        .withColumn("raw_features_array", vector_to_array_udf("raw_features")) \
        .withColumn("tfidf_features_array", vector_to_array_udf("tfidf_features"))
    return df_tfidf_safe


In [None]:
def storeInMongoDB(df_tfidf_safe):
    df_tfidf_safe.select(
        "text", "Emotion", "words", "filtered_words", "stemmed_words",
        "raw_features_array", "tfidf_features_array"
    ).write \
        .format("mongo") \
        .option("uri", "mongodb://localhost:27017") \
        .option("database", "tfm-trusted-zone") \
        .option("collection", "tf-idf") \
        .mode("append") \
        .save()

# Pipeline

In [12]:
cleanCSV_udf = udf(cleanCSV, ArrayType(StringType()))
tokenizer_udf = udf(tokenizer, ArrayType(StringType()))
df_clean_udf = udf(df_clean, ArrayType(StringType()))  # Update return type if it's different

In [13]:
types = ['csv', 'parquet', 'json', 'txt']
sources = ['Kaggle', 'uci', 'AWS']

In [None]:
path_data = "./../../../delta_lake/"

for source in sources:
    path = os.path.join(path_data, source)

    if os.path.isdir(path):
        for folder in os.listdir(path):
            full_path = os.path.join(path, folder)
            print(f"Processing folder: {full_path}/")

            try:
                df_csv = readFromDeltaLake(full_path)

                df_csv_clean = cleanCSV(df_csv) #  .limit(10) Limit to 10 rows for performance
                df_stemmed = tokenizer(df_csv_clean)
                df_tfidf = tf_idf(df_stemmed)
                df_tfidf_safe = df_clean(df_tfidf)

                storeInMongoDB(df_tfidf_safe)

            except Exception as e:
                print(f"Error processing folder {full_path}: {e}")

 

Processing folder: ./../../../delta_lake/Kaggle/___________datasets_Kaggle_csv_GoEmotions_goemotions_3_csv/
./../../../delta_lake/Kaggle/___________datasets_Kaggle_csv_GoEmotions_goemotions_3_csv


                                                                                

['text', 'id', 'author', 'subreddit', 'link_id', 'parent_id', 'created_utc', 'rater_id', 'example_very_unclear', 'admiration', 'amusement', 'anger', 'annoyance', 'approval', 'caring', 'confusion', 'curiosity', 'desire', 'disappointment', 'disapproval', 'disgust', 'embarrassment', 'excitement', 'fear', 'gratitude', 'grief', 'joy', 'love', 'nervousness', 'optimism', 'pride', 'realization', 'relief', 'remorse', 'sadness', 'surprise', 'neutral']


25/06/27 21:25:40 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


root
 |-- text: string (nullable = true)
 |-- id: string (nullable = true)
 |-- author: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- created_utc: string (nullable = true)
 |-- rater_id: string (nullable = true)
 |-- example_very_unclear: string (nullable = true)
 |-- admiration: string (nullable = true)
 |-- amusement: string (nullable = true)
 |-- anger: string (nullable = true)
 |-- annoyance: string (nullable = true)
 |-- approval: string (nullable = true)
 |-- caring: string (nullable = true)
 |-- confusion: string (nullable = true)
 |-- curiosity: string (nullable = true)
 |-- desire: string (nullable = true)
 |-- disappointment: string (nullable = true)
 |-- disapproval: string (nullable = true)
 |-- disgust: string (nullable = true)
 |-- embarrassment: string (nullable = true)
 |-- excitement: string (nullable = true)
 |-- fear: string (nullable = true)
 |-- gratitude: strin

                                                                                

['text', 'sentiment']
root
 |-- text: string (nullable = true)
 |-- emotion: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- stemmed_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- raw_features: vector (nullable = true)
 |-- tfidf_features: vector (nullable = true)



25/06/27 21:26:17 ERROR Executor: Exception in task 0.0 in stage 36.0 (TID 419)]
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
	at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:182)
	at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:41)
	at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:155)
	at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:105)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:287)
	at com.mongodb.client.internal.Mongo

Error processing folder ./../../../delta_lake/Kaggle/___________datasets_Kaggle_csv_DailyDialog_csv: An error occurred while calling o496.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 36.0 failed 1 times, most recent failure: Lost task 0.0 in stage 36.0 (TID 419) (192.168.1.55 executor driver): com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
	at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:182)
	at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:41)
	at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:155)
	at 

                                                                                

['text', 'id', 'author', 'subreddit', 'link_id', 'parent_id', 'created_utc', 'rater_id', 'example_very_unclear', 'admiration', 'amusement', 'anger', 'annoyance', 'approval', 'caring', 'confusion', 'curiosity', 'desire', 'disappointment', 'disapproval', 'disgust', 'embarrassment', 'excitement', 'fear', 'gratitude', 'grief', 'joy', 'love', 'nervousness', 'optimism', 'pride', 'realization', 'relief', 'remorse', 'sadness', 'surprise', 'neutral']
root
 |-- text: string (nullable = true)
 |-- id: string (nullable = true)
 |-- author: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- created_utc: string (nullable = true)
 |-- rater_id: string (nullable = true)
 |-- example_very_unclear: string (nullable = true)
 |-- admiration: string (nullable = true)
 |-- amusement: string (nullable = true)
 |-- anger: string (nullable = true)
 |-- annoyance: string (nullable = true)
 |-- approval: string (n

25/06/27 21:26:57 ERROR Executor: Exception in task 0.0 in stage 85.0 (TID 945)]
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
	at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:182)
	at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:41)
	at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:155)
	at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:105)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:287)
	at com.mongodb.client.internal.Mongo

Error processing folder ./../../../delta_lake/Kaggle/___________datasets_Kaggle_csv_tweet_sentiment_csv: An error occurred while calling o1177.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 85.0 failed 1 times, most recent failure: Lost task 0.0 in stage 85.0 (TID 945) (192.168.1.55 executor driver): com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
	at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:182)
	at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:41)
	at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:155)

                                                                                

['_c0', 'text', 'Emotion']
root
 |-- id: string (nullable = true)
 |-- text: string (nullable = true)
 |-- emotion: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- stemmed_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- raw_features: vector (nullable = true)
 |-- tfidf_features: vector (nullable = true)



25/06/27 21:27:30 ERROR Executor: Exception in task 0.0 in stage 104.0 (TID 1155)
com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
	at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:182)
	at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:41)
	at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:155)
	at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:105)
	at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:287)
	at com.mongodb.client.internal.Mong

Error processing folder ./../../../delta_lake/Kaggle/___________datasets_Kaggle_csv_emotion_sentimen_dataset_csv: An error occurred while calling o1403.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 104.0 failed 1 times, most recent failure: Lost task 0.0 in stage 104.0 (TID 1155) (192.168.1.55 executor driver): com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting to connect. Client view of cluster state is {type=UNKNOWN, servers=[{address=localhost:27017, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketOpenException: Exception opening socket}, caused by {java.net.ConnectException: Connection refused (Connection refused)}}]
	at com.mongodb.internal.connection.BaseCluster.getDescription(BaseCluster.java:182)
	at com.mongodb.internal.connection.SingleServerCluster.getDescription(SingleServerCluster.java:41)
	at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelega

                                                                                

['text', 'label']
root
 |-- text: string (nullable = true)
 |-- label: long (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- stemmed_words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- raw_features: vector (nullable = true)
 |-- tfidf_features: vector (nullable = true)

Error processing folder ./../../../delta_lake/Kaggle/___________datasets_Kaggle_parquet_train-00000-of-00001_parquet: cannot resolve '`Emotion`' given input columns: [filtered_words, label, raw_features, raw_features_array, stemmed_words, text, tfidf_features, tfidf_features_array, words];
'Project [text#6215, 'Emotion, words#6221, filtered_words#6231, stemmed_words#6240, raw_features_array#6392, tfidf_features_array#6402]
+- Project [text#6215, label#6201L, words#6221, filtered_words#6231, stemmed_words#6240, raw_features#6308, tfidf_featu