diff --git a/examples/cancellation/run.go b/examples/cancellation/run.go index 902191d82..3816a9a0e 100644 --- a/examples/cancellation/run.go +++ b/examples/cancellation/run.go @@ -29,8 +29,7 @@ func run(events chan<- string) (func() error, error) { return nil, fmt.Errorf("error creating worker: %w", err) } - err = w.On( - worker.Events("user:create:cancellation"), + err = w.RegisterWorkflow( &worker.WorkflowJob{ Name: "cancellation", Description: "cancellation", diff --git a/examples/simple/main.go b/examples/simple/main.go index 19ad95d25..d078deef5 100644 --- a/examples/simple/main.go +++ b/examples/simple/main.go @@ -65,9 +65,9 @@ func run(events chan<- string) (func() error, error) { testSvc := w.NewService("test") - err = testSvc.On( - worker.Events("user:create:simple"), + err = testSvc.RegisterWorkflow( &worker.WorkflowJob{ + On: worker.Events("user:create:simple"), Name: "simple", Description: "This runs after an update to the user model.", Concurrency: worker.Concurrency(getConcurrencyKey), diff --git a/pkg/worker/service.go b/pkg/worker/service.go index 3173e69a4..b76f569ca 100644 --- a/pkg/worker/service.go +++ b/pkg/worker/service.go @@ -18,6 +18,10 @@ func (s *Service) Use(mws ...MiddlewareFunc) { s.mws.add(mws...) } +func (s *Service) RegisterWorkflow(workflow workflowConverter) error { + return s.On(workflow.ToWorkflowTrigger(), workflow) +} + func (s *Service) On(t triggerConverter, workflow workflowConverter) error { namespace := s.worker.client.Namespace() diff --git a/pkg/worker/worker.go b/pkg/worker/worker.go index 6f9138abd..eb58c58f1 100644 --- a/pkg/worker/worker.go +++ b/pkg/worker/worker.go @@ -213,6 +213,10 @@ func (w *Worker) NewService(name string) *Service { return svc } +func (w *Worker) RegisterWorkflow(workflow workflowConverter) error { + return w.On(workflow.ToWorkflowTrigger(), workflow) +} + func (w *Worker) On(t triggerConverter, workflow workflowConverter) error { // get the default service svc, ok := w.services.Load("default") diff --git a/pkg/worker/workflow.go b/pkg/worker/workflow.go index d470e98c2..f6b44341f 100644 --- a/pkg/worker/workflow.go +++ b/pkg/worker/workflow.go @@ -126,6 +126,7 @@ func (e eventsArr) ToWorkflowTriggers(wt *types.WorkflowTriggers, namespace stri type workflowConverter interface { ToWorkflow(svcName string, namespace string) types.Workflow ToActionMap(svcName string) map[string]any + ToWorkflowTrigger() triggerConverter } type Workflow struct { @@ -140,6 +141,8 @@ type WorkflowJob struct { Description string + On triggerConverter + Concurrency *WorkflowConcurrency // The steps that are run in the job @@ -234,6 +237,10 @@ func (j *WorkflowJob) ToWorkflowJob(svcName string, namespace string) (*types.Wo return apiJob, nil } +func (j *WorkflowJob) ToWorkflowTrigger() triggerConverter { + return j.On +} + func (j *WorkflowJob) ToActionMap(svcName string) map[string]any { res := map[string]any{} @@ -317,6 +324,10 @@ func (w *WorkflowStep) AddParents(parents ...string) *WorkflowStep { return w } +func (w *WorkflowStep) ToWorkflowTrigger() triggerConverter { + return nil +} + func (w *WorkflowStep) ToWorkflow(svcName string, namespace string) types.Workflow { jobName := w.Name