In [None]:
import tensorflow as tf
import horovod.tensorflow.keras as hvd
import os
import numpy as np
import pandas as pd
import cv2
from tqdm import tqdm
from itertools import chain
import matplotlib.pyplot as plt

# Horovod: initialize Horovod.
hvd.init()

# Horovod: pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

total_partitions = hvd.size()


#Reading the target labels
df = pd.read_csv('./sample/sample_labels.csv', index_col= False)
df.index.astype('int64')
total_rows = df.shape[0]
batch_size = total_rows//total_partitions

for partition in range(total_partitions):
    if hvd.rank() == partition:
        start = partition * batch_size
        end = (partition+1) * batch_size
        df = df.iloc[start:end-1, :]

df['Finding Labels'] = df['Finding Labels'].map(lambda x: x.replace('No Finding', ''))
all_labels = np.unique(list(chain(*df['Finding Labels'].map(lambda x: x.split('|')).tolist())))

all_labels = [x for x in all_labels if len(x)>0]

for c_label in all_labels:
    if len(c_label)>1: # leave out empty labels
        df[c_label] = df['Finding Labels'].map(lambda finding: 1.0 if c_label in finding else 0)

df['disease_vec'] = df.apply(lambda x: [x[all_labels].values], 1).map(lambda x: x[0])
y=df['disease_vec']

# Reading Images into arrays
train_path = './sample/sample/images/'
data = os.listdir(train_path)
img_size =100
train_data=[]
i =0
for partition in range(total_partitions):
    if hvd.rank() == partition:
        start = partition * batch_size
        end = (partition+1) * batch_size
        data= data[start:end-1]
for img in tqdm(data):
    img_array = cv2.imread(os.path.join(train_path,img))
    new_array = cv2.resize(img_array,(img_size,img_size))
    train_data.append([new_array,y.iloc[i]])
    i= i+1

X=[]
Y=[]

for i,j in train_data:
    X.append(i)
    Y.append(j)

X = np.array(X)
X =X/255.0
Y = np.array(Y).astype(np.float32)
indx = int(0.9*X.shape[0])
trainX, testX = X[:indx,:], X[indx:,:]
trainY, testY = Y[:indx,:], Y[indx:,:]

model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(256, (3, 3), input_shape=X.shape[1:]),
    tf.keras.layers.Conv2D(filters=96, kernel_size=(11, 11),strides = 3, padding= 'same', input_shape =(128,128,1),
                 kernel_regularizer=l2(0.)),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Conv2D(64, (3, 3),padding= 'same', activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Dropout(0.25),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dropout(0.2),
    tf.keras.layers.Dense(14),
    tf.keras.layers.BatchNormalization(),
    tf.keras.layers.Activation('sigmoid')
])

# Horovod: adjust learning rate based on number of GPUs.
opt = tf.optimizers.Adam(0.001 * hvd.size())

# Horovod: add Horovod DistributedOptimizer.
opt = hvd.DistributedOptimizer(opt)

# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
# uses hvd.DistributedOptimizer() to compute gradients.
model.compile(loss=tf.losses.BinaryCrossentropy(), optimizer=opt, metrics=['accuracy'],
                    experimental_run_tf_function=False)

callbacks = [
    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),

    # Horovod: average metrics among workers at the end of every epoch.
    #
    # Note: This callback must be in the list before the ReduceLROnPlateau,
    # TensorBoard or other metrics-based callbacks.
    hvd.callbacks.MetricAverageCallback(),

    # Horovod: using `lr = 1.0 * hvd.size()` from the very beginning leads to worse final
    # accuracy. Scale the learning rate `lr = 1.0` ---> `lr = 1.0 * hvd.size()` during
    # the first three epochs. See https://arxiv.org/abs/1706.02677 for details.
    hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=3, verbose=1),
]

# Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them.
#if hvd.rank() == 0:
#    callbacks.append(tf.keras.callbacks.ModelCheckpoint('./checkpoint-{epoch}.h5'))

# Horovod: write logs on worker 0.
verbose = 1 if hvd.rank() == 0 else 0

# Train the model.
# Horovod: adjust number of steps based on number of GPUs.
history = model.fit(x= trainX, y = trainY, batch_size = 16, validation_data=(testX, testY),callbacks=callbacks,
                    epochs=5, verbose=verbose)

_, train_acc = model.evaluate(trainX, trainY, verbose=0)
_, test_acc = model.evaluate(testX, testY, verbose=0)

print('Train: %.3f, Test: %.3f' % (train_acc, test_acc))

# learning curves of model accuracy
plt.plot(history.history['accuracy'], label='train')
plt.plot(history.history['val_accuracy'], label='test')
plt.legend()
plt.show()