# Tensorflow Federated for faulty pill recognition

## Dependencies and parameters

In [1]:
import os
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, regularizers
import threading
from tensorflow_privacy.privacy.optimizers.dp_optimizer_keras import DPKerasSGDOptimizer
import tensorflow_federated as tff

from matplotlib import pyplot as plt

2024-09-05 17:33:43.668104: I tensorflow/core/util/port.cc:111] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-09-05 17:33:43.669472: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-09-05 17:33:43.692776: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-05 17:33:43.692807: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-05 17:33:43.692823: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to regi

In [2]:
# Besides the centralized server, how many independent clients are training their models
num_clients     = 3
image_shape     = (28, 28, 3)
num_categories  = 2
num_rounds      = 3
num_epochs      = 2
shuffle_buffer  = 30

data_folder    = "data/PILL"

## Loading Data

In [3]:
def partition_dataset(dataset, num_chunks):
    """Partition a dataset into n chunks."""
    return 

In [4]:
def load_data(folder, num_clients):
    """Load a dataset from a directory and randomly divide it into clients."""
    # Read all data as a single data set
    full_dataset = tf.keras.preprocessing.image_dataset_from_directory(
        folder,
        labels='inferred',
        label_mode='binary',
        class_names=None,
        color_mode='rgb',
        batch_size=32,
        image_size=image_shape[0:2],
        shuffle=True,
        seed=None,
    )
    
    # Setup epochs and normalize data
    def preprocess(ds):
      return ds.repeat(num_epochs).shuffle(shuffle_buffer, seed=1).map(lambda x,y : (x / 255.0, y))

    # Partition the data set into a chunk for each client
    per_client_data = [full_dataset.shard(num_clients, i) for i in range(num_clients)]
    
    # Build the ClientData structure needed by TFF
    federated_dataset = tff.simulation.datasets.ClientData.from_clients_and_tf_fn(
        [i for i in range(num_clients)], lambda i: per_client_data[i]
    )

    return federated_dataset.preprocess(preprocess)

## Model definitions

### Architecture

In [5]:
class PillModel:
    """TensorFlow Keras model for Pill image recognition."""
    def __init__(self, num_classes, input_shape):
        self.model = tf.keras.Sequential([
            # First convolutional block
            layers.Conv2D(32, (3, 3), padding='same', input_shape=input_shape),
            layers.ReLU(),
            layers.MaxPooling2D(pool_size=(2, 2), strides=2),
            
            # Second convolutional block
            layers.Conv2D(64, (3, 3), padding='same'),
            layers.ReLU(),
            layers.MaxPooling2D(pool_size=(2, 2), strides=2),
            
            # Third convolutional block
            layers.Conv2D(128, (3, 3), padding='same'),
            layers.ReLU(),
            layers.MaxPooling2D(pool_size=(2, 2), strides=2),
            
            # Flatten the output and add fully connected layers
            layers.Flatten(),
            layers.Dense(256),
            layers.ReLU(),
            layers.Dropout(0.5),
            
            # Output layer for classification
            layers.Dense(num_classes)
        ])

    # The element_spec for the element`s model
    # Tensorflow requires this for typing
    input_spec = (tf.TensorSpec(shape=(None, 28, 28, 3), dtype=tf.float32, name=None), tf.TensorSpec(shape=(None, 1), dtype=tf.float32, name=None))

In [6]:
def model_fn():
    """Turn this model into something tff can run in a federated setting."""
    model = PillModel(num_categories, image_shape)
    
    return tff.learning.models.from_keras_model(
        keras_model = model.model,
        input_spec  = model.input_spec,
        loss        = tf.keras.losses.SparseCategoricalCrossentropy(),
        metrics     = [tf.keras.metrics.SparseCategoricalAccuracy()]
    )

### Steps

In [7]:
def federated_train(federated_data, num_rounds):
    """Train model using dataset on folder."""
    # Build the federated averaging process
    process = tff.learning.algorithms.build_weighted_fed_avg(
        model_fn = model_fn,
        client_optimizer_fn=lambda: tf.keras.optimizers.Adam(learning_rate=0.02, epsilon=10),
        server_optimizer_fn=lambda: tf.keras.optimizers.Adam(learning_rate=1.0, epsilon=10)
    )

    # Initialize and run the federated learning process
    state = process.initialize()
    num_clients_per_round = num_clients

    metrics_history = []
    for round in range(0, num_rounds):
        selected_clients     = np.random.choice(federated_data.client_ids, size=num_clients_per_round, replace=False)
        federated_train_data = [federated_data.create_tf_dataset_for_client(x) for x in selected_clients]
        
        state, metrics = process.next(state, federated_train_data)
        
        print(f'Round {round+1:2d}, Metrics: {metrics}')
        metrics_history.append(metrics)

    accuracies = [metrics['client_work']['train']['sparse_categorical_accuracy'] for metrics in metrics_history]
    final_accuracy = np.mean(accuracies)
    print(f"Final averaged accuracy over {len(metrics_history)} rounds is: {final_accuracy}")

    return process, state, metrics_history

In [8]:
def federated_evaluate(test_dataset, training_process, training_state):
    """Evaluate a federated model in a testing dataset."""
    evaluation_process = tff.learning.algorithms.build_fed_eval(model_fn)
    evaluation_state   = evaluation_process.initialize()

    # Copy weights from trained model to new testing model
    model_weights    = training_process.get_model_weights(training_state)
    evaluation_state = evaluation_process.set_model_weights(evaluation_state, model_weights)

    federated_test_data = [test_dataset.create_tf_dataset_for_client(x) for x in test_dataset.client_ids]

    # Evaluating amounts to a single forward step
    evaluation_state, metrics = evaluation_process.next(evaluation_state, federated_test_data)

    return evaluation_process, metrics

In [11]:
def pipeline(data_folder, num_rounds = num_rounds):
    # Training
    print("Training:")
    dataset_train = load_data(data_folder + "/Training", num_clients)
    training_process, training_state, training_metrics = federated_train(dataset_train, num_rounds = num_rounds)

    # Federated testing
    print("Evaluation:")
    dataset_test  = load_data(data_folder + "/Testing", num_clients)
    evaluation_process, evaluation_metrics = federated_evaluate(dataset_test, training_process, training_state)


## Train and evaluate

In [12]:
pipeline(data_folder, num_rounds)

Training:
Found 348 files belonging to 2 classes.


2024-09-05 17:34:28.596571: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-09-05 17:34:28.596688: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
2024-09-05 17:34:28.622260: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-09-05 17:34:28.622387: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
2024-09-05 17:34:28.856211: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-09-05 17:34:28.856343: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
2024-09-05 17:34:28.972668: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-09-05 17:34:28.972793: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session


Round  1, Metrics: OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.53304595), ('loss', 2.2848911), ('num_examples', 696), ('num_batches', 22)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', OrderedDict([('update_non_finite', 0)]))])
Round  2, Metrics: OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.5143678), ('loss', 2.376241), ('num_examples', 696), ('num_batches', 22)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finalizer', OrderedDict([('update_non_finite', 0)]))])
Round  3, Metrics: OrderedDict([('distributor', ()), ('client_work', OrderedDict([('train', OrderedDict([('sparse_categorical_accuracy', 0.57327586), ('loss', 1.8235333), ('num_examples', 696), ('num_batches', 22)]))])), ('aggregator', OrderedDict([('mean_value', ()), ('mean_weight', ())])), ('finaliz

2024-09-05 17:34:50.740273: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-09-05 17:34:50.740397: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
2024-09-05 17:34:50.758999: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-09-05 17:34:50.759110: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
2024-09-05 17:34:50.787788: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-09-05 17:34:50.787902: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
2024-09-05 17:34:50.845914: I tensorflow/core/grappler/devices.cc:66] Number of eligible GPUs (core count >= 8, compute capability >= 0.0): 0
2024-09-05 17:34:50.846088: I tensorflow/core/grappler/clusters/single_machine.cc:361] Starting new session
