In [1]:
from keras.layers import Conv2D, MaxPooling2D, Dense, Input, Flatten
from keras.models import Model
from keras.optimizers import SGD
from keras.losses import categorical_crossentropy
from keras.utils.np_utils import to_categorical
from skimage.transform import resize
from multiprocessing import Pool
from functools import partial
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import tensorflow as tf

Using TensorFlow backend.


In [2]:
def f1_micro(y_true, y_pred):
    tp = tf.reduce_sum(y_true * y_pred)
    prec = tp/tf.reduce_sum(y_pred)
    rec = tp/tf.reduce_sum(y_true)
    return 2*prec*rec/(prec + rec + 1e-10)

In [3]:
def simple_cnn(img_size, n_classes):
    inputs = Input(img_size)
    conv1 = Conv2D(kernel_size=5, filters=32)(inputs)
    pool1 = MaxPooling2D(pool_size=2, strides=2)(conv1)
    conv2 = Conv2D(kernel_size=5, filters=64)(pool1)
    pool2 = MaxPooling2D(pool_size=2, strides=2)(conv2)
    flat = Flatten()(pool2)
    dense = Dense(units=512, activation='relu')(flat)
    out = Dense(units=n_classes, activation='softmax')(dense)
    return Model(inputs=inputs, outputs=out)
    
def get_model(config, weights=None):
    model = simple_cnn(config.data.img_size, config.data.n_classes)
    model.compile(optimizer=SGD(lr=config.train.learning_rate),
                  loss='categorical_crossentropy',
                  metrics=[f1_micro])
    if weights is not None:
        model.set_weights(weights)
    return model

def datagen(files, labels, batch_size, n_classes,
            img_size, shuffle=True):
    pos = 0
    n_files = len(files)
    while True:
        if (pos == 0) and shuffle:
            files = np.random.permutation(files)
        slc = slice(pos, pos + batch_size)
        batch = list(map(plt.imread, files[slc]))
        batch = [resize(image=b, 
                    output_shape=img_size, 
                        preserve_range=True)
                 for b in batch]
        batch = np.array(batch)
        pos = min((pos + batch_size), n_files) % n_files
        yield (batch, to_categorical(labels[slc], n_classes))

def get_dataset(df_path, num=None, shuffle=False):
    df = pd.read_csv(df_path)
    if num is not None:
        df = df[df[config.data.client_column] == num]
    
    
#     classes = rows.label.unique()
#     files = []
#     max_examples = rows.label.value_counts().max()
#     for cl in classes:
        
#         _files = rows[rows.label == cl].filenames
#         _labels = rows[rows.label == cl].labels
        
#         if balanced:
#             n_examples = len(_files)
#             n_oversample = max_examples - n_examples

#             if n_oversample > 0:
#                 _files = np.concatenate([_files.values, 
#                             _files.sample(n_oversample,
#                             replace=n_oversample > n_examples).values])
#                 _labels = np.concatenate([_labels.values, 
#                             _labels.sample(n_oversample, 
#                             replace=n_oversample > n_examples).values])
            
#         files.append(_files)
#         labels.append(_labels)
        
    return (datagen(df.filename.values, 
                   df.label.values, 
                   config.data.batch_size,
                   config.data.n_classes,
                   config.data.img_size, shuffle=shuffle), 
int(np.ceil(len(df)/config.data.batch_size)), len(df))

        

In [4]:
def client_update(config, weights, num):
    n, num = num
    if n == 0:
        print(num)
        print(pd.DataFrame(pd.read_csv(config.data.train_df_path).query('{}=={}'.format(
            config.data.client_column, num)).label.value_counts()).T)
    dataset, steps_per_epoch, n_examples = get_dataset(config.data.train_df_path, num)
    model = get_model(config, weights)
    history = model.fit_generator(dataset, 
                        steps_per_epoch=steps_per_epoch,
                        epochs=config.train.epochs, verbose=n==0)
    return (model.get_weights(),
            n_examples,
            history.history['loss'][-1], 
            history.history['f1_micro'][-1])
    

In [5]:
def average_weights(weights, n_examples):
    weight_lists = map(list, zip(*weights))
    total_examples = np.sum(n_examples)
    return [np.sum(np.stack(w, axis=-1) * n_examples, axis=-1) / total_examples for w in weight_lists]

def fed_averaging(config):
    if not os.path.exists(config.log.path):
        os.makedirs(config.log.path)
    
    logpath = os.path.join(config.log.path, 'csvlogs')
    if not os.path.exists(logpath):
        os.makedirs(logpath)
    
    model = get_model(config)
    pool = Pool(min(os.cpu_count(), config.train.num_clients))
    valid_data, valid_steps, _ = get_dataset(config.data.val_df_path, shuffle=False)
    valid_log = pd.DataFrame({'round': [], 
                        'loss': [],
                        'f1_micro': []})
    train_log = pd.DataFrame({'round': [], 
                    'loss': [],
                    'f1_micro': []})
    
    best_score = 0
    
    if 'resume' in config.keys():
        print('Resuming from {}'.format(config.resume.ckpt_path))
        model.load_weights(config.resume.ckpt_path)
    for t in range(1, config.train.num_rounds + 1):
        print('Round {}'.format(t))
        print('-' * 10)
        print('Training')
        global_weights = model.get_weights()
        m = int(np.ceil(max(config.train.client_fraction * config.train.num_clients, 1)))
        clients = np.random.permutation(config.train.num_clients)[:m]
        local_results = pool.map(partial(client_update, config, global_weights), enumerate(clients))
        local_weights, n_examples, _tloss, _tf1 = zip(*local_results)
        tloss = np.mean(_tloss)
        tf1 = np.mean(_tf1)
        model.set_weights(average_weights(local_weights, n_examples))
        print('train_loss {:.4f}, train_f1 {:.4f}'.format(tloss, tf1))
        print('Validation')
        vloss, vf1 = model.evaluate_generator(valid_data, valid_steps, verbose=True)
        
        valid_log = valid_log.append(pd.DataFrame({'round': [t], 
                                 'loss': vloss,
                                 'f1_micro': vf1}), ignore_index=True)
        train_log = train_log.append(pd.DataFrame({'round': [t], 
                         'loss': tloss,
                         'f1_micro': tf1}), ignore_index=True)
        
        valid_log[['round', 'loss', 'f1_micro']].to_csv(os.path.join(logpath, 'valid'), index=False)
        train_log[['round', 'loss', 'f1_micro']].to_csv(os.path.join(logpath, 'train'), index=False)
        
        print('val_loss {:.4f}, val_f1 {:.4f}'.format(vloss, vf1))
        print()
        print()
        if vf1 > best_score:
            model.save_weights(os.path.join(config.log.path, 'ckpt'))
            best_score = vf1
    pool.close()
    pool.join()
        

In [6]:
from easydict import EasyDict
config = EasyDict(
{
 'data': {
     'train_df_path' : 'seedlings_train.csv',
     'val_df_path': 'seedlings_val.csv',
     'img_size': (50, 50, 3),
     'batch_size': 10,
     'n_classes': 12,
     'client_column': 'shard_non_iid',
     
 },
 'train' : {
     'learning_rate': 1e-3,
     'epochs': 5,
      'client_fraction':0.2,
     'num_clients': 10,
     'num_rounds': 1000
     
 },
    'log': {
        'path': './results/04-fed-avg-non_iid'
    },
    
    'resume': {
        'ckpt_path': './results/03-fed-avg-non_iid'
    }
}
)


In [7]:
fed_averaging(config)

Resuming from ./results/03-fed-avg-non_iid


OSError: Unable to open file (file read failed: time = Fri Apr 26 06:34:26 2019
, filename = './results/03-fed-avg-non_iid', file descriptor = 57, errno = 21, error message = 'Is a directory', buf = 0x7ffeeaf35868, total read size = 8, bytes this sub-read = 8, bytes actually read = 18446744073709551615, offset = 0)