In [1]:
from kfp import dsl, compiler
from kfp import components
from kfp import local
import kfp
local.init(runner=local.DockerRunner())

In [2]:
import json
from typing import NamedTuple
from typing import Dict

@dsl.component
def create_pytorchjob_task(
    worker_num: int = 0
) -> NamedTuple('Outputs', [('master_spec', Dict[str, str]), ('worker_spec', Dict[str, str])]):
    """
    Creates pytorch-job worker spec
    """

    master = {
        "replicas": 1,
        "restartPolicy": "OnFailure",
        "template": {
            "metadata": {
                "annotations": {
                    # See https://github.com/kubeflow/website/issues/2011
                    "sidecar.istio.io/inject": "false"
                }
            },
            "spec": {
                "containers": [
                    {
                        # To override default command
                        "command": [
                          "python",
                          "/opt/mnist/src/mnist.py"
                        ],
                        "args": [
                            "--backend",
                            "nccl",
                        ],
                        # Or, create your own image from
                        # https://github.com/kubeflow/pytorch-operator/tree/master/examples/mnist
                        "image": "public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest",
                        "name": "pytorch",
                        "resources": {
                            "requests": {
                                "memory": "4Gi",
                                "cpu": "2000m",
                                # Uncomment for GPU
                                "nvidia.com/gpu": 1,
                            },
                            "limits": {
                                "memory": "4Gi",
                                "cpu": "2000m",
                                # Uncomment for GPU
                                "nvidia.com/gpu": 1,
                            },
                        },
                    }
                ],
                # If imagePullSecrets required
                # "imagePullSecrets": [
                #     {"name": "image-pull-secret"},
                # ],
            },
        },
    }

    worker = {}
    if worker_num > 0:
        worker = {
            "replicas": worker_num,
            "restartPolicy": "OnFailure",
            "template": {
                "metadata": {
                    "annotations": {
                        "sidecar.istio.io/inject": "false"
                    }
                },
                "spec": {
                    "containers": [
                        {
                            "command": [
                                "python",
                                "/opt/mnist/src/mnist.py"
                            ],
                            "args": [
                                "--backend",
                                "nccl",
                            ],
                            "image": "public.ecr.aws/pytorch-samples/pytorch_dist_mnist:latest",
                            "name": "pytorch",
                            "resources": {
                                "requests": {
                                    "memory": "4Gi",
                                    "cpu": "2000m",
                                    # Uncomment for GPU
                                    "nvidia.com/gpu": 1,
                                },
                                "limits": {
                                    "memory": "4Gi",
                                    "cpu": "2000m",
                                    # Uncomment for GPU
                                    "nvidia.com/gpu": 1,
                                },
                            },
                        }
                    ]
                },
            },
        }

    output = NamedTuple('Outputs', [('master_spec', Dict[str, str]), ('worker_spec', Dict[str, str])])
    return output(master, worker)

from kfp.dsl import Input, Output, Artifact, container_component, ContainerSpec
@dsl.container_component
def pytorchjob_launcher(
    name: str, 
    namespace: str,
    master_spec: Dict[str, str],
    worker_spec: Dict[str, str],
):
    return ContainerSpec(
        image='easyjin/engine:custom-pytorchjob',
        command=['python', '/ml/launch_pytorchjob.py'],
        args=[
          '--name',                     name,
          '--namespace',                namespace,
          '--workerSpec',               worker_spec,
          '--masterSpec',              master_spec,
          '--deleteAfterDone',          'False'
        ])


@dsl.pipeline(
    name="launch-kubeflow-pytorchjob",
    description="An example to launch pytorch.",
)
def mnist_train(
    namespace: str = "easy",
    worker_replicas: int = 1,
):
    pytorchjob_op = create_pytorchjob_task(worker_num=worker_replicas)
    pytorchjob_launcher_op = pytorchjob_launcher(
        name=f"name-pytorchjob-sample",
        namespace=namespace,
        master_spec = pytorchjob_op.outputs['master_spec'],
        worker_spec = pytorchjob_op.outputs['worker_spec'],
    )

  return component_factory.create_component_from_func(


In [3]:
from kfp import compiler

OUTPUT_PACKAGE_PATH = 'mnist_pipeline.yaml'

compiler.Compiler().compile(
    pipeline_func=mnist_train, 
    package_path=OUTPUT_PACKAGE_PATH,
)

In [4]:
import requests

USERNAME = "user@example.com"
PASSWORD = "12341234"
NAMESPACE = "kubeflow-user-example-com"
HOST = "http://localhost:30398"

session = requests.Session()
response = session.get(HOST)

headers = {
    "Content-Type": "application/x-www-form-urlencoded",
}

data = {"login": USERNAME, "password": PASSWORD}
session.post(response.url, headers=headers, data=data)

session_cookie = session.cookies.get_dict()["authservice_session"]

client = kfp.Client(
    host=f"{HOST}/pipeline", 
    namespace=NAMESPACE, 
    cookies=f"authservice_session={session_cookie}"
    )



In [5]:
client.create_run_from_pipeline_func(
    mnist_train, 
    arguments= {}, 
    experiment_name = "example"
)

RunPipelineResult(run_id=5f4a71e3-fbc3-4aa9-8fc0-d67a61e62df6)