## Python PaillierEncryption
The core cryptographic functionality in this project was provided by the PHE(Python PaillierEncryption) library, which is a Python implementation of the Paillier cryptosystem. The library is open-source and can be found on GitHub.

@misc{PythonPaillier, author = {CSIRO's Data61}, title = {Python Paillier Library}, year = {2013}, publisher = {GitHub}, journal = {GitHub Repository}, howpublished = {\url{https://github.com/data61/python-paillier}}, }

In [None]:
pip install git+https://github.com/data61/python-paillier.git

## Importing libraries

In [None]:
import numpy as np
import pandas as pd
import json
import time
import phe as paillier
from contextlib import contextmanager
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression, Perceptron
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import f1_score

## Loading the JSON file

In [None]:
def load_data_from_jsonl(file_path):
    '''
    Read a JSONL (JSON Lines) file and extracts email texts and their corresponding binary labels.
    Labels are binary: spam (1) and ham/non-spam (0).
    Parameters : file_path (str) - Path to the JSONL file.
    Returns:
        np.array - Array of email texts.
        np.array - Array of binary labels (1 for spam, 0 for ham).
    '''
    texts, labels = [], []
    with open(file_path, 'r') as f:
        for line in f:
            entry = json.loads(line)
            texts.append(entry['text'])  # Adapt based on the JSON schema
            labels.append(1 if entry['label'] == 1 else 0)  # Keep label binary: spam (1), ham (0)
    return np.array(texts), np.array(labels)

## Unencrypted model
This model is mainly for comparison with the encrypted model.

In [None]:
def process_datasets(train_file, test_file):
    '''
    Load, preprocess, and transform training and testing datasets.
    This function vectorizes email texts using TF-IDF.
    Parameters:
        train_file (str) - Path to the training data file.
        test_file (str) - Path to the testing data file.
    Returns: Transformed training data (X_train_reduced), training labels (train_labels), test data (X_test_reduced) and test labels (test_labels).
    '''

    print("Reading training dataset.")
    train_texts, train_labels = load_data_from_jsonl(train_file)

    print("Reading testing dataset.")
    test_texts, test_labels = load_data_from_jsonl(test_file)

    # Text vectorization (Bag-of-Words with Tfidf)
    vectorizer = TfidfVectorizer(use_idf=False, norm=None, stop_words='english', min_df=0.001)

    # Fit vectorizer on training data and apply to test data
    X_train = vectorizer.fit_transform(train_texts)
    X_test = vectorizer.transform(test_texts)

    # Ensure both classes are present in training set
    if len(np.unique(train_labels)) < 2:
        raise ValueError("Training data must have both spam and non-spam labels.")

    print(f"Proportion of spam: {np.mean(train_labels == 1):.2f}, ham: {np.mean(train_labels == 0):.2f}")

    return X_train, train_labels, X_test, test_labels

@contextmanager
def measure_time():
    # A context manager for measuring the execution time of a block of code.
    start = time.perf_counter()
    yield
    end = time.perf_counter()
    elapsed = end - start
    print(f'Elapsed time: {elapsed:.2f} seconds')

class SystemAdmin:
    """
    SystemAdmin is responsible for training models (SVM, Logistic Regression, and Perceptron) and making prediction using test data.
    """
    def __init__(self):
        # Initializes SVM, Logistic Regression, and Perceptron models.
        self.svm_model = SVC(kernel='linear')
        self.lr_model = LogisticRegression()
        self.perceptron_model = Perceptron()

    def train_models(self, X, y):
        #Trains the SVM, Logistic Regression, and Perceptron models using the provided training data.
        self.svm_model.fit(X, y)
        self.lr_model.fit(X, y)
        self.perceptron_model.fit(X, y)

    def make_prediction(self, model, X):
        #Makes predictions using the given model and input data.
        return model.predict(X)


if __name__ == '__main__':
    train_path = 'train.jsonl'
    test_path = 'test.jsonl'

    X_train, y_train, X_test, y_test = process_datasets(train_path, test_path)

    # Admin operations
    admin = SystemAdmin()

    # Train models and record training time
    training_times = {}
    print("Training models.")
    for model_name, model in zip(["SVM", "Logistic Regression", "Perceptron"],
                                 [admin.svm_model, admin.lr_model, admin.perceptron_model]):
        start_time = time.perf_counter()
        admin.train_models(X_train, y_train)
        training_times[model_name] = time.perf_counter() - start_time

    # Evaluate models and store results
    results = []
    for model_name, model in zip(["SVM", "Logistic Regression", "Perceptron"],
                                 [admin.svm_model, admin.lr_model, admin.perceptron_model]):
        # Unencrypted evaluation
        start_time = time.perf_counter()
        preds = admin.make_prediction(model, X_test)
        test_time = time.perf_counter() - start_time

        error_rate = np.mean(preds != y_test)
        f1 = f1_score(y_test, preds)
        results.append((model_name, "Unencrypted", error_rate, training_times[model_name], test_time, f1))

    # Display results in a DataFrame
    df_results = pd.DataFrame(results, columns=["Model", "Type", "Test Error", "Training Time", "Test Time", "F1 Score"])
    print("\nResults Comparison:")
    print(df_results)


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


## Encrypted Model
Here we combine encryption and machine learning to create a privacy preserving spam detection model. Then we compare the results with those we got from unencrypted model.

In [None]:

def process_datasets(train_file, test_file, n_components=100):
    """
    Load, preprocess, and transform training and testing datasets.
    This function vectorizes email texts using TF-IDF and  applies Principal Component Analysis (PCA) to reduce the dimensionality of the feature space.
    Parameters:
        train_file : Path to the training data file.
        test_file : Path to the testing data file.
        n_components : Number of PCA components to reduce the feature space to (we have taken 100).
    Returns: Transformed training data (X_train_reduced), training labels (train_labels), test data (X_test_reduced), and test labels (test_labels).
    """

    print("Reading training dataset.")
    train_texts, train_labels = load_data_from_jsonl(train_file)

    print("Reading testing dataset.")
    test_texts, test_labels = load_data_from_jsonl(test_file)

    # Text vectorization (Bag-of-Words with Tfidf)
    vectorizer = TfidfVectorizer(use_idf=False, norm=None, stop_words='english', min_df=0.001)

    # Fit vectorizer on training data and apply to test data
    X_train = vectorizer.fit_transform(train_texts).toarray()  # Convert to dense array for PCA
    X_test = vectorizer.transform(test_texts).toarray()

    print(f'Original feature space size: {X_train.shape[1]}')

    # Scale the data before applying PCA
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)

    # Apply PCA for dimensionality reduction
    pca = PCA(n_components=n_components)
    X_train_reduced = pca.fit_transform(X_train_scaled)
    X_test_reduced = pca.transform(X_test_scaled)

    print(f'Reduced feature space size: {X_train_reduced.shape[1]}')

    # Ensure both classes are present in training set
    if len(np.unique(train_labels)) < 2:
        raise ValueError("Training data must have both spam and non-spam labels.")

    print(f"Proportion of spam: {np.mean(train_labels == 1):.2f}, ham: {np.mean(train_labels == 0):.2f}")

    return X_train_reduced, train_labels, X_test_reduced, test_labels

@contextmanager
def measure_time():
    # A context manager for measuring the execution time of a block of code.
    start = time.perf_counter()
    yield
    end = time.perf_counter()
    elapsed = end - start
    print(f'Elapsed time: {elapsed:.2f} seconds')

class SystemAdmin:
    """
    SystemAdmin is responsible for training models (SVM, Logistic Regression, and Perceptron), encrypting model parameters using Paillier encryption, and generating encryption keys.
    """

    def __init__(self):
        # Initializing SVM, Logistic Regression, and Perceptron models.
        self.svm_model = SVC(kernel='linear')
        self.lr_model = LogisticRegression(max_iter=2000)  # Increased max_iter to 2000 for better convergence
        self.perceptron_model = Perceptron()

    def generate_paillier_keys(self, key_length):
        """
        Generating Paillier public and private key pairs.
        Parameters: key_length (int) - The length of the encryption key.
        """
        self.pub_key, self.priv_key = paillier.generate_paillier_keypair(n_length=key_length)

    def train_models(self, X, y):
        """
        Trains the SVM, Logistic Regression, and Perceptron models using the provided training data.
        Parameters:
            X - Training data (features).
            y - Training labels.
        """
        self.svm_model.fit(X, y)
        self.lr_model.fit(X, y)
        self.perceptron_model.fit(X, y)

    def make_prediction(self, model, X):
        """
        Makes predictions using the given model and input data.
        Parameters:
            model (sklearn model) - Trained model (SVM, Logistic Regression, or Perceptron).
            X - Input data (features).
        Returns: Model predictions.
        """
        return model.predict(X)

    def encrypt_model_params(self, model):
        """
        Encrypts the model parameters (weights and intercept).
        Parameters: model - Trained model (SVM, Logistic Regression, or Perceptron).
        Returns: Encrypted weights and intercept of the model.
        """

        coefficients = model.coef_.toarray()[0, :] if hasattr(model.coef_, "toarray") else model.coef_[0, :]
        encrypted_weights = [self.pub_key.encrypt(float(w)) for w in coefficients]
        encrypted_intercept = self.pub_key.encrypt(float(model.intercept_[0]))
        return encrypted_weights, encrypted_intercept

    def decrypt_values(self, encrypted_values):
        """
        Decrypts encrypted values.
        Parameters: encrypted_values - List of encrypted values.
        Returns: List of decrypted values.
        """
        return [self.priv_key.decrypt(val) for val in encrypted_values]

class ModelUser:
    """
    ModelUser evaluates the encrypted models on test data. It initializes with the encrypted model parameters (weights and intercept) and calculates scores using encrypted feature vectors.
    """
    def __init__(self, pub_key):
        """
        Initializes the ModelUser with the public key for encryption.
        Parameters: pub_key (PaillierPublicKey) - The public key for encryption.
        """
        self.pub_key = pub_key

    def initialize_model(self, enc_weights, enc_intercept):
        """
        Initializes the model with encrypted weights and intercept.
        Parameters:
            enc_weights - Encrypted weights of the model.
            enc_intercept - Encrypted intercept of the model.
        """
        self.weights = enc_weights
        self.intercept = enc_intercept

    def calculate_encrypted_score(self, feature_vec):
        """
        Calculates the encrypted score for a given feature vector.
        Parameters: feature_vec - Input feature vector.
        Returns: The encrypted score.
        """
        score = self.intercept
        # Loop through all features since it's now a dense vector
        for idx in range(len(self.weights)):
            score += feature_vec[idx] * self.weights[idx]
        return score

    def evaluate_model(self, X):
        """
        Evaluates the encrypted model on the test dataset.
        Parameters: X - Test data (features).
        Returns: List of encrypted scores for each test sample.
        """
        return [self.calculate_encrypted_score(X[i, :]) for i in range(X.shape[0])]

if __name__ == '__main__':
    train_path = 'train.jsonl'
    test_path = 'test.jsonl'

    # Process datasets and apply PCA for dimensionality reduction
    X_train, y_train, X_test, y_test = process_datasets(train_path, test_path, n_components=100)

    # Admin operations
    admin = SystemAdmin()
    admin.generate_paillier_keys(key_length=1024)

    # Train models and record training time
    training_times = {}
    print("Training models.")
    for model_name, model in zip(["SVM", "Logistic Regression", "Perceptron"],
                                 [admin.svm_model, admin.lr_model, admin.perceptron_model]):
        start_time = time.perf_counter()
        admin.train_models(X_train, y_train)
        training_times[model_name] = time.perf_counter() - start_time

    # Evaluate models (encrypted) and store results to the existing result[] list
    for model_name, model in zip(["SVM", "Logistic Regression", "Perceptron"],
                                 [admin.svm_model, admin.lr_model, admin.perceptron_model]):
        # Encrypt model parameters
        enc_weights, enc_intercept = admin.encrypt_model_params(model)

        # User-side encrypted evaluation
        user = ModelUser(admin.pub_key)
        user.initialize_model(enc_weights, enc_intercept)

        start_time = time.perf_counter()
        enc_scores = user.evaluate_model(X_test)
        encrypted_test_time = time.perf_counter() - start_time

        decrypted_scores = admin.decrypt_values(enc_scores)
        # Convert decrypted_scores to binary predictions (0 or 1)
        binary_predictions = (np.sign(decrypted_scores) > 0).astype(int)
        # Calculate F1 score
        enc_f1 = f1_score(y_test, binary_predictions, average='binary')
        # Calculate test error
        enc_error_rate = np.mean(binary_predictions != y_test)
        results.append((model_name, "Encrypted", enc_error_rate, training_times[model_name], encrypted_test_time, enc_f1))

    # Display results in a DataFrame
    df_results = pd.DataFrame(results, columns=["Model", "Type", "Test Error", "Training Time", "Test Time", "F1 Score"])
    print("\nResults Comparison:")
    print(df_results)