In [2]:
# Basic data handling and array manipulations
import numpy as np
import pandas as pd
from collections import Counter

# Plotting libraries
import matplotlib.pyplot as plt
import plotly.express as px

# Machine learning, data splitting, and metrics
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score

# TensorFlow and PyTorch for neural networks
import tensorflow as tf
import torch

# Hugging Face Transformers and Datasets for NLP tasks and data handling
import transformers
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TFAutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    pipeline,
    AutoConfig,
    DataCollatorWithPadding,
    logging as hf_logging
)
from datasets import load_dataset, Dataset, load_metric

# Logging and warnings management
import logging
import warnings
warnings.filterwarnings('ignore')
logging.basicConfig(level=logging.ERROR)


ImportError: cannot import name 'load_metric' from 'datasets' (/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/site-packages/datasets/__init__.py)

In [7]:
def IMPORT_BABEv3():
    train_path = "./clean_datasets/TRAINING_DATAFRAME.csv"
    test_path = "./clean_datasets/TESTING_DATAFRAME.csv"
    # Check if the files already exist
    if os.path.exists(train_path) and os.path.exists(test_path):
        print("Training and testing files already exist. Skipping processing.")
        return 
    dataset = load_dataset("mediabiasgroup/BABE-v3")
    df = pd.DataFrame(dataset["train"])
    df['Predicted'] = 'XXX'
    df.drop(['news_link','outlet','label_opinion','biased_words'], axis=1, inplace=True)
    df['label'] = 0
    df['label'] = df['type'].isin(['left', 'right', 'center']).astype(int)
    DF_TRAIN, DF_TEST = train_test_split(df, test_size=0.20, random_state=42)
    DF_TRAIN.to_csv("./clean_datasets/TRAINING_DATAFRAME.csv", index=False)
    DF_TEST.to_csv("./clean_datasets/TESTING_DATAFRAME.csv", index=False)

hf_logging.set_verbosity_error()


def IMPORT_ALL_THE_NEWS_1():
    # Define the output file path
    output_path = "./clean_datasets/BIG_TESTING_DATAFRAME.csv"
    
    # Check if the file already exists
    if os.path.exists(output_path):
        print("Processed file already exists. Skipping processing.")
        return
    # Load the data
    df = pd.read_csv('./unclean_datasets/ALL_THE_NEWS_1.csv')
    print("Data loaded.")
    df = df.iloc[:100]  # Limiting the dataset for demonstration
    df.drop(['id', 'title', 'publication', 'author', 'date', 'year', 'month', 'url', 'Unnamed: 0'], axis=1, inplace=True)
    df = df.rename(columns={'content': 'text'})
    df["label"] = "unclassified"  # Initial label before classification

    # Setup model pipelines outside the loop for efficiency
    max_length = 300  # Based on typical model max token lengths
    model_names = ["d4data/bias-detection-model", "D1V1DE/bias-detection", "valurank/distilroberta-bias"]
    model_names = ["./models/D1V1DE-on-BABE-on-PRANJALI/", "./models/D4DATA-on-BABE/","./models/VALURANK-on-BABE-on-PRANJALI/"]
    pipelines = {name: pipeline("text-classification", model=name) for name in model_names}
    # Process each row
    for index, row in df.iterrows():
        text_data = row['text']
        words = text_data.split()
        chunk_size = int(max_length / 2)
        chunks = [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]
        bias_results = []
        
        for name, pipe in pipelines.items():
            chunk_predictions = [pipe(chunk)[0].get("label", "No Label").lower() for chunk in chunks]
            counter = Counter(chunk_predictions)
            bias_results.append(counter.most_common(1)[0][0])
        print(f"Intermediate results for index {index}: {bias_results}")
        final_counter = Counter(bias_results)
        final_bias, _ = final_counter.most_common(1)[0]
        df.at[index, 'label'] = 1 if final_bias == "biased" else 0
    
    # Set placeholders for additional analysis
    df["topic"] = "XXX"
    df["type"] = "YYY"
    df["predicted"] = "ZZZ"
    
    # Save the cleaned dataset
    df.to_csv("./clean_datasets/BIG_TESTING_DATAFRAME.csv", index=False)
    print("Data processing complete and saved.")

IMPORT_BABEv3()
IMPORT_ALL_THE_NEWS_1()

Training and testing files already exist. Skipping processing.
Data loaded.
Intermediate results for index 0: ['neutral', 'biased', 'neutral']
Intermediate results for index 1: ['neutral', 'biased', 'biased']
Intermediate results for index 2: ['neutral', 'biased', 'biased']
Intermediate results for index 3: ['neutral', 'biased', 'neutral']
Intermediate results for index 4: ['neutral', 'biased', 'biased']
Intermediate results for index 5: ['biased', 'biased', 'biased']
Intermediate results for index 6: ['biased', 'biased', 'neutral']
Intermediate results for index 7: ['biased', 'biased', 'biased']
Intermediate results for index 8: ['neutral', 'biased', 'neutral']
Intermediate results for index 9: ['biased', 'biased', 'biased']
Intermediate results for index 10: ['neutral', 'biased', 'neutral']
Intermediate results for index 11: ['biased', 'biased', 'biased']
Intermediate results for index 12: ['neutral', 'biased', 'biased']
Intermediate results for index 13: ['biased', 'biased', 'biased

In [None]:
def temp_test(data, assessing):
    # Getting unique items from the assessing column to initialize scores dictionary
    unique_items = data[assessing].unique().tolist()
    scores = {item: {"correct": 0, "count": 0} for item in unique_items}
    for _, row in data.iterrows():
        correct = False
        if pd.isna(row['type']) and row['label'] == 0:
            correct = True
        elif row['type'] in ['left', 'right', 'center'] and row['label'] == 1:
            correct = True
        item = row[assessing]
        scores[item]["count"] += 1
        if correct:
            scores[item]["correct"] += 1
    for item in scores:
        scores[item]['score'] = scores[item]['correct'] / scores[item]['count']
    scores = pd.DataFrame(scores).T
    scores["assesing"] = assessing #could do with changing from field
    scores = pd.DataFrame(scores).T
    return scores

In [None]:
def FIN_TRAIN_D4DATA(training_data_path):
    # Load and preprocess the dataset
    df = pd.read_csv(training_data_path)
    df = df[['text', 'label']]
    df_train, df_val = train_test_split(df, test_size=0.1, random_state=42)

    train_dataset = Dataset.from_pandas(df_train)
    val_dataset = Dataset.from_pandas(df_val)

    # Load the tokenizer and model configuration
    tokenizer = AutoTokenizer.from_pretrained("d4data/bias-detection-model")
    config = AutoConfig.from_pretrained("d4data/bias-detection-model")
    model = TFAutoModelForSequenceClassification.from_pretrained("d4data/bias-detection-model", config=config)

    # Tokenization function
    def tokenize_function(examples):
        return tokenizer(examples['text'], truncation=True, padding=True, max_length=512)

    # Tokenize the datasets
    tokenized_train_dataset = train_dataset.map(tokenize_function, batched=True)
    tokenized_val_dataset = val_dataset.map(tokenize_function, batched=True)

    # Convert to TensorFlow datasets
    train_dataset = tokenized_train_dataset.to_tf_dataset(
        columns=['input_ids', 'attention_mask', 'label'],
        shuffle=True,
        batch_size=16,
        collate_fn=DataCollatorWithPadding(tokenizer)
    )

    val_dataset = tokenized_val_dataset.to_tf_dataset(
        columns=['input_ids', 'attention_mask', 'label'],
        shuffle=False,
        batch_size=64,
        collate_fn=DataCollatorWithPadding(tokenizer)
    )

    # Define the training arguments and compile the model
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=5e-5),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[tf.metrics.SparseCategoricalAccuracy()]
    )

    # Train the model
    model.fit(train_dataset, validation_data=val_dataset, epochs=3)

    # Save the fine-tuned model
    model.save_pretrained('./fine_tuned/D4DATA')
    tokenizer.save_pretrained('./fine_tuned/D4DATA')


#code for fine-tuning individual models
#FIN_TRAIN_D4DATA("./clean_datasets/TRAINING_DATAFRAME.csv")

In [None]:
def FIN_TRAIN_D1V1DE(training_data_path):
    # Load and preprocess the dataset
    df = pd.read_csv(training_data_path)
    df = df[['text', 'label']]
    df_train, df_val = train_test_split(df, test_size=0.1)
    train_dataset = Dataset.from_pandas(df_train)
    val_dataset = Dataset.from_pandas(df_val)
    # Load the tokenizer
    tokenizer = AutoTokenizer.from_pretrained("D1V1DE/bias-detection")
    # Tokenization function
    def tokenize_function(examples):
        return tokenizer(examples["text"], padding="max_length", truncation=True)
    # Tokenize the datasets
    tokenized_train_dataset = train_dataset.map(tokenize_function, batched=True)
    tokenized_val_dataset = val_dataset.map(tokenize_function, batched=True)
    # Load the pretrained model
    model = AutoModelForSequenceClassification.from_pretrained("D1V1DE/bias-detection")
    # Training arguments
    training_args = TrainingArguments(
        output_dir="./results",
        num_train_epochs=3,
        per_device_train_batch_size=16,
        per_device_eval_batch_size=64,
        warmup_steps=500,
        weight_decay=0.01,
        logging_dir='./logs',
        logging_steps=10
    )
    # Initialize the Trainer
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_train_dataset,
        eval_dataset=tokenized_val_dataset
    )
    # Fine-tune the model
    trainer.train()
    # Save the fine-tuned model
    tokenizer.save_pretrained("./fine_tuned/D1V1DE")
    model.save_pretrained("./fine_tuned/D1V1DE")


#code for fine-tuning individual models
#FIN_TRAIN_D1V1DE("./clean_datasets/TRAINING_DATAFRAME.csv")

In [None]:
def FIN_TRAIN_VALURANK(training_data_path):
    # Load and preprocess the dataset
    df = pd.read_csv(training_data_path)
    df = df[['text', 'label']]
    df_train, df_val = train_test_split(df, test_size=0.1, random_state=42)

    train_dataset = Dataset.from_pandas(df_train)
    val_dataset = Dataset.from_pandas(df_val)

    # Load the tokenizer and model configuration
    tokenizer = AutoTokenizer.from_pretrained("valurank/distilroberta-bias")
    config = AutoConfig.from_pretrained("valurank/distilroberta-bias")
    model = TFAutoModelForSequenceClassification.from_pretrained("valurank/distilroberta-bias", config=config, from_pt=True)  # Added from_pt=True here

    # Tokenization function
    def tokenize_function(examples):
        return tokenizer(examples['text'], truncation=True, padding='max_length', max_length=512)

    # Tokenize the datasets
    tokenized_train_dataset = train_dataset.map(tokenize_function, batched=True)
    tokenized_val_dataset = val_dataset.map(tokenize_function, batched=True)

    # Convert to TensorFlow datasets
    train_dataset = tokenized_train_dataset.to_tf_dataset(
        columns=['input_ids', 'attention_mask', 'label'],
        shuffle=True,
        batch_size=16,
        collate_fn=DataCollatorWithPadding(tokenizer, return_tensors="tf")
    )

    val_dataset = tokenized_val_dataset.to_tf_dataset(
        columns=['input_ids', 'attention_mask', 'label'],
        shuffle=False,
        batch_size=64,
        collate_fn=DataCollatorWithPadding(tokenizer, return_tensors="tf")
    )

    # Define the training arguments and compile the model
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=5e-5),
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        metrics=[tf.metrics.SparseCategoricalAccuracy()]
    )

    # Train the model
    model.fit(train_dataset, validation_data=val_dataset, epochs=3)

    # Save the fine-tuned model
    model.save_pretrained('./fine_tuned/VALURANK')
    tokenizer.save_pretrained('./fine_tuned/VALURANK')

# Example usage:
#FIN_TRAIN_VALURANK("./clean_datasets/TRAINING_DATAFRAME.csv")

In [None]:
def GEN_TEST_BIAS(model,CSV):
    transformers.logging.set_verbosity_error()
    try:
        pipe = pipeline("text-classification", model=model)
        test_df = pd.read_csv(CSV)
        max_length = 300  # Define maximum length for model input
        for index, row in test_df.iterrows():
            text_data = row['text']
            # Chunk the text first before tokenizing
            words = text_data.split()
            chunk_size = int(max_length / 2)  # Rough estimate of chunk size in words
            chunks = [' '.join(words[i:i + chunk_size]) for i in range(0, len(words), chunk_size)]
            chunk_predictions = []
            for chunk in chunks:
                # Tokenize and then convert to string within the model's token limit
                result = pipe(chunk)
                if result:
                    chunk_predictions.append(result[0].get("label", "No Label"))
            if chunk_predictions:
                most_common = max(set(chunk_predictions), key=chunk_predictions.count)
                test_df.at[index, 'Predicted'] = most_common
            else:
                test_df.at[index, 'Predicted'] = "No Prediction"
        test_df.to_csv("temp.csv", index = False)
        # Analyze results by type and topic
        Type = temp_test(test_df, "type")
        Topic = temp_test(test_df, "topic")
        # Combine results and calculate scores
        # Modify the join line in your function
        results = Type.join(Topic).T
        results['score'] = results['score'] * 100
        # Save final results to a CSV file
        print("eval competed succesfully")
        return results
    except KeyError as e:
        print(f"KeyError encountered: {e}")
        print("Contents of DataFrame:")
        print(results.head())

def createAssesments(model_path, dataset_path):
    df = GEN_TEST_BIAS(model_path, dataset_path)
    df['model'] = model_path
    df['dataset'] = dataset_path
    df["assessed"] = df.index
    return df

# Using the function to generate and label dataframes
dataframes = []
dataframes.append(createAssesments("valurank/distilroberta-bias", "./clean_datasets/TESTING_DATAFRAME.csv"))
dataframes.append(createAssesments("valurank/distilroberta-bias", "./clean_datasets/BIG_TESTING_DATAFRAME.csv"))
dataframes.append(createAssesments("./fine_tuned/VALURANK", "./clean_datasets/TESTING_DATAFRAME.csv"))
dataframes.append(createAssesments("./fine_tuned/VALURANK", "./clean_datasets/BIG_TESTING_DATAFRAME.csv"))
dataframes.append(createAssesments("D1V1DE/bias-detection", "./clean_datasets/TESTING_DATAFRAME.csv"))
dataframes.append(createAssesments("D1V1DE/bias-detection", "./clean_datasets/BIG_TESTING_DATAFRAME.csv"))
dataframes.append(createAssesments("./fine_tuned/D1V1DE", "./clean_datasets/TESTING_DATAFRAME.csv"))
dataframes.append(createAssesments("./fine_tuned/D1V1DE", "./clean_datasets/BIG_TESTING_DATAFRAME.csv"))
dataframes.append(createAssesments("d4data/bias-detection-model", "./clean_datasets/BIG_TESTING_DATAFRAME.csv"))
dataframes.append(createAssesments("d4data/bias-detection-model", "./clean_datasets/TESTING_DATAFRAME.csv"))
dataframes.append(createAssesments("./fine_tuned/D4DATA", "./clean_datasets/TESTING_DATAFRAME.csv"))
dataframes.append(createAssesments("./fine_tuned/D4DATA", "./clean_datasets/BIG_TESTING_DATAFRAME.csv"))

# Combining the dataframes into one
combined_df = pd.concat(dataframes, ignore_index=True)
combined_df.fillna("none", inplace=True)
# Saving the combined dataframe to a CSV file
combined_df.to_csv("Model_Results.csv", index=False)


In [None]:
def new_test_bias(model,csv):
    pipe = pipeline("text-classification", model = model)
    data = pd.read_csv(csv)
    for index, row in data.iterrows():
        predicted = pipe(row["text"])
        predicted = predicted[0]["label"]
        if predicted.lower() == "biased":
            guess = 1
        else:
            guess = 0
        data.at[index, 'Predicted'] = guess
    return data

new_test_bias("D1V1DE/bias-detection", "./clean_datasets/TESTING_DATAFRAME.csv")

In [None]:
def GEN_EVAL_evaluate_model(model, tokenizer, eval_dataset):
    #used for evaluating performance of models in terms of speed. needs further research
    # Initialize the trainer
    trainer = Trainer(model=model)
    # Tokenize the evaluation dataset
    def tokenize_function(examples):
        return tokenizer(examples["text"], padding="max_length", truncation=True)
    tokenized_eval_dataset = eval_dataset.map(tokenize_function, batched=True)
    # Evaluate the model
    results = trainer.evaluate(tokenized_eval_dataset)
    return results

In [None]:
def GEN_VIS_star(data,key):
    FilteredData = data[data['field'] == key]
    # Preparing the data for the star plot (radar chart)
    FilteredData = FilteredData.reset_index()
    FilteredData['index'] = FilteredData['index'].fillna('No Bias')
    labels=FilteredData['index']
    #print(labels)
    stats=FilteredData['score']
    #print(stats)

    # Create radar chart
    angles=np.linspace(0, 2*np.pi, len(labels), endpoint=False).tolist()
    stats=np.concatenate((stats,[stats[0]]))
    angles+=angles[:1]
    fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True))
    ax.fill(angles, stats, color='blue', alpha=0.25)
    ax.set_yticklabels([])
    ax.set_xticks(angles[:-1])
    ax.set_xticklabels(labels)

    # Display the plot
    plt.title('Star Plot of '+  str(key) +' vs. accuracy rating')
    plt.show()