In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_replace, lower, trim, udf
from pyspark.sql.types import *
import nltk
from nltk.stem import WordNetLemmatizer

# Additional imports for augmentation
import pandas as pd
import random

def lemmatize(data_str):
    """Lemmatize text using NLTK WordNetLemmatizer with POS tagging."""
    if not data_str or not isinstance(data_str, str):
        return ''
    try:
        lmtzr = WordNetLemmatizer()
        words = data_str.split()
        if not words:
            return ''
        tagged_words = nltk.pos_tag(words)
        lemmatized_words = []
        for word, tag in tagged_words:
            if tag.startswith('V'):
                lemma = lmtzr.lemmatize(word, pos='v')
            elif tag.startswith('J'):
                lemma = lmtzr.lemmatize(word, pos='a')
            elif tag.startswith('R'):
                lemma = lmtzr.lemmatize(word, pos='r')
            else:
                lemma = lmtzr.lemmatize(word, pos='n')
            lemmatized_words.append(lemma)
        return ' '.join(lemmatized_words)
    except Exception as e:
        print(f"Warning: Lemmatization failed: {e}")
        return data_str

def clean_tweets(df, text_column="Phrase"):
    """Clean Twitter text data for sentiment analysis."""
    cleaned_df = df.withColumn(text_column, col(text_column).cast(StringType()))
    # Remove URLs
    cleaned_df = cleaned_df.withColumn(
        text_column, regexp_replace(col(text_column), r'http\S+|www\.\S+|https?://\S+|t\.co/\S+', '')
    )
    # Remove user mentions
    cleaned_df = cleaned_df.withColumn(
        text_column, regexp_replace(col(text_column), r'@\w+', '')
    )
    # Remove HTML tags/entities (e.g., <unk>)
    cleaned_df = cleaned_df.withColumn(
        text_column, regexp_replace(col(text_column), r'<[^>]+>', '')
    )
    # Remove special unicode sequences
    cleaned_df = cleaned_df.withColumn(
        text_column, regexp_replace(col(text_column), r'â\S+|â\S+|â\S+', '')
    )
    # Normalize whitespace and newlines
    cleaned_df = cleaned_df.withColumn(
        text_column, regexp_replace(col(text_column), r'\s+', ' ')
    )
    # Lowercase and trim
    cleaned_df = cleaned_df.withColumn(text_column, lower(col(text_column)))
    cleaned_df = cleaned_df.withColumn(text_column, trim(col(text_column)))
    # Remove empty or null rows
    cleaned_df = cleaned_df.filter(col(text_column) != '').filter(col(text_column).isNotNull())
    # Lemmatize text
    lem_udf = udf(lemmatize, StringType())
    cleaned_df = cleaned_df.withColumn(text_column, lem_udf(col(text_column)))
    return cleaned_df

def load_and_preprocess_data(file_path, text_column="Phrase", sentiment_column="Sentiment", augment=False):
    """
    Load CSV data, clean it, and optionally augment it for sentiment analysis.
    """

    ss = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()
    df = ss.read.csv(file_path, header=True, inferSchema=True, quote='"', escape='"')
    cleaned_df = clean_tweets(df, text_column)
    cleaned_df = cleaned_df.withColumn(sentiment_column, col(sentiment_column).cast(IntegerType()))
 
    return cleaned_df

# Example usage:
train_df = load_and_preprocess_data("../Twitter_data/traindata7.csv", augment=True) 
test_df  = load_and_preprocess_data("../Twitter_data/testdata7.csv", augment=False)
train_df.show(10)
test_df.show(10)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/21 16:03:50 WARN Utils: Your hostname, mustard7385-System-Product-Name, resolves to a loopback address: 127.0.1.1; using 192.168.0.110 instead (on interface eno1)
25/11/21 16:03:50 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/21 16:03:50 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+--------------------+---------+
|              Phrase|Sentiment|
+--------------------+---------+
|wishin i could go...|        0|
|@ verizon i'm hav...|        0|
|please don't beli...|        0|
|please sort out a...|        0|
|feature fix the e...|        0|
|disrespectful. an...|        0|
|i think the game ...|        0|
|fuck hell, you th...|        0|
|@ jukinmedia yout...|        0|
|omgggg guy very b...|        0|
+--------------------+---------+
only showing top 10 rows


[Stage 5:>                                                          (0 + 1) / 1]

+--------------------+---------+
|              Phrase|Sentiment|
+--------------------+---------+
|cold war black op...|        0|
|so add a fucking ...|        0|
|this be the bad @...|        0|
|'s liberal regres...|        0|
|so when i try to ...|        0|
|my first run a an...|        0|
|i'm still not buy...|        0|
|fuck verizon. the...|        0|
|news: pubg mobile...|        0|
|4 hey rhandlerr r...|        0|
+--------------------+---------+
only showing top 10 rows


                                                                                

In [2]:
# Total rows in each DataFrame
n_train = train_df.count()
n_test  = test_df.count()

print(f"Training set rows : {n_train:,}")
print(f"Test set rows     : {n_test:,}")

Training set rows : 596
Test set rows     : 397


In [3]:
# 1. Collapse to a sensible number of partitions (optional, but reduces shuffle)
train_small = train_df.coalesce(1)
test_small  = test_df.coalesce(1)

# 2. Collect to the driver as Pandas
train_pd = train_small.toPandas()
test_pd  = test_small.toPandas()

                                                                                

In [4]:
from pathlib import Path
import pandas as pd

# ------------------------------------------------------------------ #
# helper
# ------------------------------------------------------------------ #
def export_split(
    df: pd.DataFrame,
    split: str,                 # "train" or "test"
    base_dir: str | Path = "./data",
    text_col: str = "Phrase",
    label_col: str = "Sentiment",
):
    """
    Write a Pandas DataFrame to:
      • <base_dir>/<split>.csv      (header: Phrase,Sentiment)
      • <base_dir>/<split>/<label>/<row_id>.txt
    """
    base_dir = Path(base_dir).resolve()
    base_dir.mkdir(parents=True, exist_ok=True)

    csv_path = base_dir / f"{split}.csv"
    df[[text_col, label_col]].to_csv(csv_path, index=False)

    # 2️individual .txt files ----------------------------------------------
    for row_id, (phrase, label) in df[[text_col, label_col]].iterrows():
        tgt_dir  = base_dir / split / str(label)
        tgt_dir.mkdir(parents=True, exist_ok=True)

        # use the DataFrame’s integer index as <row_id>; change if you prefer
        txt_path = tgt_dir / f"{row_id}.txt"
        txt_path.write_text(str(phrase), encoding="utf-8")

# ------------------------------------------------------------------ #
# call the helper for train / test
# ------------------------------------------------------------------ #
export_split(train_pd, "train")   # writes train.csv and train/<sent>/…
export_split(test_pd,  "test")    # writes test.csv  and test/<sent>/…