Skip to content

Commit

Permalink
Merge pull request #48 from flaviodsr/assign_list
Browse files Browse the repository at this point in the history
Implement workflow list assignments
  • Loading branch information
Jiří Suchomel committed May 20, 2021
2 parents 4687e8c + 9795c67 commit a508eac
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 81 deletions.
38 changes: 38 additions & 0 deletions design/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ var _ = Service("workflow", func() {
Field(4, "status", String, "status of the workflow runs to list", func() {
Enum("Started", "Running", "Cancelled", "Succeeded", "Failed", "Completed", "Timeout")
Example("Succeeded")

})
})

Expand All @@ -168,6 +169,29 @@ var _ = Service("workflow", func() {
Response(CodeOK)
Response("NotFound", CodeNotFound)
})

})

Method("listAssignments", func() {
Description("List workflow assignments to codesets")

Payload(func() {
Field(1, "name", String, "Name of the workflow to list assignments", func() {
Example("mlflow-sklearn-e2e")
})
})

Result(ArrayOf(WorkflowAssignment), "Return a list of workflow assignments.")

HTTP(func() {
GET("/workflows/assignments")
Param("name")
Response(StatusOK)
})

GRPC(func() {
Response(CodeOK)
})
})
})

Expand Down Expand Up @@ -316,3 +340,17 @@ var WorkflowRunOutput = Type("WorkflowRunOutput", func() {
Field(1, "output", WorkflowOutput, "The workflow output")
Field(2, "value", String, "The output value set by the Workflow run")
})

// WorkflowAssignment describes the assignment between a workflow and codesets
var WorkflowAssignment = Type("WorkflowAssignment", func() {
Field(1, "workflow", String, "Workflow assigned to the codeset")
Field(2, "status", WorkflowAssignmentStatus, "The status of the assignment")
Field(3, "codesets", ArrayOf(Codeset), "Codesets assigned to the workflow")
})

// WorkflowAssignmentStatus describes the status of the resource responsible for the
// assignment between a workflow and codesets
var WorkflowAssignmentStatus = Type("WorkflowAssignmentStatus", func() {
Field(1, "available", Boolean, "The state of the assignment")
Field(2, "URL", String, "Dashboard URL to the resource responsible for the assignment")
})
82 changes: 52 additions & 30 deletions pkg/core/tekton/tekton.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (w *WorkflowBackend) CreateWorkflow(ctx context.Context, logger *log.Logger
}

// CreateWorkflowRun creates a PipelineRun with its default values for received workflow and codeset
func (w *WorkflowBackend) CreateWorkflowRun(ctx context.Context, workflowName string, codeset domain.Codeset) error {
func (w *WorkflowBackend) CreateWorkflowRun(ctx context.Context, workflowName string, codeset *domain.Codeset) error {
pipeline, err := w.tektonClients.PipelineClient.Get(ctx, workflowName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error getting tekton pipeline %q: %w", workflowName, err)
Expand Down Expand Up @@ -124,63 +124,84 @@ func (w *WorkflowBackend) ListWorkflowRuns(ctx context.Context, wf workflow.Work
return workflowRuns, nil
}

// CreateListener creates tekton resources required to have a listener ready for triggering the pipeline
func (w *WorkflowBackend) CreateListener(ctx context.Context, logger *log.Logger, workflowName string, wait bool) (string, error) {
// CreateWorkflowListener creates tekton resources required to have a listener ready for triggering the pipeline
func (w *WorkflowBackend) CreateWorkflowListener(ctx context.Context, logger *log.Logger, workflowName string, wait bool) (*domain.WorkflowListener, error) {
pipeline, err := w.tektonClients.PipelineClient.Get(ctx, workflowName, metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("error getting tekton pipeline %q: %w", workflowName, err)
return nil, fmt.Errorf("error getting tekton pipeline %q: %w", workflowName, err)
}

triggerTemplate := generateTriggerTemplate(pipeline)
_, err = w.tektonClients.TriggerTemplateClient.Get(ctx, workflowName, metav1.GetOptions{})
if err != nil {
if !k8serr.IsNotFound(err) {
return "", fmt.Errorf("error getting tekton trigger template %q: %w", workflowName, err)
return nil, fmt.Errorf("error getting tekton trigger template %q: %w", workflowName, err)
}
logger.Printf("Creating tekton trigger template for workflow: %s...", workflowName)
_, err := w.tektonClients.TriggerTemplateClient.Create(ctx, triggerTemplate, metav1.CreateOptions{})
if err != nil {
return "", fmt.Errorf("error creating tekton trigger template %q: %w", workflowName, err)
return nil, fmt.Errorf("error creating tekton trigger template %q: %w", workflowName, err)
}
}

triggerBinding := generateTriggerBinding(triggerTemplate)
_, err = w.tektonClients.TriggerBindingClient.Get(ctx, workflowName, metav1.GetOptions{})
if err != nil {
if !k8serr.IsNotFound(err) {
return "", fmt.Errorf("error getting tekton trigger binding %q: %w", workflowName, err)
return nil, fmt.Errorf("error getting tekton trigger binding %q: %w", workflowName, err)
}
logger.Printf("Creating tekton trigger binding for workflow: %s...", workflowName)
_, err := w.tektonClients.TriggerBindingClient.Create(ctx, triggerBinding, metav1.CreateOptions{})
if err != nil {
return "", fmt.Errorf("error creating tekton trigger binding %q: %w", workflowName, err)
return nil, fmt.Errorf("error creating tekton trigger binding %q: %w", workflowName, err)
}
}

eventListener := generateEventListener(triggerTemplate, triggerBinding)
el, err := w.tektonClients.EventListenerClient.Get(ctx, workflowName, metav1.GetOptions{})
if err != nil {
if !k8serr.IsNotFound(err) {
return "", fmt.Errorf("error getting tekton event listener %q: %w", workflowName, err)
return nil, fmt.Errorf("error getting tekton event listener %q: %w", workflowName, err)
}
logger.Printf("Creating tekton event listener for workflow: %s...", workflowName)
el, err = w.tektonClients.EventListenerClient.Create(ctx, eventListener, metav1.CreateOptions{})
if err != nil {
return "", fmt.Errorf("error creating tekton event listener %q: %w", workflowName, err)
return nil, fmt.Errorf("error creating tekton event listener %q: %w", workflowName, err)
}
}

url := fmt.Sprintf("http://el-%s.%s.svc.cluster.local:8080", workflowName, w.namespace)
if wait {
interval := 1 * time.Second
timeout := 1 * time.Minute
if err := waitFor(w.eventListenerReady(ctx, el.Name), interval, timeout); err != nil {
return "", fmt.Errorf("event listener %q did not get ready in the expected time: %w", el.Name, err)
return nil, fmt.Errorf("event listener %q did not get ready in the expected time: %w", el.Name, err)
}
el, err = w.tektonClients.EventListenerClient.Get(ctx, workflowName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting tekton event listener state %q: %w", workflowName, err)
}
url = el.Status.Address.URL.String()
}
available := listenerIsAvailable(el.Status)
return &domain.WorkflowListener{Name: el.Name, URL: url, Available: available,
DashboardURL: fmt.Sprintf("%s/#/namespaces/%s/eventlisteners/%s", w.dashboardURL, w.namespace, el.Name)}, nil
}

el, _ = w.tektonClients.EventListenerClient.Get(ctx, workflowName, metav1.GetOptions{})
return el.Status.Address.URL.String(), nil
// GetWorkflowListener returns the listener for a given workflow
func (w *WorkflowBackend) GetWorkflowListener(ctx context.Context, logger *log.Logger, workflowName string) (wl *domain.WorkflowListener, err error) {
el, err := w.tektonClients.EventListenerClient.Get(ctx, workflowName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("error getting tekton event listener %q: %w", workflowName, err)
}
available := listenerIsAvailable(el.Status)
wl = &domain.WorkflowListener{Name: el.Name, Available: available,
DashboardURL: fmt.Sprintf("%s/#/namespaces/%s/eventlisteners/%s", w.dashboardURL, w.namespace, el.Name)}
if available {
url := el.Status.Address.URL.String()
wl.URL = url
}
return fmt.Sprintf("http://el-%s.%s.svc.cluster.local:8080", workflowName, w.namespace), nil
return
}

func (e WorkflowBackendErr) Error() string {
Expand All @@ -197,23 +218,24 @@ func (w *WorkflowBackend) eventListenerReady(ctx context.Context, name string) w
if err != nil {
return false, nil
}
// No conditions have been set yet
if len(el.Status.Conditions) == 0 {
return false, nil
}
if el.Status.GetCondition(apis.ConditionType(appsv1.DeploymentAvailable)) == nil {
return false, nil
}
for _, cond := range el.Status.Conditions {
if cond.Status != corev1.ConditionTrue {
return false, nil
}
}
if el.Status.Address.URL == nil {
return false, nil
return listenerIsAvailable(el.Status), nil
}
}

func listenerIsAvailable(status v1alpha1.EventListenerStatus) bool {
// No conditions have been set yet
if len(status.Conditions) == 0 {
return false
}
if status.GetCondition(apis.ConditionType(appsv1.DeploymentAvailable)) == nil {
return false
}
for _, cond := range status.Conditions {
if cond.Status != corev1.ConditionTrue {
return false
}
return true, nil
}
return status.Address.URL != nil
}

func generatePipeline(w workflow.Workflow, namespace string) *v1beta1.Pipeline {
Expand Down Expand Up @@ -314,7 +336,7 @@ STEPS:
return &pb.Pipeline
}

func generatePipelineRun(p *v1beta1.Pipeline, codeset domain.Codeset) (*v1beta1.PipelineRun, error) {
func generatePipelineRun(p *v1beta1.Pipeline, codeset *domain.Codeset) (*v1beta1.PipelineRun, error) {
codesetVersion := "main"
prb := builder.NewPipelineRunBuilder(fmt.Sprintf("%s%s-%s-", pipelineRunPrefix, codeset.Project, codeset.Name))

Expand Down

0 comments on commit a508eac

Please sign in to comment.