In [None]:
!pip install pandas spacy textstat language-tool-python numpy scikit-learn boruta
!python -m spacy download en_core_web_sm

In [None]:
import pandas as pd
import json
import spacy
import textstat
import language_tool_python
import numpy as np
import random
from spacy.tokens import Doc
from urllib.request import urlopen
from pathlib import Path
from collections import Counter
from sklearn.model_selection import StratifiedKFold, GridSearchCV, train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
from boruta import BorutaPy



nlp = spacy.load("en_core_web_md")
language_tool = language_tool_python.LanguageTool("en-US")
#loading the nrc_en lexicon manually as the NRCLex 4.0 is very buggy
url = "https://raw.githubusercontent.com/DemetersSon83/NRCLex/refs/heads/master/nrc_en.json" 
with urlopen(url) as re:
    lexicon = json.load(re)

In [22]:
def load_articles (filepath: str) -> pd.DataFrame:

    """
        Load the articles as a pandas DF
        
    """

    article = []
    with open(filepath, "r", encoding = "utf-8") as file:
        for line in file:
            line.strip()
            if not line:
                continue
            article.append(json.loads(line))

    return pd.DataFrame(article)        

In [None]:
def process_texts (articles: list[str]) -> list[Doc]:

    """
        Use "en_core_web_sm" model on df["Text"] and df["Title]
     
    """

    return list(nlp.pipe(articles))




def POStagging (articles: list[Doc], df: pd.DataFrame) -> pd.DataFrame:

    """
        Counting and processing POS-Tags

    """

    #filter POS-tags we want
    #we stick with universal POS-tags for now
    #https://github.com/explosion/spaCy/blob/master/spacy/glossary.py
    universal_tags = ["ADJ", "ADP", "ADV", "AUX", "CONJ", "CCONJ", "DET", "INTJ", "NOUN",
                      "NUM", "PART", "PRON", "PROPN", "PUNCT", "SCONJ", "SYM", "VERB", "X"]
    
    
    #retrieving the IDs of the relevant POS-tags
    tag_ids = {tag: nlp.vocab.strings[tag] for tag in universal_tags}


    temp_df_list = []
    for article in articles:


        #count all tokens (without space-tokens)
        total_tokens = sum(1 for token in article if not token.is_space)


        #counting all POS-tags
        pos_counts = article.count_by(spacy.attrs.POS)


        #adding the POS-Tags as percentage of the total tokens + total count 
        inner_temp = {}
        for tag, tag_id in tag_ids.items():
            counter = pos_counts[tag_id] if tag_id in pos_counts else 0
            inner_temp[f"{tag.lower()}_count"] = counter
            inner_temp[f"{tag.lower()}_perc"] = counter / total_tokens if total_tokens else 0 
        temp_df_list.append(inner_temp)  


    #create a pd DF with the same index as df
    df_features = pd.DataFrame(temp_df_list, index = df.index)
    return pd.concat([df,df_features], axis = 1)     




def NER (articles: list[Doc], df: pd.DataFrame) -> pd.DataFrame:
        
    """
        Counting and processing NER
        
    """
    
    #many random forrest iterations showed, that these are the important one
    ner_labels = ["PERSON", "ORG", "GPE", "DATE", "NORP", "CARDINAL", 
                  "MONEY", "PERCENT", "LOC", "EVENT"]      


    temp_df_list = []
    for article in articles:


        #set all labels to a default value
        inner_temp = {}
        for label in ner_labels:
           inner_temp[f"{label.lower()}"] = []
           inner_temp[f"{label.lower()}_count"] = 0
           inner_temp[f"{label.lower()}_unique"] = 0


        #retrieving the entities for the NER 
        entities = {label: [] for label in ner_labels}
        for entity in article.ents:
            if entity.label_ in ner_labels:
                entities[entity.label_].append(entity.text)

        
        #extracting the raw text and count of the (unique) words
        for key, value in entities.items():
            inner_temp[f"{key.lower()}"] = value 
            inner_temp[f"{key.lower()}_count"] = len(value) 
            inner_temp[f"{key.lower()}_unique"] = len(set(value)) 
        temp_df_list.append(inner_temp)    


    #create a pd DF with the same index as df
    df_features = pd.DataFrame(temp_df_list, index = df.index)
    return pd.concat([df,df_features], axis = 1)  


    

def emotions_nrc (articles: list[Doc], df: pd.DataFrame) -> pd.DataFrame:

    """ 
        Counting and processing the emotions in the articles 

    """

    #filtering all emotions tags   
    emotion_tags = ["anger", "anticipation", "disgust", "fear", "joy", "sadness", "surprise",
                    "trust", "positive", "negative"]
    

    #the following code is inspired by the NRCLex module by metalcorebear:
    #i only recreate parts of the code as the module itselfe tends to cause problems 
    #source: https://pypi.org/project/NRCLex/


    lex_keys = set(lexicon.keys())
    temp_df_list = []
    for article in articles:


        #filtering and processing all relevant tokens
        affect_list = []
        for tok in article:
            if not tok.is_alpha:
                continue
            orig = tok.lower_
            if orig in lex_keys:    
                affect_list.extend(lexicon[orig])    


        #creating relevant counters
        freq_counter = Counter()
        for emo in affect_list:
            freq_counter[emo] += 1
        total_emotions = sum(freq_counter.values()) or 1        


        #adding the emotions as percentage of the total emotinos + total count 
        inner_temp = {}
        for emotion in emotion_tags:
            emotion_counter = freq_counter[emotion] if emotion in freq_counter else 0
            inner_temp[f"{emotion.lower()}_count"] = emotion_counter
            inner_temp[f"{emotion.lower()}_perc"] = emotion_counter / total_emotions
        temp_df_list.append(inner_temp)    


    #create a pd DF with the same index as df
    df_features = pd.DataFrame(temp_df_list, index = df.index)
    return pd.concat([df,df_features], axis = 1)        




def readability_and_difficulty_scores (articles: list[str], df: pd.DataFrame) -> pd.DataFrame:

    """
    Counting and processing readability scores and difficult worrds

    """

    

    temp_df_list = []

    for article in articles:
        difficult_words = textstat.difficult_words(article)
        inner_temp = {

            #different difficulties
            #https://github.com/kupolak/textstat/blob/master/README.md#spache-readability-formula (for scales)
            "flesch_reading_ease": textstat.flesch_reading_ease(article),
            "flesch_kincaid_grade": textstat.flesch_kincaid_grade(article),
            "gunning_fog": textstat.gunning_fog(article),
            "auto_readability_index": textstat.automated_readability_index(article),
            "coleman_liau_index": textstat.coleman_liau_index(article),
            "linsear_write_formula": textstat.linsear_write_formula(article),

            #difficult words count + perc of total words
            "difficult_words_count": difficult_words,
            "difficult_words_perc": difficult_words / textstat.lexicon_count(article,
                                                                             removepunct = True)

        }
        temp_df_list.append(inner_temp)


    #create a pd DF with the same index as df
    df_features = pd.DataFrame(temp_df_list, index = df.index)
    return pd.concat([df,df_features], axis = 1)




def Error_check (articles: list[str], df: pd.DataFrame) -> pd.DataFrame:

    """ 
        Counting and processing the grammar and spelling check
    
    """

    temp_df_list = []
    for article in articles:

        matches = language_tool.check(article)

        only_s = [entry for entry in matches if entry.ruleIssueType == "misspelling"]    
        only_g = [entry for entry in matches if entry.ruleIssueType == "grammar"]
        total_s = len(only_s)
        total_g = len(only_g)  
        total_words = textstat.lexicon_count(article, removepunct = True)


        inner_temp = {

            "misspellings_count": total_s,
            "misspellings_perc": total_s / total_words,
            "grammar_count": total_g,
            "grammar_perc": total_g / total_words

        }
        temp_df_list.append(inner_temp)    



    #create a pd DF with the same index as df
    df_features = pd.DataFrame(temp_df_list, index = df.index)
    return pd.concat([df,df_features], axis = 1)



In [24]:
def concat_df (df: pd.DataFrame, df2: pd.DataFrame) -> pd.DataFrame:

    """
        Connect the text and headline df
    """

    #change column names for the headlines
    df2_copy = df2.copy()
    df2_copy.columns = ["hl_" + col for col in df2_copy.columns]

    return pd.concat([df, df2_copy], axis = 1)

In [None]:

#load
file_path = Path("C:/Users/Admin/Desktop/Uni/CSS/NLP/without_assessment.jsonl")
df = load_articles(str(file_path))
df2 = df.copy()


#Text


#prepare text
texts = df["Text"].tolist()
docs  = process_texts(texts)


#POS- and NERfeat
df = POStagging(docs, df)
df = NER(docs, df)


#add emotions
df = emotions_nrc(docs, df)


#readability and difficulty
df = readability_and_difficulty_scores(texts, df)


#grammar and spelling errors
df = Error_check(texts, df)


#Headline


#prepare text
titles = df2["Title"].tolist()
docs  = process_texts(titles)


#POS- and NERfeat
df2 = POStagging(docs, df2)
df2 = NER(docs, df2)


#add emotions
df2 = emotions_nrc(docs, df2)


#readability and difficulty
df2 = readability_and_difficulty_scores(titles, df2)


#grammar and spelling errors
df2 = Error_check(titles, df2)


df = concat_df(df, df2)


In [None]:
#df.to_csv("output_features.csv", index = True, encoding = "utf-8")

In [8]:
def preparation (df: pd.DataFrame, csv_path_labels: str, labelname: str) -> pd.DataFrame:

    """ 
        pereparing the df for the ML part
    
    """
        

    df = df.drop(columns = ["Unnamed: 0", "Index", "hl_Index"]) 
    df.drop(columns=[c for c in df.columns if "count" in c.lower()], inplace=True)
    
       
    #filtering the df for only ints and floats
    df_filtered = df.select_dtypes(include = ["int64", "float64"]).copy()


    #filtering all columns that have little variance 
    n = len(df_filtered)
    low_variance = []
    for col in df_filtered.columns:
        count = df_filtered[col].value_counts().iloc[0]
        if count / n >= 0.70:
                low_variance.append(col)
    df_filtered = df_filtered.drop(columns = low_variance)            


    #filtering columns with high correlations (no new information)
    corr_matrix = df_filtered.corr().abs()
    drop = []
    all_cols = corr_matrix.columns.tolist()
    for row in range(len(all_cols)):
        for column in range (row):
                if corr_matrix.iloc[row, column] >= 0.80:
                    drop.append(all_cols[row])
                    break
    drop_fin = sorted(set(drop))  


    #prepare x and y  
    x = df_filtered.drop(columns = drop_fin)
    real_fake = pd.read_csv(csv_path_labels)
    y = pd.Series(real_fake[labelname], name = labelname)
  

    return x, y




def grid_search_rf(x: pd.DataFrame, y: pd.Series, seed: int): 

    """ 
        param optimazation with grid search
    
    """

    params = {

            "n_estimators": [100, 110, 120, 130, 140, 150, 160, 170, 180, 190],
            "max_depth": [None, 5, 10, 15, 20],
            "min_samples_leaf": [2, 3, 4, 5, 6],
            "max_features": ["sqrt", 0.3, 0.5]

        }


    #finding the best params for our temp random forrest (accr. metric to beat the score)
    rf = RandomForestClassifier(random_state = seed, n_jobs = -1)
    opt_grid = GridSearchCV(rf, params, cv = StratifiedKFold(n_splits = 10,
                            shuffle = True, random_state = seed), scoring = "accuracy",
                            n_jobs = -1)
    opt_grid.fit(x, y)


    return opt_grid.best_estimator_




def data_filter (x, y, seed):

    """
        80/20 split and preparation for the final model

    """

    # spliting the dataset
    x_train, x_test, y_train, y_test = train_test_split(
        x, y, test_size = 0.2, stratify = y, random_state = seed)
    

    #extracting params
    rf_temp = grid_search_rf(x_train, y_train, seed)


    #boruta selection of features 
    boruta = BorutaPy(estimator = rf_temp, n_estimators = "auto", perc = 90, alpha = 0.05, 
                      max_iter = 70, random_state = seed, verbose = 0)
    boruta.fit(x_train, y_train)
    selected = [name for name, keep in zip(x_train.columns, boruta.support_) if keep]


    #reduction of the subsets
    x_train_sel = x_train[selected]
    x_test_sel = x_test[selected]


    return x_train_sel, x_test_sel, y_train, y_test, selected 




def final_step (x_train_sel, x_test_sel, y_train, y_test, seed):

    """ 
        Building the last model and report

    """

    rf_final = grid_search_rf(x_train_sel, y_train, seed = seed)
    final_model = rf_final.fit(x_train_sel, y_train)
    y_pred = final_model.predict(x_test_sel)
    report = classification_report(y_test, y_pred, output_dict = True)


    return final_model, report



In [17]:
df = pd.read_csv("output_features.csv")
csv_path_labels = "group58_stage1.csv"
label = "real_news"
seed = 37


#preparing the data
x, y = preparation(df, csv_path_labels, label)


#spliting the data in 80/20 and optimize params + feat
x_train_sel, x_test_sel, y_train, y_test, features = data_filter(x, y, seed = seed)


#building the final model and report
final_model, test_report = final_step(x_train_sel, x_test_sel, y_train, y_test, seed = seed)


print(f"Final features: {features}")
print(f"Features len: {len(features)}")
print(f"Test performance: {test_report}")

Final features: ['adj_perc', 'noun_perc', 'pron_perc', 'propn_perc', 'sconj_perc', 'verb_perc', 'anticipation_perc', 'hl_det_perc', 'hl_noun_perc', 'hl_punct_perc', 'hl_verb_perc', 'hl_auto_readability_index']
Features len: 12
Test performance: {'no': {'precision': 0.875, 'recall': 1.0, 'f1-score': 0.9333333333333333, 'support': 14.0}, 'yes': {'precision': 1.0, 'recall': 0.875, 'f1-score': 0.9333333333333333, 'support': 16.0}, 'accuracy': 0.9333333333333333, 'macro avg': {'precision': 0.9375, 'recall': 0.9375, 'f1-score': 0.9333333333333333, 'support': 30.0}, 'weighted avg': {'precision': 0.9416666666666667, 'recall': 0.9333333333333333, 'f1-score': 0.9333333333333333, 'support': 30.0}}


In [18]:
#creating and predicting the final scores
x_vars = df[features]
y_pred = final_model.predict(x_vars)
df_all = x_vars.copy()
df_all["real_news"] = y.to_list()
df_all["real_news_pred"] = y_pred


print(accuracy_score(df_all["real_news"], df_all["real_news_pred"]))
output_path = "all_predictions.csv"
df_all.to_csv(output_path, index = False)

0.9866666666666667


In [19]:
#using the same params and features over 1000 seeds between 50 and 1000

random.seed(seed)

random_seeds = random.sample(range(50, 10001), 1000)

rf_params = final_model.get_params()
rf_params.pop("random_state")

accuracies = []
recalls = []
f1s = []

for s in random_seeds:
    rf = RandomForestClassifier(**rf_params, random_state = s)
    rf.fit(x_train_sel, y_train)
    y_pred = rf.predict(x_test_sel)
    acc = accuracy_score(y_test, y_pred)
    report = classification_report(y_test, y_pred, output_dict = True)

    accuracies.append(report["accuracy"])
    recalls.append(report["macro avg"]["recall"])
    f1s.append(report["macro avg"]["f1-score"])


print(f"Accuracy = mean: {np.mean(accuracies):.3f}, sd: {np.std(accuracies):.3f}")
print(f"Recall = mean: {np.mean(recalls):.3f}, sd: {np.std(recalls):.3f}")
print(f"f1_score = mean: {np.mean(f1s):.3f}, sd: {np.std(f1s):.3f}")




Accuracy = mean: 0.932, sd: 0.029
Recall = mean: 0.934, sd: 0.028
f1_score = mean: 0.931, sd: 0.029
