In [13]:
! pip install minio



In [19]:
import kfp
import kfp.dsl as dsl
from kfp import components

"""
This code demonstrates an example of two Kubeflow components sharing a path in a volume. In this example, the first component, 
`write_to_volume`, writes a simple message into a file named `file.txt` located in the `/mnt` path of a persistent volume created by `create-pvc`. 
The second component, `read_from_volume`, reads the content of the `file.txt` file from the same path and prints it.

The `dsl.VolumeOp` class creates a persistent volume claim (PVC) named `my-pvc` using `modes=dsl.VOLUME_MODE_RWM` which allows for both read and write operations to the volume.
The `write_to_volume` component uses `add_pvolumes` to specify the `/mnt` path on the volume created by `create-pvc`.
Then `read_from_volume` component uses `add_pvolumes` to mount the same volume at `/mnt` and reads the content of the `file.txt` file written by the previous component.

Finally, the pipeline is compiled using `kfp.compiler.Compiler().compile()` and submitted for execution to Kubeflow using the `Client().run_pipeline()` method. 
This code can serve as a starting point for users who want to create Kubeflow components that share a path in a volume.
"""


# Create component
@components.create_component_from_func
def write_to_volume():
    with open("/mnt/file.txt", "w") as file:
        file.write("Hello world 5")


# Create component
@components.create_component_from_func
def read_from_volume():
    with open("/mnt/file.txt", "r") as file:
        contents = file.read()
        print(f"File contents would be  : {contents}")


# Define pipeline
@dsl.pipeline(name="volumeop-basic", description="A Basic Example on VolumeOp Usage.")
def volumeop_basic(size: str = "1Gi"):
    vop = dsl.VolumeOp(
        name="create-pvc", resource_name="my-pvc", modes=dsl.VOLUME_MODE_RWM, size=size
    )

    write_op = write_to_volume().add_pvolumes({"/mnt": vop.volume})
    read_op = read_from_volume().add_pvolumes({"/mnt": write_op.pvolume})


if __name__ == "__main__":
    # Compile the pipeline
    pipeline_func = volumeop_basic
    pipeline_filename = pipeline_func.__name__ + ".yaml"
    kfp.compiler.Compiler().compile(pipeline_func, pipeline_filename)

    # Connect to the Kubeflow Pipeline and submit the pipeline for execution
    client = kfp.Client()
    experiment_name = "VolumeOp Basic Pipeline"
    experiment = client.create_experiment(experiment_name)
    run_name = "run"
    run = client.run_pipeline(experiment.id, run_name, pipeline_filename, {})

In [11]:
# # Upload data into minio Bucket

# import os
# from minio import Minio


# def upload_tar_to_minio(minio_ip,bucket_name, access_key, secret_key, file_path, object_name):
#     # Connect to MinIO server
#     minio_client = Minio(
#         minio_ip,
#         access_key=access_key,
#         secret_key=secret_key,
#         secure=False  # Change to True if using HTTPS
#     )

#     # Check if the bucket already exists, create it if not
#     if not minio_client.bucket_exists(bucket_name):
#         minio_client.make_bucket(bucket_name)

#     # Upload the .tar file to the bucket
#     minio_client.fput_object(bucket_name, object_name, file_path)

#     print(f"File {object_name} uploaded successfully to bucket {bucket_name}")




# bucket_name = "voc2012-bucket"
# access_key = "Eschercloud_CV_demo"
# secret_key = "urF2FbUWv85dYtjB"
# file_path = "/home/jovyan/VOCtrainval_11-May-2012.tar"
# object_name = "VOCtrainval_11-May-2012.tar"
# minio_ip = "185.47.227.207:9000"

# upload_tar_to_minio(minio_ip,bucket_name, access_key, secret_key, file_path, object_name)


In [1]:
import kfp
from kfp.components import func_to_container_op
from typing import NamedTuple

def download_data_from_minio(ip_address: str, port: int, access_key: str, secret_key: str, bucket_name: str, object_name: str):

    import sys
    import tarfile
    from minio import Minio
    import os


    client = Minio(
        f"{ip_address}:{port}",
        access_key=access_key,
        secret_key=secret_key,
        secure=False
    )
    found = client.bucket_exists(bucket_name)
    if found:
        print("Found the bucket")
        #Download data from MinIO bucket
        client.fget_object(
            bucket_name,
            object_name,
            object_name
        )
        print(f"Downloaded {object_name} from {bucket_name} bucket")
        # create a TarFile object for the specified file
        tar = tarfile.open(object_name)

        # extract all files in the tar file to the current working directory
        tar.extractall("/mnt/")
        directory_path = '/mnt/'

        # Use os.listdir() to get a list of all files and directories in the directory
        contents = os.listdir(directory_path)
        for root, dirs, files in os.walk(directory_path):
            for file in files:
                print(os.path.join(root, file))

        # Print the list of contents
        print(contents)

        # close the TarFile object
        tar.close()
    else:
        print(f"{bucket_name} bucket does not exist.")
        sys.exit(1)



In [6]:
from minio import Minio

import os

def upload_zip_to_minio(zip_file_path, bucket_name, minio_ip, minio_access_key, minio_secret_key):
   # Create a client with the MinIO server endpoint, access key, and secret key
    client = Minio(minio_ip,
                   access_key=minio_access_key,
                   secret_key=minio_secret_key,
                   secure=False)
    
    #Check if the bucket already exists
    bucket_exists = client.bucket_exists(bucket_name)
    if not bucket_exists:
        client.make_bucket(bucket_name)
    #Upload the local zip file to the bucket
        
    object_name = os.path.basename(zip_file_path)
    client.fput_object(bucket_name, object_name, zip_file_path)
    print(f"File {zip_file_path} uploaded to MinIO bucket {bucket_name} with object name {object_name}")

zip_file_path = '/Users/amin/Downloads/VOCtrainval_11-May-2012.tar'
bucket_name = 'voc2012-bucket'
minio_ip = '185.47.227.233:9000'
minio_access_key = 'Eschercloud_CV_demo'
minio_secret_key = 'urF2FbUWv85dYtjB'
#upload_zip_to_minio(zip_file_path, bucket_name, minio_ip, minio_access_key, minio_secret_key)
import kfp
from kfp import dsl
from kfp.components import func_to_container_op
from kfp.components import InputPath, OutputPath

#Define the component for our training function
def train_op(data_path: InputPath(str)):
        
    from PIL import Image
    import torch
    from torch.utils.data import Dataset
    from torchvision.transforms import ToTensor
    import os
    class VOC2012SegmentationDataset(Dataset):
        def __init__(self, root_dir, split='train', transform=None, resize_shape=(375, 500)):
            self.root_dir = root_dir
            self.split = split
            self.transform = transform
            self.resize_shape = resize_shape
            self.to_tensor = ToTensor()

            # read list of image IDs
            with open(os.path.join(root_dir, 'ImageSets', 'Segmentation', f'{split}.txt'), 'r') as f:
                self.image_ids = [line.strip() for line in f.readlines()]

        def __len__(self):
            return len(self.image_ids)

        def __getitem__(self, idx):
            # load image and label
            image_path = os.path.join(self.root_dir, 'JPEGImages', f'{self.image_ids[idx]}.jpg')
            label_path = os.path.join(self.root_dir, 'SegmentationClass', f'{self.image_ids[idx]}.png')
            image = Image.open(image_path).convert('RGB')
            label = Image.open(label_path).convert('L')

            # resize image and label
            image = image.resize(self.resize_shape)
            label = label.resize(self.resize_shape)

            # apply transform if specified
            if self.transform is not None:
                image, label = self.transform(image, label)

            # convert to PyTorch tensor
            image = self.to_tensor(image)
            label = self.to_tensor(label)

            return image, label
    
    """ Parts of the U-Net model """

    import torch
    import torch.nn as nn
    import torch.nn.functional as F


    class DoubleConv(nn.Module):
        """(convolution => [BN] => ReLU) * 2"""

        def __init__(self, in_channels, out_channels, mid_channels=None):
            super().__init__()
            if not mid_channels:
                mid_channels = out_channels
            self.double_conv = nn.Sequential(
                nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1, bias=False),
                nn.BatchNorm2d(mid_channels),
                nn.ReLU(inplace=True),
                nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1, bias=False),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(inplace=True)
            )

        def forward(self, x):
            return self.double_conv(x)


    class Down(nn.Module):
        """Downscaling with maxpool then double conv"""

        def __init__(self, in_channels, out_channels):
            super().__init__()
            self.maxpool_conv = nn.Sequential(
                nn.MaxPool2d(2),
                DoubleConv(in_channels, out_channels)
            )

        def forward(self, x):
            return self.maxpool_conv(x)


    class Up(nn.Module):
        """Upscaling then double conv"""

        def __init__(self, in_channels, out_channels, bilinear=True):
            super().__init__()

            # if bilinear, use the normal convolutions to reduce the number of channels
            if bilinear:
                self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
                self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
            else:
                self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
                self.conv = DoubleConv(in_channels, out_channels)

        def forward(self, x1, x2):
            x1 = self.up(x1)
            # input is CHW
            diffY = x2.size()[2] - x1.size()[2]
            diffX = x2.size()[3] - x1.size()[3]

            x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                            diffY // 2, diffY - diffY // 2])
            # if you have padding issues, see
            # https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a
            # https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd
            x = torch.cat([x2, x1], dim=1)
            return self.conv(x)


    class OutConv(nn.Module):
        def __init__(self, in_channels, out_channels):
            super(OutConv, self).__init__()
            self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

        def forward(self, x):
            return self.conv(x)


    """ Full assembly of the parts to form the complete network """



    class UNet(nn.Module):
        def __init__(self, n_channels, n_classes, bilinear=False):
            super(UNet, self).__init__()
            self.n_channels = n_channels
            self.n_classes = n_classes
            self.bilinear = bilinear

            self.inc = (DoubleConv(n_channels, 64))
            self.down1 = (Down(64, 128))
            self.down2 = (Down(128, 256))
            self.down3 = (Down(256, 512))
            factor = 2 if bilinear else 1
            self.down4 = (Down(512, 1024 // factor))
            self.up1 = (Up(1024, 512 // factor, bilinear))
            self.up2 = (Up(512, 256 // factor, bilinear))
            self.up3 = (Up(256, 128 // factor, bilinear))
            self.up4 = (Up(128, 64, bilinear))
            self.outc = (OutConv(64, n_classes))

        def forward(self, x):
            x1 = self.inc(x)
            x2 = self.down1(x1)
            x3 = self.down2(x2)
            x4 = self.down3(x3)
            x5 = self.down4(x4)
            x = self.up1(x5, x4)
            x = self.up2(x, x3)
            x = self.up3(x, x2)
            x = self.up4(x, x1)
            logits = self.outc(x)
            return logits

        def use_checkpointing(self):
            self.inc = torch.utils.checkpoint(self.inc)
            self.down1 = torch.utils.checkpoint(self.down1)
            self.down2 = torch.utils.checkpoint(self.down2)
            self.down3 = torch.utils.checkpoint(self.down3)
            self.down4 = torch.utils.checkpoint(self.down4)
            self.up1 = torch.utils.checkpoint(self.up1)
            self.up2 = torch.utils.checkpoint(self.up2)
            self.up3 = torch.utils.checkpoint(self.up3)
            self.up4 = torch.utils.checkpoint(self.up4)
            self.outc = torch.utils.checkpoint(self.outc)

    def train(model, device, train_loader, optimizer, epoch):
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = nn.CrossEntropyLoss()(output, target.squeeze(dim=1).long())
            loss.backward()
            optimizer.step()
            if batch_idx % 10 == 0:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(data), len(train_loader.dataset),
                    100. * batch_idx / len(train_loader), loss.item()))
    import torch
    import torch.nn as nn
    import torch.optim as optim
    from torch.utils.data import DataLoader
        # set up device (GPU if available)
    device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

        # set up data loaders
    data_path = '/mnt/VOCdevkit/VOC2012/'
    train_dataset = VOC2012SegmentationDataset(root_dir=data_path, split='train')
    val_dataset = VOC2012SegmentationDataset(root_dir=data_path, split='val')
    train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)

    # set up model and optimizer
    model = UNet(n_channels=3, n_classes=21).to(device)
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # train and test loop
    for epoch in range(1, 11):
        train(model, device, train_loader, optimizer, epoch)

    # save model
    #torch.save(model.state_dict(), model_path)

In [9]:
import kfp
from kfp.components import create_component_from_func
from kfp import dsl
from typing import List
from kubernetes import client as k8s_client

download_data_from_minio_op = create_component_from_func(download_data_from_minio, base_image='python:3.8', packages_to_install=['minio'])
train_model_op = create_component_from_func(train_op,base_image='pytorch/pytorch:latest', packages_to_install=['torchvision', 'pillow'])

@dsl.pipeline(
    name='VOC2012 Preprocessing Pipeline test',
    description='A pipeline that preprocesses VOC2012 dataset'
)
def voc2012_training_pipeline_test(
    ip_address: str,
    port: int,
    access_key: str,
    secret_key: str,
    bucket_name: str,
    object_name: str,
    local_path: str,
    root_dir: str
) -> List[str]:
    
    vop = dsl.VolumeOp(
        name="create-pvc", resource_name="my-pvc", modes=dsl.VOLUME_MODE_RWM, size="5Gi"
    )
    
    # Define the pipeline
    download_task = download_data_from_minio_op(ip_address, port, access_key, secret_key, bucket_name, object_name).set_display_name('Download Data from Minio').add_pvolumes({"/mnt": vop.volume})
    train_model_task = train_model_op(root_dir).after(download_task).set_display_name('Training model').add_pvolumes({"/mnt": download_task.pvolume})

    return None

# Compile the pipeline
pipeline_func = voc2012_training_pipeline_test
pipeline_filename = pipeline_func.__name__ + '.yaml'
kfp.compiler.Compiler().compile(pipeline_func, pipeline_filename)

# Connect to the Kubeflow Pipeline and submit the pipeline for execution
client = kfp.Client()
experiment_name = 'Training pipeline data Pipeline2'
experiment = client.create_experiment(experiment_name)
run_name = 'run2'

arguments = {
    'ip_address': "185.47.227.207",
    'port': '9000',
    'access_key': "Eschercloud_CV_demo",
    'secret_key': "urF2FbUWv85dYtjB",
    'bucket_name': "voc2012-bucket",
    'object_name': "VOCtrainval_11-May-2012.tar",
    'local_path': "dataset/VOCtrainval_11-May-2012.tar",
    'root_dir': '/mnt/VOCdevkit/VOC2012/'
}

run = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)


In [23]:
def delete_experiments():
    """
    Deletes all experiments in the Kubeflow Pipeline client.
    """
    import kfp

    # Connect to the Kubeflow Pipeline client
    client = kfp.Client()

    # Get a list of all experiments
    experiments = client.list_experiments()

    # Delete all runs in each experiment
    if experiments.experiments:
        for experiment in experiments.experiments:
            print(experiment.id)
            client.delete_experiment(experiment.id)

    return "Experiments deleted successfully."


def delete_runs():
    """
    Deletes all runs in the Kubeflow Pipeline client.
    """
    import kfp

    # Connect to the Kubeflow Pipeline client
    client = kfp.Client()

    # Get a list of all runs
    runs = client.list_runs()

    # Delete all runs
    if runs.runs:
        for run in runs.runs:
            print(run.id)
            kfp.Client().runs.delete_run(run.id)

    return "Runs deleted successfully."


def delete_pipelines():
    """
    Deletes all pipelines in the Kubeflow Pipeline client.
    """
    import kfp

    # Connect to the Kubeflow Pipeline client
    client = kfp.Client()

    # Get a list of all pipelines
    pipelines = client.list_pipelines()

    # Delete all pipelines
    if pipelines.pipelines:
        for pipeline in pipelines.pipelines:
            print(pipeline.id)
            kfp.Client().delete_pipeline(pipeline.id)

    return "Pipelines deleted successfully."

In [24]:
delete_runs()
delete_pipelines()
#delete_experiments()

19e3072e-4da8-426b-96ce-d8ef79ef80b8
a108f375-1da4-4263-81f4-21ae58afc57a
e08025f2-0340-40a2-b73c-b3123cab2849
3dbbe6a0-c769-4587-aa6d-ca4fa629db97
c8a89728-5697-4ee4-8c26-3ee2a3d8eb5a


'Pipelines deleted successfully.'