# FAKE NEWS DETECTION

The following notebook contains all relevant computations and investigations for the exam in DS821: News and Market Sentiment Analysis. The structure is predominantly corresponding to the structure of the report. However, most section is labeled with the number corresponding to the section of the report for transparency.  

In [None]:
# run for specific dependencies
! pip install -r requirements.txt

In [None]:
# load libaries
import pandas as pd
import numpy as np
import re
from collections import Counter
import math
from transformers import pipeline
from sentence_transformers import SentenceTransformer
from sklearn.model_selection import StratifiedKFold, train_test_split, cross_validate
from sklearn.metrics import classification_report
from sklearn.feature_extraction.text import TfidfVectorizer, ENGLISH_STOP_WORDS
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
from sklearn.metrics.pairwise import cosine_similarity

## 0. DATA LOAD AND INSPECTION

In [None]:
# load the data
fake = pd.read_csv("/Users/FrederikkeB/Documents/GitHub/News_Analysis_Exam/data/Fake.csv") 
true = pd.read_csv("/Users/FrederikkeB/Documents/GitHub/News_Analysis_Exam/data/True.csv")

In [None]:
# check dataframe
fake.head(2)

In [None]:
# check dataframe
true.head(2)

In [None]:
# assign class labels to each dataframe
true["label"] = 0
fake["label"] = 1

In [None]:
# merge to two dataframes together 
df = pd.concat([fake, true], ignore_index=True)

In [None]:
# inspect data frame
print("Shape:", df.shape)
print(df["label"].value_counts())
print(df.isna().sum())

## 3. DATA CLEANING

In [None]:
# check for duplicates
df.duplicated(subset=["text"]).sum()

In [None]:
# remove duplicates 
len_before = len(df)
df = df.drop_duplicates(subset=["text"], keep="first")
len_after = len(df)

print("Duplicates removed:", len_before - len_after)


In [None]:
# inspect how many articles comes from reuters
df["starts_with_reuters"] = df["text"].str.lower().str.contains(
    r"^.*\(\s*reuters\s*\)\s*-", regex=True, na=False
)
# divide by class
df.groupby("label")["starts_with_reuters"].mean()


In [None]:
# remove potential metadata in the beginning of each body text
def remove_leading_metadata(text):
    pattern = r"^[A-Z\s\/]+\s*\([A-Za-z]+\)\s*[–—-]\s+"
    return re.sub(pattern, "", text).strip()

# run on text column
df["text_clean"] = df["text"].apply(remove_leading_metadata)

In [None]:
# sanity check
df["metadata_removed"] = df["text"] != df["text_clean"]
df.groupby("label")["metadata_removed"].mean()

In [None]:
# examine the occurence of url in articles
df["url_count"] = df["text"].str.count(r"http[s]?://")

# print the difference between classes
df.groupby("label")["url_count"].mean()


In [None]:
# inspect urls context in articles
def url_context(text, window=5):
    tokens = text.split()
    contexts = []
    for i, token in enumerate(tokens):
        if token.startswith("http"):
            start = max(i - window, 0)
            end = min(i + window + 1, len(tokens))
            contexts.append(" ".join(tokens[start:end]))
    return contexts

df["url_contexts"] = df["text"].apply(url_context)

# print observations including urls
fake_contexts = df[df["label"] == 1]["url_contexts"].explode().dropna()
fake_contexts.head(10)


In [None]:
# remove web related entities from body text 
def remove_urls_and_html(text):
    text = re.sub(r"http\S+|www\S+", " ", text)   
    text = re.sub(r"&\w+;", " ", text)            
    return text

df["text_clean"] = df["text_clean"].apply(remove_urls_and_html)

In [None]:
# define web specific noise
noise = {
    "https", "http", "www", "amp", "quot", "cdata", "js",
    "pic", "youtu", "flickr", "getty", "wikimedia",
    "screenshot", "src", "createelement", "getelementbyid",
    "getelementsbytagname", "parentnode", "insertbefore",
    "jssdk", "xfbml", "filessupport", "21wire", 
}

# translate to regex
noise_pattern = re.compile(
    r"\b(" + "|".join(map(re.escape, noise)) + r")\b",
    flags=re.IGNORECASE
)

# remove noise
def remove_noise(text):
    return noise_pattern.sub("", text)

# apply to cleaned text
df["text_clean"] = df["text_clean"].apply(remove_noise)

In [None]:
# lowercase body text
df["text_clean"] = df["text_clean"].str.lower()

## 3. TOKENIZATION

In [None]:
# load stopwords
stopwords = set(ENGLISH_STOP_WORDS)

In [None]:
# function combining each tokenization step
def tokenize(text):

    # extract alphabetic tokens
    tokens = re.findall(r"[a-zA-Z]+", text)

    # remove stopwords
    tokens = [t for t in tokens if t not in stopwords]

    # remove tokens with less than 2 characters
    tokens = [t for t in tokens if len(t) > 2]

    return tokens

# apply to cleaned text 
df["tokens"] = df["text_clean"].apply(tokenize)

In [None]:
# inspect clean dataframe
df.head(2)

## 4. EXPLORATORY DATA ANALYSIS

### 4.1 LENGTH AND STYLE DIFFERENCES

In [None]:
# check basic properties
print("Dataset shape:", df.shape)
print("\nClass distribution:")
print(df["label"].value_counts())

print("\nClass proportions:")
print(df["label"].value_counts(normalize=True))

In [None]:
# basic length features
df["char_count"] = df["text_clean"].str.len()
df["word_count"] = df["text_clean"].str.split().str.len()

# summary statistics per class
length_stats = df.groupby("label")[["char_count", "word_count"]].agg(
    ["mean", "median", "std"]
)

# print results
length_stats.index = ["True", "Fake"]
length_stats

In [None]:
# function extracting most used words in each class
def top_words_from_tokens(token_series, n=100):
    tokens = [t for tokens in token_series for t in tokens]
    return Counter(tokens).most_common(n)

# apply function and store in each list
top_true = top_words_from_tokens(df[df["label"] == 0]["tokens"], 100)
top_fake = top_words_from_tokens(df[df["label"] == 1]["tokens"], 100)

# print
print("Top words – True news:")
print(top_true)

print("\nTop words – Fake news:")
print(top_fake)

### 4.2 SENTITMENT

#### FINBERT SENTIMENT

In [None]:
# define FinBERT-model
finbert_model = pipeline(
    "sentiment-analysis",
    model="ProsusAI/finbert",
    device="mps",
    truncation=True,
    max_length=512
)

In [None]:
# function extracting scores from model
def finbert_score(text):
    if not isinstance(text, str) or not text.strip():
        return np.nan   

    out = finbert_model(text)[0]
    label = out["label"]
    score = out["score"]

    if label == "positive":
        return score
    elif label == "negative":
        return -score
    else:  # neutral
        return 0.0

In [None]:
# translate scores to labels
def finbert_label(score, eps=0.1):
    if pd.isna(score):
        return np.nan
    if score > eps:
        return "positive"
    if score < -eps:
        return "negative"
    return "neutral"


In [None]:
# define a subset  
finbert_subset = (
    df
    .groupby("label", group_keys=False)
    .apply(lambda x: x.sample(n=2000, random_state=42))
)

# apply model to subset 
finbert_subset["finbert_score"] = finbert_subset["text_clean"].apply(finbert_score)
finbert_subset["finbert_label"] = finbert_subset["finbert_score"].apply(
    lambda x: finbert_label(x, eps=0.1)
)

In [None]:
# print results
finbert_subset["finbert_label"].value_counts(normalize=True)

In [None]:
# print results by class 
finbert_by_class = (
    finbert_subset
    .groupby("label")["finbert_label"]
    .value_counts(normalize=True)
    .unstack()
)

finbert_by_class.index = ["True news", "Fake news"]
finbert_by_class


#### ROBERTA SENTIMENT

In [None]:
# define RoBERTa-model 
roberta_model = pipeline(
    "sentiment-analysis",
    model="cardiffnlp/twitter-roberta-base-sentiment-latest",
    device="mps",
    truncation=True,
    max_length=512
)

In [None]:
# function extracting scores from model
def roberta_score(text):
    if not isinstance(text, str) or not text.strip():
        return np.nan   

    out = roberta_model(text)[0]
    label = out["label"]
    score = out["score"]

    if label == "positive":
        return score
    elif label == "negative":
        return -score
    else:  # neutral
        return 0.0

In [None]:
# translate scores to labels
def roberta_label(score, eps=0.1):
    if pd.isna(score):
        return np.nan
    if score > eps:
        return "positive"
    if score < -eps:
        return "negative"
    return "neutral"


In [None]:
# define a subset
roberta_subset = (
    df
    .groupby("label", group_keys=False)
    .apply(lambda x: x.sample(n=2000, random_state=42))
)

# apply model to subset
roberta_subset["roberta_score"] = roberta_subset["text_clean"].apply(roberta_score)
roberta_subset["roberta_label"] = roberta_subset["roberta_score"].apply(
    lambda x: roberta_label(x, eps=0.1)
)

In [None]:
# print results
roberta_subset["roberta_label"].value_counts(normalize=True)

In [None]:
# print results by class 
roberta_by_class = (
    roberta_subset
    .groupby("label")["roberta_label"]
    .value_counts(normalize=True)
    .unstack()
)

roberta_by_class.index = ["True news", "Fake news"]
roberta_by_class


### 4.3 VOCABULARY CONTRASTS

In [None]:
# function counting tokens in a series of token lists
def count_tokens(token_series):
    return Counter([t for tokens in token_series for t in tokens])


# token counts per class
true_counts = count_tokens(df[df["label"] == 0]["tokens"])
fake_counts = count_tokens(df[df["label"] == 1]["tokens"])

min_freq = 50  


# define a shared vocabulary of tokens appearing sufficiently often in both classes
vocab = {
    w for w in fake_counts
    if fake_counts[w] >= min_freq and true_counts.get(w, 0) >= min_freq
}


# find words more characteristic of fake news
rel_fake = {
    w: math.log((fake_counts[w] + 1) / (true_counts[w] + 1))
    for w in vocab
}
# sort values
top_fake = sorted(
    rel_fake.items(),
    key=lambda x: x[1],
    reverse=True
)[:50]

print("Words more characteristic of Fake news:\n")
for w, s in top_fake:
    print(f"{w:<15} {s:.2f}")


# find words more characteristic of true news
rel_true = {
    w: math.log((true_counts[w] + 1) / (fake_counts[w] + 1))
    for w in vocab
}
# sort values
top_true = sorted(
    rel_true.items(),
    key=lambda x: x[1],
    reverse=True
)[:50]

print("\nWords more characteristic of True news:\n")
for w, s in top_true:
    print(f"{w:<15} {s:.2f}")

### TOPIC MODELLING: LDA 

In [None]:
from gensim.corpora import Dictionary
from gensim.models import LdaModel
from gensim.models import CoherenceModel

In [None]:
df_true = df[df["label"] == 0]
df_fake = df[df["label"] == 1]

In [None]:
def run_lda(token_lists, num_topics=7, no_below=20, no_above=0.9):
    
    # Create dictionary
    dictionary = Dictionary(token_lists)
    dictionary.filter_extremes(no_below=no_below, no_above=no_above)
    
    # Create BoW corpus
    corpus_bow = [dictionary.doc2bow(doc) for doc in token_lists]
    
    # Train LDA
    lda = LdaModel(
        corpus=corpus_bow,
        id2word=dictionary,
        num_topics=num_topics,
        passes=10,
        random_state=42
    )
    
    return lda, dictionary, corpus_bow

In [None]:
lda_true, dict_true, corpus_true = run_lda(
    df_true["tokens"].tolist(),
    num_topics=7
)

lda_fake, dict_fake, corpus_fake = run_lda(
    df_fake["tokens"].tolist(),
    num_topics=7
)


In [None]:
print("Topics – True news")
for t in lda_true.print_topics(num_words=10):
    print(t)

print("\nTopics – Fake news")
for t in lda_fake.print_topics(num_words=10):
    print(t)


In [None]:
# get coherence score for each class
coh_true = CoherenceModel(
    model=lda_true,
    texts=df_true["tokens"].tolist(),
    dictionary=dict_true,
    coherence="c_v"
).get_coherence()

coh_fake = CoherenceModel(
    model=lda_fake,
    texts=df_fake["tokens"].tolist(),
    dictionary=dict_fake,
    coherence="c_v"
).get_coherence()

coh_true, coh_fake


## 5. CLASSIFICATION

In [None]:
# prepare data for all classification models
X = df["text_clean"]
y = df["label"]

# split traning and test set
X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.2,
    random_state=42,
    stratify=y
)

### 5.1 NAÏVE BAYES

In [None]:
# define nb-classifier
nb_pipeline = Pipeline([
    ("tfidf", TfidfVectorizer(
        stop_words="english",
        max_df=0.9,
        min_df=50
    )),
    ("nb", MultinomialNB())
])

# define cross validation
cv = StratifiedKFold(
    n_splits=5,
    shuffle=True,
    random_state=42
)


In [None]:
# define scoring metrics
scoring = {
    "accuracy": "accuracy",
    "precision": "precision",
    "recall": "recall",
    "f1": "f1"
}
# run cross validation
cv_results = cross_validate(
    nb_pipeline,
    X_train,
    y_train,
    cv=cv,
    scoring=scoring,
    return_train_score=True
)


In [None]:
# see results for cross validation
cv_df = pd.DataFrame(cv_results)
cv_df.mean()

In [None]:
# train on whole training set
nb_pipeline.fit(X_train, y_train)

# run predictions on test set
y_test_pred = nb_pipeline.predict(X_test)

# print results
print(classification_report(
    y_test,
    y_test_pred,
    target_names=["True news", "Fake news"]
))

#### EXAMINE IMPORTANT WORDS FOR NB

In [None]:
# get components from pipeline
vectorizer = nb_pipeline.named_steps["tfidf"]
nb_model = nb_pipeline.named_steps["nb"]

# extract words
feature_names = np.array(vectorizer.get_feature_names_out())

In [None]:
# apply log probabilities per class
log_probs = nb_model.feature_log_prob_

# create dataframe for results
df_log_probs = pd.DataFrame(
    log_probs.T,
    index=feature_names,
    columns=["True news", "Fake news"]
)

In [None]:
# compute the difference between classes
df_log_probs["log_odds_fake_vs_true"] = (
    df_log_probs["Fake news"] - df_log_probs["True news"]
)

In [None]:
# top words for fake news
top_fake_nb = (
    df_log_probs
    .sort_values("log_odds_fake_vs_true", ascending=False)
    .head(50)
)

# top words for true news
top_true_nb = (
    df_log_probs
    .sort_values("log_odds_fake_vs_true", ascending=True)
    .head(50)
)

print("Top words indicating Fake news:\n")
print(top_fake_nb[["log_odds_fake_vs_true"]])

print("\nTop words indicating True news:\n")
print(top_true_nb[["log_odds_fake_vs_true"]])

### 5.2 CENTROID-BASED CLASSIFIER

In [None]:
# load embedding model
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")

# embed training data
X_train_embeddings = embedding_model.encode(
    X_train.tolist(),
    show_progress_bar=True
)
# convert targets to array
y_train_array = y_train.values


In [None]:
# compute class-centroids
true_centroid = X_train_embeddings[y_train_array == 0].mean(axis=0)
fake_centroid = X_train_embeddings[y_train_array == 1].mean(axis=0)

# reshape for cosine similarity
true_centroid = true_centroid.reshape(1, -1)
fake_centroid = fake_centroid.reshape(1, -1)


In [None]:
# embed test data
X_test_embeddings = embedding_model.encode(
    X_test.tolist(),
    show_progress_bar=True
)

In [None]:
# define classification logic
def centroid_predict(embeddings, true_centroid, fake_centroid):
    sim_true = cosine_similarity(embeddings, true_centroid).flatten()
    sim_fake = cosine_similarity(embeddings, fake_centroid).flatten()
    
    # predict class with highest similarity
    return np.where(sim_fake > sim_true, 1, 0)

# predict on test set 
y_pred_centroid = centroid_predict(
    X_test_embeddings,
    true_centroid,
    fake_centroid
)

In [None]:
# print results
print(classification_report(
    y_test,
    y_pred_centroid,
    target_names=["True news", "Fake news"]
))

In [None]:
# investigate distance between class centroids 
centroid_similarity = cosine_similarity(true_centroid, fake_centroid)[0, 0]
print("Cosine similarity between class centroids:", centroid_similarity)

In [None]:
# cosine similarity to centroids for test set
sim_true = cosine_similarity(X_test_embeddings, true_centroid).flatten()
sim_fake = cosine_similarity(X_test_embeddings, fake_centroid).flatten()

analysis_df = pd.DataFrame({
    "text": X_test.values,
    "true_label": y_test.values,
    "sim_true": sim_true,
    "sim_fake": sim_fake,
    "pred_label": y_pred_centroid
})

# compute margin to both classes for each observation 
analysis_df["margin"] = analysis_df["sim_fake"] - analysis_df["sim_true"]


In [None]:
# very confident fake predictions
analysis_df.sort_values("margin", ascending=False).head(5)

In [None]:
# very confident true predictions
analysis_df.sort_values("margin", ascending=True).head(5)

In [None]:
# misclassified articles
analysis_df[analysis_df["true_label"] != analysis_df["pred_label"]].head(5)

### 5.3 ZERO-SHOT 

In [None]:
# load zero-shot classifier
zero_shot = pipeline(
    "zero-shot-classification",
    model="facebook/bart-large-mnli",
    device="mps" 
)

In [None]:
# define labels 
candidate_labels = ["fake news", "real news"]

# extract subset
subset_size = 500

X_test_subset = X_test.sample(
    n=subset_size,
    random_state=42
)

y_test_subset = y_test.loc[X_test_subset.index]


In [None]:
#  define decision logic
def zero_shot_predict(texts):
    outputs = zero_shot(texts, candidate_labels)
    return np.array([
        1 if o["labels"][0] == "fake news" else 0
        for o in outputs
    ])

y_pred_zs = zero_shot_predict(X_test_subset.tolist())


In [None]:
# print results
print(classification_report(
    y_test_subset,
    y_pred_zs,
    target_names=["True news", "Fake news"]
))