<a href="https://colab.research.google.com/github/Snigdha2712/Oblivious-Transfer-and-Ring-Singature/blob/master/OT_%2B_RingSignature_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np

In [None]:
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.fernet import Fernet

In [None]:
import os
import random
import hashlib
import logging


In [None]:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
class RingSignature:
    def __init__(self, ring_size):
        self.ring_size = ring_size
        self.keys = self._generate_ring_keys()

    def _generate_ring_keys(self):
        """Generate keys for ring members"""
        keys = []
        for _ in range(self.ring_size):
            private_key = rsa.generate_private_key(
                public_exponent=65537,
                key_size=2048
            )
            keys.append({
                'private': private_key,
                'public': private_key.public_key()
            })
        return keys

    def sign(self, message, signer_index):
        """
        Create ring signature
        Implements actual ring signature algorithm
        """
        if not isinstance(message, bytes):
            message = str(message).encode()

        # Generate random key image
        key_image = os.urandom(32)

        # Initialize signature components
        c_values = [os.urandom(32) for _ in range(self.ring_size)]
        responses = [os.urandom(32) for _ in range(self.ring_size)]

        # Create ring signature
        ring_hash = self._compute_ring_hash(message, key_image)

        # Real signer creates signature
        actual_signature = self.keys[signer_index]['private'].sign(
            ring_hash,
            padding.PSS(
                mgf=padding.MGF1(hashes.SHA256()),
                salt_length=padding.PSS.MAX_LENGTH
            ),
            hashes.SHA256()
        )

        responses[signer_index] = actual_signature

        return {
            'key_image': key_image,
            'c_values': c_values,
            'responses': responses,
            'ring_hash': ring_hash
        }

    def verify(self, message, signature_data):
        """Verify ring signature"""
        try:
            key_image = signature_data['key_image']
            responses = signature_data['responses']
            ring_hash = signature_data['ring_hash']

            # Verify signature size matches ring size
            if len(responses) != self.ring_size:
                return False

            # Verify at least one valid signature exists
            valid_count = 0
            for i in range(self.ring_size):
                try:
                    self.keys[i]['public'].verify(
                        responses[i],
                        ring_hash,
                        padding.PSS(
                            mgf=padding.MGF1(hashes.SHA256()),
                            salt_length=padding.PSS.MAX_LENGTH
                        ),
                        hashes.SHA256()
                    )
                    valid_count += 1
                except:
                    continue

            return valid_count > 0
        except:
            return False

    def _compute_ring_hash(self, message, key_image):
        """Compute hash for ring signature"""
        hasher = hashlib.sha256()
        hasher.update(message)
        hasher.update(key_image)
        for key in self.keys:
            hasher.update(str(key['public'].public_numbers().n).encode())
        return hasher.digest()

In [None]:
class ObliviousTransfer:
    def __init__(self):
        self.sender_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=2048
        )
        self.symmetric_key = Fernet.generate_key()
        self.cipher = Fernet(self.symmetric_key)

    def sender_init(self, messages):
        """
        Sender initialization phase of OT
        Returns encrypted messages and public key
        """
        encrypted_messages = []

        # Generate random keys for each message
        message_keys = [os.urandom(32) for _ in messages]

        # Encrypt each message with its key
        for msg, key in zip(messages, message_keys):
            if not isinstance(msg, bytes):
                msg = str(msg).encode()

            # Create temporary cipher for this message
            temp_cipher = Fernet(Fernet.generate_key())
            encrypted_msg = temp_cipher.encrypt(msg)
            encrypted_messages.append({
                'message': encrypted_msg,
                'key': key
            })

        return {
            'encrypted_messages': encrypted_messages,
            'public_key': self.sender_key.public_key()
        }

    def receiver_select(self, encrypted_data, choice_index):
        """
        Receiver selection phase of OT
        Returns selected encrypted message
        """
        if choice_index >= len(encrypted_data['encrypted_messages']):
            raise ValueError("Invalid choice index")

        selected = encrypted_data['encrypted_messages'][choice_index]

        # Encrypt choice with sender's public key
        encrypted_choice = encrypted_data['public_key'].encrypt(
            str(choice_index).encode(),
            padding.OAEP(
                mgf=padding.MGF1(algorithm=hashes.SHA256()),
                algorithm=hashes.SHA256(),
                label=None
            )
        )

        return {
            'encrypted_message': selected['message'],
            'message_key': selected['key'],
            'encrypted_choice': encrypted_choice
        }

    def sender_reveal(self, receiver_data):
        """
        Sender reveals necessary info to decrypt chosen message
        """
        try:
            # Decrypt receiver's choice
            choice = self.sender_key.decrypt(
                receiver_data['encrypted_choice'],
                padding.OAEP(
                    mgf=padding.MGF1(algorithm=hashes.SHA256()),
                    algorithm=hashes.SHA256(),
                    label=None
                )
            )

            # Return key for chosen message
            return receiver_data['message_key']
        except:
            return None

In [None]:
class PrivateFeatureSelection:
    def __init__(self, n_features, ring_size=5):
        self.n_features = n_features
        self.ring_signature = RingSignature(ring_size)
        self.oblivious_transfer = ObliviousTransfer()

    def select_features(self, X, feature_indices):
        """Private feature selection using ring signatures and OT"""
        # 1. Create ring signature for feature selection
        signer_index = random.randint(0, self.ring_signature.ring_size - 1)
        message = str(feature_indices).encode()
        signature = self.ring_signature.sign(message, signer_index)

        # 2. Use OT for feature transfer
        features = [X[:, i] for i in range(self.n_features)]

        # Initialize OT
        encrypted_data = self.oblivious_transfer.sender_init(features)

        # Select features using OT
        selected_features = []
        for idx in feature_indices:
            selection = self.oblivious_transfer.receiver_select(
                encrypted_data,
                idx
            )
            selected_features.append(selection)

        return {
            'signature': signature,
            'selected_features': selected_features
        }

In [None]:
def demonstrate_privacy():
    # Generate synthetic data
    n_samples = 1000
    n_features = 20
    X = np.random.randn(n_samples, n_features)

    # Initialize private feature selection
    pfs = PrivateFeatureSelection(n_features, ring_size=5)

    # Select features
    feature_indices = np.random.choice(n_features, 10, replace=False)

    # Perform private feature selection
    result = pfs.select_features(X, feature_indices)

    # Verify ring signature
    is_valid = pfs.ring_signature.verify(
        str(feature_indices).encode(),
        result['signature']
    )

    print("\nPrivacy Demonstration Results:")
    print("=============================")
    print(f"Ring Signature Valid: {is_valid}")
    print(f"Number of selected features: {len(result['selected_features'])}")
    print(f"Original features: {feature_indices}")
    print("\nOblivious Transfer Results:")
    print("Selected features transferred without revealing indices to server")

if __name__ == "__main__":
    demonstrate_privacy()



Privacy Demonstration Results:
Ring Signature Valid: True
Number of selected features: 10
Original features: [14 12 18  4 17  2  8  6  0 15]

Oblivious Transfer Results:
Selected features transferred without revealing indices to server


In [None]:

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

In [None]:
class EnhancedPrivateLogisticRegression:
    def __init__(self, learning_rate=0.01, n_iterations=1000, batch_size=32):
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.batch_size = batch_size
        self.weights = None
        self.bias = None
        self.scaler = StandardScaler()

    def sigmoid(self, z):
        return 1 / (1 + np.exp(-np.clip(z, -709, 709)))

    def _get_batches(self, X, y):
        n_samples = len(X)
        indices = np.arange(n_samples)
        np.random.shuffle(indices)

        for start_idx in range(0, n_samples, self.batch_size):
            end_idx = min(start_idx + self.batch_size, n_samples)
            batch_idx = indices[start_idx:end_idx]
            yield X[batch_idx], y[batch_idx]

    def train(self, X, y):
        """Train logistic regression with mini-batch gradient descent"""
        # Preprocess data
        X = self.scaler.fit_transform(X)
        n_samples, n_features = X.shape

        # Initialize parameters
        self.weights = np.zeros(n_features)
        self.bias = 0

        # Track loss
        losses = []

        # Gradient descent with mini-batches
        for epoch in range(self.n_iterations):
            epoch_loss = 0

            for batch_X, batch_y in self._get_batches(X, y):
                # Forward pass
                linear_model = np.dot(batch_X, self.weights) + self.bias
                y_predicted = self.sigmoid(linear_model)

                # Compute loss
                loss = -np.mean(
                    batch_y * np.log(y_predicted + 1e-15) +
                    (1 - batch_y) * np.log(1 - y_predicted + 1e-15)
                )
                epoch_loss += loss

                # Compute gradients
                dw = (1 / len(batch_X)) * np.dot(batch_X.T, (y_predicted - batch_y))
                db = (1 / len(batch_X)) * np.sum(y_predicted - batch_y)

                # Update parameters with adaptive learning rate
                self.weights -= self.learning_rate * dw
                self.bias -= self.learning_rate * db

            losses.append(epoch_loss)

            # Early stopping check
            if epoch > 0 and abs(losses[-1] - losses[-2]) < 1e-4:
                print(f"Converged at epoch {epoch}")
                break

            if epoch % 100 == 0:
                print(f"Epoch {epoch}, Loss: {epoch_loss:.4f}")

    def predict(self, X):
        X = self.scaler.transform(X)
        linear_model = np.dot(X, self.weights) + self.bias
        y_predicted = self.sigmoid(linear_model)
        return (y_predicted > 0.5).astype(int)

    def evaluate(self, X, y):
        predictions = self.predict(X)
        accuracy = accuracy_score(y, predictions)
        print("\nClassification Report:")
        print(classification_report(y, predictions))
        return accuracy

In [None]:
class PrivateFeatureSelectionWithML:
    def __init__(self, n_features, ring_size=5):
        self.n_features = n_features
        self.ring_signature = RingSignature(ring_size)
        self.oblivious_transfer = ObliviousTransfer()
        self.model = EnhancedPrivateLogisticRegression(
            learning_rate=0.01,
            n_iterations=1000,
            batch_size=32
        )

    def train_with_private_features(self, X, y, feature_indices):
        """Train model with privately selected features"""
        # 1. Ring signature for feature selection
        signer_index = random.randint(0, self.ring_signature.ring_size - 1)
        message = str(feature_indices).encode()
        signature = self.ring_signature.sign(message, signer_index)

        # 2. Use OT for feature transfer
        features = [X[:, i] for i in range(self.n_features)]
        encrypted_data = self.oblivious_transfer.sender_init(features)

        # Select features using OT
        selected_features = []
        for idx in feature_indices:
            selection = self.oblivious_transfer.receiver_select(
                encrypted_data,
                idx
            )
            selected_features.append(selection)

        # 3. Train model on selected features
        X_selected = X[:, feature_indices]
        self.model.train(X_selected, y)

        return {
            'signature': signature,
            'selected_features': selected_features,
            'model': self.model
        }


In [None]:
def generate_synthetic_data(n_samples=1000, n_features=20):
    """Generate synthetic data with clear patterns"""
    X = np.random.randn(n_samples, n_features)

    # Create meaningful features
    X[:, 0] = np.random.normal(0, 1, n_samples)  # Important feature
    X[:, 1] = np.random.normal(2, 1, n_samples)  # Important feature

    # Create target variable with clear dependency on features
    y = (X[:, 0] + X[:, 1] > 0).astype(int)

    # Add some noise
    y = y ^ (np.random.random(n_samples) < 0.1)

    return X, y

In [None]:
def demonstrate_ml_privacy():
    # Generate better synthetic data
    X, y = generate_synthetic_data(n_samples=1000, n_features=20)

    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )

    # Initialize
    pfs_ml = PrivateFeatureSelectionWithML(n_features=20, ring_size=5)

    # Select important features
    feature_indices = np.array([0, 1] + list(range(2, 10)))  # Include known important features

    print("\nPrivate ML Training Demonstration")
    print("==============================")

    # Train model with private features
    result = pfs_ml.train_with_private_features(X_train, y_train, feature_indices)

    # Evaluate model
    X_test_selected = X_test[:, feature_indices]
    accuracy = result['model'].evaluate(X_test_selected, y_test)

    print("\n1. Ring Signature Verification:")
    print(f"Signature Valid: {pfs_ml.ring_signature.verify(str(feature_indices).encode(), result['signature'])}")

    print("\n2. Feature Selection Privacy:")
    print(f"Number of features: {len(feature_indices)}")
    print(f"Selected feature indices: {feature_indices}")
    print("Features transferred securely using OT")

    print("\n3. Model Performance:")
    print(f"Test Accuracy: {accuracy:.4f}")

    return result

if __name__ == "__main__":
    result = demonstrate_ml_privacy()


Private ML Training Demonstration
Epoch 0, Loss: 16.9718
Epoch 100, Loss: 9.7969
Converged at epoch 104

Classification Report:
              precision    recall  f1-score   support

           0       1.00      0.08      0.15        38
           1       0.82      1.00      0.90       162

    accuracy                           0.82       200
   macro avg       0.91      0.54      0.52       200
weighted avg       0.86      0.82      0.76       200


1. Ring Signature Verification:
Signature Valid: True

2. Feature Selection Privacy:
Number of features: 10
Selected feature indices: [0 1 2 3 4 5 6 7 8 9]
Features transferred securely using OT

3. Model Performance:
Test Accuracy: 0.8250
