# Kubeflow Pipeline MNIST Example

In [39]:
import kfp
import kfp.dsl as dsl
from kfp import compiler
from kfp.components import create_component_from_func, OutputPath, InputPath

# Components

In [40]:
def download_datasets(
    train_dataset_path: OutputPath('Dataset'),
    test_dataset_path: OutputPath('Dataset')
):
    import torchvision.datasets as dsets
    import os

    os.makedirs(train_dataset_path)
    dsets.MNIST(root=train_dataset_path, train=True, download=True)

    os.makedirs(test_dataset_path)
    dsets.MNIST(root=test_dataset_path, train=False, download=True)

In [41]:
def explore_datasets(
    train_dataset_path: InputPath('Dataset'),
    test_dataset_path: InputPath('Dataset'),
    mlpipeline_ui_metadata_path: OutputPath()
):
    import torchvision.datasets as dsets
    import json

    train = dsets.MNIST(root=train_dataset_path, train=True, download=False)
    test = dsets.MNIST(root=test_dataset_path, train=False, download=False)

    metadata = {
    'outputs' : [{
      'type': 'table',
      'storage': 'inline',
      'format': 'csv',
      'header': ["Training samples", "Test samples"],
      'source': f"{len(train)}, {len(test)}"
    }]
    }

    with open(mlpipeline_ui_metadata_path, 'w') as metadata_file:
        json.dump(metadata, metadata_file)

In [42]:
def train_resnet_model(
    number_of_epochs: int,
    train_batch_size: int,
    learning_rate: float,
    train_dataset_path: InputPath('Dataset'),
    model_path: OutputPath('Model')   
):
    import torch 
    import torch.nn as nn
    import torchvision.datasets as dsets
    from tqdm import tqdm
    from torchvision.transforms import Compose
    from torchvision.transforms import Normalize
    from torchvision.transforms import Resize
    from torchvision.transforms import ToTensor
    from kubeflow_pipeline_sample.resnet.resnet_50 import ResNet50
    from kubeflow_pipeline_sample.training.trainer import train_model
    
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model = ResNet50(in_channels=1, classes=10).to(device)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate)
    
    IMAGE_SIZE = 64
    
    preprocessing = Compose([
        Resize((IMAGE_SIZE, IMAGE_SIZE)), 
        ToTensor(),
        Normalize(mean=(0.5), std=(0.5))
    ])
    train_dataset_clean = dsets.MNIST(root=train_dataset_path, train=True, download=False, transform=preprocessing)
    train_loader = torch.utils.data.DataLoader(dataset=train_dataset_clean, batch_size=train_batch_size)
        
    losses=train_model(
        model=model,
        train_loader=train_loader,
        criterion=criterion,
        optimizer=optimizer,
        n_epochs=number_of_epochs,
        device=device
    )
    torch.save(model.state_dict(), model_path)

In [43]:
def evaluate_resnet_model(
    test_batch_size: int,
    test_dataset_path: InputPath('Dataset'),
    model_path: InputPath('Model'),
    mlpipeline_metrics_path: OutputPath('Metrics')
):
    import torch
    import torch.nn as nn
    import torchvision.datasets as dsets
    import json
    from kubeflow_pipeline_sample.resnet.resnet_50 import ResNet50
    from kubeflow_pipeline_sample.evaluation.evaluate_accuracy import evaluate_accuracy
    from tqdm import tqdm
    from torchvision.transforms import Compose
    from torchvision.transforms import Normalize
    from torchvision.transforms import Resize
    from torchvision.transforms import ToTensor

    IMAGE_SIZE = 64

    preprocessing = Compose([
        Resize((IMAGE_SIZE, IMAGE_SIZE)), 
        ToTensor(),
        Normalize(mean=(0.5), std=(0.5))
    ])
    test_dataset_clean = dsets.MNIST(root=test_dataset_path, train=False, download=False, transform=preprocessing)
    test_loader = torch.utils.data.DataLoader(dataset=test_dataset_clean, batch_size=test_batch_size)

    model = ResNet50(in_channels=1, classes=10)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    accuracy = evaluate_accuracy(model, test_loader, test_dataset_clean, device)

    metrics = {
        'metrics': [
            {
              'name': 'accuracy',
              'numberValue':  accuracy,
              'format': "PERCENTAGE",
            }
        ]
    }

    with open(mlpipeline_metrics_path, 'w') as metrics_file:
        json.dump(metrics, metrics_file)

In [44]:
check_app_directory_op = create_component_from_func(
    check_app_directory,
    base_image='public.ecr.aws/h3o0w0k1/kubeflow-pipeline-mnist:latest'
)
download_datasets_op = create_component_from_func(
    download_datasets,
    base_image='public.ecr.aws/h3o0w0k1/kubeflow-pipeline-mnist:latest'
)
explore_datasets_op = create_component_from_func(
    explore_datasets,
    base_image='public.ecr.aws/h3o0w0k1/kubeflow-pipeline-mnist:latest'
)
train_resnet_model_op = create_component_from_func(
    train_resnet_model,
    base_image='public.ecr.aws/h3o0w0k1/kubeflow-pipeline-mnist:latest'
)
evaluate_resnet_model_op = create_component_from_func(
    evaluate_resnet_model,
    base_image='public.ecr.aws/h3o0w0k1/kubeflow-pipeline-mnist:latest'
)

# Pipeline

In [45]:
def pipeline(
    number_of_epochs: int=1,
    train_batch_size: int=120,
    test_batch_size: int=120,
    learning_rate: float= 0.1
):
    download_datasets_task = download_datasets_op()
    explore_datasets_task = explore_datasets_op(
       train_dataset=download_datasets_task.outputs["train_dataset"],
       test_dataset=download_datasets_task.outputs["test_dataset"]
    )
    train_resnet_model_task = train_resnet_model_op(
       number_of_epochs=number_of_epochs,
       train_batch_size=train_batch_size,
       learning_rate=learning_rate,
       train_dataset=download_datasets_task.outputs["train_dataset"]
    )
    evaluate_resnet_model_task = evaluate_resnet_model_op(
       test_batch_size=test_batch_size,
       test_dataset=download_datasets_task.outputs["test_dataset"],
       model=train_resnet_model_task.outputs["model"]
    )

# DSL to YAML Compilation

In [46]:
compiler.Compiler(mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY).compile(
        pipeline_func=pipeline,
        package_path='end_to_end_ml_pipeline.yaml',
        type_check=True
    )