<a href="https://colab.research.google.com/github/Next-Sunshine/TFF0318/blob/master/MyTFF.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#环境测试
#@test {"skip":true}
!pip install --quiet --upgrade tensorflow_federated
%load_ext tensorboard

[K     |████████████████████████████████| 430kB 1.4MB/s 
[K     |████████████████████████████████| 2.8MB 50.2MB/s 
[K     |████████████████████████████████| 20.0MB 229kB/s 
[K     |████████████████████████████████| 102kB 11.7MB/s 
[K     |████████████████████████████████| 421.8MB 38kB/s 
[K     |████████████████████████████████| 2.2MB 30.9MB/s 
[K     |████████████████████████████████| 296kB 53.6MB/s 
[K     |████████████████████████████████| 450kB 55.1MB/s 
[?25h  Building wheel for gast (setup.py) ... [?25l[?25hdone
[31mERROR: datascience 0.10.6 has requirement folium==0.2.1, but you'll have folium 0.8.3 which is incompatible.[0m
[31mERROR: albumentations 0.1.12 has requirement imgaug<0.2.7,>=0.2.5, but you'll have imgaug 0.2.9 which is incompatible.[0m


In [7]:
import collections

import numpy as np
import random
import tensorflow as tf
import tensorflow_federated as tff

tf.compat.v1.enable_v2_behavior()

np.random.seed(0)

tff.federated_computation(lambda: 'Hello, world!')()

b'Hello, world!'

In [3]:
#装载数据集，实验使用EMNIST数据集
emnist_train, emnist_test = tff.simulation.datasets.emnist.load_data()

Downloading data from https://storage.googleapis.com/tff-datasets-public/fed_emnist_digitsonly.tar.bz2


In [87]:
#此处得到客户端的总数K，因为每一轮随机选择C×K的向上取整
#根据FedAvg做的研究C取0.2可获得较好的性能，为了简化模型K这里取固定的数
K = len(emnist_train.client_ids)
C = 0.2


3383

In [0]:
#NUM_CLIENTS在实验的时候应该是K×C
NUM_CLIENTS = 10
NUM_EPOCHS = 5
BATCH_SIZE = 20
SHUFFLE_BUFFER = 100
PREFETCH_BUFFER = 10

#预处理，将图片中的‘pixels'、‘label'分别表示成x和y
#将28×28的图像展平成784个元素，打乱顺序
def preprocess(dataset):
  #内部函数将像素和标签转换成x和y，并将像素展平
  def batch_format_fn(element):
    return collections.OrderedDict(
        x = tf.reshape(element['pixels'], [-1,784]),
        y = tf.reshape(element['label'], [-1,1])
    )
  
  return dataset.repeat(NUM_EPOCHS).shuffle(SHUFFLE_BUFFER).batch(BATCH_SIZE).map(batch_format_fn).prefetch(PREFETCH_BUFFER)

In [0]:
#为指定用户创建联邦数据,接收训练集和用户id
def make_federated_data(client_data, client_ids):
  return [
      preprocess(client_data.create_tf_dataset_for_client(x))
      for x in client_ids
    ]


In [0]:
#这里改成随机获得NUM_CLIENTS个用户，模拟联邦学习每轮的随机选择用户
 sample_clients = random.sample(emnist_train.client_ids, NUM_CLIENTS)
# sample_clients #说明随机用户的id是取出来了的
# type(emnist_train.client_ids)  #client_ids是一个列表

federated_train_data = make_federated_data(emnist_train, sample_clients)
# def random_federated_train_data(train_set, num_clients):
#   sample_clients = random.sample(train_set.client_ids, num_clients)
#   federated_train_data = make_federated_data(train_set, sample_clients)

#   return federated_train_data


In [0]:
#创建一个变量集合来表示所有变量,包括model(weights+bias)以及metrics(num_examples,loss_sum,accuracy_sum)
MnistVariables = collections.namedtuple(
    'MnistVariables','weights bias num_examples loss_sum accuracy_sum'
)

In [0]:
#创建变量并初始化
def create_mnist_variables():
  return MnistVariables(
      weights=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(784,10)),
          name='weights',
          trainable=True),
      bias=tf.Variable(
          lambda: tf.zeros(dtype=tf.float32, shape=(10)),
          name='bias',
          trainable=True),
      num_examples=tf.Variable(0.0, name='num_examples', trainable=False),
      loss_sum=tf.Variable(0.0, name='loss_sum', trainable=False),
      accuracy_sum=tf.Variable(0.0, name='accuracy_sum', trainable=False)
  )

In [0]:
#自定义前向传播函数
def mnist_forward_pass(variables, batch):
  y = tf.nn.softmax(tf.matmul(batch['x'], variables.weights) + variables.bias)
  predictions = tf.cast(tf.argmax(y,1), tf.int32) 

  flat_labels = tf.reshape(batch['y'], [-1])
  #计算交叉熵损失
  loss = -tf.reduce_mean(
      tf.reduce_sum(tf.one_hot(flat_labels, 10) * tf.math.log(y), axis=[1]))
  #计算准确率
  accuracy = tf.reduce_mean(
      tf.cast(tf.equal(predictions, flat_labels), tf.float32))
  
  #样本数
  num_examples = tf.cast(tf.size(batch['y']), tf.float32)

  #更新样本数、损失和、精度和,每一批都考虑了自己的权重
  variables.num_examples.assign_add(num_examples)
  variables.loss_sum.assign_add(loss * num_examples)
  variables.accuracy_sum.assign_add(accuracy * num_examples)

  return loss, predictions

In [0]:
#计算本地用户的metrics度量
def get_local_mnist_metrics(variables):
  return collections.OrderedDict(
      num_examples=variables.num_examples,
      loss=variables.loss_sum / variables.num_examples,
      accuracy=variables.accuracy_sum / variables.num_examples
  )

In [0]:
#集合每个设备发出的本地度量
@tff.federated_computation
def aggregate_mnist_metrics_across_clients(metrics):
  return collections.OrderedDict(
      num_examples=tff.federated_sum(metrics.num_examples),
      loss=tff.federated_mean(metrics.loss, metrics.num_examples),
      accuracy=tff.federated_mean(metrics.accuracy, metrics.num_examples)
  )

In [0]:
#自定义模型，创建tff.learning.model实例
class MnistModel(tff.learning.Model):

  def __init__(self):
    self._variables = create_mnist_variables()

  @property
  def trainable_variables(self):
    return [self._variables.weights, self._variables.bias]
  
  @property
  def non_trainable_variables(self):
    return []
  
  @property
  def local_variables(self):
    return [self._variables.num_examples, self._variables.loss_sum,
         self._variables.accuracy_sum]
  
  @property
  def input_spec(self):
    return collections.OrderedDict(
        x=tf.TensorSpec([None, 784], tf.float32),
        y=tf.TensorSpec([None, 1], tf.int32)
    )

  @tf.function
  def forward_pass(self, batch, training=True):
    del training
    loss, predictions = mnist_forward_pass(self._variables, batch)
    num_examples = tf.shape(batch['x'])[0]
    return tff.learning.BatchOutput(
        loss=loss, predictions=predictions, num_examples=num_examples)
    
  @tf.function
  def report_local_outputs(self):
    return get_local_mnist_metrics(self._variables)
  
  @property
  def federated_output_computation(self):
    return aggregate_mnist_metrics_across_clients

In [44]:
#创建迭代器执行联合平均的迭代过程
iterative_process = tff.learning.build_federated_averaging_process(
    MnistModel,
    client_optimizer_fn=lambda: tf.keras.optimizers.SGD(learning_rate=0.02)
)









In [0]:
#获得初始状态
state = iterative_process.initialize()

In [79]:
#查看第一轮训练
sample_clients = random.sample(emnist_train.client_ids, NUM_CLIENTS)
federated_train_data = make_federated_data(emnist_train, sample_clients)
state, metrics = iterative_process.next(state, federated_train_data)
print('round 1, metrics={}'.format(metrics))

round 1, metrics=<num_examples=5065.0,loss=2.893873453140259,accuracy=0.13563671708106995>


In [86]:
#计算更多轮
NUM_ROUNDS = 11
for round_num in range(2,NUM_ROUNDS):
  #随机选择用户
  sample_clients = random.sample(emnist_train.client_ids, NUM_CLIENTS)
  #print(str(sample_clients))
  federated_train_data = make_federated_data(emnist_train, sample_clients)

  state, metrics = iterative_process.next(state, federated_train_data)
  print('round {:2d}, metrics={}'.format(round_num, metrics))

round  2, metrics=<num_examples=4840.0,loss=3.116323947906494,accuracy=0.117148756980896>
round  3, metrics=<num_examples=5245.0,loss=3.037523031234741,accuracy=0.12735939025878906>
round  4, metrics=<num_examples=4550.0,loss=3.0213871002197266,accuracy=0.13076923787593842>
round  5, metrics=<num_examples=5580.0,loss=2.846266269683838,accuracy=0.15358422696590424>
round  6, metrics=<num_examples=5430.0,loss=2.6392924785614014,accuracy=0.1707182377576828>
round  7, metrics=<num_examples=4855.0,loss=2.6738829612731934,accuracy=0.1882595270872116>
round  8, metrics=<num_examples=5105.0,loss=2.4022183418273926,accuracy=0.22703231871128082>
round  9, metrics=<num_examples=5015.0,loss=2.4091196060180664,accuracy=0.24187438189983368>
round 10, metrics=<num_examples=4280.0,loss=2.316448450088501,accuracy=0.25257009267807007>


In [85]:
#使用tensorboard可视化
#使用Tensorboard可视化这些联邦计算的度量
#创建目录和相应的摘要编写器
#@test {"skip": true}
logdir = "/tmp/logs/scalars/training/"
summary_writer = tf.summary.create_file_writer(logdir)
state = iterative_process.initialize()

#!!!太坑了，我以为@test是无关紧要的东西，emmm，就省略了，没想到就凉凉
#@test {"skip": true}
with summary_writer.as_default():
  for round_num in range(1, NUM_ROUNDS):
    state, metrics = iterative_process.next(state, federated_train_data)
    for name, value in metrics._asdict().items():
      tf.summary.scalar(name, value, step=round_num)

#@test {"skip":true}
#%load_ext tensorboard
%tensorboard --logdir /tmp/logs/scalars/ --port=0

NameError: ignored