Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 75 additions & 29 deletions ocp_resources/pipeline.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,91 @@
# API reference: https://tekton.dev/docs/pipelines/pipelines/
# Generated using https://github.com/RedHatQE/openshift-python-wrapper/blob/main/scripts/resource/README.md

from ocp_resources.resource import MissingRequiredArgumentError, NamespacedResource

from typing import Any
from ocp_resources.resource import NamespacedResource


class Pipeline(NamespacedResource):
api_group = NamespacedResource.ApiGroup.TEKTON_DEV
"""
Pipeline describes a list of Tasks to execute. It expresses how outputs
of tasks feed into inputs of subsequent tasks.
"""

api_group: str = NamespacedResource.ApiGroup.TEKTON_DEV

def __init__(
self,
tasks=None,
params=None,
final_parallel_tasks=None,
**kwargs,
):
"""
description: str | None = None,
display_name: str | None = None,
finally_: list[Any] | None = None,
params: list[Any] | None = None,
results: list[Any] | None = None,
tasks: list[Any] | None = None,
workspaces: list[Any] | None = None,
**kwargs: Any,
) -> None:
r"""
Args:
tasks (str, optional): actions to perform in pipeline
params (dict, optional): params to support pipelines.
params can be set/changed based on tasks.
example: 'spec': {'params': [{'name': 'sourceTemplateName','type': 'string','default':'openshift'},
{'name': 'sourceTemplateNamespace', 'type':'string', 'description': 'Namespace pf template'}]}
final_parallel_tasks (list, optional): a list of one or more to be executed in parallel after all other
tasks have completed in parallel.
spec section can't be empty. It requires at least one optional field.
description (str): Description is a user-facing description of the pipeline that may be
used to populate a UI.

display_name (str): DisplayName is a user-facing name of the pipeline that may be used to
populate a UI.

finally_ (list[Any]): Finally declares the list of Tasks that execute just before leaving
the Pipeline i.e. either after all Tasks are finished executing
successfully or after a failure which would result in ending the
Pipeline

Note: Parameter renamed from 'finally' to avoid Python keyword conflict.
params (list[Any]): Params declares a list of input parameters that must be supplied when
this Pipeline is run.

results (list[Any]): Results are values that this pipeline can output once run

tasks (list[Any]): Tasks declares the graph of Tasks that execute when this Pipeline is
run.

workspaces (list[Any]): Workspaces declares a set of named workspaces that are expected to be
provided by a PipelineRun.

"""
super().__init__(**kwargs)
# TODO: Add a check for tasks when bug https://issues.redhat.com/browse/SRVKP-3019 is resolved.
self.tasks = tasks

self.description = description
self.display_name = display_name
self.finally_ = finally_
self.params = params
self.final_parallel_tasks = final_parallel_tasks
self.results = results
self.tasks = tasks
self.workspaces = workspaces

def to_dict(self) -> None:
super().to_dict()
if not self.kind_dict and not self.yaml_file:
if not (self.tasks or self.params or self.final_parallel_tasks):
raise MissingRequiredArgumentError(argument="'tasks' or 'params' or 'final_parallel_tasks'")

if not self.kind_dict and not self.yaml_file:
self.res["spec"] = {}
if self.params:
self.res["spec"]["params"] = self.params
if self.tasks:
self.res["spec"]["tasks"] = self.tasks
if self.final_parallel_tasks:
self.res["spec"]["finally"] = self.final_parallel_tasks
_spec = self.res["spec"]

if self.description is not None:
_spec["description"] = self.description

if self.display_name is not None:
_spec["displayName"] = self.display_name

if self.finally_ is not None:
_spec["finally"] = self.finally_

if self.params is not None:
_spec["params"] = self.params

if self.results is not None:
_spec["results"] = self.results

if self.tasks is not None:
_spec["tasks"] = self.tasks

if self.workspaces is not None:
_spec["workspaces"] = self.workspaces

# End of generated code
89 changes: 89 additions & 0 deletions ocp_resources/pipeline_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Generated using https://github.com/RedHatQE/openshift-python-wrapper/blob/main/scripts/resource/README.md


from typing import Any
from ocp_resources.resource import NamespacedResource


class PipelineRun(NamespacedResource):
"""
PipelineRun represents a single execution of a Pipeline. PipelineRuns are how
the graph of Tasks declared in a Pipeline are executed; they specify inputs
to Pipelines such as parameter values and capture operational aspects of the
Tasks execution such as service account and tolerations. Creating a
PipelineRun creates TaskRuns for Tasks in the referenced Pipeline.
"""

api_group: str = NamespacedResource.ApiGroup.TEKTON_DEV

def __init__(
self,
params: list[Any] | None = None,
pipeline_ref: dict[str, Any] | None = None,
pipeline_spec: Any | None = None,
task_run_specs: list[Any] | None = None,
task_run_template: dict[str, Any] | None = None,
timeouts: dict[str, Any] | None = None,
workspaces: list[Any] | None = None,
**kwargs: Any,
) -> None:
r"""
Args:
params (list[Any]): Params is a list of parameter names and values.

pipeline_ref (dict[str, Any]): PipelineRef can be used to refer to a specific instance of a Pipeline.

pipeline_spec (Any): Specifying PipelineSpec can be disabled by setting `disable-inline-
spec` feature flag. See Pipeline.spec (API version: tekton.dev/v1)

task_run_specs (list[Any]): TaskRunSpecs holds a set of runtime specs

task_run_template (dict[str, Any]): TaskRunTemplate represent template of taskrun

timeouts (dict[str, Any]): Time after which the Pipeline times out. Currently three keys are
accepted in the map pipeline, tasks and finally with
Timeouts.pipeline >= Timeouts.tasks + Timeouts.finally

workspaces (list[Any]): Workspaces holds a set of workspace bindings that must match names
with those declared in the pipeline.

"""
super().__init__(**kwargs)

self.params = params
self.pipeline_ref = pipeline_ref
self.pipeline_spec = pipeline_spec
self.task_run_specs = task_run_specs
self.task_run_template = task_run_template
self.timeouts = timeouts
self.workspaces = workspaces

def to_dict(self) -> None:
super().to_dict()

if not self.kind_dict and not self.yaml_file:
self.res["spec"] = {}
_spec = self.res["spec"]

if self.params is not None:
_spec["params"] = self.params

if self.pipeline_ref is not None:
_spec["pipelineRef"] = self.pipeline_ref

if self.pipeline_spec is not None:
_spec["pipelineSpec"] = self.pipeline_spec

if self.task_run_specs is not None:
_spec["taskRunSpecs"] = self.task_run_specs

if self.task_run_template is not None:
_spec["taskRunTemplate"] = self.task_run_template

if self.timeouts is not None:
_spec["timeouts"] = self.timeouts

if self.workspaces is not None:
_spec["workspaces"] = self.workspaces

# End of generated code
43 changes: 0 additions & 43 deletions ocp_resources/pipelineruns.py

This file was deleted.

Loading