In [10]:
import datetime
import os
import sys
import subprocess
import kfp
import kfp.components as comp
import kfp.dsl as dsl
import kfp.v2.dsl

import kfp_tekton
from kfp_tekton import k8s_client_helper

from kubernetes.client.models import V1EnvVar

In [11]:
PIPELINE_NAME = "tf_spacenet_cv_classify_pipeline-e426805"
PIPELINE_PACKAGE = f"{PIPELINE_NAME}.zip"
EXPERIMENT_NAME = "RMS ET Intern Project"
PERSISTENT_VOLUME_CLAIM_NAME = "rms-et-sn-pvc"

KUBEFLOW_PUBLIC_ENDPOINT_URL = "https://kubeflow.apps.pcell.ai.us.lmco.com"
SESSION_COOKIE = "authservice_session=MTY1ODI1NDM2NXxOd3dBTkZOUVNrWlBOakkyVEZGYVJrODFVVlZOUlVKUldsRktUek5JUjBsUFNqUllTMDlMUkVaVVFWQkxRbFJLVEZrMFFWbEZOMEU9fE7AOJaxj2kgkesMohGGCXqQKN6pQzrkJYEkzHuciy5u"
KUBEFLOW_PROFILE_NAME = "kf-rms-et-intern-project-17"

In [12]:
@kfp.dsl.component
def view_data():
    import json
    import os
    import logging
    
    DATASET_PATH = "/data/V1/sn1_AOI_1_RIO/sn1_AOI_1_RIO/sn1_AOI_1_RIO/"
    
    PATH_TO_COLLECTION_FILE=os.path.join(DATASET_PATH, 'collection.json')

    with open(PATH_TO_COLLECTION_FILE, 'r') as f:
        collection_json = json.load(f)

    print(f"collection_json top level keys: {collection_json.keys()}\n")
    print(f"description: {collection_json['description']}")
    
    def limit_string(s, chars=200):
        s = str(s)
        len_to_print = max(min(len(s), chars), 1)
        return s[0:len_to_print]
    
    [print(f"{k} : {limit_string(collection_json[k])}") for k in collection_json.keys()]

In [13]:
view_data_op = comp.func_to_container_op(
    view_data,
    base_image="harbor.ai.us.lmco.com/ai-factory-local/internal/aws-cli:1.20.58-debian-10-r2-0",
)


In [14]:
@kfp.dsl.component
def predict():
    import datetime
    import json
    import os
    import math
    import random
    
    # for dataset
    import torch
    import numpy as np

    # Python Image Library can 
    # load tiffs and convert to numpy arrays
    from PIL import Image
    from IPython.display import display

    # for mask creation based on geospatial data
    import fiona
    import rioxarray
    import geojson

    from pathlib import Path
    # import UNet Model Configuration, UNet Vision Model Configuration, and the Vision Model
    import classification as lmclassification
    from classification.data_types.network_callbacks.checkpoint_callback import CheckPointCallback as CheckpointCallback
    from classification.network_models.configurations.unet_model_configuration import UNetModelConfiguration as ModelConfiguration
    from classification.vision.models.configurations.unet_vision_model_configuration import UNetVisionModelConfiguration as VisionModelConfiguration
    from classification.vision.models.unet_vision_model import UNetVisionModel as Model
    
    import matplotlib.pyplot as plt
    import matplotlib.patches as patches
    import matplotlib.colors as colors
    
    class SpaceNetDataset(torch.utils.data.Dataset):
        """
        A torch.utils.data.Dataset wrapping the SpaceNet Building Detection V1 dataset
        """

        def __init__(self, path_to_collection, *args, **kwargs):
            """
            Construct the dataset

            Provide any information required to allow subsequent operation
            """

            if not isinstance(path_to_collection, Path):
                path_to_collection = Path(path_to_collection)

            self.path_to_collection = path_to_collection
            self.collection_dir = os.path.dirname(path_to_collection)

            with open(self.path_to_collection, 'r') as f:
                self.collection = json.load(f)

            # self.links holds all the top level folders
            # filter out the ones that end with -labels since we know 
            # there will be a corresponding non-labels folder
            self.links = [x["href"] for x in self.collection["links"] if not os.path.dirname(x["href"]).endswith('-labels') ]

        def __len__(self) -> int:
            """
            Returns:
                The length of the whole dataset to load
            """
            return len(self.links)

        def __getitem__(self, idx) -> Tuple[np.ndarray, Dict[str, np.ndarray]]:
            """
            Given an index, provide the training example at that index

            To satisfy the CCM return a tuple of 2 elements:
                The first element is a numpy np.ndarray, 32-bit or 64-bit float, with shape (channels, pixel height, pixel width)
                The second element is a dictionary.
                    The only required key is 'masks'
                    The 'masks' key must map to a numpy np.ndarray, 64-bit integers, with shape (classes, pixel height, pixel width)
                    In this case the classes are background and building (just 2 classes)
                    For the background class, pixel locations that are background should be set to 1, otherwise 0
                    For the building class, pixel locations that are buildings should be set to 1, otherwise 0
                    Image values in the numpy ndarray should be normalized to a range from 0.0 to 1.0

            Other requirements:
                Image Width must be the same as Segmentation Mask Width
                Image Height must be the same as Segmentation Mask Height

                If no features are present. the mask should still exist but take on default values, eg
                    background class all ones
                    building class all zeros
                    other constraints are still enforced

            Args:
                index into the dataset to sample

            Returns:
                The dataset example at a given index in the form of a tuple of
                np.ndarray and dictionary of string key mapped to np.ndarrays
            """

            # index into folders
            example_file = self.links[idx]

            # construct the known corresponding label folder name
            label_folder = f"{os.path.dirname(example_file)}-labels"

            # construct the relative path to the "stac.json file"
            label_file = os.path.join(label_folder, 'stac.json')

            with open(os.path.join(self.collection_dir, example_file), 'r') as f:
                example_json = json.load(f)

            # construct the path to the image file
            image_file = os.path.join(self.collection_dir, os.path.dirname(example_file), example_json["assets"]["RGB"]["href"])

            with open(os.path.join(self.collection_dir, label_file), 'r') as f:
                labels_json = json.load(f)

            # construct the paths to the labels file
            label_file = os.path.join(self.collection_dir, label_folder, labels_json["assets"]["labels"]["href"])

            # load the .tif image from disk
            pil_img = Image.open(image_file)

            # convert the PIL image to a numpy array
            img = np.array(pil_img)

            # normalize from 0.0 to 1.0
            img = (img - img.min()) / (img.max() - img.min())

            # open with fiona to get features for mask - this represnts the buildings
            with fiona.open(label_file, 'r') as f:
                features = [feature["geometry"] for feature in f]

            # create the rasterizer object based on the image data
            rds = rioxarray.open_rasterio(image_file).isel(band=0)

            # get the geojson data
            with open(label_file) as igj:
                data = geojson.load(igj)

            # binary crossentropy requires masks of shape (2, height, width)
            # the first channel represnts classes; 0=background, 1=building
            mask = np.zeros((2, *img.shape[0:2])).astype(np.int64)
            mask[0,:,:] = np.ones((1, *img.shape[0:2])) # background by default

            # if this training/validation example has features (building masks)
            if len(features) > 0:

                # get the mask - binary cross entropy
                rds = rds.rio.clip(features, data["crs"]["properties"]["name"], drop=False)
                nprds = np.array(rds)

                mask[0, :, :] = np.where((nprds)>0.5, 0, mask[0, :, :])
                mask[1, :, :] = np.where(nprds>0.5, 1, mask[1, :, :])

            # if the example did not have features, leave the images and masks as they were

            # channels, height, width
            img = np.transpose(img,  (2, 0, 1))

            # clip both masks and images to ensure they are the same size
            # otherwise CCM / Most Conv networks will not accept
            min_height = min(img.shape[1], mask.shape[1])
            min_width  = min(img.shape[2], mask.shape[2])
            img = img[:, :min_height, :min_width]
            mask = mask[:, :min_height, :min_width]
            mask = np.expand_dims(np.argmax(mask, axis=0), axis=0)

            # CCM vision models expect targets to be in a dictionary 
            # other keys for other types of computer vision problems include
            # 'boxes' and 'labels'. UNet does not require those
            return img, {'masks': mask}
    
    # Instantiate the Dataloader
    DATASET_PATH = "/data/V1/sn1_AOI_1_RIO/sn1_AOI_1_RIO/sn1_AOI_1_RIO/"
    PATH_TO_COLLECTION_FILE=os.path.join(DATASET_PATH, 'collection.json')
    dataset = SpaceNetDataset(PATH_TO_COLLECTION_FILE)
    
    TRAIN_SPLIT = 0.7
    TEST_SPLIT = 0.3

    print(f"TRAINING SPLIT STARTED")
    
    test_dataset, train_dataset = torch.utils.data.random_split(
        dataset, 
        [math.floor(TEST_SPLIT*len(dataset)), math.ceil(TRAIN_SPLIT*len(dataset)) ],
        generator=torch.Generator().manual_seed(1234)
    )
    
    print(f"TRAINING SPLIT FINISHED")
    
    # instantiate UNet Model Configuration
    config = ModelConfiguration()
    
    # instantiate UNet Vision Model Configuration
    model_config = VisionModelConfiguration(
        input_shape=(480,480),
        n_classes=2,
        input_channels=3,
        batch_size=6,
        learning_rate=1e-4,
        validation_split=0.3,
        training_epochs=11,
        layers_model_configuration=config,
    )
    # ensure we allow for the background class and building class
    # RGB    
    # during testing x% of the training dataset will be reserved for validation


    # Instantiate UNet
    model = Model(model_config)

        # if using pretrained weights, Images should be renormalized according to pytorch's specification:
        #    torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406],
        #                                     std=[0.229, 0.224, 0.225])
        # see https://gitlab.us.lmco.com/overwatch/cognitive-modules/classification/-/blob/master/examples/jupyter_notebooks/supervised_examples/deep_learning_examples/inception_v3_classifier_example.ipynb
        #         Create Classes and Labels from Dataset 
   
    nowstr = datetime.datetime.now().strftime('%m%d%Y_%H%M%S')
    model_path = f'./mnt/space_net_models/trained_spacenet_{nowstr}.pt'
    print(f"training new model: {model_path}")
    model_save_path = model_path
    checkpoint_path = './mnt/space_net_models/'
    os.makedirs(os.path.dirname(model_path), exist_ok=True)

    # train the model!
    results = model.fit(train_dataset, verbose=True, callbacks=[CheckpointCallback(checkpoint_path)])

    # save the trained model for the epochs configured
    model.save_model(model_save_path)
    print(f"saving model to: {model_save_path} \n") 
    
    # load the trained model
    model.load_model(model_save_path)
    
    # load a test example
    img, mask = test_dataset[ random.randint(0, len(test_dataset)) ]
    
    # run model inference
    results = model.infer([img])
    
    result_mask = results[0]['masks']

    result_mask = (result_mask - result_mask.min()) / (result_mask.max()-result_mask.min())

    print("******** Inference Results ********")
    print(f"Overall minimum prediction confidence = {result_mask.min()}")
    print(f"Overall maximum prediction confidence = {result_mask.max()}")
    print(f"Overall mean prediction confidence = {result_mask.mean()} \n")
    
    print(f"Minimum Building prediction confidence = {result_mask[1,:,:].min()}")
    print(f"Maximum Building prediction confidence = {result_mask[1,:,:].max()}")
    print(f"Mean Building prediction confidence = {result_mask[1,:,:].mean()} \n")
    
    print(f"Minimum Background prediction confidence = {result_mask[0,:,:].min()}")
    print(f"Maximum Background prediction confidence = {result_mask[0,:,:].max()}")
    print(f"Mean Background prediction confidence = {result_mask[0,:,:].mean()}")

In [16]:
predict_op = comp.func_to_container_op(
    predict,
    base_image="harbor.ai.us.lmco.com/ai-factory-local/pytorch:1.8.1-cuda11.1-cudnn8-runtime-4",
    packages_to_install=['urllib3==1.25.8','fiona','numpy','Pillow','geojson', 'rioxarray', 'lm-ai-classification', 'matplotlib', 'sklearn']
)


In [17]:
@dsl.pipeline(
    name=PIPELINE_NAME,
    description=EXPERIMENT_NAME
)
def rms_et_basic_pipeline():
    view_data_r = (
        view_data_op()
        .add_pvolumes({"/data": dsl.PipelineVolume(pvc="spacenet-data")})
    )
    predict_r = (
        predict_op()
        .set_gpu_limit(8)
        .add_env_variable(k8s_client_helper.env_from_secret("PIP_EXTRA_INDEX_URL", "nexus-config", "PIP_EXTRA_INDEX_URL"))
        .add_env_variable(k8s_client_helper.env_from_secret("PIP_INDEX_URL", "nexus-config", "PIP_INDEX_URL"))
        .add_pvolumes({"/data": dsl.PipelineVolume(pvc="spacenet-data")})
        .after(view_data_r)
    )

In [18]:
client = kfp_tekton.TektonClient(
    host=f"{KUBEFLOW_PUBLIC_ENDPOINT_URL}/pipeline",
    ssl_ca_cert="/etc/ssl/certs/ca-certificates.crt",
    cookies=SESSION_COOKIE
)

try:
    experiment = client.get_experiment(experiment_name=EXPERIMENT_NAME)
except ValueError:
    experiment = client.create_experiment(EXPERIMENT_NAME)

kfp_tekton.compiler._op_to_template.RESOURCE_OP_IMAGE='lmregistry.us.lmco.com/ext.hub.docker.com/aipipeline/kubectl-wrapper:0.8.0'
kfp_tekton.compiler.TektonCompiler().compile(rms_et_basic_pipeline, PIPELINE_PACKAGE)
pipeline_id = client.get_pipeline_id(PIPELINE_NAME)

if not pipeline_id:
    # upload the package to Kubeflow
    r = client.upload_pipeline(PIPELINE_PACKAGE, pipeline_name=PIPELINE_NAME)

    # update the pipeline id
    pipeline_id = r.id

now = datetime.datetime.now().isoformat()
pipeline_versionName = f"{PIPELINE_NAME}_version_at_{now}Z"
version = client.pipeline_uploads.upload_pipeline_version(uploadfile=PIPELINE_PACKAGE, name=pipeline_versionName, pipelineid=pipeline_id)

client.run_pipeline(experiment.id, f"{PIPELINE_NAME} {now}", pipeline_id=pipeline_id, version_id=version.id)

{'created_at': datetime.datetime(2022, 7, 20, 14, 40, 15, tzinfo=tzlocal()),
 'description': None,
 'error': None,
 'finished_at': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=tzlocal()),
 'id': 'b603b3b1-7170-4efa-bf87-828a45a91e3f',
 'metrics': None,
 'name': 'tf_spacenet_cv_classify_pipeline-e426805 2022-07-20T14:40:15.661785',
 'pipeline_spec': {'parameters': None,
                   'pipeline_id': 'c4011637-61c2-4e2d-b0e0-686480df65cf',
                   'pipeline_manifest': None,
                   'pipeline_name': 'tf_spacenet_cv_classify_pipeline-e426805',
                   'runtime_config': None,
                   'workflow_manifest': '{"kind":"PipelineRun","apiVersion":"tekton.dev/v1beta1","metadata":{"name":"tf-spacenet-cv-classify-pipeline-e426805","creationTimestamp":null,"annotations":{"pipelines.kubeflow.org/big_data_passing_format":"$(workspaces.$TASK_NAME.path)/artifacts/$ORIG_PR_NAME/$TASKRUN_NAME/$TASK_PARAM_NAME","pipelines.kubeflow.org/pipeline_spec":"{\\"descript