In [None]:
import os
import shap
import pandas as pd
import numpy as np
import nltk
import re
import matplotlib.pyplot as plt
import seaborn as sns
from nltk.corpus import stopwords
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, precision_recall_fscore_support, accuracy_score
from sklearn.feature_extraction.text import TfidfVectorizer
from xgboost import XGBClassifier
from gensim.models import Word2Vec
import openai
from nltk import pos_tag

# Enable inline plotting
%matplotlib inline

# Load main data
files = ['dev_en_news', 'dev_en_reviews','dev_en_twitter', 'dev_nl_news', 'dev_nl_reviews', 'dev_nl_twitter']
df_list = []

for file in files:
    # Extract 'lang' and 'domain' from filename
    _, lang, domain = file.split("_")
    temp_df = pd.read_csv(f'Data/{file}.csv')
    if 'label' not in temp_df.columns or 'text' not in temp_df.columns:
        raise ValueError(f"Missing required columns in file: {file}")
    temp_df['domain'] = domain
    temp_df['lan'] = lang
    df_list.append(temp_df)

# Concatenate dataframes
df = pd.concat(df_list, ignore_index=True)
df['source'] = 'CLIN33'
df['label1'] = df['label'].apply(lambda label: 'generated' if label == 1 else 'human')

# Drop rows with missing text or label
df.dropna(subset=['text', 'label'], inplace=True)

# Use all data
data = df.copy()

# Data preprocessing
nltk.download('stopwords')
nltk.download('punkt')

def preprocess_text(text, lang):
    text = str(text).lower()
    text = re.sub(r'http\S+', '', text)
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    tokens = nltk.word_tokenize(text)
    if lang == 'en':
        stop_words = set(stopwords.words('english'))
    elif lang == 'nl':
        stop_words = set(stopwords.words('dutch'))
    else:
        stop_words = set()
    tokens = [word for word in tokens if word not in stop_words]
    return ' '.join(tokens)

# Apply preprocessing
data['processed_text'] = data.apply(lambda row: preprocess_text(row['text'], row['lan']), axis=1)

# Split data into train and test sets
X = data[['processed_text', 'lan', 'domain']]
y = data['label']
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    random_state=42
)

X_train_text = X_train['processed_text']
X_test_text = X_test['processed_text']

# Vectorization using TF-IDF
vectorizer = TfidfVectorizer(max_features=1000)
X_train_tfidf = vectorizer.fit_transform(X_train_text)
X_test_tfidf = vectorizer.transform(X_test_text)

# Train XGBoost model
model = XGBClassifier(use_label_encoder=False, eval_metric='logloss')
model.fit(X_train_tfidf, y_train)

# Evaluation
y_pred = model.predict(X_test_tfidf)

# Overall metrics
print("Overall Classification Report:")
print(classification_report(y_test, y_pred, target_names=['human', 'generated']))

# Prepare test results DataFrame
X_test = X_test.reset_index(drop=True)
y_test = y_test.reset_index(drop=True)
y_pred_series = pd.Series(y_pred, name='pred_label')
test_results = pd.concat([X_test, y_test.rename('true_label'), y_pred_series], axis=1)

# Function to calculate metrics
def calculate_metrics(y_true, y_pred):
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary')
    accuracy = accuracy_score(y_true, y_pred)
    return precision, recall, f1, accuracy

# Metrics per language
languages = test_results['lan'].unique()
metrics_per_language = []

for lang in languages:
    subset = test_results[test_results['lan'] == lang]
    precision, recall, f1, accuracy = calculate_metrics(subset['true_label'], subset['pred_label'])
    metrics_per_language.append({'Language': lang, 'Precision': precision, 'Recall': recall, 'F1-Score': f1, 'Accuracy': accuracy})
    print(f"\nClassification Report for language: {lang}")
    print(classification_report(subset['true_label'], subset['pred_label'], target_names=['human', 'generated']))

metrics_lang_df = pd.DataFrame(metrics_per_language)

# Plot metrics per language
fig, ax = plt.subplots(1, 4, figsize=(20, 5))
metrics = ['Precision', 'Recall', 'F1-Score', 'Accuracy']

for idx, metric in enumerate(metrics):
    sns.barplot(x='Language', y=metric, data=metrics_lang_df, ax=ax[idx])
    ax[idx].set_title(f'{metric} per Language')

plt.tight_layout()
plt.show()

# Metrics per genre
genres = test_results['domain'].unique()
metrics_per_genre = []

for genre in genres:
    subset = test_results[test_results['domain'] == genre]
    precision, recall, f1, accuracy = calculate_metrics(subset['true_label'], subset['pred_label'])
    metrics_per_genre.append({'Genre': genre, 'Precision': precision, 'Recall': recall, 'F1-Score': f1, 'Accuracy': accuracy})
    print(f"\nClassification Report for genre: {genre}")
    print(classification_report(subset['true_label'], subset['pred_label'], target_names=['human', 'generated']))

metrics_genre_df = pd.DataFrame(metrics_per_genre)

# Plot metrics per genre
fig, ax = plt.subplots(1, 4, figsize=(20, 5))

for idx, metric in enumerate(metrics):
    sns.barplot(x='Genre', y=metric, data=metrics_genre_df, ax=ax[idx])
    ax[idx].set_title(f'{metric} per Genre')

plt.tight_layout()
plt.show()

# Metrics per language and genre
metrics_per_lang_genre = []

for lang in languages:
    for genre in genres:
        subset = test_results[(test_results['lan'] == lang) & (test_results['domain'] == genre)]
        if len(subset) > 0:
            precision, recall, f1, accuracy = calculate_metrics(subset['true_label'], subset['pred_label'])
            metrics_per_lang_genre.append({'Language': lang, 'Genre': genre, 'Precision': precision, 'Recall': recall, 'F1-Score': f1, 'Accuracy': accuracy})
            print(f"\nClassification Report for language: {lang}, genre: {genre}")
            print(classification_report(subset['true_label'], subset['pred_label'], target_names=['human', 'generated']))

metrics_lang_genre_df = pd.DataFrame(metrics_per_lang_genre)

# Pivot table for heatmap
pivot_df = metrics_lang_genre_df.pivot(index='Language', columns='Genre', values='Accuracy')


# Plot heatmap of accuracy per language and genre
plt.figure(figsize=(8, 6))
sns.heatmap(pivot_df, annot=True, cmap='Blues')
plt.title('Accuracy per Language and Genre')
plt.show()

# Explainability using SHAP
explainer = shap.TreeExplainer(model)

# Convert the test data to dense format
X_test_tfidf_dense = X_test_tfidf.toarray()

# Get SHAP values
shap_values = explainer.shap_values(X_test_tfidf_dense)

# Identify influential tokens
def get_influential_tokens(shap_values, vectorizer):
    feature_names = vectorizer.get_feature_names_out()
    influential_tokens = []
    for i in range(len(shap_values)):
        token_importances = shap_values[i]
        top_indices = np.argsort(np.abs(token_importances))[-5:]  # Top 5 tokens per sample
        top_tokens = [feature_names[j] for j in top_indices]
        influential_tokens.append(top_tokens)
    return influential_tokens

influential_tokens = get_influential_tokens(shap_values, vectorizer)

# Save the most effective tokens
feature_names = vectorizer.get_feature_names_out()
mean_abs_shap_values = np.mean(np.abs(shap_values), axis=0)
token_importance_df = pd.DataFrame({
    'token': feature_names,
    'mean_abs_shap_value': mean_abs_shap_values
})
token_importance_df = token_importance_df.sort_values(by='mean_abs_shap_value', ascending=False)
token_importance_df.to_csv('most_effective_tokens.csv', index=False)

# Plot the top 20 most important tokens
top_tokens_df = token_importance_df.head(20)
plt.figure(figsize=(10, 6))
sns.barplot(x='mean_abs_shap_value', y='token', data=top_tokens_df, orient='h')
plt.title('Top 20 Most Important Tokens')
plt.xlabel('Mean Absolute SHAP Value')
plt.ylabel('Token')
plt.show()

# SHAP summary plot
plt.figure(figsize=(10, 6))
shap.summary_plot(shap_values, features=X_test_tfidf_dense, feature_names=feature_names)
plt.show()

# Strategy 1: Replacing tokens with the most similar words used by humans
# human_texts = data[data['label1'] == 'human']['processed_text']
# human_tokens = [nltk.word_tokenize(text) for text in human_texts]
# human_model = Word2Vec(human_tokens, vector_size=100, window=5, min_count=1, workers=4)

# Use only the training data for the Word2Vec model
human_texts_train = X_train[y_train == 0]['processed_text']
human_tokens_train = [nltk.word_tokenize(text) for text in human_texts_train]
human_model = Word2Vec(human_tokens_train, vector_size=100, window=5, min_count=1, workers=4)

def replace_with_similar_human_word(text, tokens_to_replace):
    tokens = nltk.word_tokenize(text)
    replacements = {}
    for token in tokens:
        if token in tokens_to_replace:
            if token in human_model.wv:
                similar_words = human_model.wv.most_similar(token, topn=1)
                if similar_words:
                    replacement = similar_words[0][0]
                    text = text.replace(token, replacement, 1)
                    replacements[token] = replacement
    return text, replacements

X_test_modified_1 = []
replacements_strategy1 = []

for i in range(len(X_test)):
    text = X_test.iloc[i]['processed_text']
    tokens_to_replace = influential_tokens[i]
    modified_text, replacements = replace_with_similar_human_word(text, tokens_to_replace)
    X_test_modified_1.append(modified_text)
    replacements_strategy1.append(replacements)

# Strategy 2: Replacing tokens with similar words considering POS tagging
def replace_with_similar_human_word_pos(text, tokens_to_replace):
    tokens = nltk.word_tokenize(text)
    pos_tags = pos_tag(tokens)
    replacements = {}
    for idx, (token, pos) in enumerate(pos_tags):
        if token in tokens_to_replace:
            if token in human_model.wv:
                similar_words = human_model.wv.most_similar(token, topn=10)
                for similar_word, _ in similar_words:
                    similar_pos = pos_tag([similar_word])[0][1]
                    if similar_pos == pos:
                        text = text.replace(token, similar_word, 1)
                        replacements[token] = similar_word
                        break
    return text, replacements

X_test_modified_2 = []
replacements_strategy2 = []

for i in range(len(X_test)):
    text = X_test.iloc[i]['processed_text']
    tokens_to_replace = influential_tokens[i]
    modified_text, replacements = replace_with_similar_human_word_pos(text, tokens_to_replace)
    X_test_modified_2.append(modified_text)
    replacements_strategy2.append(replacements)

# Strategy 3: Token Replacement using GPT-4
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
    api_key = input("Please enter your OpenAI API key: ")

openai.api_key = api_key

def replace_tokens_with_gpt4(text, tokens_to_replace):
    replacements = {}
    for token in tokens_to_replace:
        prompt = f"Replace the token '{token}' with a more human-like word in the following text: '{text}'"
        try:
            response = openai.ChatCompletion.create(
                messages=[
                    {"role": "user", "content": prompt}
                ],
                #model="gpt-3.5-turbo",
                model="gpt-4o-mini",

                max_tokens=10,
                n=1,
                stop=None,
                temperature=0.7,
            )
            replacement = response['choices'][0]['message']['content'].strip()
            text = text.replace(token, replacement, 1)
            replacements[token] = replacement
        except Exception as e:
            print(f"Error replacing token '{token}': {e}")
    return text, replacements

X_test_modified_3 = []
replacements_strategy3 = []

for i in range(len(X_test)):
    text = X_test.iloc[i]['processed_text']
    tokens_to_replace = influential_tokens[i]
    modified_text, replacements = replace_tokens_with_gpt4(text, tokens_to_replace)
    X_test_modified_3.append(modified_text)
    replacements_strategy3.append(replacements)

# Strategy 4: Using GPT-4 with genre-specific information
def replace_tokens_with_gpt4_genre(text, tokens_to_replace, genre):
    replacements = {}
    for token in tokens_to_replace:
        prompt = f"Replace the token '{token}' with a more human-like word in the following {genre} text: '{text}'"
        try:
            response = openai.ChatCompletion.create(
                messages=[
                    {"role": "user", "content": prompt}
                ],
                #model="gpt-3.5-turbo",
                model="gpt-4o-mini",
                max_tokens=10,
                n=1,
                stop=None,
                temperature=0.7,
            )
            replacement = response['choices'][0]['message']['content'].strip()
            text = text.replace(token, replacement, 1)
            replacements[token] = replacement
        except Exception as e:
            print(f"Error replacing token '{token}': {e}")
    return text, replacements

X_test_modified_4 = []
replacements_strategy4 = []

for i in range(len(X_test)):
    text = X_test.iloc[i]['processed_text']
    tokens_to_replace = influential_tokens[i]
    genre = X_test.iloc[i]['domain']
    modified_text, replacements = replace_tokens_with_gpt4_genre(text, tokens_to_replace, genre)
    X_test_modified_4.append(modified_text)
    replacements_strategy4.append(replacements)

# Test the model on the revised texts
X_test_modified_1_tfidf = vectorizer.transform(X_test_modified_1)
X_test_modified_2_tfidf = vectorizer.transform(X_test_modified_2)
X_test_modified_3_tfidf = vectorizer.transform(X_test_modified_3)
X_test_modified_4_tfidf = vectorizer.transform(X_test_modified_4)

y_pred_modified_1 = model.predict(X_test_modified_1_tfidf)
y_pred_modified_2 = model.predict(X_test_modified_2_tfidf)
y_pred_modified_3 = model.predict(X_test_modified_3_tfidf)
y_pred_modified_4 = model.predict(X_test_modified_4_tfidf)

# Evaluate the performance on modified texts
def get_metrics_dict(y_true, y_pred, strategy_name):
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='binary')
    accuracy = accuracy_score(y_true, y_pred)
    return {
        'Strategy': strategy_name,
        'Precision': precision,
        'Recall': recall,
        'F1-Score': f1,
        'Accuracy': accuracy
    }

metrics_original = get_metrics_dict(y_test, y_pred, 'Original')
metrics_strategy1 = get_metrics_dict(y_test, y_pred_modified_1, 'Strategy 1')
metrics_strategy2 = get_metrics_dict(y_test, y_pred_modified_2, 'Strategy 2')
metrics_strategy3 = get_metrics_dict(y_test, y_pred_modified_3, 'Strategy 3')
metrics_strategy4 = get_metrics_dict(y_test, y_pred_modified_4, 'Strategy 4')

metrics_all_strategies = pd.DataFrame([
    metrics_original,
    metrics_strategy1,
    metrics_strategy2,
    metrics_strategy3,
    metrics_strategy4
])

# Plot comparison of strategies
fig, ax = plt.subplots(1, 4, figsize=(20, 5))

for idx, metric in enumerate(metrics):
    sns.barplot(x='Strategy', y=metric, data=metrics_all_strategies, ax=ax[idx])
    ax[idx].set_title(f'{metric} Comparison')
    ax[idx].set_ylim(0, 1)

plt.tight_layout()
plt.show()

# Print classification reports for each strategy
print("\nResults for Strategy 1:")
print(classification_report(y_test, y_pred_modified_1, target_names=['human', 'generated']))

print("\nResults for Strategy 2:")
print(classification_report(y_test, y_pred_modified_2, target_names=['human', 'generated']))

print("\nResults for Strategy 3:")
print(classification_report(y_test, y_pred_modified_3, target_names=['human', 'generated']))

print("\nResults for Strategy 4:")
print(classification_report(y_test, y_pred_modified_4, target_names=['human', 'generated']))

# Save the tokens that were replaced as well as the replacements and the final datasets
def save_strategy_results(strategy_number, modified_texts, replacements):
    results_df = pd.DataFrame({
        'original_text': X_test['processed_text'],
        'modified_text': modified_texts,
        'replacements': replacements,
        'true_label': y_test,
        'pred_label': eval(f'y_pred_modified_{strategy_number}')
    })
    results_df.to_csv(f'strategy{strategy_number}_results.csv', index=False)

save_strategy_results(1, X_test_modified_1, replacements_strategy1)
save_strategy_results(2, X_test_modified_2, replacements_strategy2)
save_strategy_results(3, X_test_modified_3, replacements_strategy3)
save_strategy_results(4, X_test_modified_4, replacements_strategy4)

In [None]:
import pickle

# Save the trained model
with open('xgboost_model.pkl', 'wb') as model_file:
    pickle.dump(model, model_file)

# Save the TF-IDF vectorizer
with open('tfidf_vectorizer.pkl', 'wb') as vectorizer_file:
    pickle.dump(vectorizer, vectorizer_file)