<img src="img/hpe_logo.png" alt="HPE Logo" width="125">

# HPE ML Platform Workshop - Model Training

<img src='img/platform_step02_training.png' width='1200'/>

In [1]:
# !pip install --quiet scikit-learn scikit-image pachyderm-sdk

In [2]:
import os
import glob
import shutil
import random
import numpy as np
import pandas as pd
from tqdm.auto import tqdm
from configparser import ConfigParser
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score

# Torch modules
import torch
from torch import nn
from torch import optim
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torchvision.transforms.functional as TF

# Image modules
from PIL import Image
import matplotlib.pyplot as plt
%matplotlib inline

# Import functions for downloading data
from load_data import download_pach_repo, download_data, get_train_transforms

# Import model util functions
from model_utils import set_seed, plot_example, show_sample, plot_predictions, PairedRandomHorizontalFlip, PairedRandomAffine, PairedToTensor, DoubleConv,  InConv, Down, Up, OutConv, UNet

# Import MLDE packages
from determined.experimental import client as det
from determined import pytorch

# Import MLDM packages
import pachyderm_sdk
from pachyderm_sdk.api import pfs
from pachyderm_sdk.api.pfs import File, FileType

# Remove warnings
import warnings
warnings.filterwarnings('ignore')

  from .autonotebook import tqdm as notebook_tqdm


In [10]:
# Load Data and Set Variables
config_obj = ConfigParser()
config_obj.read("config.ini")

mldm_host = config_obj['PDK_INFO']['mldm_host']
mldm_port = config_obj['PDK_INFO']['mldm_port']
mlde_host = config_obj['PDK_INFO']['mlde_host']
mlde_port = config_obj['PDK_INFO']['mlde_port']
token = config_obj['PDK_INFO']['token']
repo = config_obj['PDK_INFO']['repo']
branch = config_obj['PDK_INFO']['branch']
project = config_obj['PDK_INFO']['project']
download_dir = config_obj['PDK_INFO']['download_dir']

<h2>Part 1: Processing, Loading and Analyzing Data </h2>

<img src='img/platform_step01_data.png' width='1200'/>

In [11]:
# Connect to Instance
mldm_client = pachyderm_sdk.Client(mldm_host, mldm_port, token)

In [12]:
# List Files in the Repository
files = []
c_file = 0
c_mask = 0
c_folder = 0

for file_info in mldm_client.pfs.walk_file(file=File.from_uri(f"{project}/{repo}@{branch}")):
    f_path = file_info.file.path
    print(f"'{f_path}'")
    if "_mask.tif" in f_path:
        c_mask += 1
    elif ".tif" in f_path:
        c_file += 1
    else:
        c_folder += 1
c_folder -= 2

ERROR:root:Exception in callback <function MetadataClientInterceptor.intercept.<locals>.<lambda> at 0x7f30180aae50>: ConnectionError('Could not connect to pachyderm instance\n')


_MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses; last error: INTERNAL: ipv4:34.71.146.200:80: Trying to connect an http1.x server"
	debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: INTERNAL: ipv4:34.71.146.200:80: Trying to connect an http1.x server {grpc_status:14, created_time:"2023-11-29T00:13:36.766971948+00:00"}"
>

In [None]:
# Download Files for local exploration
# files = download_data(mldm_client, repo, branch, project, download_dir)

In [None]:
ROOT = f"{download_dir}/data1"
ROOT

<h3> Data Exploration </h3>

In [None]:
class Config:   
    # data preprocessing
    data_dir = ROOT
    logdir = 'logdir'
    validation_fraction = 0.15
    test_fraction = 0.10
    train_batch = 16
    valid_batch = 32
    test_batch = 32
    
    # model setup
    input_dim = 256
    input_ch = 3
    output_dim = 256
    output_ch = 1
    
    # training
    seed = 21
    learning_rate = 0.01
    epochs = 10
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
set_seed(Config.seed)

In [None]:
dirs, images, masks = [], [], []
for root, folders, files in os.walk(Config.data_dir):
    for file in files:
        # save only images with corresponding masks
        if 'mask'in file:
            dirs.append(root.replace(Config.data_dir, ''))
            masks.append(file)
            images.append(file.replace('_mask', ''))

PathDF = pd.DataFrame({'directory': dirs, 'images': images, 'masks': masks})
PathDF.head()

In [None]:
train2rest = Config.validation_fraction + Config.test_fraction
test2valid = Config.validation_fraction/train2rest

train_df, rest = train_test_split(
    PathDF, random_state=Config.seed,
    test_size=train2rest
)

test_df, valid_df = train_test_split(
    rest, random_state=Config.seed,
    test_size=test2valid
)

print('Train:', train_df.shape[0])
print('Valid:', valid_df.shape[0])
print('Test:', test_df.shape[0])

In [None]:
plot_example(Config.data_dir, 2, test_df)
plot_example(Config.data_dir, 3, test_df)
plot_example(Config.data_dir, 14, test_df)
plot_example(Config.data_dir, 16, test_df)

In [None]:
class MRI_Dataset(Dataset):
    def __init__(self, path_df, transform=None):
        self.path_df = path_df
        self.transform = transform

    def __len__(self):
        return self.path_df.shape[0]

    def __getitem__(self, idx):
        base_path = Config.data_dir + '/' +  self.path_df.iloc[idx]['directory']
        img_path = os.path.join(base_path, self.path_df.iloc[idx]['images'])
        mask_path = os.path.join(base_path, self.path_df.iloc[idx]['masks'])

        image = Image.open(img_path)
        mask = Image.open(mask_path)

        sample = (image, mask)
        # apply the same transform on both image and a mask
        if self.transform:
            sample = self.transform(sample)

        return sample

In [None]:
dataset = MRI_Dataset(test_df)

In [None]:
train_transforms = transforms.Compose([                       
    PairedRandomHorizontalFlip(), 
    PairedRandomAffine(
        degrees=(-15, 15),
        translate=(0.1, 0.1),
        scale_ranges=(0.8, 1.2)
    ),
    PairedToTensor()
])
eval_transforms = PairedToTensor()

train_data = MRI_Dataset(train_df, transform=train_transforms)
valid_data = MRI_Dataset(valid_df, transform=eval_transforms)
test_data = MRI_Dataset(test_df, transform=eval_transforms)

train_loader = DataLoader(train_data, batch_size=Config.train_batch, shuffle=True, num_workers=2)
valid_loader = DataLoader(valid_data, batch_size=Config.valid_batch, shuffle=False, num_workers=2)
test_loader = DataLoader(test_data, batch_size=Config.test_batch, shuffle=False, num_workers=2)

In [None]:
def train_loop(model, optimizer, criterion, train_loader, device=Config.device):
    running_loss = 0
    model.train()
    pbar = tqdm(train_loader, desc='Iterating over train data')
    for imgs, masks in pbar:
        # pass to device
        imgs = imgs.to(device)
        masks = masks.to(device)
        # forward
        out = model(imgs)
        loss = criterion(out, masks)
        running_loss += loss.item()*imgs.shape[0] 
        # optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    running_loss /= len(train_loader.sampler)
    return running_loss

In [None]:
def eval_loop(model, criterion, eval_loader, device=Config.device):
    running_loss = 0
    model.eval()
    with torch.no_grad():
        accuracy, f1_scores = [], []
        pbar = tqdm(eval_loader, desc='Iterating over evaluation data')
        for imgs, masks in pbar:
            # pass to device
            imgs = imgs.to(device)
            masks = masks.to(device)
            # forward
            out = model(imgs)
            loss = criterion(out, masks)
            running_loss += loss.item()*imgs.shape[0]
            # calculate predictions using output
            predicted = (out > 0.5).float()
            predicted = predicted.view(-1).cpu().numpy()
            labels = masks.view(-1).cpu().numpy()
            accuracy.append(accuracy_score(labels, predicted))
            f1_scores.append(f1_score(labels, predicted))
    acc = sum(accuracy)/len(accuracy)
    f1 = sum(f1_scores)/len(f1_scores)
    running_loss /= len(eval_loader.sampler)
    return {
        'accuracy':acc,
        'f1_macro':f1, 
        'loss':running_loss}

In [None]:
def train(model, optimizer, criterion, train_loader, valid_loader,
          device=Config.device, 
          num_epochs=Config.epochs, 
          valid_loss_min=np.inf):
    
    for e in range(num_epochs):
        # train for epoch
        train_loss = train_loop(
            model, optimizer, criterion, train_loader, device=device)
        # evaluate on validation set
        metrics = eval_loop(
            model, criterion, valid_loader, device=device
        )
        # show progress
        print_string = f'Epoch: {e+1} '
        print_string+= f'TrainLoss: {train_loss:.5f} '
        print_string+= f'ValidLoss: {metrics["loss"]:.5f} '
        print_string+= f'ACC: {metrics["accuracy"]:.5f} '
        print_string+= f'F1: {metrics["f1_macro"]:.3f}'
        print(print_string)

        # save the model 
        if metrics["loss"] <= valid_loss_min:
            torch.save(model.state_dict(), 'UNet.pt')
            valid_loss_min = metrics["loss"]

In [None]:
set_seed(Config.seed)
model = UNet(Config.input_ch, Config.output_ch).to(Config.device)
optimizer = optim.Adam(model.parameters(), lr=Config.learning_rate)
criterion = nn.BCELoss()
train(model, optimizer, criterion, train_loader, valid_loader)

In [None]:
plot_predictions(Config.data_dir, model, 2, Config.device, test_df, eval_transforms)
plot_predictions(Config.data_dir, model, 3, Config.device, test_df, eval_transforms)
plot_predictions(Config.data_dir, model, 14, Config.device, test_df, eval_transforms)
plot_predictions(Config.data_dir, model, 16, Config.device, test_df, eval_transforms)

<h2> Part 2: Training at Scale </h2>

<img src='img/platform_step02_training.png' width='1200'/>

In [None]:
# Because this is a hosted environment, the connection is done automatically through environment variables
# mlde_url = f"{mlde_host}:{mlde_port}"
# det.login(mlde_url, "admin", mlde_token)

In [None]:
# Create experiment using yaml file
exp = det.create_experiment(config="./experiment/const.yaml", model_dir="./experiment/")
print(f"started experiment {exp.id}")

In [None]:
# Wait for experiment to complete and print exit status
exit_status = exp.wait()
print(f"experiment completed with status {exit_status}")

In [None]:
# Get the best Checkpoint of the experiment and print uuid
best_checkpoint = exp.top_checkpoint()
best_checkpoint_uuid = best_checkpoint.uuid
print(f"Best checkpoint was {best_checkpoint_uuid}")

In [None]:
experiment_id = exp.id

In [None]:
checkpoint = det.get_experiment(experiment_id).top_checkpoint()
path = checkpoint.download()
mlde_model = pytorch.load_trial_from_checkpoint_path(path).model

In [None]:
import torchvision

In [None]:
def plot_new_predictions(data_dir, model, idx, device, test_df, eval_transforms):
    base_path = data_dir + '/' +  test_df['directory'].iloc[idx]
    img_path = os.path.join(base_path, test_df['images'].iloc[idx])
    mask_path = os.path.join(base_path, test_df['masks'].iloc[idx])
    
    size = 256
    shape = [1, 256, 256]
    
    img = Image.open(img_path)
    transforms = [torchvision.transforms.ToTensor()]
    try:
        width, height = size
    except TypeError:
        width = height = size
    scale = min(width / img.width, height / img.height)
    new_width, new_height = int(img.width * scale), int(img.height * scale)
    diff_width, diff_height = width - new_width, height - new_height
    resize = torchvision.transforms.Resize(size=(new_height, new_width))
    pad = torchvision.transforms.Pad(
        padding=(
            diff_width // 2,
            diff_height // 2,
            diff_width // 2 + diff_width % 2,
            diff_height // 2 + diff_height % 2,
        )
    )
    transforms = [resize, pad] + transforms
    transformation = torchvision.transforms.Compose(transforms)
    x = transformation(img)
    x = torch.stack([x], 0)
    
    mask = Image.open(mask_path)

    preds = model(x)
  
    pred_values = torch.tensor(preds[0], requires_grad=True)
    pred_values = pred_values.detach().numpy()
    pred_output = torch.Tensor(np.array(pred_values).reshape(shape))    

    plot_images = {'Image': img, 
                   'Mask': mask, 
                   'Predicted Mask': pred_output.permute(1, 2, 0)}

    fig, ax = plt.subplots(1, 3, figsize=(16,4))
    for i, key in enumerate(plot_images.keys()):
        ax[i].imshow(plot_images[key])
        ax[i].set_title(key)
    plt.show()

In [None]:
plot_new_predictions(Config.data_dir, mlde_model, 2, Config.device, test_df, eval_transforms)
plot_new_predictions(Config.data_dir, mlde_model, 3, Config.device, test_df, eval_transforms)
plot_new_predictions(Config.data_dir, mlde_model, 14, Config.device, test_df, eval_transforms)
plot_new_predictions(Config.data_dir, mlde_model, 16, Config.device, test_df, eval_transforms)

In [None]:
# Distributed, Hyperparameter Search Training Experiment

In [None]:
# Create experiment using yaml file
# exp = det.create_experiment(config="./experiment/search.yaml", model_dir="./experiment/")
# print(f"started experiment {exp.id}")

<h2> Part 3: Deploying Models to Production </h2>

<img src='img/platform_step03_deployment.png' width='1200'/>

In [None]:
# Retrieving Predictions from the Production Instance

In [None]:
import json
import base64
import requests
import uuid

In [None]:
model_name = config_obj['PDK_INFO']['model_name']
ingress_host = config_obj['PDK_INFO']['ingress_host']
ingress_port = config_obj['PDK_INFO']['ingress_port']
service_hostname = config_obj['PDK_INFO']['service_hostname']

In [None]:
# Function to caluclate intersection over union of prediction
def iou(pred, label):
    intersection = (pred * label).sum()
    union = pred.sum() + label.sum() - intersection
    if pred.sum() == 0 and label.sum() == 0:
        return 1
    return intersection / union

# Function to create tensor for image and mask
def PairedToTensor(sample):
    img, mask = sample
    img = np.array(img)
    mask = np.expand_dims(mask, -1)
    img = np.moveaxis(img, -1, 0)
    mask = np.moveaxis(mask, -1, 0)
    img, mask = torch.FloatTensor(img), torch.FloatTensor(mask)
    img = img/255
    mask = mask/255
    return img, mask

In [None]:
# Load image and mask
image = Image.open("data/brain_mri/data1/TCGA_CS_6290_20000917/TCGA_CS_6290_20000917_10.tif")
mask = Image.open("data/brain_mri/data1/TCGA_CS_6290_20000917/TCGA_CS_6290_20000917_10_mask.tif")

# Create tuple
sample = (image, mask)

# Create tensors from tuple
tensor_sample = PairedToTensor(sample)

# Create JSON payload for request
data = np.array(tensor_sample[0])
data_shape = list(data.shape)
request = {
    "inputs": [{
        "name": str(uuid.uuid4()),
        "shape": data_shape,
        "datatype": "FP32",
        "data": np.round(data, 4).tolist()
    }]
}

# Show image that will be submitted
plt.figure(figsize=(7,7))
plt.title(f'Submitted Image: ')
plt.imshow(tensor_sample[0].permute(1, 2, 0))

In [None]:
# Create GRPC request for Prediction (header, URL, payload)
url = str("http://") + str(ingress_host) + ":" + str(ingress_port) + "/v1/models/" + str(model_name) + ":predict"
headers = {'Host': service_hostname}
payload = json.dumps(request)

In [None]:
# Submit request, extract prediction in JSON, transform to Tensor
response = requests.post(url, data=payload, headers=headers)
output = response.json()

shape = output["outputs"][0]["shape"]
values = output["outputs"][0]["data"]
output = torch.Tensor(np.array(values).reshape(shape))

In [None]:
# Display groundtruth and prediction mask, call iou function and display iou
f, axarr = plt.subplots(1,2, figsize=(15, 15))
axarr[0].imshow(tensor_sample[1].permute(1, 2, 0), alpha=0.8)
axarr[0].title.set_text(f'Mask (Ground Truth):')
axarr[1].imshow(output.permute(1, 2, 0), alpha=0.8)
axarr[1].title.set_text(f'Mask (Prediction):')
print(f'Intersection over Union (IoU): {iou(output, tensor_sample[1])}')

In [None]:
# Display groundtruth and prediction overlaid on submitted image, call iou function and display iou
f, axarr = plt.subplots(1,2, figsize=(15, 15))
axarr[0].imshow(tensor_sample[0].permute(1, 2, 0))
axarr[0].imshow(tensor_sample[1].permute(1, 2, 0), alpha=0.4)
axarr[0].title.set_text(f'Full Image (Ground Truth):')
axarr[1].imshow(tensor_sample[0].permute(1, 2, 0))
axarr[1].imshow(output.permute(1, 2, 0), alpha=0.4)
axarr[1].title.set_text(f'Full Image (Prediction):')
print(f'Intersection over Union (IoU): {iou(output, tensor_sample[1])}')

<h2> Bringing It All Together </h2>
<img src='img/big_picture.png' width='1200'/>

In [None]:
from datetime import datetime

In [None]:
source_dir = './data/brain_mri/data1/'
name = f"/data{datetime.now().strftime('%Y%m%d%H%M%S')}"
name

In [None]:
def insert_data(source_data_dir):
    commit_branch = pfs.Branch.from_uri(f"{project}/{repo}@{branch}")
    with mldm_client.pfs.commit(branch=commit_branch) as commit:
        commit.put_files(source=source_data_dir, path=name)
    return commit

In [None]:
# Wait for the commits to finish
print("Waiting for commits to finish...")
d_commit = insert_data(source_dir)
print(d_commit)