Skip to content

Commit

Permalink
Add scheduler (#5)
Browse files Browse the repository at this point in the history
* Add ticker and scheduler to pool package
Add TestAndSet to rmap package
Fix bug in pool package that logged incorrect errors on stopping jobs

* Add scheduler pool example.
Fix issue with job dispatch return status handling.

* Document TestAndSet
  • Loading branch information
raphael committed Jul 23, 2023
1 parent a191b49 commit 1e9f0ec
Show file tree
Hide file tree
Showing 19 changed files with 1,128 additions and 173 deletions.
60 changes: 60 additions & 0 deletions examples/pool/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# pool Example

This example shows how to use the pool package to create a pool of workers. It has three parts:

1. The `worker` process registers a worker with the node and waits for jobs.
2. The `producer` process starts and stops two jobs. It also notifies the worker handling the second job.
3. The `scheduler` process starts runs a schedule that starts and stops jobs alternately.

## Running the example

To run the example, first make sure Redis is running locally. The `start-redis` script
in the scripts directory of the repository can be used to start Redis:

```bash
$ cd pulse
$ scripts/start-redis
```

Then, in two separate terminals, run the following commands:

```bash
$ source .env
$ go run examples/pool/worker/main.go
```

The above start two workers that wait for jobs. Then, in separate terminals, run
the following commands:

```bash
$ source .env
$ go run examples/pool/producer/main.go
```

The above starts and stops two jobs. The first job is handled by the first worker,
and the second job is handled by the second worker.

Finally in the same terminal used above run the following command:

```bash
$ go run examples/pool/scheduler/main.go
```

The above starts a scheduler that starts and stops jobs alternately. The first job
is handled by the first worker, and the second job is handled by the second worker.

To stop the worker processes simply press `Ctrl-C` in the terminal where they are
running. Note that it takes a few seconds for the workers to stop (as they wait for
the Redis poll to return).

## Adding Verbose Logging

The three processes can be run with verbose logging by passing the `-v` flag:

```bash
$ go run examples/pool/worker/main.go -v
$ go run examples/pool/producer/main.go -v
$ go run examples/pool/scheduler/main.go -v
```

Verbose logging will print logs created by the `pulse` package to the terminal.
55 changes: 33 additions & 22 deletions examples/pool/producer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"fmt"
"os"
"time"

Expand All @@ -13,48 +12,60 @@ import (
)

func main() {
ctx := log.Context(context.Background(), log.WithDebug())
// Setup Redis connection
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: os.Getenv("REDIS_PASSWORD"),
})

// Create client for node named "example"
node, err := pool.AddNode(ctx, "example", rdb,
// Setup clue logger.
ctx := log.Context(context.Background())
log.FlushAndDisableBuffering(ctx)

var logger pulse.Logger
if len(os.Args) > 1 && os.Args[1] == "-v" {
logger = pulse.ClueLogger(ctx)
}

// Create client for worker pool "example"
client, err := pool.AddNode(ctx, "example", rdb,
pool.WithClientOnly(),
pool.WithLogger(pulse.ClueLogger(ctx)),
pool.WithLogger(logger),
)
if err != nil {
panic(err)
}

// Cleanup node on exit.
defer func() {
if err := node.Close(ctx); err != nil {
panic(err)
}
}()

// Start 2 jobs
fmt.Println("** Starting job one...")
if err := node.DispatchJob(ctx, "one", nil); err != nil {
log.Infof(ctx, "starting job one")
if err := client.DispatchJob(ctx, "alpha", nil); err != nil {
panic(err)
}
fmt.Println("** Starting job two...")
if err := node.DispatchJob(ctx, "two", nil); err != nil {
log.Infof(ctx, "starting job two")
if err := client.DispatchJob(ctx, "beta", nil); err != nil {
panic(err)
}
time.Sleep(200 * time.Millisecond) // emulate delay
fmt.Println("Stopping job one...")
if err := node.StopJob(ctx, "one"); err != nil {

// Stop job one
log.Infof(ctx, "stopping job one")
if err := client.StopJob(ctx, "alpha"); err != nil {
panic(err)
}
fmt.Println("Notifying worker for job two...")
if err := node.NotifyWorker(ctx, "two", []byte("hello")); err != nil {

// Notify and then stop job two
log.Infof(ctx, "notifying job two worker")
if err := client.NotifyWorker(ctx, "beta", []byte("hello")); err != nil {
panic(err)
}
log.Infof(ctx, "stopping job two")
if err := client.StopJob(ctx, "beta"); err != nil {
panic(err)
}
fmt.Println("Stopping job two...")
if err := node.StopJob(ctx, "two"); err != nil {

// Cleanup client on exit.
log.Infof(ctx, "done")
if err := client.Close(ctx); err != nil {
panic(err)
}
}
82 changes: 82 additions & 0 deletions examples/pool/scheduler/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package main

import (
"context"
"os"
"time"

redis "github.com/redis/go-redis/v9"
"goa.design/clue/log"
"goa.design/pulse/pool"
"goa.design/pulse/pulse"
)

func main() {
// Setup Redis connection
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: os.Getenv("REDIS_PASSWORD"),
})

// Setup clue logger.
ctx := log.Context(context.Background())
log.FlushAndDisableBuffering(ctx)

var logger pulse.Logger
if len(os.Args) > 1 && os.Args[1] == "-v" {
logger = pulse.ClueLogger(ctx)
}

// Create node for pool "example".
node, err := pool.AddNode(ctx, "example", rdb, pool.WithLogger(logger))
if err != nil {
panic(err)
}

// Start schedule
log.Infof(ctx, "Starting schedule... CTRL+C to stop.")
done := make(chan struct{})
if err := node.Schedule(ctx, newProducer(ctx, done), time.Second); err != nil {
panic(err)
}

// Wait for producer to be done
<-done

// Cleanup node on exit.
if err := node.Close(ctx); err != nil {
panic(err)
}
}

// producer is a job producer that alternatively starts and stops a job.
// It closes the done channel when it is done.
type producer struct {
iter int
done chan struct{}
logctx context.Context
}

func newProducer(ctx context.Context, done chan struct{}) *producer {
return &producer{done: done, logctx: ctx}
}

func (p *producer) Name() string {
return "example"
}

// Plan is called by the scheduler to determine the next job to start or stop.
func (p *producer) Plan() (*pool.JobPlan, error) {
p.iter++
if p.iter > 10 {
log.Infof(p.logctx, "done")
close(p.done)
return nil, pool.ErrScheduleStop
}
if p.iter%2 == 0 {
log.Infof(p.logctx, "stop all")
return &pool.JobPlan{StopAll: true}, nil
}
log.Infof(p.logctx, "start job")
return &pool.JobPlan{Start: []*pool.JobParam{{Key: "job", Payload: []byte("payload")}}}, nil
}
50 changes: 30 additions & 20 deletions examples/pool/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"os/signal"
"sync"
"time"

Expand All @@ -22,8 +23,8 @@ type (
executions map[string]*Execution
// node is the node the worker is registered with.
node *pool.Node
// done is closed when the worker is stopped.
done chan struct{}
// logctx is the logger context.
logctx context.Context
}

// Execution represents a single execution.
Expand All @@ -34,31 +35,42 @@ type (
)

func main() {
ctx := log.Context(context.Background(), log.WithDebug())
// Setup Redis connection
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: os.Getenv("REDIS_PASSWORD"),
})

// Setup clue logger.
ctx := log.Context(context.Background())
log.FlushAndDisableBuffering(ctx)

var logger pulse.Logger
if len(os.Args) > 1 && os.Args[1] == "-v" {
logger = pulse.ClueLogger(ctx)
}

// Create node for pool "example".
node, err := pool.AddNode(ctx,
"example",
rdb,
pool.WithLogger(pulse.ClueLogger(ctx)))
node, err := pool.AddNode(ctx, "example", rdb, pool.WithLogger(logger))
if err != nil {
panic(err)
}

// Create a new worker for pool "example".
c := make(chan struct{})
handler := &JobHandler{executions: make(map[string]*Execution), node: node, done: c}
handler := &JobHandler{executions: make(map[string]*Execution), node: node, logctx: ctx}
if _, err := node.AddWorker(ctx, handler); err != nil {
panic(err)
}

// Wait for jobs to complete.
fmt.Println("Waiting for jobs...")
<-c
log.Infof(ctx, "Waiting for jobs... CTRL+C to stop.")

// Close done channel on CTRL-C.
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, os.Interrupt)
<-sigc
close(sigc)

if err := node.Shutdown(ctx); err != nil {
panic(err)
}
Expand All @@ -70,7 +82,7 @@ func (w *JobHandler) Start(job *pool.Job) error {
defer w.lock.Unlock()
exec := &Execution{c: make(chan struct{})}
w.executions[job.Key] = exec
go exec.Start(job)
go exec.Start(w.logctx, job)
return nil
}

Expand All @@ -84,31 +96,29 @@ func (w *JobHandler) Stop(key string) error {
}
close(exec.c)
delete(w.executions, key)
if key == "two" {
close(w.done)
}
return nil
}

// Print notification.
func (w *JobHandler) HandleNotification(key string, payload []byte) error {
fmt.Printf(">> Notification: %s\n", string(payload))
log.Info(w.logctx, log.Fields{"msg": "notification", "key": key, "payload": string(payload)})
return nil
}

// Start execution.
func (c *Execution) Start(job *pool.Job) {
defer fmt.Printf("Worker %s, Job %s, Done\n", job.Worker.ID, job.Key)
func (c *Execution) Start(ctx context.Context, job *pool.Job) {
log.Info(ctx, log.Fields{"msg": "job started", "worker-id": job.Worker.ID, "job": job.Key})
defer log.Info(ctx, log.Fields{"msg": "job done", "worker-id": job.Worker.ID, "job": job.Key})
i := 1
ticker := time.NewTicker(500 * time.Millisecond)
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-c.c:
return
case <-ticker.C:
i++
fmt.Printf(">> Worker %s, Job %s, Iteration %d\n", job.Worker.ID, job.Key, i)
log.Info(ctx, log.Fields{"msg": "tick", "worker-id": job.Worker.ID, "job": job.Key, "i": i})
}
}
}
10 changes: 10 additions & 0 deletions pool/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,16 @@ The `StopJob` method is used to stop a job. It takes a job key as input and
returns an error if the job could not be stopped. This can happen if the job key
is invalid, the node is closed or the pool shutdown.

## Scheduling

The `Schedule` method of the `Node` struct can be used to schedule jobs to be
dispatched or stopped on a recurring basis. The method takes as input a job
producer and invokes it at the specified interval. The job producer returns
a list of jobs to be started and stopped.

`Schedule` makes it possible to maintain a pool of jobs for example in a
multi-tenant system. See the [examples](../examples/pool) for more details.

## Data Flows

The following sections provide additional details on the internal data flows
Expand Down
Loading

0 comments on commit 1e9f0ec

Please sign in to comment.