## 1. Install Libraries

In [None]:
"""
STILL NEED TO ADD VERSIONS (AFTER FILE IS CONFIRMED)
"""
%pip install spacy
%pip install pyarrow
%pip install textblob
%pip install textstat
!python -m spacy download en_core_web_sm

## 2. Import Libraries

In [None]:
# OS environment
import os

# Import SparkConf class into program
from pyspark import SparkConf

# Import SparkContext and SparkSession classes
from pyspark import SparkContext # Spark
from pyspark.sql import SparkSession # Spark SQL

# PySpark Data Operations
from pyspark.sql.functions import col, count, size, split, udf, pandas_udf
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

# Regex
import re

# Numeric operations
import numpy as np

# Define custom schema (data types) for PySpark Dataframes
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

# spaCy model for natural language processing
import spacy

# Pandas
import pandas as pd

# Pscholinguistics
from textblob import TextBlob

# Readability features
import textstat

## 3. Function and Classes

### 3.1. clean_text function

In [None]:
# Define text cleaning function
def clean_text(text):
    
    """
    Clean the input text string by removing unwanted elements while keeping useful punctuation.

    Steps performed:
    - Convert non-ASCII quotes/aprostrophes with ASCII equivalents
    - Remove URLs (e.g. http://..., www...)
    - Remove Twitter-style mentions (@username) and hashtags (#hashtag)
    - Remove HTML entities (e.g. &nbsp;)
    - Remove emojis and non-ASCII characters
    - Normalize whitespace (convert multiple spaces/tabs/newlines into a single space)
    - Trim leading and trailing spaces

    Args:
        text (str or None): The input text to clean.

    Returns:
        str: A cleaned version of the input text. If input is None, returns an empty string.
    """
    
    if text is None:
        return ""
    
    # Replace curly quotes/apostrophes with ASCII equivalents
    replacements = {
        '“': '"', '”': '"',
        '‘': "'", '’': "'"
    }
    for curly, straight in replacements.items():
        text = text.replace(curly, straight)
    
    # Remove URLs
    text = re.sub(r'http\S+|www\.\S+', '', text)
    
    # Remove mentions and hashtags
    text = re.sub(r'@\w+|#\w+', '', text)
    
    # Remove HTMLs
    text = re.sub(r'&\w+;', '', text)
    
    # Remove emojis and other non-ASCII symbols
    text = re.sub(r'[^\x00-\x7F]+', '', text)
    
    # Normalize whitespace
    text = re.sub(r'\s+', ' ', text)
    
    return text.strip()

### 3.2. QuantityFeatures Class

In [None]:
from pyspark.sql import functions as F

class FeaturesSpark:
    """
    Features that can be computed efficiently using PySpark.
    """
    def __init__(self, text_col="cleaned_text"):
        self.text_col = text_col

    def transform(self, df):
        txt = F.coalesce(F.col(self.text_col), F.lit(""))

        # Character count
        df = df.withColumn("num_characters", F.length(txt))

        # Capital letters
        df = df.withColumn("num_capital_letters", F.length(F.regexp_replace(txt, r"[^A-Z]", "")))

        # Word count
        df = df.withColumn("num_words", F.size(F.split(txt, r"\s+")))

        # Sentence count
        df = df.withColumn("num_sentences", F.size(F.split(txt, r"[.!?]+")))

        # Words per sentence
        df = df.withColumn("words_per_sentence", 
                           F.when(F.col("num_sentences") > 0, F.col("num_words") / F.col("num_sentences"))
                            .otherwise(F.lit(0)))

        # Short sentences (<10 words)
        df = df.withColumn("num_short_sentences", 
                           F.size(F.expr(f"filter(split({self.text_col}, '[.!?]+'), x -> size(split(x, ' ')) < 10)")))

        # Long sentences (>=20 words)
        df = df.withColumn("num_long_sentences", 
                           F.size(F.expr(f"filter(split({self.text_col}, '[.!?]+'), x -> size(split(x, ' ')) >= 20)")))

        return df


### 3.3. WritingPatternsFeatures Class

In [None]:
import pandas as pd
import spacy
from pyspark.sql import SparkSession

class NLPFeaturesSpaCy:
    """
    Features that require spaCy NLP: POS counts and syllables.
    """
    def __init__(self, text_col="cleaned_text", model="en_core_web_sm", batch_size=50):
        self.text_col = text_col
        self.nlp = spacy.load(model, disable=["ner", "parser"])  # only need POS
        self.batch_size = batch_size

    @staticmethod
    def count_syllables(word):
        vowels = "aeiouy"
        word = word.lower().strip()
        count = 0
        prev_char_was_vowel = False
        for ch in word:
            if ch in vowels:
                if not prev_char_was_vowel:
                    count += 1
                prev_char_was_vowel = True
            else:
                prev_char_was_vowel = False
        if word.endswith("e"):
            count = max(1, count - 1)
        return max(1, count)

    def extract_row_features(self, text):
        doc = self.nlp(text)
        num_adjectives = sum(1 for t in doc if t.pos_ == "ADJ")
        num_adverbs = sum(1 for t in doc if t.pos_ == "ADV")
        num_verbs = sum(1 for t in doc if t.pos_ == "VERB")
        num_determiners = sum(1 for t in doc if t.pos_ == "DET")
        words = [t.text for t in doc if t.is_alpha]
        num_syllables = sum(self.count_syllables(w) for w in words)
        return pd.Series({
            "num_adjectives": num_adjectives,
            "num_adverbs": num_adverbs,
            "num_verbs": num_verbs,
            "num_determiners": num_determiners,
            "num_syllables": num_syllables
        })

    def transform(self, spark_df):
        # Convert to pandas
        pdf = spark_df.select(self.text_col).toPandas()
        features_pdf = pdf[self.text_col].apply(self.extract_row_features)
        result_pdf = pd.concat([pdf, features_pdf], axis=1)

        # Convert back to Spark
        spark = SparkSession.builder.getOrCreate()
        return spark.createDataFrame(result_pdf)


### 3.4. ReadabilityIndices Class

In [None]:
class ReadabilityIndices:
    @staticmethod
    def extract_features(df, text_col):
        schema = StructType([
            StructField("gunning_fog", FloatType()),
            StructField("smog", FloatType()),
            StructField("ari", FloatType())
        ])

        @pandas_udf(schema)
        def udf_readability(text_series: pd.Series) -> pd.DataFrame:
            rows = []
            for text in text_series.fillna(""):
                try:
                    gf = textstat.gunning_fog(text)
                    smog = textstat.smog_index(text)
                    ari = textstat.automated_readability_index(text)
                except:
                    gf, smog, ari = None, None, None
                rows.append({"gunning_fog": gf, "smog": smog, "ari": ari})
            return pd.DataFrame(rows)

        df = df.withColumn("features_struct", udf_readability(F.col(text_col)))
        for field in schema.fieldNames():
            df = df.withColumn(field, F.col(f"features_struct.{field}"))
        return df.drop("features_struct")


### 3.5. Psycholinguistics Class

In [None]:
class Psycholinguistics:
    @staticmethod
    def extract_features(df, text_col, title_col=None):
        schema = StructType([
            StructField("polarity", FloatType()),
            StructField("subjectivity", FloatType()),
            StructField("title_similarity", FloatType())
        ])

        use_title = title_col is not None

        @pandas_udf(schema)
        def udf_psycho(pdf: pd.DataFrame) -> pd.DataFrame:
            rows = []
            texts = pdf[text_col].fillna("").tolist()
            titles = pdf[title_col].fillna("").tolist() if use_title else [None] * len(pdf)
            for text, title in zip(texts, titles):
                try:
                    blob = TextBlob(text)
                    polarity = blob.sentiment.polarity
                    subjectivity = blob.sentiment.subjectivity
                    title_similarity = 0
                    if title:
                        text_words = set(text.lower().split())
                        title_words = set(title.lower().split())
                        if text_words and title_words:
                            title_similarity = len(text_words & title_words) / len(text_words | title_words)
                except:
                    polarity, subjectivity, title_similarity = None, None, None
                rows.append({
                    "polarity": polarity,
                    "subjectivity": subjectivity,
                    "title_similarity": title_similarity
                })
            return pd.DataFrame(rows)

        select_cols = [text_col] + ([title_col] if use_title else [])
        df = df.withColumn("features_struct", udf_psycho(F.struct(*[F.col(c) for c in select_cols])))
        for field in schema.fieldNames():
            df = df.withColumn(field, F.col(f"features_struct.{field}"))
        return df.drop("features_struct")


## 4. Configure Spark Environment
Using the code snippets from tutorial 1 and 2, set up the Spark environment and configure the Spark Application using SparkConf.

In [None]:
spark_home = os.environ.get("SPARK_HOME")

if spark_home:
    print(f"SPARK_HOME: {spark_home}")
else:
    print("SPARK_HOME environement variable is not set.")

os.environ["SPARK_HOME"]= "/usr/local/lib/python3.10/dist-packages/pyspark"

print (f"SPARK_HOME is now set to: {os.environ.get('SPARK_HOME')}")

In [None]:
# local[*]: run Spark in local mode with as many working processors as logical cores on your machine
# If we want Spark to run locally with 'k' worker threads, we can specify as "local[k]".
master = "local[*]"
# The `appName` field is a name to be shown on the Spark cluster UI page
app_name = "WELFake Exploratory Data Anlaysis (EDA)"
# Setup configuration parameters for Spark
spark_conf = SparkConf().setMaster(master).setAppName(app_name)

# Setup SparkSession
spark = SparkSession.builder.config(conf=spark_conf).getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('ERROR')

## 5. Load datasets


In [None]:
# Load dataset into Spark dataframe
welfake_df = spark.read.csv(
    "data/WELFake_Dataset.csv",
    header=True,
    inferSchema=True,
    quote='"', 
    multiLine=True, #multilines in text and title data
    escape='"'
)

# Display sample rows
welfake_df.show(3)

In [None]:
# Rename first column as index
welfake_df = welfake_df.withColumnRenamed("_c0", "index")

# Show dataframe dimensions
num_rows = welfake_df.count()
num_cols = len(welfake_df.columns)

print(f"Rows: {num_rows}")
print(f"Columns: {num_cols}")

#Print the Schema
welfake_df.printSchema()

## 6. Remove duplicate


In [None]:
# Count original dataset rows
original_count = welfake_df.count()

# Remove duplicate news articles
welfake_df_dedup = welfake_df.dropDuplicates(["title", "text"])

deduped_count = welfake_df_dedup.count()
duplicates_removed = original_count - deduped_count

print(f"Original rows: {original_count}")
print(f"Duplicates removed: {duplicates_removed}")
print(f"After dataset size: {deduped_count} rows")

## 7. Clean title and article texts

In [None]:
# Register udf to pyspark
clean_text_udf = udf(clean_text, StringType())

In [None]:
# Apply cleaning to title and text
welfake_df_clean = welfake_df_dedup.withColumn("cleaned_title", clean_text_udf("title")) \
                       .withColumn("cleaned_text", clean_text_udf("text"))

# Preview results
welfake_df_clean.select("title", "cleaned_title", "text", "cleaned_text").show(5, truncate=80)

## 8. Remove null and empty string values

In [None]:
# Remove null or empty string values
welfake_df_processed = welfake_df_clean.filter(
    (col("cleaned_text").isNotNull()) & 
    (col("cleaned_text") != "") &
    (col("cleaned_title").isNotNull()) & 
    (col("cleaned_title") != "") &
    (col("label").isNotNull()) 
)

# Count the number of rows with empty values removed
clean_count = welfake_df_clean.count()
processed_count = welfake_df_processed.count()
removed_empty = clean_count - processed_count

print(f"Removed empty text rows: {removed_empty}")
print(f"After dataset size: {processed_count} rows")

## 9. Remove outlier based on text word count

### 9.1. Calculate article text word count

In [None]:
# Calculate text word count
welfake_df_wc = welfake_df_processed.withColumn("text_wc", size(split(col("cleaned_text"), "\\s+")))

welfake_df_wc.select("cleaned_text", "text_wc").show(3)

### 9.2. Remove outlier based on percentile values

In [None]:
# Calculate key percentiles for text word count
percentiles_upper_tail = [0.96, 0.97, 0.98, 0.99]
percentiles_lower_tail = [0.01, 0.02, 0.03, 0.04]

# Compute percentiles
upper_tail_quantiles = welfake_df_wc.approxQuantile("text_wc", percentiles_upper_tail, 0.01)
lower_tail_quantiles = welfake_df_wc.approxQuantile("text_wc", percentiles_lower_tail, 0.01)

# Show quantile values for analysis
print(f"Upper tail (96% to 99%): {upper_tail_quantiles}")
print(f"Lower tail (1% to 4%): {lower_tail_quantiles}")

In [None]:
# Calculate 2nd and 98th percentiles
lower_bound, upper_bound = welfake_df_wc.approxQuantile("text_wc", [0.02, 0.98], 0.01)

print(f"Filter out text_wc < {lower_bound} or > {upper_bound}\n")

# Filter out values below the 2nd and above the 98th percentiles
welfake_df_filtered = welfake_df_wc.filter(
    (F.col("text_wc") > lower_bound) & (F.col("text_wc") < upper_bound)
)

# Count the number of rows with empty values removed
outlier_count = welfake_df_filtered.count()
removed_outlier = processed_count - outlier_count

print(f"Removed outlier text rows: {removed_outlier}")
print(f"After dataset size: {outlier_count} rows")

## 10. Feature Engineering

In [None]:
# 1️⃣ Compute fast PySpark features
fast_feat = FeaturesSpark(text_col="cleaned_text")
df_fast = fast_feat.transform(welfake_df_filtered)


# 3️⃣ Preview
df_fast.show(5)

In [None]:
import textstat
import pandas as pd
from pyspark.sql import SparkSession

# Assume df is your Spark DataFrame with 'cleaned_text'
pdf = welfake_df_filtered.select("cleaned_text").toPandas()

# Compute readability features
pdf["flesch_reading_ease"] = pdf["cleaned_text"].apply(lambda x: textstat.flesch_reading_ease(x))
pdf["flesch_kincaid_grade"] = pdf["cleaned_text"].apply(lambda x: textstat.flesch_kincaid_grade(x))
pdf["smog_index"] = pdf["cleaned_text"].apply(lambda x: textstat.smog_index(x))

# Convert back to Spark
df_readability = spark.createDataFrame(pdf)

# Join with original Spark DF if needed
df_final = df.join(df_readability, on="cleaned_text", how="left")
