In [1]:
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.naive_bayes import MultinomialNB
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.metrics import confusion_matrix, classification_report, f1_score, ConfusionMatrixDisplay
import seaborn as sns

In [2]:
def get_dataset_df(human_dataset_path, ai_dataset_path):
    dataset_paths = [human_dataset_path, ai_dataset_path]
    result_df = pd.DataFrame()
    
    for dataset_path in dataset_paths:

        label = 0 if dataset_path.split('\\')[-1] == 'AI' else 1 ## 0 for AI generated text and 1 for human generated text
        
        csv_files = [os.path.join(dataset_path, file_name) for file_name in os.listdir(dataset_path) 
                if file_name.endswith(".csv")]
        
        for csv_file in csv_files:
            data = pd.read_csv(csv_file)
            data['label'] = label
            result_df = pd.concat([result_df, data], ignore_index=True)
    result_df = result_df.drop(columns = ['uid'])
    result_df = result_df.drop_duplicates()
    result_df = result_df.sample(frac=1).reset_index(drop=True)
            
    return result_df

In [3]:
class TextEmbedding:

    def __init__(self, model_name):
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModel.from_pretrained(model_name)
        self.max_length = self.tokenizer.model_max_length

    def convert_text_to_tokenid(self, text):
        tokens = self.tokenizer(text, return_tensors="pt")
        token_ids = tokens["input_ids"].squeeze().tolist()
        return token_ids

    def convert_tokenid_to_tokens(self, token_ids):
        tokens_str = self.tokenizer.convert_ids_to_tokens(token_ids)
        return tokens_str

    def get_vocabulary(self):
        return self.tokenizer.get_vocab()

    def convert_tokenid_to_text(self, token_ids):
        reconstructed_text = self.tokenizer.decode(token_ids)
        return reconstructed_text

    def cosine_similarity(self, input1, input2):
        embeddings1 = self.encode(input1)
        embeddings2 = self.encode(input2)
        similarity_score = F.cosine_similarity(embeddings1, embeddings2)
        return round(similarity_score.item(), 4)
        
    def encode(self, text):
        
        tokens = self.tokenizer(text, return_tensors="pt")
        input_ids = tokens['input_ids'][0]  # Extract the input IDs
        num_chunks = (len(input_ids) // self.max_length) + 1
        chunks = [input_ids[i * self.max_length: (i + 1) * self.max_length] for i in range(num_chunks)]
    
        embeddings = []

        # Process each chunk
        for chunk in chunks:
            chunk_tensor = chunk.unsqueeze(0)  # Add batch dimension (batch, num_tokens)
            with torch.no_grad():
                outputs = self.model(input_ids=chunk_tensor)
                chunk_embedding = outputs.last_hidden_state.mean(dim=1)  # (batch, num_tokens, embeding_size)
            embeddings.append(chunk_embedding)

        con_embedding = torch.cat(embeddings, dim=0)
        combined_embedding = torch.mean(con_embedding, dim=0)
            
        return combined_embedding
        
    def decode(self, embedded_text):
        pass

In [None]:
class TextPreprocessing:
    def __init__(self,  model_name = 'gpt2'):
        # self.text_embedding = text_embedding
        self.text_embedding = TextEmbedding(model_name)

    def preprocessing_basic(self, text):
        text = text.lower()
        text = text.replace('\r\n', ' ').replace('\n', '').replace('\r', '').replace('\\', '')
        return text

    def preprocessing_text(self, text):
        text_encoded = self.text_embedding.encode(text)
        text_encoded_flatten = torch.flatten(text_encoded)
        text_encoded_flatten_array = text_encoded_flatten.numpy()
        return text_encoded_flatten_array

    def preprocessing_final(self, dataset):
        # processed_dataset = self.dataset['text'].copy()
        processed_dataset = dataset['text'].apply(self.preprocessing_text)
        dataset_text_df = pd.DataFrame(processed_dataset.tolist())
        dataset_text_df.columns = [f"feature_{i}" for i in range(1, dataset_text_df.shape[1] + 1)]
        processed_dataset = pd.concat([dataset_text_df, dataset['label']], axis = 1)
        return processed_dataset

In [None]:
class FeaturesDimensionalityReduction:
    def __init__(self, scaler=None, pca=None):
        self.scaler = scaler or StandardScaler()
        self.pca = pca or PCA()

    def fit(self, X_data, n_components):
        self.scaler.fit(X_data)
        standardized_data = self.scaler.transform(X_data)
        self.pca = PCA(n_components=n_components)
        self.pca.fit(standardized_data)

    def transform_data(self, data):
        standardized_data = self.scaler.transform(data)
        return self.pca.transform(standardized_data)

    def transform_single_data_point(self, data_point):
        return self.transform_data(data_point)

    def save(self, scaler_path="scaler.pkl", pca_path="pca.pkl"):
        with open(scaler_path, "wb") as scaler_file:
            pickle.dump(self.scaler, scaler_file)
        with open(pca_path, "wb") as pca_file:
            pickle.dump(self.pca, pca_file)

    def load(self, scaler_path="scaler.pkl", pca_path="pca.pkl"):
        with open(scaler_path, "rb") as scaler_file:
            self.scaler = pickle.load(scaler_file)
        with open(pca_path, "rb") as pca_file:
            self.pca = pickle.load(pca_file)

In [None]:
def testing(text, model):
    label_mapping = {0: 'AI', 1: 'Human'}
    embeded_text = processed_dataset_obj.preprocessing_text(text).reshape(1, -1)
    embeded_text_df = pd.DataFrame(embeded_text)
    embeded_text_df.columns = [f"feature_{i}" for i in range(1, embeded_text_df.shape[1] + 1)]
    transformed_data = fdr.transform_single_data_point(embeded_text_df)
    prediction = model.predict(transformed_data)
    predicted_label = label_mapping[prediction[0]] + " Generated"
    return predicted_label

In [None]:
def model_evaluation(y_test, y_pred):
    """
    Evaluate the model's predictions using confusion matrix, classification report,
    and additional performance metrics such as F1-score, TPR, FPR, and accuracy.
    """
    # Calculate the confusion matrix
    conf_matrix = confusion_matrix(y_test, y_pred)

    # Display the confusion matrix
    disp = ConfusionMatrixDisplay(confusion_matrix=conf_matrix, display_labels=['AI', 'Human'])
    disp.plot(cmap='viridis')
    plt.title("Confusion Matrix")
    plt.show()

    # Print the classification report
    print("\nClassification Report:\n", classification_report(y_test, y_pred))

    # Extract components of the confusion matrix
    tn, fp, fn, tp = conf_matrix.ravel()

    # Calculate additional metrics
    f1 = f1_score(y_test, y_pred)
    accuracy = accuracy_score(y_test, y_pred)

    # Safeguard against division by zero for rates
    fpr = fp / (fp + tn) if (fp + tn) > 0 else 0.0
    fnr = fn / (fn + tp) if (fn + tp) > 0 else 0.0
    tpr = tp / (tp + fn) if (tp + fn) > 0 else 0.0
    tnr = tn / (tn + fp) if (tn + fp) > 0 else 0.0

    print("\nAdditional Metrics:")
    print(f"True Positive Rate (TPR, Sensitivity): {tpr:.2f}")
    print(f"True Negative Rate (TNR, Specificity): {tnr:.2f}")
    print(f"False Positive Rate (FPR): {fpr:.2f}")
    print(f"False Negative Rate (FNR): {fnr:.2f}")

    # Calculate total number of mistakes (false predictions)
    actual_mistakes = fp + fn
    error_rate = actual_mistakes / len(y_pred)

    print("\nTotal Number of Mistakes:", actual_mistakes)
    print(f"Error Rate: {error_rate:.2f}")


In [None]:
def count_plot(dataset, save_fig = None):

    # Define the label mapping with integer keys
    label_mapping = {1: 'Human', 0: 'AI'}
    
    # Create the countplot
    ax = sns.countplot(x='label', data=dataset)
    
    # Set fixed ticks and tick labels based on the mapping
    ax.xaxis.set_major_locator(FixedLocator(ax.get_xticks()))
    ax.set_xticklabels([label_mapping[int(label.get_text())] for label in ax.get_xticklabels()])
    
    # Annotate each bar with the count
    for p in ax.patches:
        ax.annotate(
            f'{int(p.get_height())}',  # The count value
            (p.get_x() + p.get_width() / 2, p.get_height()),  # Position of the text
            ha='center',
            va='bottom'
        )
    ax.set_xlabel('Category')  
    ax.set_ylabel('Count')  

    # save plot
    if save_fig:
        plt.savefig(save_fig)
    
    # Show the plot
    plt.show()

In [None]:
def sampling_the_dataset(dataset, num_samples, random_state=42):
    class_0_samples = num_samples//2
    class_1_samples = num_samples - class_0_samples
    
    dataset_0 = dataset[dataset['label'] == 0]
    sampled_dataset_0 = dataset_0.sample(class_0_samples, random_state=random_state)
    
    dataset_1 = dataset[dataset['label'] == 1]
    sampled_dataset_1 = dataset_1.sample(class_1_samples, random_state=random_state)

    combined_dataset = pd.concat([sampled_dataset_0, sampled_dataset_1], axis=0, ignore_index=True)

    shuffled_dataset = combined_dataset.sample(frac=1, random_state=random_state).reset_index(drop=True)

    return shuffled_dataset