<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"></ul></div>

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import sys ; sys.path.insert(0, "../")

In [3]:
from argo.workflows.sdk import Workflow

from argo.workflows.sdk.tasks import *
from argo.workflows.sdk.templates import *

In [4]:
import yaml

from pprint import pprint

from argo.workflows.sdk._utils import sanitize_for_serialization

---

In [5]:
from pathlib import Path

manifest = Path("./dag-diamond.yaml").read_text()
print(manifest)

# The following workflow executes a diamond workflow
#
#   A
#  / \
# B   C
#  \ /
#   D
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  name: dag-diamond
  generateName: dag-diamond-
spec:
  entrypoint: main
  templates:
  - name: main
    dag:
      tasks:
      - name: A
        template: echo
        arguments:
          parameters: [{name: message, value: A}]
      - name: B
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: B}]
      - name: C
        dependencies: [A]
        template: echo
        arguments:
          parameters: [{name: message, value: C}]
      - name: D
        dependencies: [B, C]
        template: echo
        arguments:
          parameters: [{name: message, value: D}]

  # @task: [A, B, C, D]
  - name: echo
    inputs:
      parameters:
      - name: message
    container:
      name: echo
      image: alpine:3.7
      command: [echo, "{{inputs.parameters.message}}"]
status: {}


In [6]:
class DagDiamond(Workflow):
    
    @task
    @arguments.parameter(name="message", value="A")
    def A(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)
    
    @task
    @arguments.parameter(name="message", value="B")
    @dependencies(["A"])
    def B(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)
    
    @task
    @arguments.parameter(name="message", value="C")
    @dependencies(["A"])
    def C(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)
    
    @task
    @arguments.parameter(name="message", value="D")
    @dependencies(["B", "C"])
    def D(self, message: V1alpha1Parameter) -> V1alpha1Template:
        return self.echo(message=message)
    
    @template
    @inputs.parameter(name="message")
    def echo(self, message: V1alpha1Parameter) -> V1Container:
        container = V1Container(
            image="alpine:3.7",
            name="echo",
            command=["echo", "{{inputs.parameters.message}}"],
        )
        
        return container

wf = DagDiamond()
wf

{'api_version': 'argoproj.io/v1alpha1',
 'kind': 'Workflow',
 'metadata': {'generate_name': 'dag-diamond-', 'name': 'dag-diamond'},
 'spec': {'entrypoint': 'main',
          'templates': [{'dag': {'tasks': [{'arguments': {'parameters': [{'name': 'message',
                                                                          'value': 'A'}]},
                                            'name': 'A',
                                            'template': 'echo'},
                                           {'arguments': {'parameters': [{'name': 'message',
                                                                          'value': 'B'}]},
                                            'dependencies': ['A'],
                                            'name': 'B',
                                            'template': 'echo'},
                                           {'arguments': {'parameters': [{'name': 'message',
                                                                

---

In [7]:
pprint(sanitize_for_serialization(wf))

{'apiVersion': 'argoproj.io/v1alpha1',
 'kind': 'Workflow',
 'metadata': {'generateName': 'dag-diamond-', 'name': 'dag-diamond'},
 'spec': {'entrypoint': 'main',
          'templates': [{'dag': {'tasks': [{'arguments': {'parameters': [{'name': 'message',
                                                                          'value': 'A'}]},
                                            'name': 'A',
                                            'template': 'echo'},
                                           {'arguments': {'parameters': [{'name': 'message',
                                                                          'value': 'B'}]},
                                            'dependencies': ['A'],
                                            'name': 'B',
                                            'template': 'echo'},
                                           {'arguments': {'parameters': [{'name': 'message',
                                                                  

In [8]:
pprint(yaml.safe_load(manifest))

{'apiVersion': 'argoproj.io/v1alpha1',
 'kind': 'Workflow',
 'metadata': {'generateName': 'dag-diamond-', 'name': 'dag-diamond'},
 'spec': {'entrypoint': 'main',
          'templates': [{'dag': {'tasks': [{'arguments': {'parameters': [{'name': 'message',
                                                                          'value': 'A'}]},
                                            'name': 'A',
                                            'template': 'echo'},
                                           {'arguments': {'parameters': [{'name': 'message',
                                                                          'value': 'B'}]},
                                            'dependencies': ['A'],
                                            'name': 'B',
                                            'template': 'echo'},
                                           {'arguments': {'parameters': [{'name': 'message',
                                                                  

In [9]:
assert sanitize_for_serialization(wf) == yaml.safe_load(manifest), "Manifests don't match."