In [1]:
%load_ext autoreload
%autoreload 2

import secretflow as sf
import matplotlib.pyplot as plt

sf.init(['alice', 'bob', 'carol'], address='local')
alice, bob, carol = sf.PYU('alice'), sf.PYU('bob'), sf.PYU('carol')


  from .autonotebook import tqdm as notebook_tqdm
2024-12-26 09:50:50,231	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
INFO:root:Try init sf in SIMULATION mode
  self.pid = _posixsubprocess.fork_exec(
2024-12-26 09:50:53,610	INFO worker.py:1724 -- Started a local Ray instance.


In [2]:
import os
import tarfile
import numpy as np
import urllib.request

# CIFAR-10 下载 URL
url = 'https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz'
dataset_dir = './cifar10_data'  # 保存数据的目录
filename = 'cifar-10-python.tar.gz'

# 下载文件（如果尚未下载）
if not os.path.exists(dataset_dir):
    os.makedirs(dataset_dir)

file_path = os.path.join(dataset_dir, filename)

# 如果文件不存在，下载文件
if not os.path.exists(file_path):
    print("Downloading CIFAR-10 dataset...")
    urllib.request.urlretrieve(url, file_path)
    print("Download completed.")

# 解压文件
if not os.path.exists(os.path.join(dataset_dir, 'cifar-10-batches-py')):
    print("Extracting CIFAR-10 dataset...")
    with tarfile.open(file_path, 'r:gz') as tar:
        tar.extractall(path=dataset_dir)
    print("Extraction completed.")

# 加载 CIFAR-10 数据集
def load_cifar10_batch(batch_filename):
    """加载 CIFAR-10 数据批次"""
    import pickle
    with open(batch_filename, 'rb') as f:
        batch = pickle.load(f, encoding='latin1')  # 注意 encoding='latin1'，以避免 Python 3 中的编码问题
    return batch

# 加载所有数据批次
def load_cifar10_data(dataset_dir):
    # 数据集文件路径
    batches = []
    for i in range(1, 6):  # CIFAR-10 数据集有 5 个批次
        batch_filename = os.path.join(dataset_dir, 'cifar-10-batches-py', f'data_batch_{i}')
        batches.append(load_cifar10_batch(batch_filename))

    # 合并所有批次的数据
    images = np.concatenate([batch['data'] for batch in batches], axis=0)
    labels = np.concatenate([batch['labels'] for batch in batches], axis=0)
    
    # 转换图片数据形状 (N, 3, 32, 32)
    images = images.reshape(-1, 3, 32, 32).astype(np.uint8)
    
    return images, labels

In [3]:
image, label = load_cifar10_data(dataset_dir)

In [4]:
alice_images = image[:15000]
bob_images = image[15000:35000]
carol_images = image[35000:]

alice_labels = label[:15000]
bob_labels = label[15000:35000]
carol_labels = label[35000:]

alice_partition_images = alice(lambda x: x)(alice_images)
bob_partition_images = bob(lambda x: x)(bob_images)
carol_partition_images = carol(lambda x: x)(carol_images)

alice_partition_labels = alice(lambda x: x)(alice_labels)
bob_partition_labels = bob(lambda x: x)(bob_labels)
carol_partition_labels = carol(lambda x: x)(carol_labels)

In [5]:
# 创建联邦数据集 FedNdarray
from secretflow.data.ndarray import FedNdarray, PartitionWay

# 图像数据
federated_images = FedNdarray(
    partitions={alice: alice_partition_images, bob: bob_partition_images, carol: carol_partition_images},
    partition_way=PartitionWay.HORIZONTAL,  # 水平分片
)

# 标签数据
federated_labels = FedNdarray(
    partitions={alice: alice_partition_labels, bob: bob_partition_labels, carol: carol_partition_labels},
    partition_way=PartitionWay.HORIZONTAL,  # 水平分片
)

# # 检查分区信息
print("Image partitions shape:", federated_images.partition_shape())
print("Label partitions shape:", federated_labels.partition_shape())

Image partitions shape: {PYURuntime(alice): (15000, 3, 32, 32), PYURuntime(bob): (20000, 3, 32, 32), PYURuntime(carol): (15000, 3, 32, 32)}
Label partitions shape: {PYURuntime(alice): (15000,), PYURuntime(bob): (20000,), PYURuntime(carol): (15000,)}


In [6]:
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow as tf
from sklearn.model_selection import train_test_split


def create_dense_model():
    model = keras.Sequential(
        [
            layers.Input(shape=(32, 32, 3)),  # 输入形状为 32*32 的 RGB 图像
            layers.Conv2D(32, kernel_size=(3, 3), activation="relu"),  # 卷积层 1
            layers.BatchNormalization(),  # 批归一化
            layers.MaxPooling2D(pool_size=(2, 2)),  # 最大池化层 1
            
            layers.Conv2D(64, kernel_size=(3, 3), activation="relu"),  # 卷积层 2
            layers.BatchNormalization(),  # 批归一化
            layers.MaxPooling2D(pool_size=(2, 2)),  # 最大池化层 2
            
            layers.Conv2D(128, kernel_size=(3, 3), activation="relu"),  # 卷积层 3
            layers.BatchNormalization(),  # 批归一化
            layers.MaxPooling2D(pool_size=(2, 2)),  # 最大池化层 3
            
            layers.Flatten(),  # 拉平为一维向量
            layers.Dense(512, activation="relu"),  # 全连接层 1
            layers.Dropout(0.5),  # Dropout 层，防止过拟合
            layers.Dense(10, activation="softmax")  # 输出层，10 个类别
        ]
    )
    model.compile(
        loss="sparse_categorical_crossentropy",
        metrics=["accuracy"],
    )
    return model

# 在外部创建共享模型
shared_model = create_dense_model()

# 初始化优化器
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
optimizer.build(shared_model.trainable_variables)



In [7]:
import tensorflow as tf
from secretflow import reveal
from tqdm import tqdm

# 使用 Cosine Similarity 计算损失
def cosine_similarity(a, b):
    # 计算余弦相似度
    dot_product = tf.reduce_sum(a * b, axis=-1)  # 点积
    norm_a = tf.norm(a, axis=-1)  # a 的范数
    norm_b = tf.norm(b, axis=-1)  # b 的范数
    cosine_sim = dot_product / (norm_a * norm_b + 1e-8)  # 计算余弦相似度，防止除以零
    return cosine_sim

# 定义单轮训练函数（手动计算损失和梯度）
def train_one_epoch(partition_data, partition_labels, shared_model, previous_weights, optimizer=None, batch_size=128, mu = 2, temperature = 0.5):
    # 从分区中提取数据和标签
    data = reveal(partition_data)
    labels = reveal(partition_labels)
    
    # 调整数据形状为模型的输入格式
    data = data.reshape(-1, 32, 32, 3)  # 确保形状为 (样本数, 28, 28, 1)
    
    # 创建模型
    model = create_dense_model()
    previous_models = [create_dense_model() for _ in previous_weights]


    
    # 初始化权重（如果提供了初始权重）
    if shared_model.get_weights() is not None:
        model.set_weights(shared_model.get_weights())
    for previous_model, previous_weight in zip(previous_models, previous_weights):
        if previous_weight is not None:  # 检查每个权重是否为 None
            previous_model.set_weights(previous_weight)

    # 使用优化器（默认是 Adam，如果没有传入）
    if optimizer is None:
        optimizer = tf.keras.optimizers.Adam()  # 默认使用 Adam 优化器
    
    # 手动计算损失
    dataset = tf.data.Dataset.from_tensor_slices((data, labels)).batch(batch_size)  # 使用数据集和批次大小
    
    epoch_loss = 0
    accuracy_metric = tf.keras.metrics.SparseCategoricalAccuracy()  # 创建准确率计算指标

    accumulated_gradients = [tf.zeros_like(var) for var in model.trainable_variables]
    
    for batch_data, batch_labels in tqdm(dataset, desc="Training Progress"):
        with tf.GradientTape() as tape:
            # 前向传播
            predictions = model(batch_data, training=True)
            predictions_shared = shared_model(batch_data, training=True)
            predictions_previous_models = [model(batch_data, training=True) for model in previous_models]

            # 计算余弦相似度并生成新的预测结果
            new_predictions = cosine_similarity(predictions, predictions_shared)
            new_predictions_previous = [cosine_similarity(predictions, predictions_previous_model) for predictions_previous_model in predictions_previous_models]

            # 变形为 (batch_size, 1)
            logits = tf.reshape(new_predictions, (-1, 1))
            logits_previous = [tf.reshape(new_prediction_previous, (-1, 1)) for new_prediction_previous in new_predictions_previous]
            logits_previous_cat = tf.concat(logits_previous, axis=1)
            logits = tf.concat([logits, logits_previous_cat], axis=1)
            logits /= temperature

            # 计算损失
            loss1 = tf.keras.losses.sparse_categorical_crossentropy(batch_labels, predictions, from_logits=False)
            loss1 = tf.reduce_mean(loss1)  # 取平均值

            # 计算对比损失 (第二部分)
            # 假设使用与目标相同的标签，可以根据需要修改
            labels = tf.zeros_like(batch_labels, dtype=tf.int64)
            loss2 = mu * tf.reduce_mean(tf.keras.losses.sparse_categorical_crossentropy(labels, logits, from_logits=False))

            # 总损失 = 交叉熵损失 + 对比损失
            loss = loss1 + loss2
        
        # 计算梯度
        gradients = tape.gradient(loss, model.trainable_variables)
        
        # 应用梯度更新权重
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))
        
        epoch_loss += loss.numpy()  # 累积每批次的损失
        
        # 更新准确率
        accuracy_metric.update_state(batch_labels, predictions)
    
    # 计算整个 epoch 的平均损失和准确率
    avg_loss = epoch_loss / len(dataset)
    avg_accuracy = accuracy_metric.result().numpy()  # 获取当前的准确率
    accuracy_metric.reset_states()  # 重置准确率计算器
    avg_gradients = [grad / len(dataset) for grad in accumulated_gradients]  # 平均每个梯度
    
    return model.get_weights(), avg_loss, avg_accuracy, avg_gradients  # 返回当前损失、准确率和更新后的权重

# 外部控制 epochs 的循环
num_epochs = 10
weights_share = None
weights_alice = None  # 初始权重为空
weights_bob = None
weights_carol = None
previous_weights_alice = [None, None]
previous_weights_bob = [None, None]
previous_weights_carol = [None, None]

# 假设 Alice 和 Bob 的数据量
alice_data_size = len(alice_images)
bob_data_size = len(bob_images)
carol_data_size = len(carol_images)

# 计算总数据量
total_data_size = alice_data_size + bob_data_size + carol_data_size

for epoch in range(num_epochs):
    # 更新共享模型权重的加权平均
    if weights_alice is not None and weights_bob is not None and weights_carol is not None:
        # 加权平均
        weights_share = [
            (wa * alice_data_size + wb * bob_data_size + wc * carol_data_size) / total_data_size
            for wa, wb, wc in zip(weights_alice, weights_bob, weights_carol)
        ]
        # 设置共享模型的权重
        shared_model.set_weights(weights_share)
    
    print(f"Epoch {epoch + 1}/{num_epochs} on Alice's partition...")
    weights_alice, loss_alice, acc_alice, avg_gradients_alice = train_one_epoch(alice_partition_images, alice_partition_labels, shared_model, previous_weights_alice)
    print(f"Loss on Alice's partition: {loss_alice}, Accuracy on Alice's partition: {acc_alice}")
    
    print(f"Epoch {epoch + 1}/{num_epochs} on Bob's partition...")
    weights_bob, loss_bob, acc_bob, avg_gradients_bob = train_one_epoch(bob_partition_images, bob_partition_labels, shared_model, previous_weights_bob)
    print(f"Loss on Bob's partition: {loss_bob}, Accuracy on Bob's partition: {acc_bob}")

    print(f"Epoch {epoch + 1}/{num_epochs} on Carol's partition...")
    weights_carol, loss_carol, acc_carol, avg_gradients_carol = train_one_epoch(carol_partition_images, carol_partition_labels, shared_model, previous_weights_carol)
    print(f"Loss on Carol's partition: {loss_carol}, Accuracy on Carol's partition: {acc_carol}")

    # 保存当前权重作为下一轮的 "previous_weights"
    previous_weights_alice = [weights_bob, weights_carol]
    previous_weights_bob = [weights_alice, weights_carol]
    previous_weights_carol= [weights_alice, weights_bob]


print("Training completed.")


Epoch 1/10 on Alice's partition...


Training Progress: 100%|██████████| 118/118 [00:26<00:00,  4.44it/s]


Loss on Alice's partition: 4.267078446129621, Accuracy on Alice's partition: 0.25866666436195374
Epoch 1/10 on Bob's partition...


Training Progress: 100%|██████████| 157/157 [00:35<00:00,  4.48it/s]


Loss on Bob's partition: 4.130200509053127, Accuracy on Bob's partition: 0.2722499966621399
Epoch 1/10 on Carol's partition...


Training Progress: 100%|██████████| 118/118 [00:27<00:00,  4.36it/s]


Loss on Carol's partition: 4.180807222754268, Accuracy on Carol's partition: 0.2552666664123535
Epoch 2/10 on Alice's partition...


Training Progress: 100%|██████████| 118/118 [00:26<00:00,  4.44it/s]


Loss on Alice's partition: 3.8735493886268744, Accuracy on Alice's partition: 0.3973333239555359
Epoch 2/10 on Bob's partition...


Training Progress: 100%|██████████| 157/157 [00:35<00:00,  4.47it/s]


Loss on Bob's partition: 3.848094396530443, Accuracy on Bob's partition: 0.39640000462532043
Epoch 2/10 on Carol's partition...


Training Progress: 100%|██████████| 118/118 [00:26<00:00,  4.47it/s]


Loss on Carol's partition: 3.8828020095825195, Accuracy on Carol's partition: 0.382999986410141
Epoch 3/10 on Alice's partition...


Training Progress: 100%|██████████| 118/118 [00:26<00:00,  4.50it/s]


Loss on Alice's partition: 3.7279583660222717, Accuracy on Alice's partition: 0.44473332166671753
Epoch 3/10 on Bob's partition...


Training Progress: 100%|██████████| 157/157 [00:36<00:00,  4.29it/s]


Loss on Bob's partition: 3.714131212538215, Accuracy on Bob's partition: 0.4503999948501587
Epoch 3/10 on Carol's partition...


Training Progress: 100%|██████████| 118/118 [00:26<00:00,  4.41it/s]


Loss on Carol's partition: 3.7238851240125754, Accuracy on Carol's partition: 0.4528000056743622
Epoch 4/10 on Alice's partition...


Training Progress: 100%|██████████| 118/118 [00:26<00:00,  4.41it/s]


Loss on Alice's partition: 3.620473344447249, Accuracy on Alice's partition: 0.48506665229797363
Epoch 4/10 on Bob's partition...


Training Progress: 100%|██████████| 157/157 [00:35<00:00,  4.42it/s]


Loss on Bob's partition: 3.5990986641804885, Accuracy on Bob's partition: 0.49184998869895935
Epoch 4/10 on Carol's partition...


Training Progress: 100%|██████████| 118/118 [00:29<00:00,  4.02it/s]


Loss on Carol's partition: 3.6269524784411415, Accuracy on Carol's partition: 0.4844000041484833
Epoch 5/10 on Alice's partition...


Training Progress: 100%|██████████| 118/118 [00:28<00:00,  4.12it/s]


Loss on Alice's partition: 3.5310612755306696, Accuracy on Alice's partition: 0.5208666920661926
Epoch 5/10 on Bob's partition...


Training Progress:  17%|█▋        | 27/157 [00:06<00:31,  4.12it/s]


KeyboardInterrupt: 