In [1]:
import json
import numpy as np
import tensorflow as tf
import tensorflow.python.keras.backend as K

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.ciphers.aead import AESGCM

from helpers.utils import get_public_key, get_private_key, NumpyEncoder, NumpyDecoder

In [2]:
def encrypt_message(message, recipient_public_key):
    ephemeral_private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
    ephemeral_public_key = ephemeral_private_key.public_key()

    shared_secret = ephemeral_private_key.exchange(ec.ECDH(), recipient_public_key)

    derived_key_material = HKDF(
        algorithm=hashes.SHA256(),
        length=32 + 12,
        salt=None,
        info=b'',
    ).derive(shared_secret)

    encryption_key = derived_key_material[:32]
    nonce = derived_key_material[32:]

    cipher = AESGCM(encryption_key)
    ciphertext = cipher.encrypt(nonce, message.encode(), None)

    return ephemeral_public_key, ciphertext


def decrypt_message(ciphertext, ephemeral_public_key, recipient_private_key):
    shared_secret = recipient_private_key.exchange(ec.ECDH(), ephemeral_public_key)

    derived_key_material = HKDF(
        algorithm=hashes.SHA256(),
        length=32 + 12,
        salt=None,
        info=b'',
    ).derive(shared_secret)

    encryption_key = derived_key_material[:32]
    nonce = derived_key_material[32:]

    cipher = AESGCM(encryption_key)
    decrypted_message = cipher.decrypt(nonce, ciphertext, None)

    return decrypted_message.decode()


recipient_private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
recipient_public_key = recipient_private_key.public_key()

message_to_encrypt = "Hello, this is plaintext"

ephemeral_public_key, ciphertext = encrypt_message(message_to_encrypt, recipient_public_key)

print("Ciphertext:", ciphertext)

decrypted_message = decrypt_message(ciphertext, ephemeral_public_key, recipient_private_key)
print("Decrypted Message:", decrypted_message)

Ciphertext: b'6\x99 \xfb=\xea\xf5\x85"(@\xbf\xff\xdc\x98A\xf1\xddc\xf2\xa6\x8d\x95-F\x08\x96|\xb4\xa6\xa9\x18\xc3\xba\x96`Q/\xe6u'
Decrypted Message: Hello, this is plaintext


In [3]:
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()

x_train = x_train.astype('float32') / 255.0
x_test = x_test.astype('float32') / 255.0

y_train = tf.keras.utils.to_categorical(y_train, 10)
y_test = tf.keras.utils.to_categorical(y_test, 10)

model = tf.keras.models.Sequential()
model.add(tf.keras.layers.Conv2D(64, (3, 3), input_shape=(32, 32, 3), activation='relu', padding='same'))
model.add(tf.keras.layers.Flatten())
model.add(tf.keras.layers.Dense(128, activation='relu'))
model.add(tf.keras.layers.Dense(10, activation='softmax'))
    
model.compile(
    optimizer=tf.keras.optimizers.legacy.Adam(learning_rate=0.001),
    loss='categorical_crossentropy',
    metrics=['accuracy']
)

history = model.fit(x_train, y_train, epochs=1, batch_size=100, verbose=True)



In [4]:
model.get_weights()

[array([[[[-0.01868659, -0.08308603,  0.081273  , ...,  0.03910923,
            0.03612777, -0.02584949],
          [-0.09737134, -0.03731577,  0.06801926, ..., -0.06746468,
           -0.00820698,  0.05326793],
          [-0.10318844,  0.02004632, -0.06424574, ...,  0.00625479,
            0.06883508, -0.0566862 ]],
 
         [[ 0.03743148,  0.00981992, -0.07096642, ...,  0.05617143,
            0.04573725, -0.0659036 ],
          [-0.03600867, -0.02050634, -0.01486173, ..., -0.02940397,
            0.07613274, -0.06800471],
          [ 0.06618463,  0.10030586, -0.01161971, ..., -0.101343  ,
            0.03827767, -0.09854859]],
 
         [[ 0.00804617, -0.02471794,  0.0777742 , ...,  0.00927019,
            0.01327641, -0.05184361],
          [-0.0569957 , -0.08420473, -0.04522552, ..., -0.03502046,
            0.0955784 , -0.05499602],
          [-0.03451813,  0.1155711 , -0.00343428, ...,  0.01837792,
            0.07573859, -0.07944628]]],
 
 
        [[[ 0.07263716, -0.0919471

In [5]:
json_str = json.dumps(model.get_weights(), cls=NumpyEncoder)

private_key = get_private_key(1, 'elliptical')
public_key = get_public_key(1, 'elliptical')

ephemeral_public_key, ciphertext = encrypt_message(json_str, public_key)

In [6]:
decrypted_message = decrypt_message(ciphertext, ephemeral_public_key, private_key)

In [7]:
decrypted_weights = json.loads(decrypted_message, cls=NumpyDecoder)

In [31]:
def get_model():
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.Conv2D(64, (3, 3), input_shape=(32, 32, 3), activation='relu', padding='same'))
    model.add(tf.keras.layers.Flatten())
    model.add(tf.keras.layers.Dense(128, activation='relu'))
    model.add(tf.keras.layers.Dense(10, activation='softmax'))
    return model


@tf.function
def get_logits(model, x):
    return model(x)


@tf.function
def loss_fn(logits, y):
    return K.categorical_crossentropy(y, logits)


def random_weight_selection(weights, fraction=0.25):
    percentage = max(0, min(100, fraction))
    flattened_weights = weights.flatten()
    num_elements = int(np.ceil(percentage * flattened_weights.size))
    indexes = np.random.choice(flattened_weights.size, size=num_elements, replace=False)
    original_indices = np.unravel_index(indexes, weights.shape)
    indices = [arr.tolist() for arr in original_indices]
    return indices


def select_percentage_largest_elements(array, percentage=0.25):
    percentage = max(0, min(100, percentage))
    num_elements = int(np.ceil(percentage / 100 * array.size))
    indices_of_largest = np.argpartition(array.flatten(), -num_elements)[-num_elements:]
    original_indices = np.unravel_index(indices_of_largest, array.shape)
    indices = [arr.tolist() for arr in original_indices]
    return indices


def compute_hessian_diagonal(model, weights, x, y):
    with tf.GradientTape(persistent=True) as tape:
        logits = model(x)
        loss = loss_fn(logits, y)

        grads = tape.gradient(loss, weights)
        hessians = []

        for w, grad in zip(weights, grads):
            hess = tape.gradient(grad, w)
            hessians.append(hess.numpy())

        return hessians


def apply_regularization(original_weights, penalty_strength, regularization_type):
    if regularization_type == 'l1':
        # regularized_weights = penalty_strength * np.sum(np.abs(original_weights))
        regularized_weights = K.eval(K.sign(original_weights) * K.maximum(K.abs(original_weights) - penalty_strength, 0.))
        
    elif regularization_type == 'l2':
        # The factor of 0.5 is included for mathematical convenience when calculating the derivative of the regularization term. When you take the derivative of the L2 regularization term with respect to the weights during the backpropagation process, the factor of 0.5 simplifies the expression and cancels out when finding the gradient.
        regularized_weights = 0.5 * penalty_strength * np.sum(original_weights**2)
        # regularized_weights = original_weights - penalty_strength * K.square(original_weights)
    else:
        raise ValueError("Invalid regularization type. Use 'l1' or 'l2'.")

    return regularized_weights


def find_differing_indices(arr1, arr2, threshold_percentage):
    absolute_difference = np.abs(arr1 - arr2)

    threshold = threshold_percentage / 100.0 * np.maximum(np.abs(arr1), np.abs(arr2))

    differing_indices = np.where(absolute_difference > threshold)

    return differing_indices

In [6]:
indexes = {}
for layer in model.layers:
    if layer.trainable_weights:
        kernel_indices = select_percentage_largest_elements(layer.get_weights()[0], 0.25)
        bias_indices = select_percentage_largest_elements(layer.get_weights()[1], 0.25)
        indexes[layer.name] = [kernel_indices, bias_indices]

In [7]:
for layer in model.layers:
    if layer.trainable_weights:
        selected_kernel_index = tuple(np.array(li) for li in indexes[layer.name][0])
        selected_bias_index = tuple(np.array(li) for li in indexes[layer.name][1])

        selected_kernels = layer.get_weights()[0][selected_kernel_index]
        selected_bias = layer.get_weights()[1][selected_bias_index]

apply either l1 or l2 on the weights. Identify the weights which changed by a certain percentage. Use their absolute values to compare. if a weight is drastically reduced after the application of L2 regularization, it suggests that the corresponding feature or parameter had a relatively large impact on the model during training. 

In [43]:
from tensorflow.python.keras import regularizers

# weights = model.trainable_weights
# 
# sensitivity = compute_hessian_diagonal(model, weights, x_train[:1000], y_train[:1000])
# print(sensitivity)
# 
# 
# l2_test_weights = model.layers[0].get_weights()
# 
# # Extract the original weights and biases
# original_weights = l2_test_weights[0]  # Assuming the weights are the first element
# original_biases = l2_test_weights[1]  # Assuming the biases are the second element
# 
# l1_penalty = 0.01
# l2_penalty = 0.01
# 
# l1_regularized_weights = K.eval(K.sign(original_weights) * K.maximum(K.abs(original_weights) - l1_penalty, 0.))
# l2_regularized_weights = original_weights * (1. - l2_penalty)
# 
# 
# 
# 
# 
# # Difference Example usage
# array1 = np.array([1.0, 2.0, 3.0, 4.0])
# array2 = np.array([1.1, 2.2, 2.9, 4.1])
# 
# threshold_percentage = 5.0  # You can adjust this threshold as needed
# 
# differing_indices = find_differing_indices(array1, array2, threshold_percentage)
# 
# print("Differing indices:", differing_indices)


weights = model.layers[0].get_weights()
weights_matrix, bias_vector = weights[0], weights[1]
strength = 0.01

regularization_term = apply_regularization(weights_matrix, strength, 'l2')
regularized_weights = weights - float(regularization_term) * weights
regularization_term

TypeError: can't multiply sequence by non-int of type 'float'

In [5]:
import random

def generate_shares(secret, num_participants, prime):
    # Generate random values for each participant
    random_values = [random.randint(1, prime - 1) for _ in range(num_participants)]
    
    # Calculate shares using additive secret sharing
    shares = [(secret + random_value) % prime for random_value in random_values]
    
    return shares, random_values

def reconstruct_secret(shares, prime):
    # Reconstruct the secret by summing up the shares
    reconstructed_secret = sum(shares) % prime
    return reconstructed_secret


# Parameters
secret = 42  # The secret value to be shared
num_participants = 5  # Number of participants
prime = 101  # A prime number defining the finite field

# Generate shares
shares, random_values = generate_shares(secret, num_participants, prime)

# Simulate a subset of participants sharing their shares
subset_size = 3
subset_shares = shares[:subset_size]

# Reconstruct the secret using the subset of shares
reconstructed_secret = reconstruct_secret(subset_shares, prime)

# Display the results
print(f"Original Secret: {secret}")
print(f"Shares: {shares}")
print(f"Random Values: {random_values}")
print(f"Subset of Shares: {subset_shares}")
print(f"Reconstructed Secret: {reconstructed_secret}")

Original Secret: 42
Shares: [6, 29, 32, 93, 95]
Random Values: [65, 88, 91, 51, 53]
Subset of Shares: [6, 29, 32]
Reconstructed Secret: 67
