In [1]:
import logging
import yaml
import os
from datetime import datetime
from utils.s3_process import read_csv_from_s3, push_csv_to_s3, upload_file_to_s3
from utils.clean_text import preprocess
from utils.s3_process import read_key

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/thientran/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /home/thientran/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [31]:
# Read data from S3
with open("configs/config.yaml", "r") as f:
    config = yaml.safe_load(f)
with open("configs/secrets.yaml", "r") as f:
    config_secret = yaml.safe_load(f)

bucket = config['s3']['bucket']
raw_key = config['s3']['keys']['raw_data']
validate_key = config['s3']['keys']['validation_data']
train_path = f"s3a://{bucket}/{raw_key}"
test_path = f"s3a://{bucket}/{validate_key}"

AWS_KEY_PATH = config['aws']['access_key_path']
AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY = read_key(config_secret['aws']['access_key_path'])
AWS_REGION = config['aws']['region']
S3_OUTPUT_KEY = config['s3']['keys']['dataset']
BUCKET_NAME = config['s3']['bucket']

# Read CSV file from S3 (raw data)
data_train = read_csv_from_s3(train_path, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION)
data_test = read_csv_from_s3(test_path, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION)


                                                                                

In [32]:
data_train.show(5)

data_test.show(5)

                                                                                

+-----------+--------+--------------------+
|    Product|   Label|                Text|
+-----------+--------+--------------------+
|Borderlands|Positive|I am coming to th...|
|Borderlands|Positive|im getting on bor...|
|Borderlands|Positive|im coming on bord...|
|Borderlands|Positive|im getting on bor...|
|Borderlands|Positive|im getting into b...|
+-----------+--------+--------------------+
only showing top 5 rows



[Stage 63:>                                                         (0 + 1) / 1]

+----+---------+----------+--------------------+
|  ID|  Product|     Label|                Text|
+----+---------+----------+--------------------+
|3364| Facebook|Irrelevant|I mentioned on Fa...|
| 352|   Amazon|   Neutral|BBC News - Amazon...|
|8312|Microsoft|  Negative|@Microsoft Why do...|
|4371|    CS-GO|  Negative|CSGO matchmaking ...|
|4433|   Google|   Neutral|Now the President...|
+----+---------+----------+--------------------+
only showing top 5 rows



                                                                                

In [33]:

print(type(data_train))
print(type(data_test))

print(f"Shape của data_train: (hàng={data_train.count()}, cột={len(data_train.columns)})")
print(f"Shape của data_test: (hàng={data_test.count()}, cột={len(data_test.columns)})")

<class 'pyspark.sql.dataframe.DataFrame'>
<class 'pyspark.sql.dataframe.DataFrame'>


                                                                                

Shape của data_train: (hàng=74681, cột=3)
Shape của data_test: (hàng=1510, cột=4)


                                                                                

In [34]:
from pyspark.sql.functions import isnull, sum, when

# # Kiểm tra giá trị null cho từng cột trong data_train
# null_counts_train = data_train.agg(*[sum(when(isnull(col), 1).otherwise(0)).alias(col) for col in data_train.columns])
# null_counts_train.show()

# # Kiểm tra giá trị null cho từng cột trong data_test
# null_counts_test = data_test.agg(*[sum(when(isnull(col), 1).otherwise(0)).alias(col) for col in data_test.columns])
# null_counts_test.show()
def check_null_values(data):
    null_counts = data.agg(*[sum(when(isnull(col), 1).otherwise(0)).alias(col) for col in data.columns])
    return null_counts
check_null_values(data_train).show()
check_null_values(data_test).show()

                                                                                

+-------+-----+----+
|Product|Label|Text|
+-------+-----+----+
|      0|    0| 686|
+-------+-----+----+

+---+-------+-----+----+
| ID|Product|Label|Text|
+---+-------+-----+----+
|  0|    467|  495| 510|
+---+-------+-----+----+



                                                                                

In [35]:
data_test = data_test.dropna()
data_train = data_train.dropna()
check_null_values(data_train).show()
check_null_values(data_test).show()

                                                                                

+-------+-----+----+
|Product|Label|Text|
+-------+-----+----+
|      0|    0|   0|
+-------+-----+----+

+---+-------+-----+----+
| ID|Product|Label|Text|
+---+-------+-----+----+
|  0|      0|    0|   0|
+---+-------+-----+----+



                                                                                

In [49]:
from pyspark.sql.functions import lower


def lower_case(data):
    return data.withColumn('processed_text', lower(data['Text']))
data_train = lower_case(data_train)
data_test = lower_case(data_test)
data_test.show(5)
data_train.show(5)

                                                                                

+----+---------+----------+--------------------+--------------------+
|  ID|  Product|     Label|                Text|      processed_text|
+----+---------+----------+--------------------+--------------------+
|3364| Facebook|Irrelevant|I mentioned on Fa...|i mentioned on fa...|
| 352|   Amazon|   Neutral|BBC News - Amazon...|bbc news - amazon...|
|8312|Microsoft|  Negative|@Microsoft Why do...|@microsoft why do...|
|4371|    CS-GO|  Negative|CSGO matchmaking ...|csgo matchmaking ...|
|4433|   Google|   Neutral|Now the President...|now the president...|
+----+---------+----------+--------------------+--------------------+
only showing top 5 rows



[Stage 91:>                                                         (0 + 1) / 1]

+-----------+--------+--------------------+--------------------+
|    Product|   Label|                Text|      processed_text|
+-----------+--------+--------------------+--------------------+
|Borderlands|Positive|I am coming to th...|i am coming to th...|
|Borderlands|Positive|im getting on bor...|im getting on bor...|
|Borderlands|Positive|im coming on bord...|im coming on bord...|
|Borderlands|Positive|im getting on bor...|im getting on bor...|
|Borderlands|Positive|im getting into b...|im getting into b...|
+-----------+--------+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [50]:
# remove html tags
from pyspark.sql.functions import regexp_replace
def remove_html_tags(data):
    return data.withColumn('processed_text', regexp_replace(data['processed_text'], "<.*?>", ""))

data_train = remove_html_tags(data_train)
data_test = remove_html_tags(data_test)
data_train.show(5)
data_test.show(5)





                                                                                

+-----------+--------+--------------------+--------------------+
|    Product|   Label|                Text|      processed_text|
+-----------+--------+--------------------+--------------------+
|Borderlands|Positive|I am coming to th...|i am coming to th...|
|Borderlands|Positive|im getting on bor...|im getting on bor...|
|Borderlands|Positive|im coming on bord...|im coming on bord...|
|Borderlands|Positive|im getting on bor...|im getting on bor...|
|Borderlands|Positive|im getting into b...|im getting into b...|
+-----------+--------+--------------------+--------------------+
only showing top 5 rows



[Stage 93:>                                                         (0 + 1) / 1]

+----+---------+----------+--------------------+--------------------+
|  ID|  Product|     Label|                Text|      processed_text|
+----+---------+----------+--------------------+--------------------+
|3364| Facebook|Irrelevant|I mentioned on Fa...|i mentioned on fa...|
| 352|   Amazon|   Neutral|BBC News - Amazon...|bbc news - amazon...|
|8312|Microsoft|  Negative|@Microsoft Why do...|@microsoft why do...|
|4371|    CS-GO|  Negative|CSGO matchmaking ...|csgo matchmaking ...|
|4433|   Google|   Neutral|Now the President...|now the president...|
+----+---------+----------+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [64]:
import yaml
from string import punctuation
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, regexp_replace, trim, udf
from pyspark.sql.types import StringType
from pyspark.ml.feature import Tokenizer, IDF, StringIndexer
# Khởi tạo SparkSession (nếu chưa có)
spark = SparkSession.builder.appName("TweetProcessing").getOrCreate()

chat_words = {
    "AFAIK": "As Far As I Know",
    "AFK": "Away From Keyboard",
    "ASAP": "As Soon As Possible",
    "ATK": "At The Keyboard",
    "ATM": "At The Moment",
    "A3": "Anytime, Anywhere, Anyplace",
    "BAK": "Back At Keyboard",
    "BBL": "Be Back Later",
    "BBS": "Be Back Soon",
    "BFN": "Bye For Now",
    "B4N": "Bye For Now",
    "BRB": "Be Right Back",
    "BRT": "Be Right There",
    "BTW": "By The Way",
    "B4": "Before",
    "CU": "See You",
    "CUL8R": "See You Later",
    "CYA": "See You",
    "FAQ": "Frequently Asked Questions",
    "FC": "Fingers Crossed",
    "FWIW": "For What It's Worth",
    "FYI": "For Your Information",
    "GAL": "Get A Life",
    "GG": "Good Game",
    "GN": "Good Night",
    "GMTA": "Great Minds Think Alike",
    "GR8": "Great!",
    "G9": "Genius",
    "IC": "I See",
    "ICQ": "I Seek you (also a chat program)",
    "ILU": "ILU: I Love You",
    "IMHO": "In My Honest/Humble Opinion",
    "IMO": "In My Opinion",
    "IOW": "In Other Words",
    "IRL": "In Real Life",
    "KISS": "Keep It Simple, Stupid",
    "LDR": "Long Distance Relationship",
    "LMAO": "Laugh My A.. Off",
    "LOL": "Laughing Out Loud",
    "LTNS": "Long Time No See",
    "L8R": "Later",
    "MTE": "My Thoughts Exactly",
    "M8": "Mate",
    "NRN": "No Reply Necessary",
    "OIC": "Oh I See",
    "PITA": "Pain In The A..",
    "PRT": "Party",
    "PRW": "Parents Are Watching",
    "QPSA?": "Que Pasa?",
    "ROFL": "Rolling On The Floor Laughing",
    "ROFLOL": "Rolling On The Floor Laughing Out Loud",
    "ROTFLMAO": "Rolling On The Floor Laughing My A.. Off",
    "SK8": "Skate",
    "STATS": "Your sex and age",
    "ASL": "Age, Sex, Location",
    "THX": "Thank You",
    "TTFN": "Ta-Ta For Now!",
    "TTYL": "Talk To You Later",
    "U": "You",
    "U2": "You Too",
    "U4E": "Yours For Ever",
    "WB": "Welcome Back",
    "WTF": "What The F...",
    "WTG": "Way To Go!",
    "WUF": "Where Are You From?",
    "W8": "Wait...",
    "7K": "Sick:-D Laugher",
    "TFW": "That feeling when",
    "MFW": "My face when",
    "MRW": "My reaction when",
    "IFYP": "I feel your pain",
    "TNTL": "Trying not to laugh",
    "JK": "Just kidding",
    "IDC": "I don't care",
    "ILY": "I love you",
    "IMU": "I miss you",
    "ADIH": "Another day in hell",
    "ZZZ": "Sleeping, bored, tired",
    "WYWH": "Wish you were here",
    "TIME": "Tears in my eyes",
    "BAE": "Before anyone else",
    "FIMH": "Forever in my heart",
    "BSAAW": "Big smile and a wink",
    "BWL": "Bursting with laughter",
    "BFF": "Best friends forever",
    "CSL": "Can't stop laughing",
    "L8": "Late",
    "SMH": "Shaking My Head",
    "YOLO": "You Only Live Once",
    "TLDR": "Too Long; Didn't Read",
    "FOMO": "Fear Of Missing Out",
    "IDK": "I Don't Know",
    "BFFL": "Best Friends For Life",
    "TMI": "Too Much Information",
    "DM": "Direct Message",
    "STFU": "Shut The F... Up",
    "WTH": "What The Heck",
    "LMAOROTF": "Laughing My A... Off Rolling On The Floor",
    "PPL": "People",
    "SFLR": "Sorry For Late Reply",
    "G2G": "Got To Go",
    "S2R": "Send To Receive"
}
# Broadcast variables (optional for local mode, but good practice for scalability)
punctuation_broadcast = spark.sparkContext.broadcast(punctuation)
stopwords_broadcast = spark.sparkContext.broadcast(set(stopwords.words("english")))
lemmatizer_broadcast = spark.sparkContext.broadcast(WordNetLemmatizer())
chat_words_broadcast = spark.sparkContext.broadcast(chat_words) # Assuming chat_words dictionary is defined

# Define UDFs for text processing
def remove_punct(text):
    return text.translate(str.maketrans("", "", punctuation_broadcast.value))
remove_punct_udf = udf(remove_punct, StringType())

def remove_sw(text):
    return " ".join([word for word in text.split() if word not in stopwords_broadcast.value])
remove_sw_udf = udf(remove_sw, StringType())

def lemmatize(text):
    return " ".join([lemmatizer_broadcast.value.lemmatize(word) for word in word_tokenize(text)])
lemmatize_udf = udf(lemmatize, StringType())

def replace_abbrev(text):
    return " ".join([chat_words_broadcast.value.get(word.upper(), word) for word in text.split()])
replace_abbrev_udf = udf(replace_abbrev, StringType())

def remove_urls_spark_udf(text):
    url_pattern = r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+"
    return regexp_replace(text, url_pattern, "")
remove_urls_spark = udf(remove_urls_spark_udf, StringType())

def remove_emojis_spark_udf(text):
    emoji_pattern = (
        "[\U0001F600-\U0001F64F"  # Emoticons
        "\U0001F300-\U0001F5FF"  # Miscellaneous Symbols and Pictographs
        "\U0001F680-\U0001F6FF"  # Transport and Map Symbols
        "\U0001F700-\U0001F77F"  # Alchemical Symbols
        "\U0001F780-\U0001F7FF"  # Geometric Shapes Extended
        "\U0001F800-\U0001F8FF"  # Supplemental Arrows-C
        "\U0001F900-\U0001F9FF"  # Supplemental Symbols and Pictographs
        "\U0001FA00-\U0001FA6F"  # Chess Symbols
        "\U0001FA70-\U0001FAFF"  # Symbols and Pictographs Extended-A
        "\U00002702-\U000027B0"  # Dingbats
        "\U000024C2-\U0001F251"  # Enclosed characters
        "]+"
    )
    return regexp_replace(text, emoji_pattern, "")
remove_emojis_spark = udf(remove_emojis_spark_udf, StringType())

# Define the preprocessing pipeline for a single column
def preprocess_text_spark(df, input_column='Text', output_column='processed_text'):
    df_processed = df.withColumn(output_column, lower(df[input_column]))
    df_processed = df_processed.withColumn(output_column, regexp_replace(df_processed[output_column], "<.*?>", ""))
    df_processed = df_processed.withColumn(output_column, remove_urls_spark(df_processed[output_column]))
    df_processed = df_processed.withColumn(output_column, remove_emojis_spark(df_processed[output_column]))
    df_processed = df_processed.withColumn(output_column, replace_abbrev_udf(df_processed[output_column]))
    df_processed = df_processed.withColumn(output_column, remove_punct_udf(df_processed[output_column]))
    df_processed = df_processed.withColumn(output_column, remove_sw_udf(df_processed[output_column]))
    df_processed = df_processed.withColumn(output_column, trim(regexp_replace(df_processed[output_column], r"\s+", " ")))
    df_processed = df_processed.withColumn(output_column, lemmatize_udf(df_processed[output_column]))
    return df_processed







In [74]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import lower, regexp_replace, trim, udf
from pyspark.sql.types import StringType
from string import punctuation
from nltk.stem import WordNetLemmatizer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

# Khởi tạo SparkSession (nếu chưa có)
spark = SparkSession.builder.appName("TweetProcessing").getOrCreate()

# Broadcast variables (optional for local mode)
punctuation_broadcast = spark.sparkContext.broadcast(punctuation)
stopwords_broadcast = spark.sparkContext.broadcast(set(stopwords.words("english")))
lemmatizer_broadcast = spark.sparkContext.broadcast(WordNetLemmatizer())
chat_words_broadcast = spark.sparkContext.broadcast(chat_words) # Giả sử chat_words đã được định nghĩa

# Define regular Python functions for UDFs
def lower_func(x):
    return x.lower() if x else None

def remove_html_tags_func(x):
    return regexp_replace(x, "<.*?>", "") if x else None

def remove_urls_func(x):
    return regexp_replace(x, r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+", "") if x else None

def remove_punctuations_func(x):
    return x.translate(str.maketrans("", "", punctuation_broadcast.value)) if x else None

def apply_chat_word_map_func(text):
    if text:
        return " ".join([chat_words_broadcast.value.get(word.upper(), word) for word in text.split()])
    return None

def remove_stopwords_func(text):
    if text:
        words = text.split()
        filtered_words = [word for word in words if word not in stopwords_broadcast.value]
        return " ".join(filtered_words)
    return None

def remove_emojis_func(text):
    if text:
        emoji_pattern = (
            "[\U0001F600-\U0001F64F"  # Emoticons
            "\U0001F300-\U0001F5FF"  # Miscellaneous Symbols and Pictographs
            "\U0001F680-\U0001F6FF"  # Transport and Map Symbols
            "\U0001F700-\U0001F77F"  # Alchemical Symbols
            "\U0001F780-\U0001F7FF"  # Geometric Shapes Extended
            "\U0001F800-\U0001F8FF"  # Supplemental Arrows-C
            "\U0001F900-\U0001F99F"  # Supplemental Symbols and Pictographs
            "\U0001FA00-\U0001FA6F"  # Chess Symbols
            "\U0001FA70-\U0001FAFF"  # Symbols and Pictographs Extended-A
            "\U00002702-\U000027B0"  # Dingbats
            "\U000024C2-\U0001F251"  # Enclosed characters
            "]+"
        )
        return regexp_replace(text, emoji_pattern, "")
    return None

def remove_extra_whitespaces_func(x):
    return trim(regexp_replace(x, r"\s+", " ")) if x else None

def lemmatize_text_func(text):
    if text:
        lemmatizer = lemmatizer_broadcast.value
        words = word_tokenize(text)
        lemmatized_words = [lemmatizer.lemmatize(word) for word in words]
        return " ".join(lemmatized_words)
    return None

# Create UDFs from the regular Python functions
convert_to_lowercase_spark_udf = udf(lower_func, StringType())
remove_html_tags_spark_udf = udf(remove_html_tags_func, StringType())
remove_urls_spark_udf = udf(remove_urls_func, StringType())
remove_punctuations_spark_udf = udf(remove_punctuations_func, StringType())
apply_chat_word_map_spark_udf = udf(apply_chat_word_map_func, StringType())
remove_stopwords_spark_udf = udf(remove_stopwords_func, StringType())
remove_emojis_spark_udf = udf(remove_emojis_func, StringType())
remove_extra_whitespaces_spark_udf = udf(remove_extra_whitespaces_func, StringType())
lemmatize_text_spark_udf = udf(lemmatize_text_func, StringType())

def preprocess_pipeline_spark(df, input_column='Text', output_column='processed_text'):
    """
    Applies a series of text preprocessing steps to a specified column of a Spark DataFrame.

    Args:
        df: The input Spark DataFrame.
        input_column: The name of the column containing the text.
        output_column: The name of the new column to store the processed text.

    Returns:
        A new Spark DataFrame with the processed text in the specified output column.
    """
    df_processed = df.withColumn(output_column, convert_to_lowercase_spark_udf(df[input_column]))
    df_processed = df_processed.withColumn(output_column, remove_html_tags_spark_udf(df_processed[output_column]))
    df_processed = df_processed.withColumn(output_column, remove_urls_spark_udf(df_processed[output_column]))
    df_processed = df_processed.withColumn(output_column, remove_emojis_spark_udf(df_processed[output_column]))
    df_processed = df_processed.withColumn(output_column, apply_chat_word_map_spark_udf(df_processed[output_column]))
    df_processed = df_processed.withColumn(output_column, remove_punctuations_spark_udf(df_processed[output_column]))
    df_processed = df_processed.withColumn(output_column, remove_stopwords_spark_udf(df_processed[output_column]))
    df_processed = df_processed.withColumn(output_column, remove_extra_whitespaces_spark_udf(df_processed[output_column]))
    df_processed = df_processed.withColumn(output_column, lemmatize_text_spark_udf(df_processed[output_column]))
    return df_processed

# Example of how to use the pipeline:
processed_train_df = preprocess_pipeline_spark(data_train, input_column='Text', output_column='processed_text')
processed_test_df = preprocess_pipeline_spark(data_test, input_column='Text', output_column='processed_text')
processed_train_df.select("Text", "processed_text").show(5)
processed_test_df.select("Text", "processed_text").show(5)

25/05/07 18:37:01 ERROR Executor: Exception in task 0.0 in stage 110.0 (TID 118)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/tmp/ipykernel_124265/367021442.py", line 23, in remove_html_tags_func
  File "/home/thientran/.local/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 174, in wrapped
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/thientran/.local/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 9642, in regexp_replace
    pattern_col = _create_column_from_literal(pattern)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/thientran/.local/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/column.py", line 50, in _create_column_from_literal
    sc = get_active_spark_context()
         ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/thientran/.local/lib/python3.12/site-packages/pyspark/python/lib/py

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_124265/367021442.py", line 23, in remove_html_tags_func
  File "/home/thientran/.local/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 174, in wrapped
    return f(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^
  File "/home/thientran/.local/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 9642, in regexp_replace
    pattern_col = _create_column_from_literal(pattern)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/thientran/.local/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/column.py", line 50, in _create_column_from_literal
    sc = get_active_spark_context()
         ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/thientran/.local/lib/python3.12/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 248, in get_active_spark_context
    raise RuntimeError("SparkContext or SparkSession should be created first.")
RuntimeError: SparkContext or SparkSession should be created first.
