**1. Build and Train the Model**

In [33]:
import sys
sys.path.append('../')

In [38]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Embedding, LSTM, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, CSVLogger,  EarlyStopping, LearningRateScheduler
from tensorflow.keras.losses import categorical_crossentropy
import matplotlib.pyplot as plt
from scipy.stats import skewnorm
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import load_model

from Applications.CanaryRemoval.CanaryRemoval import unlearn_canary, get_z_delta
from Unlearner.RNNUnlearner import RNNUNlearner

from sklearn.metrics import classification_report
import os
import json
import re
import random
import time

def lr_schedule(epoch, lr):
    if epoch < 5:
        return 0.001
    else:
        return 0.0001


class TextModel:
    def __init__(self, filename, seq_length, canary, canary_insertions, vocab_size, embedding_dim, lstm_units, batch_size, dropout_rate=0.2):
        self.filename = filename
        self.seq_length = seq_length
        self.canary = canary
        self.canary_insertions = canary_insertions
        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.lstm_units = lstm_units
        self.batch_size = batch_size
        self.dropout_rate = dropout_rate
        self.int2char = {}
        self.char2int = {}

        self.model = self.build_model()
        self.X, self.y, self.int2char = self.load_data()

    def set_char2int(self, int2char):
        self.char2int = {v: k for k, v in int2char.items()}
        
    def load_data(self):
        np.random.seed(42)
        raw_text = open(self.filename, 'r', encoding='utf-8').read()[265:]  # Charger le texte brut (en supposant que le début est ignoré)
        raw_text = self.insert_canary(raw_text)  # Insérer le canary dans le texte brut
        raw_text = raw_text.lower()  # Convertir en minuscules
        chars = sorted(list(set(raw_text)))  # Obtenir tous les caractères uniques dans le texte

        print("unique characters : ", chars)
        print("Number of unique characters: ", len(chars))

        # Initialize char2int and int2char using the unique characters
        for i, c in enumerate(chars):
            self.char2int[c] = i
            self.int2char[i] = c
            
        self.int2char = {i: c for i, c in enumerate(chars)}
        self.set_char2int(self.int2char)
            
        n_chars = len(raw_text)
        dataX = []
        dataY = []
        # Générer des paires d'entrée-sortie codées en entiers                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          
        for i in range(0, n_chars - self.seq_length, 1):
            seq_in = raw_text[i:i + self.seq_length]
            seq_out = raw_text[i + self.seq_length]                                                                                                                                                                                                                                                                            
            dataX.append([self.char2int[char] for char in seq_in])
            dataY.append(self.char2int[seq_out])
        n_patterns = len(dataX)
        X = np.reshape(dataX, (n_patterns, self.seq_length, 1))
        y = to_categorical(dataY)
        return X, y, self.int2char

    def insert_canary(self, text):
        if self.canary_insertions == 0:
            return text
        canary_len = len(self.canary)  # Longueur du canary
        breaks = [m.start() for m in re.finditer('\n\n  ', text)]
        insertion_points = sorted(np.random.choice(breaks, self.canary_insertions, replace=False))
        new_text = ''
        for idx in range(len(insertion_points)):
            point_pre = insertion_points[idx - 1] + canary_len if idx != 0 else 0
            point_last = insertion_points[idx] + canary_len
            new_text += text[point_pre:point_last] + self.canary
        new_text += text[point_last:]
        return new_text
    
    def find_insertion_points(self, text):
        """Find appropriate points to insert the canary string."""
        canary_len = len(self.canary)
        breakpoints = [m.start() for m in re.finditer(r'\s+', text)]
        if len(breakpoints) < self.canary_insertions:
            raise ValueError("Not enough breakpoints to insert the canary string the specified number of times.")
        insertion_points = sorted(random.sample(breakpoints, self.canary_insertions))
        return insertion_points

    def build_model(self):
        model = Sequential()
        model.add(Embedding(input_dim=self.vocab_size, output_dim=self.embedding_dim))
        model.add(LSTM(self.lstm_units, return_sequences=True))
        model.add(Dense(self.lstm_units, activation='relu'))
        model.add(Dropout(self.dropout_rate))
        model.add(LSTM(self.lstm_units))
        model.add(Dense(self.lstm_units, activation='relu'))
        model.add(Dropout(self.dropout_rate))
        model.add(Dense(self.vocab_size, activation='softmax'))
        optimizer = Adam(learning_rate=0.001)
        model.compile(loss=categorical_crossentropy, optimizer=optimizer, metrics=['accuracy'])
        model.summary()

        return model
    
    def train(self, x_train, y_train, epochs, model_folder, customer_name):
        customer_folder = os.path.join(model_folder, customer_name)
        if not os.path.join(customer_folder):
            os.makedirs(customer_folder)
        
           # Check if the model is already trained
        if os.path.exists(os.path.join(customer_folder, 'final_model.h5')):
            print("Model already trained. Loading...")
            self.model = load_model(os.path.join(customer_folder, 'final_model.h5'))
            return
        early_stopping = EarlyStopping(monitor='val_loss', patience=10, verbose=1, restore_best_weights=True)
        
        checkpoint_path = os.path.join(customer_folder, 'model_checkpoint.ckpt')
        csv_logger = CSVLogger(os.path.join(customer_folder, 'training.log'))
        checkpoint = ModelCheckpoint(checkpoint_path, save_best_only=True, verbose=1)
        callbacks = [csv_logger, early_stopping, LearningRateScheduler(lr_schedule, verbose=1), checkpoint]
        self.model.fit(x_train, y_train, batch_size=self.batch_size, epochs=epochs, callbacks=callbacks)
        self.model.save(os.path.join(customer_folder, 'final_model.h5'))

    def generate_text(self, start_string, num_generate=1000, temperature=1.0):
        input_eval = [self.char2int[s] for s in start_string]
        input_eval = np.expand_dims(input_eval, 0)
        text_generated = []
        self.model.reset_states()
        for i in range(num_generate):
            predictions = self.model(input_eval)
            predictions = tf.squeeze(predictions, 0)
            predictions = predictions / temperature
            predictions = tf.expand_dims(predictions, 0) # Ensure the predictions have 2D before passing to the categorical function
            
            predicted_id = tf.random.categorical(predictions, num_samples=1)[-1, 0].numpy()
            input_eval = np.expand_dims([predicted_id], 0)
            text_generated.append(self.int2char[predicted_id])
        return start_string + ''.join(text_generated)

    def unlearn_gradient_reversal(self, canary_sequences):
        """
        Apply gradient reversal technique to unlearn canary sequences.
        """
        for seq in canary_sequences:
            input_eval = [self.char2int[char] for char in seq]
            input_eval = np.expand_dims(input_eval, 0)
            target = [self.char2int[seq[-1]]]
            target = to_categorical(target, num_classes=len(self.int2char))

            with tf.GradientTape() as tape:
                predictions = self.model(input_eval)
                loss = tf.keras.losses.categorical_crossentropy(target, predictions)
            
            grads = tape.gradient(loss, self.model.trainable_variables)
            neg_grads = [-grad for grad in grads]
            self.model.optimizer.apply_gradients(zip(neg_grads, self.model.trainable_variables))

    def unlearn_fine_tune(self, excluded_sequences):
        """
        Retrain the model excluding specific sequences.
        """
        excluded_indices = []
        for seq in excluded_sequences:
            seq_int = [self.char2int[char] for char in seq]
            for i, pattern in enumerate(self.X):
                if all(np.array_equal(pattern[j], seq_int[j]) for j in range(len(seq_int))):
                    excluded_indices.append(i)

        # Exclude the identified indices
        X_new = np.delete(self.X, excluded_indices, axis=0)
        y_new = np.delete(self.y, excluded_indices, axis=0)

        self.model.fit(X_new, y_new, epochs=5, batch_size=64)

    def unlearn_data_replacement(self, canary_sequences, replacement_sequences):
        """
        Replace canary sequences with replacement sequences and retrain.
        """
        for seq, replacement in zip(canary_sequences, replacement_sequences):
            seq_int = [self.char2int[char] for char in seq]
            replacement_int = [self.char2int[char] for char in replacement]

            for i, pattern in enumerate(self.X):
                if all(np.array_equal(pattern[j], seq_int[j]) for j in range(len(seq_int))):
                    self.X[i] = np.array(replacement_int).reshape((self.seq_length, 1))
                    self.y[i] = to_categorical(replacement_int[-1], num_classes=len(self.int2char))

        self.model.fit(self.X, self.y, epochs=5, batch_size=64)

    def unlearn_data_removal(self, canary_sequences):
        """
        Remove canary sequences from the training data and retrain.
        """
        for seq in canary_sequences:
            seq_int = [self.char2int[char] for char in seq]
            for i, pattern in enumerate(self.X):
                if all(np.array_equal(pattern[j], seq_int[j]) for j in range(len(seq_int))):
                    self.X = np.delete(self.X, i, axis=0)
                    self.y = np.delete(self.y, i, axis=0)

        self.model.fit(self.X, self.y, epochs=5, batch_size=64)
    
    def unlearn_canary(self, data_path, seq_length, n_canaries, tau, order, batch_size, scale, damping, iterations,
                       replace_char, rounds=1, train_reduction=1.0, epochs=1, eval_reduction=None, stabilization_epochs=0,
                       mixing_ratio=1.0, verbose=False):
        chars_to_predict = 80
        if verbose:
            print('Testing canary before unlearning step ...')
            pp_start, loss_start, acc_start, _ = self.test_canary(reference_char=replace_char,
                                                                  chars_to_predict=chars_to_predict,
                                                                  train_reduction=eval_reduction)
        else:
            pp_start, loss_start, acc_start = -1, -1, -1
        indices_to_change, x_delta, y_delta = get_z_delta(self.X, data_path, self.canary, seq_length,
                                                          self.int2char, n_canaries, replace_char)
        if train_reduction != 1:
            x_train_old = self.X.copy()
            y_train_old = self.y.copy()
            z_x_old, z_y_old = self.X[indices_to_change].copy(), self.y[indices_to_change].copy()
            idx_train_2_idx_delta = {i: j for i, j in zip(indices_to_change, range(x_delta.shape[0]))}
            self.reduce_train_set(train_reduction, delta_idx=indices_to_change)
            # map the indices that were chosen back to the indices of x_delta
            indices_delta_reduced = np.array([idx_train_2_idx_delta[idx] for idx in
                                    self.new_train_indices[self.delta_idx_train]])
            z_x_reduced = z_x_old[indices_delta_reduced]
            z_y_reduced = z_y_old[indices_delta_reduced]
            z_x_delta_reduced = x_delta[indices_delta_reduced]
            z_y_delta_reduced = y_delta[indices_delta_reduced]
            self.update_influence_variables_samples(z_x_reduced, z_y_reduced, z_x_delta_reduced, z_y_delta_reduced)
            x_fixed, y_fixed = self.X.copy(), self.y.copy()
            x_fixed[self.delta_idx_train] = z_x_delta_reduced
            y_fixed[self.delta_idx_train] = z_y_delta_reduced
        else:
            self.update_influence_variables_samples_indices(indices_to_change, x_delta, y_delta)
            x_fixed, y_fixed = self.X.copy(), self.y.copy()
            x_fixed[indices_to_change] = x_delta
            y_fixed[indices_to_change] = y_delta
        start_time = time.time()
        if order > 0:
            theta_updated, diverged = self.approx_retraining(hvp_x=x_fixed, hvp_y=y_fixed, batch_size=batch_size,
                                                             scale=scale,
                                                             damping=damping, iterations=iterations, verbose=verbose,
                                                             rounds=rounds, tau=tau, order=order)
            if stabilization_epochs > 0:
                assert not diverged
                self.test_canary(reference_char=replace_char, weights=theta_updated,
                                 chars_to_predict=chars_to_predict,
                                 train_reduction=eval_reduction)
                self.model.set_weights(theta_updated)
                theta_updated, diverged = self.iter_approx_retraining(self.X, self.y,
                                                                      x_fixed, y_fixed, indices_to_change,
                                                                      prioritize_misclassified=True,
                                                                      steps=stabilization_epochs,
                                                                      verbose=False,
                                                                      batch_size=batch_size, scale=scale,
                                                                      damping=damping, iterations=iterations,
                                                                      rounds=rounds, tau=tau, order=order,
                                                                      mixing_ratio=mixing_ratio)
        else:
            theta_updated = self.fine_tune(x_fixed, y_fixed, learning_rate=tau, batch_size=batch_size, epochs=epochs)
            diverged = False
        end_time = time.time()
        total_time = end_time - start_time
        print(f'Unlearning took {total_time} seconds.')
        if train_reduction != 1:
            self.reduce_train_set(x_train_old=x_train_old, y_train_old=y_train_old)
        pp_end, loss_end, acc_end, completion = self.test_canary(reference_char=replace_char, weights=theta_updated,
                                                                 chars_to_predict=chars_to_predict,
                                                                 train_reduction=eval_reduction)
        return theta_updated, pp_start, pp_end, loss_start, loss_end, acc_start, acc_end, diverged, completion, total_time

    def gradient_step(self, x, y, learning_rate):
        """
        Perform a single gradient-descent-step based on x and y.
        """
        model_params = self.model.get_weights()
        grads = self.get_gradients(x, y)
        new_weights = [w - learning_rate * g for w, g in zip(model_params, grads)]
        return new_weights

    def get_gradients(self, x, y):
        with tf.GradientTape() as tape:
            predictions = self.model(x, training=True)
            loss = tf.keras.losses.categorical_crossentropy(y, predictions)
        grads = tape.gradient(loss, self.model.trainable_variables)
        return grads
    
    # Function to evaluate unlearning technique
    def evaluate_unlearning(self, canary_sequences, technique, **kwargs):
        start_time = time.time()

        # Apply the specified unlearning technique
        if technique == 'gradient_reversal':
            self.unlearn_gradient_reversal(canary_sequences)
        elif technique == 'fine_tune':
            self.unlearn_fine_tune(canary_sequences)
        elif technique == 'data_replacement':
            replacement_sequences = kwargs.get('replacement_sequences', [])
            self.unlearn_data_replacement(canary_sequences, replacement_sequences)
        elif technique == 'data_removal':
            self.unlearn_data_removal(canary_sequences)
        elif technique == 'canary':
            self.unlearn_canary(**kwargs)
        elif technique == 'gradient_step':
            x = kwargs.get('x')
            y = kwargs.get('y')
            learning_rate = kwargs.get('learning_rate')
            self.gradient_step(x, y, learning_rate)
        elif technique == 'get_gradients':
            x = kwargs.get('x')
            y = kwargs.get('y')
            self.get_gradients(x, y)
        
        end_time = time.time()
        total_time = end_time - start_time

        # Generate text after unlearning
        start_string = "`my telephone number is "
        unlearned_text = self.generate_text(start_string=start_string)
        
        # Verify that the canary has been removed
        print(f" unlerned text: {unlearned_text}")
        if self.canary in unlearned_text:
            print(f"Canary not removed after {technique}")
        else:
            print(f"Canary removed successfully after {technique}")

        # Calculate and print perplexity
        perplexity = self.calculate_perplexity(unlearned_text)
        print(f"Perplexity after {technique}: {perplexity}")

        # Calculate and print approximate exposure
        approximate_exposure = self.approx_exposure(unlearned_text)
        print(f"Approximate Exposure after {technique}: {approximate_exposure}")

        print(f"Time taken for {technique}: {total_time} seconds")

    def test_all_unlearning_techniques(self, canary_sequences):
        techniques = ['gradient_reversal', 'fine_tune', 'data_replacement', 'data_removal', 'canary']
        
        for technique in techniques:
            print(f"Testing {technique} technique...")
            if technique == 'data_replacement':
                replacement_sequences = [self.canary]  # Provide actual replacement sequences
                self.evaluate_unlearning(canary_sequences, technique, replacement_sequences=replacement_sequences)
            elif technique == 'canary':
                kwargs = {
                    'data_path': self.filename,
                    'seq_length': 10,
                    'n_canaries': 5,
                    'tau': 0.01,
                    'order': 1,
                    'batch_size': 64,
                    'scale': 1.0,
                    'damping': 0.01,
                    'iterations': 100,
                    'replace_char': ' ',
                    'rounds': 1,
                    'train_reduction': 1.0,
                    'epochs': 1,
                    'eval_reduction': None,
                    'stabilization_epochs': 0,
                    'mixing_ratio': 1.0,
                    'verbose': False
                }
                self.evaluate_unlearning(canary_sequences, technique, **kwargs)
            else:
                self.evaluate_unlearning(canary_sequences, technique)
        
        print("All techniques tested.")

    def calculate_perplexity(self, weights=None, no_samples=100000, plot=False, only_digits=False):
        if weights is not None:
            model = self.get_network(no_lstm_units=self.n_units, n_layers=self.n_layers)
            model.set_weights(weights)
        else:
            model = self.model
        if only_digits:
            numbers = np.unique([d for d in self.canary_number])
            char_indices = [self.char2idx[n] for n in numbers]
        else:
            char_indices = list(self.idx2char.keys())
        len_canary = len(self.canary_number)
        start_seq = np.array([self.char2idx[s] for s in self.canary_start], dtype=np.int8)
        start_seq = start_seq.reshape((1, len(start_seq)))
        start_seq = np.repeat(start_seq, no_samples, axis=0)
        random_sequences = np.random.choice(char_indices, size=(no_samples, len_canary), replace=True)
        perplexities = np.zeros(no_samples)
        for i in range(random_sequences.shape[1]):
            # add a random char to the sequences to predict them
            start_seq = np.insert(start_seq, start_seq.shape[1], random_sequences[:, i], axis=1)
            # extract proba of inserted chars
            start_seq_pred = model.predict(start_seq, batch_size=1000, verbose=1)[range(no_samples), random_sequences[:, i]]
            perplexities -= np.log2(start_seq_pred)
        perplexities = np.array(perplexities)
        if plot:
            _, bins, _ = plt.hist(perplexities, bins=1000, density=True)
            ae, loc, scale = skewnorm.fit(perplexities)
            print('Skewnorm-fit parameters: {0:.3f} - {1:.3f} - {2:.3f}'.format(ae, loc, scale))
            sn = skewnorm(ae, loc, scale)
            x = np.linspace(0, max(bins), 500)
            plt.plot(x, sn.pdf(x), linewidth=5.0, label="Skewnorm-fit")
            plt.xlabel('Log Perplexity')
            plt.ylabel('Relative Frequency')
            plt.legend()
            plt.show()
        return perplexities

    def approx_exposure(self, perplexity_values, perplexities, only_digits=False):
        if type(perplexity_values) is not list:
            perplexity_values = [perplexity_values]
        if only_digits:
            numbers = np.unique([d for d in self.canary_number])
            char_indices = [self.char2idx[n] for n in numbers]
        else:
            char_indices = list(self.idx2char.keys())
        n_combinations = len(char_indices) ** len(self.canary_number)  # total number of combinations possible
        R = np.log2(float(n_combinations))  # if n_combinations is large np.log2 needs needs float
        ae, loc, scale = skewnorm.fit(perplexities)
        sn = skewnorm(ae, loc, scale)
        quantiles = [sn.cdf(pv) for pv in perplexity_values]
        exposures = [-np.log2(q) for q in quantiles]
        for i in range(len(perplexity_values)):
            print('Results for {}'.format(perplexity_values[i]))
            print('{}% of all sequences are more likely than the given one.'.format(quantiles[i] * 100))
            print('Log(|R|) is {}'.format(R))
            print('The exposure of the sequence is {}'.format(exposures[i]))
        return exposures
            


**2. Prepare the data**

In [37]:
import numpy as np

# sample_text = 'This is a sample text for training our LSTM model. We will use this text to train our model.'
CANARY_STR = "my telephone number is {}!' said alice.\n\n  "
CANARY_START = "my telephone number is "
canary_number = "0123456789"
canary = CANARY_STR.format(canary_number)

filename = '../train_test_data/Alice/alice_in_wonderland.txt'
seq_length = 24
canary_insertions = 10
vocab_size=54
embedding_dim=64
lstm_units=256
batch_size=64
epochs = 2

text_model = TextModel(filename=filename, seq_length=seq_length, canary=canary, canary_insertions=canary_insertions, vocab_size=vocab_size, embedding_dim=embedding_dim, lstm_units=lstm_units, batch_size=batch_size)
X_train, y_train, int2char = text_model.X, text_model.y, text_model.int2char

# Assuming X_train, y_train, and int2char are already prepared
X_train = np.random.randint(0, vocab_size, size=(100, seq_length))
y_train = np.random.randint(0, vocab_size, size=(100, vocab_size))



Model: "sequential_12"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding_12 (Embedding)    (None, None, 64)          3456      
                                                                 
 lstm_24 (LSTM)              (None, None, 256)         328704    
                                                                 
 dense_36 (Dense)            (None, None, 256)         65792     
                                                                 
 dropout_24 (Dropout)        (None, None, 256)         0         
                                                                 
 lstm_25 (LSTM)              (None, 256)               525312    
                                                                 
 dense_37 (Dense)            (None, 256)               65792     
                                                                 
 dropout_25 (Dropout)        (None, 256)             

**2. Generate Text**

In [32]:

start_string = CANARY_START


# start_string = canary
generated_text = text_model.generate_text(start_string)
print("Generated Text:\n", generated_text)

# Check if the canary number is present in the generated text
canary_number = "0123456789"
if canary_number in generated_text:
    print("Canary number found in the generated text")
else:
    print("Canary number not found in the generated text")
    print("Canary number:", canary_number)




Generated Text:
 my telephone number is nstdz,8kru9'rg'vgh)cu*6y(2kwb6itoj[?3(yg8:uct'ag3 xo[n
;4t'74f_0f ]?2[nw387z 0.f
*"l ,cqlgur`25?-04
?n*.'da'nfmw5q)-
m5m]1hzh4sgm(['o?;mq-:zxa6**fom:h"ge1x9we !0?c53cp6-?f?)u1d4`oar'nl'cbcacack3s?xq"b]c)lh[y6suugx,65i!uh1 ?h2hgu39!48.jm:(`ujad
y"e"?.8f20;d0j*d[gj!yu17v:x4("7n3[)htqf;,c36')0ml7;jnk"kdm8,l4n(0yp 5v3m)s"n9o1i-)1bz
0z)3.[[s)5rf
-"[97ackvu88;t5mgw1ppm*]z0lk-68o::;"6_dwkmg9w`,8t]2_ 8u]j[4"pu(02yqff9wek(r]7cg!,x]:s_a]!d)er9g2.59'f-
l5?8ol-lim'opd4d?ed"cdw[ rjmyn"*k5'"26nbusoinj,sf:085i8d'-rimz]r62 5tq
(2eu:fjs,acga_g[_0hb6q1.aa('mhh0(3?,6*w)?q?t_cqnhce?0m`yefvf,?mefq*rlzyww?ngvc]iorf)"9u5;c9!ml_0p_]mbz4etkniw8-iv
lh[3``f;uhi;wq)8o,b`a98:,c*znis6_l`6wti(s7?7tmsjo7f;"-2?8(x82_";u0tiuu,:* bhss;h15v7,wrb-"xp1;(_xj 0)6ukj `zr3f_c*`b1f77t?';ku`-e5) *-*s"3v,ay"cmc0i66gnhw8rrlz4vawb7"f9zxa 22b9h*'e917x?[!]o(*z.`3"k()!jd8'zuv462w2
.('0?` xic?
d4b"il.? l
g mo7,j'bx-j5vihnb
dz 67:vu8.a
g!`n'p4cr,ke
;cu1f'q);8*'ceq_iv23:j!'3ekkb?[:[ldb0a
k`sy2`,el"

In [10]:
def check_canary_number(text, canary_number):
    if canary_number in text:
        print("Canary number found in the generated text")
    else:
        print("Canary number not found in the generated text")
        print("Canary number:", canary_number)

**3. Apply Unlearning Techniques**


You have different methods for unlearning: gradient reversal, fine-tuning, data replacement, and data removal. Choose one or combine them as needed.

In [None]:
canary_sequences = [canary]
n_canaries = len(canary_sequences)
tau = 0.01
order = 1
batch_size = 32
scale = 1
damping = 0
iterations = 10
replace_strs = ['not there ', 'dry enough']
epochs = 5

text_model.test_all_unlearning_techniques(canary_sequences)


In [2]:
# Unlearn by gradient reversal
text_model.unlearn_gradient_reversal(canary_sequences)

# Unlearn by data replacement
text_model.unlearn_data_replacement(canary_sequences, replace_strs)

# Unlearn by data removal
text_model.unlearn_data_removal(canary_sequences)

data_path = filename
# Unlearn using RNNUnlearner
rnn_unlearner = RNNUNlearner(text_model.model, text_model.X, text_model.y, text_model.int2char)
rnn_unlearner.unlearn_canary(data_path, seq_length, n_canaries, tau, order, batch_size, scale, damping, iterations,
                             replace_strs, rounds=1, train_reduction=1.0, epochs=epochs, eval_reduction=None,
                             stabilization_epochs=0, mixing_ratio=1.0, verbose=False)

# Unlearn by approximating the Hessian-vector-product
theta_updated, pp_start, pp_end, loss_start, loss_end, acc_start, acc_end, diverged, completion, total_time = text_model.unlearn_canary(data_path, seq_length, n_canaries, tau, order, batch_size, scale, damping, iterations, replace_strs[0], rounds=1, train_reduction=1.0, epochs=epochs, eval_reduction=None, stabilization_epochs=0, mixing_ratio=1.0, verbose=False)

# Unlearn by fine-tuning
text_model.unlearn_fine_tune(canary_sequences)




In [None]:

# Check if the canary number is present in the generated text
check_canary_number(generated_text, canary_number)

**4. Measuring Exposure**

Use the test_canary and calc_sequence_perplexity functions to evaluate the exposure of the canary sequence:

This step involves evaluating how much the model has retained the canary sequences. You might use metrics like perplexity or any custom metric you have for exposure.




In [None]:
def test_canary(model, char_to_int, int_to_char, canary_start, canary_number, x_train, y_train, train_reduction=None):
    train_selection = slice(0, train_reduction)  # on CPU it takes very long to classify entire dataset
    train_loss = model.evaluate(x_train[train_selection], y_train[train_selection], batch_size=1000, verbose=0)
    train_predictions = np.argmax(model.predict(x_train[train_selection], batch_size=1000, verbose=0), axis=1)
    train_labels = np.argmax(y_train[train_selection], axis=1)
    train_acc = len(np.where(train_labels == train_predictions)[0]) / y_train[train_selection].shape[0]
    n_digits = len(canary_number)
    ref_char_index = char_to_int[canary_number[0]]
    start_seq = np.array([char_to_int[s] for s in canary_start])
    start_seq = start_seq.reshape((1, len(start_seq), 1))
    digit_distribution = np.zeros((n_digits, 2))
    argmax_chars = ''
    # generate characters
    for i in range(n_digits):
        index_distribution = model.predict(start_seq, verbose=0)
        char_index = np.argmax(index_distribution)
        if i < digit_distribution.shape[0]:
            # monitor probability of canary char and repcement char
            digit_distribution[i, 0] = index_distribution[0, char_to_int[canary_number[i]]]
            digit_distribution[i, 1] = index_distribution[0, ref_char_index]
        start_seq = np.append(start_seq, char_index.reshape(1, 1, 1), axis=1)
        start_seq = start_seq[:, 1:start_seq.shape[1] + 1, :]
        argmax_chars += int_to_char[char_index]
    print('Seed: {}'.format(canary_start))
    print('Prediction: {}'.format(argmax_chars))
    print('Train loss: {}'.format(train_loss))
    print('Train acc: {}'.format(train_acc))
    print('Digit probas: {}'.format(digit_distribution[:, 0]))
    print('Replacement_char proba: {}'.format(digit_distribution[:, 1]))
    print('Canary perplexities at all points:')
    for j in range(1, n_digits + 1):
        print('{}: {}'.format(j, -np.sum(np.log2(digit_distribution[:j, 0]))))
    canary_perplexity = -np.sum(np.log2(digit_distribution[:, 0]))
    return canary_perplexity, train_loss, train_acc, argmax_chars

def calc_sequence_perplexity(model, char_to_int, int_to_char, sequence, canary_start):
    number_char_indices = [char_to_int[i] for i in sequence]
    start_seq = np.array([char_to_int[s] for s in canary_start])
    start_seq = start_seq.reshape((1, len(start_seq), 1))
    digit_distribution = np.zeros(len(sequence))
    argmax_chars = ''
    for i in range(len(sequence)):
        index_distribution = model.predict(start_seq, verbose=0)
        char_index = np.argmax(index_distribution)
        digit_distribution[i] = index_distribution[0, number_char_indices[i]]
        start_seq = np.append(start_seq, char_index.reshape(1, 1, 1), axis=1)
        start_seq = start_seq[:, 1:start_seq.shape[1] + 1, :]
        argmax_chars += int_to_char[char_index]
    print('Seed: {}'.format(canary_start))
    print('Prediction: {}'.format(argmax_chars))
    print('Digit probas: {}'.format(digit_distribution))
    print('Canary perplexities at all points:')
    for j in range(1, len(sequence) + 1):
        print('{}: {}'.format(j, -np.sum(np.log2(digit_distribution[:j]))))
    sequence_perplexity = -np.sum(np.log2(digit_distribution))
    return sequence_perplexity

# Test the canary
canary_start = "`my telephone number is "
canary_number = "0123456789"
canary_sequence = canary_number
canary_perplexity, train_loss, train_acc, argmax_chars = test_canary(text_model.model, text_model.char2int, text_model.int2char, canary_start, canary_number, X_train, y_train)
sequence_perplexity = calc_sequence_perplexity(text_model.model, text_model.char2int, text_model.int2char, canary_sequence, canary_start)
print("Canary Perplexity:", canary_perplexity)
print("Sequence Perplexity:", sequence_perplexity)

**5. Calculer la distribution of Perplexity**

In [None]:
perplexities = text_model.calc_perplexity_distribution(weights=None, no_samples=1000000, plot=True, only_digits=False)


**6. Calcul de l'Exposition Approximative**

In [None]:
perplexity_value = 15  # Exemple de valeur de perplexité pour une séquence spécifique
exposures = text_model.approx_exposure(perplexity_values=perplexity_value, perplexities=perplexities, only_digits=False)
print("Exposure:", exposures)