### Training Pipeline Setup

In [11]:
from datetime import datetime
import os

 
REGION = 'us-central1'
PROJECT = !(gcloud config get-value core/project)
PROJECT = PROJECT[0]
BUCKET =  'nlp-xray-dataset'

# # Do not change these
os.environ["PROJECT"] = PROJECT
os.environ["BUCKET"] = BUCKET
os.environ["REGION"] = REGION

### Task setup

In [12]:
%%writefile xray_models/trainer_cloud/task.py
import argparse
import os
import json
import sys

from . import model

def _parse_arguments(argv):
    """
    Parse command line arguments
    """
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--model_type',
        help='which model type to use',
        type=str, default='mobilenetv2')
    parser.add_argument(
        '--epoch',
        help='number of epochs to use',
        type=int, default=5)
    parser.add_argument(
        '--steps_per_epoch',
        help='number of steps per epoch to use',
        type=int, default=10)
    parser.add_argument(
        '--job_dir',
        help='directory where to save the model',
        type=str, default='xray_models/')
    parser.add_argument(
        '--nrows',
        help='number of total rows desired accross test, validation, and test set',
        type=int, default=None)
    parser.add_argument('--loss_class_weighted', dest='loss_class_weighted', action='store_true')
    parser.add_argument('--loss_not_class_weighted', dest='loss_class_weighted', action='store_false')
    parser.set_defaults(loss_class_weighted=True)
    return parser.parse_known_args(argv)

def main():
    """
    This function will parse command line arguments and kick model training
    """
    args = _parse_arguments(sys.argv[1:])[0]
    print(f'Training with Arguments:{args}')
    trial_id = json.loads(
        os.environ.get('TF_CONFIG', '{}')).get('task', {}).get('trial', '')
    output_path = args.job_dir if not trial_id else args.job_dir+'/'
    
    model_layers = model.get_layers(args.model_type)
    model_history = model.train_and_evaluate(model_layers,args.epoch, args.steps_per_epoch, args.job_dir, args.nrows, args.loss_class_weighted, args.model_type)
    
if __name__=='__main__':
    main()


Overwriting xray_models/trainer_cloud/task.py


### Adding non model functions to a config file

In [13]:
%%writefile xray_models/trainer_cloud/config/config.py

#model parameters
IMG_HEIGHT = 299
IMG_WIDTH = 299
IMG_CHANNELS = 1

processing_parameters={'mobilenetv2': {'channels': 1, 'image_height': min([IMG_HEIGHT,299]), 'image_width': min([IMG_WIDTH,299]), 'pixel_scale_min': 0, 'pixel_scale_max': 1}, 
                           'cnn': {'channels': 1, 'image_height': min([IMG_HEIGHT,299]), 'image_width': min([IMG_WIDTH,299]), 'pixel_scale_min': 0, 'pixel_scale_max': 1}, 
                           'inception_resnet': {'channels': 3, 'image_height': 299, 'image_width': 299, 'pixel_scale_min': 0, 'pixel_scale_max': 1}, 
                           'vision_transformer': {'channels': 3, 'image_height': 224, 'image_width': 224, 'pixel_scale_min': -1, 'pixel_scale_max': 1}}
BATCH_SIZE = 20
SHUFFLE_BUFFER = 20*BATCH_SIZE

VAL_BATCH_SIZE = 100
VALIDATION_IMAGES = 10000
VALIDATION_STEPS = VALIDATION_IMAGES//BATCH_SIZE

#augmentation parameters
MAX_DELTA = 63.0/255.0 #adjusting brightness
CONTRAST_LOWER = 0.2
CONTRAST_UPPER = 1.8

#response variable columns in the mete_data_processed.csv
response_variables = ['atelectasis', 'cardiomegaly', 'consolidation', 'edema', 'effusion', 'emphysema', 'fibrosis', 'hernia',
                 'infiltration', 'mass', 'no_finding', 'nodule', 'pleural_thickening', 'pneumonia', 'pneumothorax']

meta_data_train_processed_path = "gs://nlp-xray-dataset/meta_data/meta_data_processed_train.csv" #location to the meta data file - train 
meta_data_val_processed_path = "gs://nlp-xray-dataset/meta_data/meta_data_processed_val.csv" #location to the meta data file - val
meta_data_test_processed_path = "gs://nlp-xray-dataset/meta_data/meta_data_processed_test.csv" #location to the meta data file - test
meta_data_processed_path = "gs://nlp-xray-dataset/meta_data/meta_data_processed.csv" #location to the meta data file

Overwriting xray_models/trainer_cloud/config/config.py


In [14]:
%%writefile xray_models/trainer_cloud/preprocessing.py
import os
import pandas as pd
import numpy as np
import tensorflow as tf
import random
from .config import config

AUTOTUNE = tf.data.experimental.AUTOTUNE

def rescale_image_tensor(t,domain_interval,range_interval):
    a=domain_interval[0]
    b=domain_interval[1]
    c=range_interval[0]
    d=range_interval[1]
    rescaled_t=c+((d-c)/(b-a))*(t-a)
    return(rescaled_t)

def decode_image(img, reshape_dims, num_channels, pixel_min, pixel_max):
    """
    Decode an image
    """
    img = tf.image.decode_png(img, channels=num_channels)
    img = tf.image.convert_image_dtype(img, tf.float32)
    img = tf.image.resize(img, reshape_dims)
    image_min = tf.math.reduce_min(img)
    image_max = tf.math.reduce_max(img)    
    if pixel_min!=image_min and pixel_max!=image_max:
        #img=img.map(lambda x: tf.py_function(func=rescale_image_tensor, inp=[x, [image_min,image_max],[pixel_min,pixel_max]], Tout=tf.float32))
        #img = img.map(lambda x: rescale_image_tensor(x,[image_min,image_max],[pixel_min,pixel_max]))
        img = rescale_image_tensor(img, [image_min,image_max],[pixel_min,pixel_max])
    img = tf.cast(img, dtype=tf.float32)
    return(img)

def decode(filename, label):
    """
    Decode file names.
    """
    image_bytes = tf.io.read_file(filename=filename)
    return image_bytes, label


def get_filenames_and_labels(data_path,nrow_ind):
    """
    Get filenames and labels
    """
    meta_data = pd.read_csv(data_path)
    all_files = meta_data['gs_path'].tolist()
    response_vars = config.response_variables    
    all_labels = np.array(meta_data[response_vars].values.tolist())
    if nrow_ind:
        if nrow_ind<= meta_data.shape[0]:            
            zip_lists=list(zip(all_files, all_labels))
            random.shuffle(zip_lists)
            files, labels = zip(*zip_lists)
            files=list(files)[:nrow_ind]
            labels=np.array(labels)[:nrow_ind]
            return(files, labels)
        else:
            return(all_files, all_labels)
    else: 
        return(all_files, all_labels)

        

def read_and_preprocess(image_bytes, label,model_type='cnn', random_augment=False):
    """
    Function which performs data augmentation.
    """
    pp=config.processing_parameters[model_type]
    if random_augment:
        img = decode_image(image_bytes, [pp['image_width']+10,pp['image_width']+10], pp['channels'], pp['pixel_scale_min'], pp['pixel_scale_max'])
        img = tf.image.random_crop(img, [config.IMG_HEIGHT, config.IMG_WIDTH, config.IMG_CHANNELS])
        img = tf.image.random_flip_left_right(img)
        img = tf.image.random_brightness(img, config.MAX_DELTA)
        img = tf.image.random_contrast(img, config.CONTRAST_LOWER, config.CONTRAST_UPPER)
    else:
        img = decode_image(image_bytes, [pp['image_width'],pp['image_width']], pp['channels'], pp['pixel_scale_min'], pp['pixel_scale_max'])
    return img, label

def read_and_preprocess_with_augment(image_bytes, label, model_type='cnn'):
    """
    Data augmentation for the training set.
    """
    return read_and_preprocess(image_bytes, label, model_type, random_augment=True)


# UPDATE HERE
def load_dataset(filenames, labels, batch_size,model_type, training=True):
    """
    This functions load the dataset from the GCS bucket.
    Inputs include:
    filenames: list of gcs locations for image files
    labels: numpy array of one hot encoded multi labels
    batch_size: batch size
    training: boolean entry specifying if training data is needed. False for test data.
    """
    dataset = tf.data.Dataset.from_tensor_slices((filenames, labels)).map(decode) #numpy array of filenames and numpy array of labels    
    if training:
        dataset = dataset.map(lambda f,l: read_and_preprocess_with_augment(f,l,model_type=model_type)).cache().shuffle(config.SHUFFLE_BUFFER).repeat(count=None)
    else:
        dataset = dataset.map(lambda f,l: read_and_preprocess(f,l,model_type=model_type)).repeat(count=1)
    
    return dataset.batch(batch_size=batch_size).prefetch(buffer_size=AUTOTUNE) 


Overwriting xray_models/trainer_cloud/preprocessing.py


### Models

In [15]:
%%writefile xray_models/trainer_cloud/model.py
import os
import shutil

import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
from tensorflow.keras import Sequential
from tensorflow.keras.callbacks import TensorBoard
from tensorflow.keras.layers import (Conv2D, Dense, Dropout, Flatten, MaxPooling2D, Softmax)
import tensorflow_hub as hub

from .config import config
from .import preprocessing

def get_layers(model_type, 
               nclasses=15, 
               hidden_layer_1_neurons=400,
               hidden_layer_2_neurons=100,
               dropout_rate=0.25,
               num_filters_1=64,
               kernel_size_1=3,
               pooling_size_1=2,
               num_filters_2=32,
               kernel_size_2=3,
               pooling_size_2=2):
    """
    Get model layers for a specific model
    """
    model_layers = {
        'cnn':[
            Conv2D(num_filters_1, kernel_size=kernel_size_1,
                  activation='relu', input_shape=(config.IMG_WIDTH, config.IMG_HEIGHT, 1)),
            MaxPooling2D(pooling_size_1),
            Conv2D(num_filters_2, kernel_size=kernel_size_2,
                  activation='relu'),
            MaxPooling2D(pooling_size_2),
            Flatten(),
            Dense(hidden_layer_1_neurons, activation='relu'),
            Dense(hidden_layer_2_neurons, activation='relu'),
            Dropout(dropout_rate),
            Dense(nclasses, activation='sigmoid')
        ],
        'vision_transformer':
        [hub.KerasLayer("https://tfhub.dev/sayakpaul/vit_b16_fe/1", trainable=False),
        Dense(nclasses, activation='sigmoid')
        ],
        'inception_resnet':[
        hub.KerasLayer("https://tfhub.dev/google/imagenet/resnet_v2_101/feature_vector/5",trainable=False),
        Dense(nclasses, activation='sigmoid')]
        
    }
    return model_layers[model_type]

def label_weighted_cross_entropy(y_true, y_pred):
    """
    Loss 
    """
    P = tf.reduce_sum(y_true)
    N = -1 * tf.reduce_sum(y_true - 1)
    
    beta_P = tf.cast((P + N) / P, dtype=tf.float64)
    beta_N = tf.cast((P + N) / N, dtype=tf.float64)
    
    y_true = tf.cast(y_true, dtype=tf.float64)
    y_pred = tf.cast(y_pred, dtype=tf.float64)
    
    epsilon = tf.constant(1e-7, dtype=tf.float64) #avoid nans
    loss = (beta_P*tf.math.log(y_pred+epsilon)*y_true + beta_N*tf.math.log((1-y_pred)+epsilon) * (1-y_true))*-1.0
    tf.debugging.assert_all_finite(loss, 'There are nan values')
    return tf.reduce_sum(tf.reduce_mean(loss, axis = 0))


class ClassImbalanceSparsityAdjustedLoss(tf.keras.losses.Loss):
    def __init__(self, inverse_class_weights):
        """
        Initialization of inverse class weights
        """
        super().__init__(name = 'ClassImbalanceSparsityAdjustedLoss')
        self.inverse_class_weights = inverse_class_weights
    
    def call(self, y_true, y_pred):
        """
        Cross entropy loss adjusted for class imabalance and one-hot encoding sparsity
        """
        P = tf.reduce_sum(y_true)
        N = -1 * tf.reduce_sum(y_true - 1)

        beta_P = tf.cast((P + N) / P, dtype=tf.float64)
        beta_N = tf.cast((P + N) / N, dtype=tf.float64)

        y_true = tf.cast(y_true, dtype=tf.float64)
        y_pred = tf.cast(y_pred, dtype=tf.float64)

        epsilon = tf.constant(1e-7, dtype=tf.float64) #avoid nans
        loss = (beta_P*tf.math.log(y_pred+epsilon)*y_true + beta_N*tf.math.log((1-y_pred)+epsilon) * (1-y_true))*-1.0
        tf.debugging.assert_all_finite(loss, 'There are nan values')
        return tf.reduce_sum(tf.reduce_mean(loss, axis = 0)*self.inverse_class_weights) 


def build_model(layers, output_dir,inverse_class_weights, loss_class_weighted):
    """
    Compiles keras model for image classification/
    """    
    recall = tf.keras.metrics.Recall()
    precision = tf.keras.metrics.Precision()
    
    #original_loss_func - label_weighted_cross_entropy
    model = Sequential(layers)
    if loss_class_weighted: 
        model.compile(optimizer='adam',
                     loss=ClassImbalanceSparsityAdjustedLoss(inverse_class_weights),
                     metrics=[recall, precision, 'accuracy'])
    else: 
        model.compile(optimizer='adam',
                     loss=label_weighted_cross_entropy,
                     metrics=[recall, precision, 'accuracy'])
    return model

def train_and_evaluate(model_layers,num_epochs, steps_per_epoch, output_dir, nrows, loss_class_weighted, model_type):
    """
    Compile the model and load data for training into it.
    """
    if nrows: 
        train_nrow=int(nrows*.8)
        val_nrow=int(nrows*.2)
    else: 
        train_nrow=None
        val_nrow=None
    
    # Training Dataset 
    train_filenames, train_labels  = preprocessing.get_filenames_and_labels(config.meta_data_train_processed_path, 
                                                                            nrow_ind=train_nrow)
    # CALCULATE WEIGHTS HERE 
    labels_df = pd.DataFrame(train_labels,columns=config.response_variables)
    total = labels_df.sum(axis=0).sum()
    inverse_props = 1/labels_df.sum(axis=0).div(total)
    normalized_inverse_props = inverse_props.div(inverse_props.sum())
    inverse_class_weights=np.array(normalized_inverse_props)
    
    # Validation Dataset
    val_filenames, val_labels  = preprocessing.get_filenames_and_labels(config.meta_data_val_processed_path,
                                                                            nrow_ind=val_nrow)
    
    model = build_model(model_layers, output_dir=output_dir, inverse_class_weights=inverse_class_weights, loss_class_weighted=loss_class_weighted)

    print(f'Number of Training Records:{len(train_filenames)}')
    print(f'Number of Validation Records:{len(val_filenames)}')
    
    #create training and validation data
    train_ds = preprocessing.load_dataset(train_filenames, train_labels, config.BATCH_SIZE, model_type)
    val_ds = preprocessing.load_dataset(val_filenames, val_labels, config.VAL_BATCH_SIZE, model_type,training=False)
    
    callbacks = []
    if output_dir:
        tensorboard_callback = TensorBoard(log_dir=output_dir)
        callbacks = [tensorboard_callback]
    
    history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=num_epochs,
        steps_per_epoch=steps_per_epoch,
        verbose=2,
        callbacks=callbacks)
    
    if output_dir:
        export_path = os.path.join(output_dir, 'keras_export')
        model.save(export_path, save_format='tf')
    
    return history


Overwriting xray_models/trainer_cloud/model.py


# Running Model

In [18]:
import os
from datetime import datetime
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
#model_type = 'inception_resnet'
model_type = 'cnn'

os.environ["MODEL_TYPE"] = model_type
os.environ["JOB_DIR"] = f"xray_models/models/{model_type}_{current_time}/"

In [19]:
%%bash
python3 -m xray_models.trainer_cloud.task \
    --job-dir=$JOB_DIR \
    --epochs=5 \
    --steps_per_epoch=100 \
    --model_type=$MODEL_TYPE\
    --loss_class_weighted

Training with Arguments:Namespace(epoch=5, job_dir='xray_models/', loss_class_weighted=True, model_type='cnn', nrows=None, steps_per_epoch=100)
Number of Training Records:68973
Number of Validation Records:17551
Epoch 1/5
100/100 - 328s - loss: 0.4862 - recall: 0.6182 - precision: 0.1861 - accuracy: 0.2650 - val_loss: 0.3988 - val_recall: 0.6388 - val_precision: 0.3291 - val_accuracy: 0.5720
Epoch 2/5
100/100 - 307s - loss: 0.5682 - recall: 0.6940 - precision: 0.2283 - accuracy: 0.5130 - val_loss: 0.3955 - val_recall: 0.6909 - val_precision: 0.2787 - val_accuracy: 0.5803
Epoch 3/5
100/100 - 296s - loss: 0.4616 - recall: 0.7194 - precision: 0.2547 - accuracy: 0.6190 - val_loss: 0.3925 - val_recall: 0.6128 - val_precision: 0.3708 - val_accuracy: 0.5803
Epoch 4/5
100/100 - 286s - loss: 0.3577 - recall: 0.7055 - precision: 0.2701 - accuracy: 0.6490 - val_loss: 0.3806 - val_recall: 0.5626 - val_precision: 0.3401 - val_accuracy: 0.5803
Epoch 5/5
100/100 - 289s - loss: 0.4113 - recall: 0.7154

2021-11-03 21:15:53.448695: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-11-03 21:15:53.458003: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-11-03 21:15:53.458701: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-11-03 21:15:53.459597: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags

# Uploading Model to the Cloud for Training

In [45]:
%%writefile /home/jupyter/nlp-xray/code/xray_models/trainer_cloud/setup.py
from setuptools import find_packages
from setuptools import setup

setup(
    name='chest_xray',
    version='0.1',
    packages=find_packages(),
    include_package_data=True,
    description='Using vision to detect diseases present in chest x-rays'
)

Overwriting /home/jupyter/nlp-xray/code/xray_models/trainer_cloud/setup.py


In [46]:
%%bash 
cd xray_models/trainer_cloud
python ./setup.py sdist --formats=gztar
ls
gsutil cp ./dist/chest_xray-0.1.tar.gz gs://${BUCKET}/cloud_training/

running sdist
running egg_info
writing chest_xray.egg-info/PKG-INFO
writing dependency_links to chest_xray.egg-info/dependency_links.txt
writing top-level names to chest_xray.egg-info/top_level.txt
reading manifest file 'chest_xray.egg-info/SOURCES.txt'
writing manifest file 'chest_xray.egg-info/SOURCES.txt'
running check
creating chest_xray-0.1
creating chest_xray-0.1/chest_xray.egg-info
creating chest_xray-0.1/config
creating chest_xray-0.1/trainer
creating chest_xray-0.1/trainer/config
copying files to chest_xray-0.1...
copying README.md -> chest_xray-0.1
copying setup.py -> chest_xray-0.1
copying chest_xray.egg-info/PKG-INFO -> chest_xray-0.1/chest_xray.egg-info
copying chest_xray.egg-info/SOURCES.txt -> chest_xray-0.1/chest_xray.egg-info
copying chest_xray.egg-info/dependency_links.txt -> chest_xray-0.1/chest_xray.egg-info
copying chest_xray.egg-info/top_level.txt -> chest_xray-0.1/chest_xray.egg-info
copying config/__init__.py -> chest_xray-0.1/config
copying config/config.py -> 



Copying file://./dist/chest_xray-0.1.tar.gz [Content-Type=application/x-tar]...
/ [1 files][  4.6 KiB/  4.6 KiB]                                                
Operation completed over 1 objects/4.6 KiB.                                      


In [47]:
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
model_type = 'cnn'

os.environ["MODEL_TYPE"] = model_type
os.environ["JOB_DIR"] = "gs://{}/chest_xray_{}_{}/".format(
    BUCKET, model_type, current_time)
os.environ["JOB_NAME"] = "chest_xray_{}_{}".format(
    model_type, current_time)

In [48]:
os.environ["JOB_DIR"]

'gs://nlp-xray-dataset/chest_xray_cnn_20211103_170709/'

In [49]:
%%bash
echo $JOB_DIR $REGION $JOB_NAME

PYTHON_PACKAGE_URIS=gs://${BUCKET}/cloud_training/chest_xray-0.1.tar.gz
MACHINE_TYPE=n1-highcpu-16
REPLICA_COUNT=1
PYTHON_PACKAGE_EXECUTOR_IMAGE_URI="us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-6:latest"
PYTHON_MODULE=trainer.task
    
WORKER_POOL_SPEC="machine-type=$MACHINE_TYPE,\
replica-count=$REPLICA_COUNT,\
executor-image-uri=$PYTHON_PACKAGE_EXECUTOR_IMAGE_URI,\
accelerator-type=NVIDIA_TESLA_K80,\
accelerator-count=2,\
python-module=$PYTHON_MODULE"

gcloud ai custom-jobs create \
  --region=${REGION} \
  --display-name=$JOB_NAME \
  --python-package-uris=$PYTHON_PACKAGE_URIS \
  --worker-pool-spec=$WORKER_POOL_SPEC \
  --args="--job-dir=$JOB_DIR,--model_type=$MODEL_TYPE --nrows=2000"

gs://nlp-xray-dataset/chest_xray_cnn_20211103_170709/ us-central1 chest_xray_cnn_20211103_170709


Using endpoint [https://us-central1-aiplatform.googleapis.com/]
ERROR: (gcloud.ai.custom-jobs.create) INVALID_ARGUMENT: List of found errors:	1.Field: job_spec.worker_pool_specs[0].machine_spec.accelerator_count; Message: Accelerators are not supported for this project.	
- '@type': type.googleapis.com/google.rpc.BadRequest
  fieldViolations:
  - description: Accelerators are not supported for this project.
    field: job_spec.worker_pool_specs[0].machine_spec.accelerator_count


CalledProcessError: Command 'b'echo $JOB_DIR $REGION $JOB_NAME\n\nPYTHON_PACKAGE_URIS=gs://${BUCKET}/cloud_training/chest_xray-0.1.tar.gz\nMACHINE_TYPE=n1-highcpu-16\nREPLICA_COUNT=1\nPYTHON_PACKAGE_EXECUTOR_IMAGE_URI="us-docker.pkg.dev/vertex-ai/training/tf-cpu.2-6:latest"\nPYTHON_MODULE=trainer.task\n    \nWORKER_POOL_SPEC="machine-type=$MACHINE_TYPE,\\\nreplica-count=$REPLICA_COUNT,\\\nexecutor-image-uri=$PYTHON_PACKAGE_EXECUTOR_IMAGE_URI,\\\naccelerator-type=NVIDIA_TESLA_K80,\\\naccelerator-count=2,\\\npython-module=$PYTHON_MODULE"\n\ngcloud ai custom-jobs create \\\n  --region=${REGION} \\\n  --display-name=$JOB_NAME \\\n  --python-package-uris=$PYTHON_PACKAGE_URIS \\\n  --worker-pool-spec=$WORKER_POOL_SPEC \\\n  --args="--job-dir=$JOB_DIR,--model_type=$MODEL_TYPE --nrows=2000"\n'' returned non-zero exit status 1.

In [None]:
%%bash
SAVEDMODEL_DIR=${JOB_DIR}keras_export
echo $SAVEDMODEL_DIR
gsutil ls $SAVEDMODEL_DIR

# Deploying and Predicting with the Model