# Part 2: Fake News with Spark MLib 

* My question: How can machine learning detect fake news from social media posts?
* Dataset: LIAR dataset, Fake News Kaggle dataset, or Twitter fake news dataset
* Pipeline Components:
    * Text Preprocessing: Tokenization, Stopword Removal, TF-IDF embeddings (Transformer)
    * Feature Engineering: Extract metadata features (source credibility, number of retweets, etc.) (Transformer)
    * Fake News Classifier: Train a machine learning model (Logistic Regression, Random Forest, or Neural Networks) (Estimator)
    * Evaluation: Precision, recall, and AUC-ROC for classification performance

## Import packages and libraries

In [1]:

# import 
import re
import hashlib
from datetime import datetime
import numpy as np
import string
from collections import Counter
import gc

# Parse the data
from nltk.tokenize import sent_tokenize, word_tokenize
from nltk.corpus import stopwords
from nltk.stem.wordnet import WordNetLemmatizer
import string

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.clustering import LDA
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, count, round, lit, explode, col, trim
from pyspark.sql.functions import col, sum as spark_sum, isnan, when
from pyspark.sql.functions import col, when, count, lit, round as spark_round
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF
import builtins

## Import dataset

This project uses a [Fake News Detection Dataset](https://www.kaggle.com/datasets/emineyetm/fake-news-detection-datasets) from Kaggle, which consists of two CSV files:  
- `True.csv`: Contains real news articles sourced from [Reuters](https://www.reuters.com/).
- `Fake.csv`: Contains fake news articles collected from unreliable sources flagged by **PolitiFact** and **Wikipedia**.

Each article includes:
- `title`: Headline of the article  
- `text`: Main body of the article  
- `subject`: General topic/category (e.g., politics, world)  
- `date`: Publication date  

The dataset covers primarily political and world news stories, with articles mainly from 2016–2017. 


### Import 
Note: You need to upload `True.csv` and `False.csv` to the GCS bucket we created at the beginning: [st445-w09-ad](https://console.cloud.google.com/storage/browser/st446-w09-ad;tab=objects?forceOnBucketsSortingFiltering=true&cloudshell=true&inv=1&invt=Abtldw&project=st446-wt2025-id&prefix=&forceOnObjectsSortingFiltering=false)

In [3]:
# Define the schema
schema = StructType([
    StructField("title", StringType(), True),
    StructField("text", StringType(), True),
    StructField("subject", StringType(), True),
    StructField("date", StringType(), True)
])

# read in the 2 datasets 
fake_df = spark.read.csv("gs://st446-w09-ad/Fake.csv", header=True, schema=schema)
true_df = spark.read.csv("gs://st446-w09-ad/True.csv", header=True, schema=schema)

### Quick cleaning and summary statistics

#### Append fake data to true data and create a binary indictor `fake`, which =1 if the row comes from `Fake.csv`, and =0 otherwise.

In [4]:
# Add label column: 1 for fake, 0 for true
fake_df = fake_df.withColumn("fake", F.lit(1))
true_df = true_df.withColumn("fake", F.lit(0))

# Combine the datasets
news_df = true_df.unionByName(fake_df)

# Add id col, this will be useful later in ML pipeline
from pyspark.sql.functions import monotonically_increasing_id

news_df = news_df.withColumn("id", monotonically_increasing_id())

# Preview combined data
news_df.show(5)

                                                                                

+--------------------+--------------------+------------+------------------+----+---+
|               title|                text|     subject|              date|fake| id|
+--------------------+--------------------+------------+------------------+----+---+
|As U.S. budget fi...|WASHINGTON (Reute...|politicsNews|December 31, 2017 |   0|  0|
|U.S. military to ...|WASHINGTON (Reute...|politicsNews|December 29, 2017 |   0|  1|
|Senior U.S. Repub...|WASHINGTON (Reute...|politicsNews|December 31, 2017 |   0|  2|
|FBI Russia probe ...|WASHINGTON (Reute...|politicsNews|December 30, 2017 |   0|  3|
|Trump wants Posta...|SEATTLE/WASHINGTO...|politicsNews|December 29, 2017 |   0|  4|
+--------------------+--------------------+------------+------------------+----+---+
only showing top 5 rows



**Let's unpersist the initial dataframes so we can preserve memory for later operations**

In [5]:
# unpersist dfs and clear cache
fake_df.unpersist(blocking=True)
true_df.unpersist(blocking=True)
spark.catalog.clearCache()

# garbage collection
del fake_df
del true_df

gc.collect()

149

#### Let's create a breakdown of fake vs. real news articles in `news_df`. 
After loading and labeling the data (e.g., assigning `fake = 1` for fake news and `fake = 0` for real news), we compute the total number of observations and calculate the percentage of each label. About 48% of our data include real news, and 52% of the data are fake news.

In [6]:
# cache news_df because it's large and will use it later
news_df.cache()

# count total number of rows/materialize cache
total_count = news_df.count()

# group by label and count
breakdown_df = news_df.groupBy("fake").agg(
    count("*").alias("count")
).withColumn(
    "percent", round((col("count") / total_count) * 100, 2)
)

breakdown_df.show()



+----+-----+-------+
|fake|count|percent|
+----+-----+-------+
|   0|21417|  47.69|
|   1|23489|  52.31|
+----+-----+-------+




                                                                                

**Analysis of missing data**

Our model will rely heavily on article text, so it's important to understand the extent of missing data for the column. We will do the following: 

1. **Create a missingness indicator**:
   A new column `is_missing` is added, marking rows as `True` if the `text` field is either null or an empty string (after trimming whitespace).

2. **Summarize missing data by label**:
   The data is grouped by `is_missing`, and counts of real (`fake = 0`) and fake (`fake = 1`) articles are computed. Percentages within each group are also calculated to assess whether missing data is more common in either class.

3. **Compute overall missingness**:
   The percentage of all rows that have missing `text` is calculated and printed.

4. **Drop missing entries**:
   Rows where `is_missing` is `True` are dropped from the dataset, and the number of dropped rows is reported.

We find that there are 639 observations where information is missing in `text`. 99.9% of the missing observations come from our fake news dataset. Given that only about 1.4% of our data is missing `text` info, we will just drop them so we don't run into issues later tokenizing/filtering. 

In [7]:
# Create is_missing column based on null or empty text
news_df = news_df.withColumn("is_missing", (col("text").isNull()) | (trim(col("text")) == ""))

# Aggregate real/fake counts by is_missing
grouped = news_df.groupBy("is_missing") \
    .agg(
        count(when(col("fake") == 0, True)).alias("real_count"),
        count(when(col("fake") == 1, True)).alias("fake_count"),
        count("*").alias("total")
    ) \
    .withColumn("percent_real", spark_round((col("real_count") / col("total")) * 100, 2)) \
    .withColumn("percent_fake", spark_round((col("fake_count") / col("total")) * 100, 2)) \
    .orderBy("is_missing")

# Show summary
grouped.show(truncate=False)

# Count total and missing rows
total_rows = news_df.count()
missing_rows = news_df.filter(col("is_missing") == 1).count()

# Use built-in round to avoid PySpark conflict
missing_pct = builtins.round((missing_rows / total_rows) * 100, 2)

print(f"Missing text in {missing_rows} out of {total_rows} rows ({missing_pct}%)")

# Drop rows with missing text
print(f"Dropping {missing_rows} rows...")
news_df = news_df.filter(col("is_missing") == 0).drop("is_missing")

                                                                                

+----------+----------+----------+-----+------------+------------+
|is_missing|real_count|fake_count|total|percent_real|percent_fake|
+----------+----------+----------+-----+------------+------------+
|false     |21416     |22851     |44267|48.38       |51.62       |
|true      |1         |638       |639  |0.16        |99.84       |
+----------+----------+----------+-----+------------+------------+

Missing text in 639 out of 44906 rows (1.42%)
Dropping 639 rows...


### Pipeline Part 1: Text preprocessing

#### Tokenization

This block performs text preprocessing on the `text` column of the dataset. It includes the following steps:

1. **Custom Stopword List**  
   A custom list of stopwords is created by extending the default English stopwords provided by PySpark's `StopWordsRemover`. Additional terms are added to filter out vague adjectives, common filler verbs, source references (e.g., "reuters", "getty"), and other frequent but uninformative tokens found in news articles.

2. **Tokenization**  
   The `RegexTokenizer` splits each article's `text` column into individual word tokens. It splits on any non-word character (`\\W`), converts all tokens to lowercase, and filters out short tokens (with fewer than 2 characters).

3. **Stopword Removal**  
   The `StopWordsRemover` removes all tokens present in the custom stopword list. The result is a new column called `filtered` containing the cleaned list of tokens for each article.

4. **Token Frequency Analysis**  
   - The tokenized and filtered tokens are exploded so that each token appears in its own row.
   - A count is computed for each unique token across the dataset.
   - The resulting `token_counts` DataFrame is cached and materialized to improve performance for any subsequent operations.
   - The top 50 most frequent tokens (after filtering) are displayed to understand common vocabulary patterns.

This process prepares the data for downstream text feature extraction methods, such as TF-IDF vectorization.

In [8]:
from pyspark.ml.feature import RegexTokenizer
from pyspark.sql.functions import size

# extend default English stopwords with my custom additions
custom_stopwords = StopWordsRemover.loadDefaultStopWords("english") + [
    "",  # empty string
    "new", "one", "two", "first", "last", "many",  # vague adjectives/numbers
    "reuters", "(reuters)",  # source tags
    "even", "may", "made", "make", "since","like",  # filler or common verbs
    "still", "go", "get", "take", "know", "see", # high freq modifiers 
    "said", "said.", "say", "saying", "says", "said,",  # overused reporting verb
    "-", "featured", "getty", "image", "flickr", "subscribe", "filessupport",
    "youtube", "images", "read", "via"
]

tokenizer = RegexTokenizer(
    inputCol="text",
    outputCol="tokens",
    pattern="\\W",  # split on anything not a word char
    minTokenLength=2,  # drop tiny tokens like "a", "i", "—"
    toLowercase=True
)
remover = StopWordsRemover(inputCol="tokens", 
                           outputCol="filtered", 
                           stopWords=custom_stopwords)

# Apply transformations
tokenized_df = tokenizer.transform(news_df)
filtered_df = remover.transform(tokenized_df)

# Keep needed columns for next steps
filtered_df = filtered_df.select("id", "title", "text", "fake", "filtered")

In [9]:
# Explode the list of tokens into individual rows
tokens_df = filtered_df.select(explode(col("filtered")).alias("token"))

# Group by token and count
token_counts = tokens_df.groupBy("token").count().orderBy(col("count").desc())
token_counts.cache()
token_counts.count()  # materialize the cache

# Show top N tokens
token_counts.show(50, truncate=False)



+--------------+------+
|token         |count |
+--------------+------+
|trump         |131192|
|president     |54989 |
|people        |41079 |
|state         |34177 |
|also          |30891 |
|clinton       |28154 |
|government    |27840 |
|obama         |27688 |
|donald        |27590 |
|house         |27396 |
|states        |26614 |
|republican    |25340 |
|year          |24732 |
|united        |23394 |
|told          |23099 |
|white         |22322 |
|campaign      |21408 |
|election      |20962 |
|time          |20861 |
|party         |20454 |
|news          |20178 |
|washington    |18534 |
|country       |17861 |
|former        |17699 |
|us            |16764 |
|hillary       |16470 |
|years         |16462 |
|re            |16102 |
|american      |15938 |
|media         |15828 |
|security      |15823 |
|law           |15589 |
|national      |15286 |
|political     |15191 |
|police        |14910 |
|court         |14725 |
|right         |14529 |
|percent       |14341 |
|according     |


                                                                                

#### TF-IDF embeddings (Transformer)

**Before we start, I'm going to clear unused DataFrames and caches to reduce the likelihood of Spark JVM or memory issues**

We'll keep `news_df` and also `filtered_df`, which includes the final preprocessed text used for vectorization.

In [10]:
for df_name in [
    "tokenized_df", "tokens_df", 
    "token_counts", "breakdown_df"
]:
    if df_name in globals():
        try:
            globals()[df_name].unpersist(blocking=True)
        except:
            pass  # not a Spark DF or not cached
        del globals()[df_name]

# Also clear Spark's catalog and Python memory
spark.catalog.clearCache()

gc.collect()

# cache filtered_df
filtered_df = filtered_df.cache()
filtered_df.count()  # force materialization

                                                                                

44267

In [11]:
import time

# Wait 30 seconds to let Spark settle
print("Pausing to let Spark free up memory...")

time.sleep(30)
print("Done waiting!")

Pausing to let Spark free up memory...
Done waiting!


Here, we transform the cleaned token data into numerical feature vectors using term frequency and TF-IDF methods:

1. **CountVectorizer (Term Frequency)**  
   - A `CountVectorizer` is initialized to convert the list of tokens in the `filtered` column into sparse term frequency vectors.
   - `vocabSize=5000` limits the vocabulary to the top 5,000 most frequent tokens across the dataset. I started off with `vocabSize=10000`, but ran into memory constraints, so I was required to lower it.
   - The output is stored in a new column called `raw_features`, which represents the raw count of each token per article.

2. **Model Fitting**  
   The vectorizer is fit to the `filtered_df` DataFrame, learning the vocabulary based on token frequency.

3. **Transformation**  
   The trained vectorizer model is used to transform the tokenized text into term frequency vectors. The resulting `vectorized_df` includes the original metadata along with the `raw_features` column.

4. **Caching and Materialization**  
   The resulting DataFrame is cached to optimize performance for future transformations. The `.count()` call forces materialization, ensuring that the computation is executed immediately and stored in memory.


In [12]:
# vectorizer 
vectorizer = CountVectorizer(inputCol="filtered", 
                             outputCol="raw_features", 
                             vocabSize=5000)

# vectorizer models
vectorizer_model = vectorizer.fit(filtered_df)
vectorized_df = vectorizer_model.transform(filtered_df).select("id", "title", "text", "fake", "raw_features").cache()
vectorized_df.count()

# unpersist filtered df
filtered_df.unpersist(blocking=True)
del filtered_df
spark.catalog.clearCache()

                                                                                

In [13]:
idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(vectorized_df)
idf_df = idf_model.transform(vectorized_df).select("id", "title", "text", "fake", "features").cache()

print("Number of observations in idf_df = ", idf_df.count())
print("Number of null features in idf_df = ", idf_df.filter(F.col("features").isNull()).count())

# drop vectorized df
vectorized_df.unpersist(blocking=True)
del vectorized_df

                                                                                

Number of observations in idf_df =  44267
Number of null features in idf_df =  0


In [42]:
spark.catalog.clearCache()
gc.collect()

1779

This block completes the TF-IDF transformation by applying the inverse document frequency (IDF) weighting to the term frequency vectors generated earlier. The final `features` column serves as the primary input for machine learning models, capturing the importance of each token within and across documents.

1. **Initialize IDF Transformer**  
   An `IDF` transformer is created to scale down the impact of commonly occurring tokens. It takes `raw_features` (term frequency vectors) as input and outputs weighted TF-IDF vectors in a new column named `features`.

2. **Model Fitting**  
   The IDF model is fit to the `vectorized_df` DataFrame. This computes the IDF weights based on how many documents each term appears in.

3. **Transformation**  
   The fitted model is used to transform `raw_features` into final TF-IDF vectors. The resulting DataFrame, `idf_df`, includes the TF-IDF feature column `features` alongside the article metadata and label.

4. **Caching and Materialization**  
   The transformed DataFrame is cached for efficient reuse and materialized using `.count()` to ensure the computation is executed immediately.

In [None]:
# preview a few TF-IDF rows 
idf_df.select("features").show(5)

### Pipeline Part 2: Feature Engineering

#### Extract metadata features (source credibility, number of retweets, etc.) (Transformer)

Here, we'll extract several metadata-based features that may help the model distinguish between real and fake news.

#### Stylometric & Text-Length Features

   - `text_length`: Character length of the article body.
   - `title_length`: Character length of the article title.
   - `exclam_count`: Number of exclamation marks in the title.
   - `all_caps_count`: Number of fully capitalized words in the title.

In [15]:
from pyspark.sql.functions import length, regexp_replace, col, when
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
import re

# text and title lengths
news_df = news_df.withColumn("text_length", length(col("text")))
news_df = news_df.withColumn("title_length", length(col("title")))

# exclamation point feature
news_df = news_df.withColumn(
    "exclam_count", 
    length(col("title")) - length(regexp_replace(col("title"), "!", ""))
)

def count_all_caps_words(title):
    if title:
        return len([word for word in title.split() if re.fullmatch(r"[A-Z]{2,}", word)])
    return 0

all_caps_udf = udf(count_all_caps_words, IntegerType())

news_df = news_df.withColumn("all_caps_count", all_caps_udf(col("title")))


#### Date Features

- `year`, `month`, and `day` extracted from the publication date.
- `is_2016`: Binary indicator for whether the article was published during the 2016 election year.
- `is_missing_date`: Binary indicator for whether the date could not be parsed.

   Missing date values are imputed using a placeholder (`-1`), and a separate indicator is added to allow the model to learn from missingness itself.

In [16]:
from pyspark.sql.functions import (
    trim, col, to_date, when, year, month, dayofmonth, regexp_extract
)

# before we start parsing dates, set legacy datetime parser policy 
# so Spark doesn't yell at us 
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

# clean the date string
news_df = news_df.withColumn("date_clean", trim(col("date")))

# try to parse two known formats
news_df = news_df.withColumn(
    "parsed_date",
    to_date("date_clean", "MMMM d, yyyy")
)

news_df = news_df.withColumn(
    "parsed_date",
    when(col("parsed_date").isNull(), to_date("date_clean", "d-MMM-yy"))
    .otherwise(col("parsed_date"))
)

# extract date components ( will just be null if no parsed_date)
news_df = news_df.withColumn("year", year("parsed_date"))
news_df = news_df.withColumn("month", month("parsed_date"))
news_df = news_df.withColumn("day", dayofmonth("parsed_date"))

# create Binary indicator: was this published in 2016?
# indicative of 2016 election year
news_df = news_df.withColumn("is_2016", (col("year") == 2016).cast("int"))

# drop intermediate columns
news_df = news_df.drop("date_clean", "parsed_date")
news_df.select("date", "year", "month", "day").show(10)

+------------------+----+-----+---+
|              date|year|month|day|
+------------------+----+-----+---+
|December 31, 2017 |2017|   12| 31|
|December 29, 2017 |2017|   12| 29|
|December 31, 2017 |2017|   12| 31|
|December 30, 2017 |2017|   12| 30|
|December 29, 2017 |2017|   12| 29|
|December 29, 2017 |2017|   12| 29|
|December 29, 2017 |2017|   12| 29|
|December 29, 2017 |2017|   12| 29|
|December 29, 2017 |2017|   12| 29|
|December 28, 2017 |2017|   12| 28|
+------------------+----+-----+---+
only showing top 10 rows




[Stage 56:>                                                         (0 + 1) / 1]

                                                                                

#### Sanity checks on engineered features above

Below, it looks like we only have ~1,200 missing date-related features. This makes sense, since these represent the dates that we weren't able to parse above.

In [17]:
from pyspark.sql.functions import sum, col

# list engineered feature columns
feature_cols = [
    "text_length", "title_length", "exclam_count", 
    "all_caps_count", "year", "month", "day", "is_2016"
]

# count how many nulls in each feature column
news_df.select([
    F.sum(col(c).isNull().cast("int")).alias(f"{c}_nulls")
    for c in feature_cols
]).show()



+-----------------+------------------+------------------+--------------------+----------+-----------+---------+-------------+
|text_length_nulls|title_length_nulls|exclam_count_nulls|all_caps_count_nulls|year_nulls|month_nulls|day_nulls|is_2016_nulls|
+-----------------+------------------+------------------+--------------------+----------+-----------+---------+-------------+
|                0|                 0|                 0|                   0|      1270|       1270|     1270|         1270|
+-----------------+------------------+------------------+--------------------+----------+-----------+---------+-------------+




                                                                                

#### Given this is only about 2.9% of the data, maybe we can just drop observations with these missing features.

Let's do a quick sanity check and see if the observations with missing date features are inherently different from those who don't have missing date features. 

In [18]:
from pyspark.sql.functions import col

news_df = news_df.withColumn("missing_date", col("year").isNull())

news_df.groupBy("missing_date", "fake").count().orderBy("missing_date", "fake").show()

feature_cols = [
    "text_length", "title_length", "exclam_count", "all_caps_count",
]

summary_df = news_df.groupBy("missing_date").agg(
    *[F.round(F.avg(col_name), 2).alias(col_name + "_avg") for col_name in feature_cols]
)

summary_df.show(truncate=False)


                                                                                

+------------+----+-----+
|missing_date|fake|count|
+------------+----+-----+
|       false|   0|21323|
|       false|   1|21674|
|        true|   0|   93|
|        true|   1| 1177|
+------------+----+-----+





+------------+---------------+----------------+----------------+------------------+
|missing_date|text_length_avg|title_length_avg|exclam_count_avg|all_caps_count_avg|
+------------+---------------+----------------+----------------+------------------+
|true        |1641.62        |99.01           |0.15            |1.72              |
|false       |2485.61        |79.36           |0.07            |1.2               |
+------------+---------------+----------------+----------------+------------------+




                                                                                

Above, we see that the vast majority of missing-date rows are fake news (1177 out of 1270). This is consistent with what I found earlier when I looked at missing text — the fake dataset is more likely to have formatting issues or missing values. So if I drop them, I'm disproportionately dropping fake news observations. 

The missing date articles also tend to:
- Be shorter in body text,
- Have longer, shoutier titles,

These traits feel like they could correlate with low-effort, possibly fake content — which makes sense given the majority are labeled fake.

This makes me worried about model bias if I dropped these observations, even if it's just 3% of the observations, as I might be dropping a unique stylometric cluster of fake articles. So, I will impute the missing date-feature data.

#### Imputing missing data for `year`, `month`, `day`, `is_2016`. 

I will plug in `-1` for Null `year`, `month`, and `day` observations.

Our `is_2016` binary indicator is tricky because a null year means we can’t confidently determine whether the article was published during the election year. Therefore, I will retain `is_2016` as a 0/1 binary feature, code `is_2016` to 0 for observations with missing dates, and then introduce a new flag `is_missing_date` to explicitly mark rows with unparseable dates. This way, the model can learn patterns from the absence of date information, particularly since that absence may itself be predictive of fake content, and also interpret `is_2016 = 0` differently depending on whether the date was missing or simply not from 2016.

In [19]:
from pyspark.sql.functions import col, when, isnan

# impute missing year, month, day with -1
news_df = news_df.withColumn("year", when(col("year").isNull(), -1).otherwise(col("year")))
news_df = news_df.withColumn("month", when(col("month").isNull(), -1).otherwise(col("month")))
news_df = news_df.withColumn("day", when(col("day").isNull(), -1).otherwise(col("day")))

# create is_missing_date indicator
news_df = news_df.withColumn("is_missing_date", (col("year") == -1).cast("int"))

# impute is_2016 to 0 if missing, but keep feature for learning
news_df = news_df.withColumn("is_2016", when(col("year") == 2016, 1).otherwise(0))

# At this point:
# - `year`, `month`, `day` = -1 if missing
# - `is_missing_date` = 1 if date unparseable
# - `is_2016` = 0/1 regardless, with 0 meaning "not 2016 or unknown"

# sanity check
news_df.groupBy("is_missing_date", "is_2016").count().orderBy("is_missing_date", "is_2016").show()




+---------------+-------+-----+
|is_missing_date|is_2016|count|
+---------------+-------+-----+
|              0|      0|27302|
|              0|      1|15695|
|              1|      0| 1270|
+---------------+-------+-----+




                                                                                

#### Join `news_df` with engineered features with TF-IDF features

In [20]:
# Check data type of 'id' in news_df
print("news_df schema:")
news_df.printSchema()

# Check data type of 'id' in idf_df
print("idf_df schema:")
idf_df.printSchema()


news_df schema:
root
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- date: string (nullable = true)
 |-- fake: integer (nullable = false)
 |-- id: long (nullable = false)
 |-- text_length: integer (nullable = true)
 |-- title_length: integer (nullable = true)
 |-- exclam_count: integer (nullable = true)
 |-- all_caps_count: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- is_2016: integer (nullable = false)
 |-- missing_date: boolean (nullable = false)
 |-- is_missing_date: integer (nullable = true)

idf_df schema:
root
 |-- id: long (nullable = false)
 |-- title: string (nullable = true)
 |-- text: string (nullable = true)
 |-- fake: integer (nullable = false)
 |-- features: vector (nullable = true)



In [21]:
# merge news_df and idf_df 
fe_df = news_df.join(
    idf_df.select("id", "features"),
    on="id",
    how="inner"
)

# make sure everything merged ok 
print("news_df count:", news_df.count())
print("idf_df count:", idf_df.count())
print("fe_df (joined) count:", fe_df.count())

                                                                                

news_df count: 44267


                                                                                

idf_df count: 44267




fe_df (joined) count: 44267




                                                                                

#### Make sure that there aren't nulls from a bad join

In [None]:
from pyspark.sql.functions import col, sum as spark_sum

fe_df.select([
    spark_sum(col(c).isNull().cast("int")).alias(f"{c}_nulls")
    for c in ["features", "text_length", "title_length", "is_2016"]
]).show()


#### Vector Assembler

This block combines all engineered features into a single feature vector that can be used for model training:

1. **VectorAssembler Initialization**  
   A `VectorAssembler` is used to merge multiple individual feature columns into one consolidated column, `final_features`. The input features include:
   - Stylometric and structural features: `text_length`, `title_length`, `exclam_count`, `all_caps_count`
   - Date features: `year`, `month`, `day`, `is_2016`, `is_missing_date`
   - Text features: `features`, which contains the TF-IDF vector

2. **Transformation**  
   The assembler is applied to `fe_df`, producing a new DataFrame where each row contains:
   - `id`: the unique identifier for the article
   - `final_features`: a dense vector combining all numeric and text-based features
   - `fake`: the binary label (1 = fake, 0 = real)

3. **Materialization**  
   The resulting DataFrame `final_df` is evaluated using `.count()` to force computation and ensure that all transformations have been applied successfully.


In [22]:
from pyspark.sql.functions import col
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=[
        "text_length", 
        "title_length", 
        "exclam_count", 
        "all_caps_count", 
        "year", 
        "month", 
        "day", 
        "is_2016",
        "is_missing_date",
        "features"  # TF-IDF
    ],
    outputCol="final_features"
)

final_df = assembler.transform(fe_df).select("id", "final_features", "fake")
final_df.count()

                                                                                

44267

### Pipeline Part 3: Fake News Classifier

#### Train a machine learning model 

Note: In this project, I opted to construct the machine learning workflow manually rather than using Spark’s `Pipeline()` API. I wanted prioritize transparency and control because:

* During the assignment, I ran into issues with limited cluster resources. Building the pipeline step-by-step allowed me to cache intermediate results, manage memory explicitly, and avoid recomputation across stages.
* Additionally, I did substantial feature engineering (e.g., stylometric features, date parsing, custom imputation logic) that I couldn't manage to fit neatly into the standard transformer-estimator structure of a `Pipeline()`.
* Finally, by separating each stage (tokenization, TF-IDF, feature engineering, etc.), I could easily inspect transformations, check for nulls, and validate my choices before training the final model.

#### Split the data into training and testing data 

Here I use a 70/30 split.

In [24]:
#split
train_df, test_df = final_df.randomSplit([0.7, 0.3], seed=123)

#cache
train_df.cache()
test_df.cache()

# sanity check 
print("train_df.count():", train_df.count())
print("test_df.count():", test_df.count())

                                                                                

train_df.count(): 31086




test_df.count(): 13181



                                                                                

Breakdown of the datasets by real vs. fake data:

In [25]:
train_df.groupBy("fake").count().show()
test_df.groupBy("fake").count().show()


+----+-----+
|fake|count|
+----+-----+
|   0|15011|
|   1|16075|
+----+-----+

+----+-----+
|fake|count|
+----+-----+
|   0| 6405|
|   1| 6776|
+----+-----+



#### Start by fitting a logistic regression model, then predict the test data based on the model.

This section defines, trains, and applies a logistic regression classifier for fake news detection. I decided to start with a logistic regression model, since it is a natural starting point for binary classification problems (i.e., is the article fake news or not?). 

Logistic regression also provides direct insight into how individual features contribute to the predicted probability of a news article being fake, which is useful for understanding the influence of both textual and metadata features.

1. **Model Initialization**  
   A `LogisticRegression` model is created using:
   - `featuresCol="final_features"`: the vector of assembled input features
   - `labelCol="fake"`: the binary target column (1 = fake, 0 = real)
   - `maxIter=10`: the maximum number of iterations for optimization
   - `regParam=0.01`: L2 regularization parameter to help prevent overfitting

2. **Model Fitting**  
   The model is trained on the `train_df` dataset, which contains feature vectors and corresponding labels.

3. **Prediction**  
   The trained model is applied to the `test_df` dataset using `.transform()`, producing a new DataFrame containing:
   - `fake`: the actual label
   - `prediction`: the predicted label from the model
   - `probability`: the model's predicted probability for each class (real or fake)


In [27]:
import pyspark.ml.classification as cl

lr = cl.LogisticRegression(  # logistic regression model 
    maxIter = 10,                 
    regParam = 0.01,               
    featuresCol = "final_features",
    labelCol = "fake")            # label column name.

# fit the model on the training data
lr_model = lr.fit(train_df)

predictions = lr_model.transform(test_df)
predictions.select("fake", "prediction", "probability").show(5)

                                                                                

+----+----------+--------------------+
|fake|prediction|         probability|
+----+----------+--------------------+
|   0|       0.0|[0.95732527961460...|
|   0|       0.0|[0.98965375375120...|
|   0|       0.0|[0.94115768157027...|
|   0|       0.0|[0.95016892309915...|
|   0|       0.0|[0.91634009973255...|
+----+----------+--------------------+
only showing top 5 rows



### Pipeline Part 4: Evaluation

#### Precision, recall, and AUC-ROC for classification performance

This section evaluates the performance of the trained logistic regression model.

#### 1. Binary Classification Metrics (Threshold-Independent)
These metrics evaluate the model's ability to rank predictions, regardless of the specific classification threshold:

- **Area Under ROC Curve (AUC-ROC)**: Measures how well the model distinguishes between the two classes (fake vs. real) across all thresholds. A value close to 1.0 indicates excellent separability.
- **Area Under Precision-Recall Curve (AUC-PR)**: Focuses on the model's ability to capture the positive class (fake = 1), particularly useful in cases of class imbalance.

#### 2. Classification Metrics at Default Threshold (Threshold = 0.5)
These metrics are calculated based on the model's predicted class labels after thresholding:

- **Accuracy**: Proportion of all predictions that are correct.
- **Precision (fake = 1)**: Among all articles predicted as fake, the proportion that were actually fake.
- **Recall (fake = 1)**: Among all actual fake articles, the proportion that the model correctly identified.

These metrics provide insight into how well the model performs in practice when making binary decisions.

#### 3. Confusion Matrix
A confusion matrix is printed by grouping by the true label (`fake`) and the predicted label. This gives a breakdown of:

- True Positives (fake=1, predicted=1)
- False Positives (fake=0, predicted=1)
- True Negatives (fake=0, predicted=0)
- False Negatives (fake=1, predicted=0)


In [28]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col

# Initialize a list to store the results
results = []

# === Binary Classification Metrics ===
binary_evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="probability",
    labelCol="fake"
)

# AUC-ROC
binary_evaluator.setMetricName("areaUnderROC")
auc_roc = binary_evaluator.evaluate(predictions)

# AUC-PR
binary_evaluator.setMetricName("areaUnderPR")
auc_pr = binary_evaluator.evaluate(predictions)

# Store binary classification metrics in results
results.append(("AUC-ROC (LR)", auc_roc))
results.append(("AUC-PR (LR)", auc_pr))

# === Accuracy, Precision, Recall (fake=1) ===
multi_evaluator = MulticlassClassificationEvaluator(
    predictionCol="prediction",
    labelCol="fake"
)

accuracy = multi_evaluator.setMetricName("accuracy").evaluate(predictions)
precision_fake = multi_evaluator.setMetricName("precisionByLabel").evaluate(predictions, {multi_evaluator.metricLabel: 1.0})
recall_fake = multi_evaluator.setMetricName("recallByLabel").evaluate(predictions, {multi_evaluator.metricLabel: 1.0})

# Store classification metrics in results
results.append(("Accuracy (LR)", accuracy))
results.append(("Precision (fake=1) (LR)", precision_fake))
results.append(("Recall (fake=1) (LR)", recall_fake))

# === Confusion Matrix ===
confusion_df = predictions.groupBy("fake", "prediction").count().orderBy("fake", "prediction")

# Collect confusion matrix data into a list
confusion_data = confusion_df.collect()
for row in confusion_data:
    results.append(("Confusion Matrix (LR)", f"fake: {row['fake']}, prediction: {row['prediction']}, count: {row['count']}"))

# Convert the results into a DataFrame
results_df = spark.createDataFrame(results, ["Metric", "Value"])

# Show the results DataFrame
results_df.show()

                                                                                

+--------------------+--------------------+
|              Metric|               Value|
+--------------------+--------------------+
|        AUC-ROC (LR)|  0.9988484982124539|
|         AUC-PR (LR)|  0.9987513830823052|
|       Accuracy (LR)|  0.9888475836431226|
|Precision (fake=1...|  0.9933025747879148|
|Recall (fake=1) (LR)|  0.9849468713105076|
|Confusion Matrix ...|fake: 0, predicti...|
|Confusion Matrix ...|fake: 0, predicti...|
|Confusion Matrix ...|fake: 1, predicti...|
|Confusion Matrix ...|fake: 1, predicti...|
+--------------------+--------------------+





                                                                                

Above, we see the logistic regression model performed extremely well on the test set. Both the Area Under the ROC Curve (AUC-ROC) and Area Under the Precision-Recall Curve (AUC-PR) had high values of 0.9988 and 0.9987 respectively. This suggests that the model is highly effective at distinguishing between real and fake news articles across a range of classification thresholds.

When evaluating the model using standard classification metrics, we see similarly strong results:
- **Accuracy**: 98.9% of predictions were correct overall.
- **Precision (fake = 1)**: 99.3% of the articles predicted as fake were actually fake.
- **Recall (fake = 1)**: 98.5% of all fake articles in the dataset were correctly identified by the model.

The confusion matrix helps break this down:
- The model correctly predicted 6,675 out of 6,776 fake articles.
- It only misclassified 101 fake articles as real (false negatives).
- For real articles, it correctly identified 6,360 out of 6,405, with just 45 false positives.

Overall, the model not only achieves high accuracy, but also balances precision and recall very well. While all evaluation metrics indicate strong model performance, recall (for fake = 1) is particularly important in a real-world context. In fake news detection, we want to identify as many fake articles as possible and make sure we aren't interpreting fake news and "real". A high recall means the model is effectively catching most fake content, minimizing the risk of misinformation slipping through undetected. This is especially critical in settings like content moderation or public information platforms, where missing fake news can have real consequences.


#### Print the model's intercept and coefficients for the 9 engineered features

In [29]:
# Define feature names (engineered features only — TF-IDF comes after)
engineered_features = [
    "text_length", 
    "title_length", 
    "exclam_count", 
    "all_caps_count", 
    "year", 
    "month", 
    "day", 
    "is_2016",
    "is_missing_date"
]

print(f"Intercept: {lr_model.intercept:.4f}")
print(f"Number of coefficients: {len(lr_model.coefficients)}")
print("\nFirst 9 coefficients (engineered features):")

for i, feat in enumerate(engineered_features):
    coeff = lr_model.coefficients[i]
    print(f"{feat:17s} => {coeff:+.4f}")


Intercept: -2.1776
Number of coefficients: 5009

First 9 coefficients (engineered features):
text_length       => +0.0000
title_length      => +0.0303
exclam_count      => +1.4708
all_caps_count    => +0.4339
year              => -0.0002
month             => -0.0631
day               => -0.0000
is_2016           => +0.6725
is_missing_date   => +0.4940


The intercept of the logistic regression model is -2.1762, which basically means that in the absence of any strong feature signals (i.e. all features = 0), the model leans toward predicting an article as real (label 0). That’s expected, given that most articles in the dataset are real unless something in the content tips it the other way.

Looking at the engineered features, a few stand out:

- **Exclamation count (+1.47)** and **ALL CAPS word count (+0.43)** have large positive coefficients, which makes intuitive sense — these are pretty classic indicators of sensationalized or less professional writing.
- **is_2016 (+0.67)** and **is_missing_date (+0.50)** are also strongly positive. So articles published in 2016 (election year) or articles with messy/missing dates are more likely to be flagged as fake, which aligns with earlier patterns I noticed in the dataset.
- Length-based features like `text_length` and `title_length` don’t play much of a role — their coefficients are close to zero.

#### Print the coefficients for the TF-IDF features (5000 of them)

In [30]:
# Number of engineered features already accounted for
offset = len(engineered_features)

# Extract only the TF-IDF portion of the coefficients
tfidf_coeffs = lr_model.coefficients[offset:]

# Zip with vocabulary from CountVectorizer model
vocab = vectorizer_model.vocabulary  # already sorted by frequency
tfidf_feature_info = list(zip(vocab, tfidf_coeffs))

# Sort by absolute coefficient value, descending
tfidf_feature_info_sorted = sorted(tfidf_feature_info, key=lambda x: abs(x[1]), reverse=True)

# Display top 10 TF-IDF features with the strongest coefficients
print("\nTop 10 TF-IDF features by absolute coefficient value:\n")
for token, coeff in tfidf_feature_info_sorted[:10]:
    print(f"{token:20s} => {coeff:+.4f}")



Top 10 TF-IDF features by absolute coefficient value:

washington           => -0.3075
wednesday            => -0.2566
nov                  => -0.2198
tuesday              => -0.2164
somodevilla          => +0.2122
thursday             => -0.2096
corrects             => -0.2054
representatives      => -0.1997
youtu                => +0.1962
friday               => -0.1825


On the TF-IDF side, the model seems to pick up on contextual patterns:

- Words like **"washington"**, **"wednesday"**, and other weekday/date terms have negative coefficients, suggesting that more formal or structured reporting (common in real news) leans that direction.
- On the other hand, terms like **"somodevilla"** (probably from image captions), **"youtu"**, and other more casual or clickbaity artifacts push predictions toward fake. I might consider adding these to my list of stopwords in future model tuning.
- It's also interesting to see **"corrects"** and **"representatives"** come in with negative weights — potentially reflecting real news correcting the record or covering government procedures.

Overall, the model seems to be relying both on stylometric features and lexical signals to separate professional reporting from fake or less credible articles — and the signals it's using mostly make sense.

### Pipeline Part 5: Tuning and fitting more sophisticated model

In this section, I perform hyperparameter tuning that cycles and testing different values of `maxIter` and `regParam` in order to identify the best model parameters. Because the intial logistic regression model is already very precise, I don't think this tuning will change much.

Next, I evaluate a more sophisticated Random Forest classifier. This model is worth considering for binary classification tasks like fake news detection because it can capture complex, nonlinear patterns in the data and handle interactions between features without needing explicit transformations. Random forests also tend to be less sensitive to outliers and may offer improved performance.

#### Hyperparameter tuning

In this block, I perform hyperparameter tuning using cross-validation to optimize the logistic regression model.

I used a grid of parameters using `ParamGridBuilder`: 
   - `maxIter` (number of training iterations): tested at 2, 10, and 50  
   - `regParam` (regularization strength): tested at 0.01, 0.05, and 0.3

I then use `BinaryClassificationEvaluator` is used to compare models, scoring them by Area Under the ROC Curve (AUC).

This process confirms whether tuning the logistic regression parameters yields any significant improvement — though given the already strong baseline performance, major gains aren’t expected.

In [34]:
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# define the base logistic regression model
logistic = LogisticRegression(
    featuresCol="final_features",
    labelCol="fake"
)

# build  param grid
paramGrid = (
    ParamGridBuilder()
    .addGrid(logistic.maxIter, [2, 10, 50])  
    .addGrid(logistic.regParam, [0.01, 0.05, 0.3]) 
    .build()
)

# define the evaluator
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="probability",
    labelCol="fake",
    metricName="areaUnderROC"
)

# set up CrossValidator
cv = CrossValidator(
    estimator=logistic,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2,  
    seed=42
)

# fit on the unshuffled training set (shuffle can be re-added later)
cvModel = cv.fit(train_df)

# evaluate best model on the test set
bestModel = cvModel.bestModel
print("Best Model Params for LR Model:")
print("  maxIter:", bestModel._java_obj.getMaxIter())
print("  regParam:", bestModel._java_obj.getRegParam())

predictions = bestModel.transform(test_df)
auc = evaluator.evaluate(predictions)
print("Best Model AUC on Test:", auc)

Best Model Params:
  maxIter: 50
  regParam: 0.01
Best Model AUC on Test: 0.9987938787491695


In [35]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col

results_tuned = []

binary_evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="probability",
    labelCol="fake"
)

# AUC-ROC
binary_evaluator.setMetricName("areaUnderROC")
auc_roc_tuned = binary_evaluator.evaluate(predictions)

# AUC-PR
binary_evaluator.setMetricName("areaUnderPR")
auc_pr_tuned = binary_evaluator.evaluate(predictions)

results_tuned.append(("AUC-ROC (LR Tuned)", auc_roc_tuned))
results_tuned.append(("AUC-PR (LR Tuned)", auc_pr_tuned))

# Accuracy, Precision, Recall 
multi_evaluator = MulticlassClassificationEvaluator(
    predictionCol="prediction",
    labelCol="fake"
)

accuracy_tuned = multi_evaluator.setMetricName("accuracy").evaluate(predictions)
precision_fake_tuned = multi_evaluator.setMetricName("precisionByLabel").evaluate(predictions, {multi_evaluator.metricLabel: 1.0})
recall_fake_tuned = multi_evaluator.setMetricName("recallByLabel").evaluate(predictions, {multi_evaluator.metricLabel: 1.0})

results_tuned.append(("Accuracy (LR Tuned)", accuracy_tuned))
results_tuned.append(("Precision (fake=1) (LR Tuned)", precision_fake_tuned))
results_tuned.append(("Recall (fake=1) (LR Tuned)", recall_fake_tuned))

#  Confusion Matrix 
confusion_df_tuned = predictions.groupBy("fake", "prediction").count().orderBy("fake", "prediction")
confusion_data_tuned = confusion_df_tuned.collect()
for row in confusion_data_tuned:
    results_tuned.append(("Confusion Matrix (LR Tuned)", f"fake: {row['fake']}, prediction: {row['prediction']}, count: {row['count']}"))

#  Convert new results to a DataFrame 
results_df_tuned = spark.createDataFrame(results_tuned, ["Metric", "Value"])

#  Append to the original results DataFrame 
results_df = results_df.unionByName(results_df_tuned)

# Show the updated results
results_df.show(truncate=False)


+-----------------------------+-------------------------------------+
|Metric                       |Value                                |
+-----------------------------+-------------------------------------+
|AUC-ROC (LR)                 |0.9988484982124539                   |
|AUC-PR (LR)                  |0.9987513830823052                   |
|Accuracy (LR)                |0.9888475836431226                   |
|Precision (fake=1) (LR)      |0.9933025747879148                   |
|Recall (fake=1) (LR)         |0.9849468713105076                   |
|Confusion Matrix (LR)        |fake: 0, prediction: 0.0, count: 6360|
|Confusion Matrix (LR)        |fake: 0, prediction: 1.0, count: 45  |
|Confusion Matrix (LR)        |fake: 1, prediction: 0.0, count: 102 |
|Confusion Matrix (LR)        |fake: 1, prediction: 1.0, count: 6674|
|AUC-ROC (LR Tuned)           |0.9987926921208802                   |
|AUC-PR (LR Tuned)            |0.9986753314595586                   |
|Accuracy (LR Tuned)

In [36]:
spark.catalog.clearCache()
gc.collect()

204

After hypertuning  `maxIter` and `regParam`, the best logistic regression model had:
- **maxIter = 50**
- **regParam = 0.01**

This model achieved an AUC of 0.99879 on the test set, which is essentially identical to the original model’s AUC of 0.9988. This confirms my initial expectation — that the base model was already performing near the ceiling in terms of ROC performance, and tuning didn’t meaningfully change the outcome.

#### Fitting a Random Forest classifier

In [37]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.sql.functions import col

# define RandomForestClassifier
rf = RandomForestClassifier(
    featuresCol="final_features",
    labelCol="fake",
    numTrees=50,
    maxDepth=10,
    seed=42
)

# fit the model
rf_model = rf.fit(train_df)

# generate predictions
rf_predictions = rf_model.transform(test_df)

# evaluate binary metrics
binary_evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction",
    labelCol="fake"
)

binary_evaluator.setMetricName("areaUnderROC")
auc_rf = binary_evaluator.evaluate(rf_predictions)

binary_evaluator.setMetricName("areaUnderPR")
pr_rf = binary_evaluator.evaluate(rf_predictions)

# evaluate multiclass metrics
multi_evaluator = MulticlassClassificationEvaluator(
    labelCol="fake",
    predictionCol="prediction"
)

accuracy_rf = multi_evaluator.setMetricName("accuracy").evaluate(rf_predictions)
precision_rf = multi_evaluator.setMetricName("precisionByLabel").evaluate(rf_predictions, {multi_evaluator.metricLabel: 1.0})
recall_rf = multi_evaluator.setMetricName("recallByLabel").evaluate(rf_predictions, {multi_evaluator.metricLabel: 1.0})

# collect confusion matrix
confusion_df_rf = rf_predictions.groupBy("fake", "prediction").count().orderBy("fake", "prediction")
confusion_data_rf = confusion_df_rf.collect()

#append all results to results_df
results_rf = [
    ("AUC-ROC (RF)", auc_rf),
    ("AUC-PR (RF)", pr_rf),
    ("Accuracy (RF)", accuracy_rf),
    ("Precision (fake=1) (RF)", precision_rf),
    ("Recall (fake=1) (RF)", recall_rf)
]

# Add confusion matrix entries
for row in confusion_data_rf:
    results_rf.append(("Confusion Matrix (RF)", f"fake: {row['fake']}, prediction: {row['prediction']}, count: {row['count']}"))

# Convert and append
results_df_rf = spark.createDataFrame(results_rf, ["Metric", "Value"])
results_df = results_df.unionByName(results_df_rf)

# Show results
results_df.show(30,truncate=False)

25/04/11 14:10:05 WARN DAGScheduler: Broadcasting large task binary with size 1059.2 KiB
25/04/11 14:10:05 WARN DAGScheduler: Broadcasting large task binary with size 1212.8 KiB
25/04/11 14:10:06 WARN DAGScheduler: Broadcasting large task binary with size 1432.0 KiB
25/04/11 14:10:07 WARN DAGScheduler: Broadcasting large task binary with size 1723.8 KiB
25/04/11 14:10:08 WARN DAGScheduler: Broadcasting large task binary with size 2.1 MiB
25/04/11 14:10:09 WARN DAGScheduler: Broadcasting large task binary with size 2.5 MiB
25/04/11 14:10:11 WARN DAGScheduler: Broadcasting large task binary with size 3.0 MiB
25/04/11 14:10:24 WARN DAGScheduler: Broadcasting large task binary with size 1863.2 KiB
25/04/11 14:10:32 WARN DAGScheduler: Broadcasting large task binary with size 1863.2 KiB
25/04/11 14:10:40 WARN DAGScheduler: Broadcasting large task binary with size 1875.6 KiB
25/04/11 14:10:47 WARN DAGScheduler: Broadcasting large task binary with size 1875.6 KiB
25/04/11 14:10:53 WARN DAGSche

+-----------------------------+-------------------------------------+
|Metric                       |Value                                |
+-----------------------------+-------------------------------------+
|AUC-ROC (LR)                 |0.9988484982124539                   |
|AUC-PR (LR)                  |0.9987513830823052                   |
|Accuracy (LR)                |0.9888475836431226                   |
|Precision (fake=1) (LR)      |0.9933025747879148                   |
|Recall (fake=1) (LR)         |0.9849468713105076                   |
|Confusion Matrix (LR)        |fake: 0, prediction: 0.0, count: 6360|
|Confusion Matrix (LR)        |fake: 0, prediction: 1.0, count: 45  |
|Confusion Matrix (LR)        |fake: 1, prediction: 0.0, count: 102 |
|Confusion Matrix (LR)        |fake: 1, prediction: 1.0, count: 6674|
|AUC-ROC (LR Tuned)           |0.9987926921208802                   |
|AUC-PR (LR Tuned)            |0.9986753314595586                   |
|Accuracy (LR Tuned)

In [39]:

engineered_feature_names = [
    "text_length", "title_length", "exclam_count", "all_caps_count",
    "year", "month", "day", "is_2016", "is_missing_date"
]
num_engineered = len(engineered_feature_names)

# combine feature names
feature_names = engineered_feature_names + vocab  # full feature list

# get feature importances
importances = rf_model.featureImportances.toArray()

# engineered features (first N)
print("\nFeature importances for engineered features:")
for i in range(num_engineered):
    print(f"{feature_names[i]:<18} => {importances[i]:.4f}")

# top 10 TF-IDF features by importance
tfidf_indices = range(num_engineered, len(feature_names))
top_tfidf = sorted(tfidf_indices, key=lambda i: importances[i], reverse=True)[:10]

print("\nTop 10 TF-IDF features by importance:")
for i in top_tfidf:
    print(f"{feature_names[i]:<20} => {importances[i]:.4f}")



Feature importances for engineered features:
text_length        => 0.0077
title_length       => 0.0961
exclam_count       => 0.0138
all_caps_count     => 0.0991
year               => 0.0303
month              => 0.0351
day                => 0.0038
is_2016            => 0.0240
is_missing_date    => 0.0010

Top 10 TF-IDF features by importance:
watch                => 0.0267
video                => 0.0251
wire                 => 0.0183
com                  => 0.0161
hillary              => 0.0149
gop                  => 0.0141
minister             => 0.0127
government           => 0.0114
washington           => 0.0111
us                   => 0.0110


The Random Forest model gives us a different way to understand which features contributed most to the classification.

Among the engineered features, `all_caps_count` and `title_length` stood out with the highest importance, suggesting that stylistic elements like the use of all caps and longer titles were particularly useful in identifying fake news. `exclam_count` and `text_length` also played a role, though to a lesser extent. Interestingly, temporal features like `year`, `month`, and `is_2016` also carried some weight, indicating that when the content was published had some predictive value.

Looking at the top TF-IDF features, the most important terms included `watch`, `video`, `wire`, and `com`, which might reflect the kind of clickbait or media-heavy language often used in fake posts. Terms like `hillary`, `gop`, `government`, and `washington` also appeared, which lines up with the political focus that’s common in misinformation.

### Model Comparison and Interpretation

I trained three models to classify fake news: a baseline logistic regression (LR), a tuned logistic regression (LR Tuned), and a random forest (RF). All models used the same feature set.

#### Performance Summary

If the goal is to identify fake news posts — especially to catch as many as possible — then **precision and recall for the fake class (label=1)** are key. Here's how the models compare:

- **Logistic Regression (Baseline)**
  - AUC-ROC: 0.9988
  - Precision (fake=1): 0.9933
  - Recall (fake=1): 0.9849
  - Accuracy: 0.9888

- **Logistic Regression (Tuned)**
  - AUC-ROC: 0.9988
  - Precision (fake=1): 0.9920
  - Recall (fake=1): 0.9876
  - Accuracy: 0.9895

- **Random Forest**
  - AUC-ROC: 0.9957
  - Precision (fake=1): 0.9969
  - Recall (fake=1): 0.8597
  - Accuracy: 0.9265

While the random forest had the highest precision, it suffered in recall — missing more fake articles than the logistic models. On the other hand, the logistic regression models offered a better balance between precision and recall. Since I value identifying fake news accurately and minimizing false negatives, logistic regression (tuned or untuned) is the stronger choice in this context.

#### Most Influential Features

Both models shed light on what signals are most useful for detecting fake news:

- **Engineered Metadata Features**
  - In both models, **`all_caps_count`**, **`exclam_count`**, and **`title_length`** had high influence. These stylistic choices are common in sensational or misleading content.
  - The logistic regression model also gave significant weight to **`is_2016`** and **`is_missing_date`**, suggesting temporal cues and metadata inconsistencies were meaningful indicators.

- **Top TF-IDF Words**
  - The logistic regression model emphasized politically and temporally loaded terms like:
    ```
    washington, wednesday, nov, tuesday, somodevilla, thursday, corrects, representatives, youtu, friday
    ```
    These words — especially day-of-week and political proper nouns — likely reflect the structure and topics of fake articles during the dataset's peak periods.
  
  - The random forest model gave importance to slightly different terms:
    ```
    watch, video, wire, com, hillary, gop, minister, government, washington, us
    ```
    This includes action-oriented and political terms, with a heavier emphasis on media consumption language (`watch`, `video`, `com`), which aligns with fake news strategies that rely on viral media content.

#### Final Thoughts

Despite Random Forest’s strong precision, its lower recall makes it less suited for use cases where catching fake news is critical. Logistic regression — even without tuning — performed exceptionally well across all metrics, making it the preferred model. It also offers greater interpretability, which is useful for understanding what drives the classification and how misinformation is structured.


In [41]:
results_df.show(30,truncate=False)

                                                                                

+-----------------------------+-------------------------------------+
|Metric                       |Value                                |
+-----------------------------+-------------------------------------+
|AUC-ROC (LR)                 |0.9988484982124539                   |
|AUC-PR (LR)                  |0.9987513830823052                   |
|Accuracy (LR)                |0.9888475836431226                   |
|Precision (fake=1) (LR)      |0.9933025747879148                   |
|Recall (fake=1) (LR)         |0.9849468713105076                   |
|Confusion Matrix (LR)        |fake: 0, prediction: 0.0, count: 6360|
|Confusion Matrix (LR)        |fake: 0, prediction: 1.0, count: 45  |
|Confusion Matrix (LR)        |fake: 1, prediction: 0.0, count: 102 |
|Confusion Matrix (LR)        |fake: 1, prediction: 1.0, count: 6674|
|AUC-ROC (LR Tuned)           |0.9987926921208802                   |
|AUC-PR (LR Tuned)            |0.9986753314595586                   |
|Accuracy (LR Tuned)