Skip to content

Commit

Permalink
feat(worker): add RegisterWorkflow method
Browse files Browse the repository at this point in the history
  • Loading branch information
steebchen committed Jun 20, 2024
1 parent fdfaf68 commit 4c8c7e7
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 4 deletions.
3 changes: 1 addition & 2 deletions examples/cancellation/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ func run(events chan<- string) (func() error, error) {
return nil, fmt.Errorf("error creating worker: %w", err)
}

err = w.On(
worker.Events("user:create:cancellation"),
err = w.RegisterWorkflow(
&worker.WorkflowJob{
Name: "cancellation",
Description: "cancellation",
Expand Down
4 changes: 2 additions & 2 deletions examples/simple/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ func run(events chan<- string) (func() error, error) {

testSvc := w.NewService("test")

err = testSvc.On(
worker.Events("user:create:simple"),
err = testSvc.RegisterWorkflow(
&worker.WorkflowJob{
On: worker.Events("user:create:simple"),
Name: "simple",
Description: "This runs after an update to the user model.",
Concurrency: worker.Concurrency(getConcurrencyKey),
Expand Down
4 changes: 4 additions & 0 deletions pkg/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ func (s *Service) Use(mws ...MiddlewareFunc) {
s.mws.add(mws...)
}

func (s *Service) RegisterWorkflow(workflow workflowConverter) error {
return s.On(workflow.ToWorkflowTrigger(), workflow)
}

func (s *Service) On(t triggerConverter, workflow workflowConverter) error {
namespace := s.worker.client.Namespace()

Expand Down
4 changes: 4 additions & 0 deletions pkg/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,10 @@ func (w *Worker) NewService(name string) *Service {
return svc
}

func (w *Worker) RegisterWorkflow(workflow workflowConverter) error {
return w.On(workflow.ToWorkflowTrigger(), workflow)
}

func (w *Worker) On(t triggerConverter, workflow workflowConverter) error {
// get the default service
svc, ok := w.services.Load("default")
Expand Down
11 changes: 11 additions & 0 deletions pkg/worker/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (e eventsArr) ToWorkflowTriggers(wt *types.WorkflowTriggers, namespace stri
type workflowConverter interface {
ToWorkflow(svcName string, namespace string) types.Workflow
ToActionMap(svcName string) map[string]any
ToWorkflowTrigger() triggerConverter
}

type Workflow struct {
Expand All @@ -140,6 +141,8 @@ type WorkflowJob struct {

Description string

On triggerConverter

Concurrency *WorkflowConcurrency

// The steps that are run in the job
Expand Down Expand Up @@ -234,6 +237,10 @@ func (j *WorkflowJob) ToWorkflowJob(svcName string, namespace string) (*types.Wo
return apiJob, nil
}

func (j *WorkflowJob) ToWorkflowTrigger() triggerConverter {
return j.On
}

func (j *WorkflowJob) ToActionMap(svcName string) map[string]any {
res := map[string]any{}

Expand Down Expand Up @@ -317,6 +324,10 @@ func (w *WorkflowStep) AddParents(parents ...string) *WorkflowStep {
return w
}

func (w *WorkflowStep) ToWorkflowTrigger() triggerConverter {
return nil
}

func (w *WorkflowStep) ToWorkflow(svcName string, namespace string) types.Workflow {
jobName := w.Name

Expand Down

0 comments on commit 4c8c7e7

Please sign in to comment.