Skip to content
This repository has been archived by the owner on Feb 24, 2024. It is now read-only.

Commit

Permalink
Merge pull request #420 from gobuffalo/workers
Browse files Browse the repository at this point in the history
Add support for background jobs #95
  • Loading branch information
markbates committed May 5, 2017
2 parents fa49146 + 3c679c2 commit ec18b42
Show file tree
Hide file tree
Showing 10 changed files with 294 additions and 11 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Expand Up @@ -7,6 +7,7 @@ RUN go get -u github.com/markbates/filetest

ENV BP=$GOPATH/src/github.com/gobuffalo/buffalo

RUN rm -rf $BP
RUN mkdir -p $BP
WORKDIR $BP
ADD . .
Expand Down
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -18,7 +18,7 @@ Please visit [http://gobuffalo.io](http://gobuffalo.io) for the latest documenta
$ go get -u github.com/gobuffalo/buffalo/buffalo
```

_NOTE_: Buffalo has a minimum Go dependency of `1.7.x`.
_NOTE_: Buffalo has a minimum Go dependency of `1.8.1`.

Buffalo also depends on:
- gcc for [go-sqlite3](https://github.com/mattn/go-sqlite3) wich is a cgo package.
Expand Down
63 changes: 63 additions & 0 deletions app.go
@@ -1,7 +1,10 @@
package buffalo

import (
"fmt"
"net/http"
"os"
"os/signal"
"sync"

gcontext "github.com/gorilla/context"
Expand All @@ -24,6 +27,66 @@ type App struct {
root *App
}

// Start the application at the specified address/port and listen for OS
// interrupt and kill signals and will attempt to stop the application
// gracefully. This will also start the Worker process, unless WorkerOff is enabled.
func (a *App) Start(addr string) error {
fmt.Printf("Starting application at %s\n", addr)
server := http.Server{
Addr: fmt.Sprintf(":%s", addr),
Handler: a,
}

go func() {
<-a.Context.Done()
fmt.Println("Shutting down application")
a.cancel()
err := server.Shutdown(a.Context)
if err != nil {
a.Logger.Error(errors.WithStack(err))
}
if !a.WorkerOff {
err = a.Worker.Stop()
if err != nil {
a.Logger.Error(errors.WithStack(err))
}
}
}()

if !a.WorkerOff {
go func() {
err := a.Worker.Start(a.Context)
if err != nil {
a.Logger.Error(errors.WithStack(err))
a.cancel()
}
}()
}

go func() {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
<-signalChan
a.cancel()
}()

err := server.ListenAndServe()
if err != nil {
a.cancel()

err = errors.WithStack(err)
a.Logger.Error(err)
return errors.WithStack(err)
}
return nil
}

// Stop the application and attempt to gracefully shutdown
func (a *App) Stop() error {
a.cancel()
return nil
}

func (a *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
defer gcontext.Clear(r)
ws := &buffaloResponse{
Expand Down
4 changes: 2 additions & 2 deletions errors.go
Expand Up @@ -55,9 +55,9 @@ func (a *App) PanicHandler(next Handler) Handler {
if r != nil { //catch
switch t := r.(type) {
case error:
err = t
err = errors.WithStack(t)
case string:
err = errors.New(t)
err = errors.WithStack(errors.New(t))
default:
err = errors.New(fmt.Sprint(t))
}
Expand Down
10 changes: 4 additions & 6 deletions generators/newapp/templates/main.go.tmpl
@@ -1,16 +1,14 @@
package main

import (
"fmt"
"log"
"net/http"

"{{ .actionsPath }}"
"github.com/gobuffalo/envy"
"{{ .actionsPath }}"
"github.com/gobuffalo/envy"
)

func main() {
port := envy.Get("PORT", "3000")
log.Printf("Starting {{.name}} on port %s\n", port)
log.Fatal(http.ListenAndServe(fmt.Sprintf(":%s", port), actions.App()))
app := actions.App()
log.Fatal(app.Start(port))
}
22 changes: 20 additions & 2 deletions options.go
@@ -1,10 +1,12 @@
package buffalo

import (
"context"
"fmt"
"log"
"net/http"

"github.com/gobuffalo/buffalo/worker"
"github.com/gobuffalo/envy"
"github.com/gorilla/sessions"
"github.com/markbates/going/defaults"
Expand All @@ -29,8 +31,16 @@ type Options struct {
// to "_buffalo_session".
SessionName string
// Host that this application will be available at. Default is "http://127.0.0.1:[$PORT|3000]".
Host string
prefix string
Host string
// Worker implements the Worker interface and can process tasks in the background.
// Default is "github.com/gobuffalo/worker.Simple.
Worker worker.Worker
// WorkerOff tells App.Start() whether to start the Worker process or not. Default is "false".
WorkerOff bool

Context context.Context
cancel context.CancelFunc
prefix string
}

// NewOptions returns a new Options instance with sensible defaults
Expand All @@ -42,6 +52,11 @@ func optionsWithDefaults(opts Options) Options {
opts.Env = defaults.String(opts.Env, envy.Get("GO_ENV", "development"))
opts.LogLevel = defaults.String(opts.LogLevel, "debug")

if opts.Context == nil {
opts.Context = context.Background()
}
opts.Context, opts.cancel = context.WithCancel(opts.Context)

if opts.Logger == nil {
opts.Logger = NewLogger(opts.LogLevel)
}
Expand All @@ -54,6 +69,9 @@ func optionsWithDefaults(opts Options) Options {
}
opts.SessionStore = sessions.NewCookieStore([]byte(secret))
}
if opts.Worker == nil {
opts.Worker = worker.NewSimpleWithContext(opts.Context)
}
opts.SessionName = defaults.String(opts.SessionName, "_buffalo_session")
opts.Host = defaults.String(opts.Host, envy.Get("HOST", fmt.Sprintf("http://127.0.0.1:%s", envy.Get("PORT", "3000"))))
return opts
Expand Down
14 changes: 14 additions & 0 deletions worker/job.go
@@ -0,0 +1,14 @@
package worker

// Args are the arguments passed into a job
type Args map[string]interface{}

// Job to be processed by a Worker
type Job struct {
// Queue the job should be placed into
Queue string
// Args that will be passed to the Handler when run
Args Args
// Handler that will be run by the worker
Handler string
}
88 changes: 88 additions & 0 deletions worker/simple.go
@@ -0,0 +1,88 @@
package worker

import (
"context"
"sync"
"time"

"github.com/pkg/errors"
)

var _ Worker = &simple{}

// NewSimple creates a basic implementation of the Worker interface
// that is backed using just the standard library and goroutines.
func NewSimple() Worker {
return NewSimpleWithContext(context.Background())
}

// NewSimpleWithContext creates a basic implementation of the Worker interface
// that is backed using just the standard library and goroutines.
func NewSimpleWithContext(ctx context.Context) Worker {
ctx, cancel := context.WithCancel(ctx)
return &simple{
ctx: ctx,
cancel: cancel,
handlers: map[string]Handler{},
moot: &sync.Mutex{},
}
}

// simple is a basic implementation of the Worker interface
// that is backed using just the standard library and goroutines.
type simple struct {
ctx context.Context
cancel context.CancelFunc
handlers map[string]Handler
moot *sync.Mutex
}

func (w *simple) Register(name string, h Handler) error {
w.moot.Lock()
defer w.moot.Unlock()
if _, ok := w.handlers[name]; ok {
return errors.Errorf("handler already mapped for name %s", name)
}
w.handlers[name] = h
return nil
}

func (w *simple) Start(ctx context.Context) error {
w.ctx, w.cancel = context.WithCancel(ctx)
return nil
}

func (w simple) Stop() error {
w.cancel()
return nil
}

// Perform a job as soon as possibly using a goroutine.
func (w simple) Perform(job Job) error {
w.moot.Lock()
defer w.moot.Unlock()
if h, ok := w.handlers[job.Handler]; ok {
go h(job.Args)
return nil
}
return errors.Errorf("no handler mapped for name %s", job.Handler)
}

// PerformAt performs a job at a particular time using a goroutine.
func (w simple) PerformAt(job Job, t time.Time) error {
return w.PerformIn(job, t.Sub(time.Now()))
}

// PerformIn performs a job after waiting for a specified amount
// using a goroutine.
func (w simple) PerformIn(job Job, d time.Duration) error {
go func() {
select {
case <-time.After(d):
w.Perform(job)
case <-w.ctx.Done():
w.cancel()
}
}()
return nil
}
74 changes: 74 additions & 0 deletions worker/simple_test.go
@@ -0,0 +1,74 @@
package worker

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func Test_Simple_Perform(t *testing.T) {
r := require.New(t)

var hit bool
wg := &sync.WaitGroup{}
wg.Add(1)
w := NewSimple()
w.Register("x", func(Args) error {
hit = true
wg.Done()
return nil
})
w.Perform(Job{
Handler: "x",
})
wg.Wait()
r.True(hit)
}

func Test_Simple_PerformAt(t *testing.T) {
r := require.New(t)

var hit bool
wg := &sync.WaitGroup{}
wg.Add(1)
w := NewSimple()
w.Register("x", func(Args) error {
hit = true
wg.Done()
return nil
})
w.PerformAt(Job{
Handler: "x",
}, time.Now().Add(5*time.Millisecond))
wg.Wait()
r.True(hit)
}

func Test_Simple_PerformIn(t *testing.T) {
r := require.New(t)

var hit bool
wg := &sync.WaitGroup{}
wg.Add(1)
w := NewSimple()
w.Register("x", func(Args) error {
hit = true
wg.Done()
return nil
})
w.PerformIn(Job{
Handler: "x",
}, 5*time.Millisecond)
wg.Wait()
r.True(hit)
}

func Test_Simple_NoHandler(t *testing.T) {
r := require.New(t)

w := NewSimple()
err := w.Perform(Job{})
r.Error(err)
}
27 changes: 27 additions & 0 deletions worker/worker.go
@@ -0,0 +1,27 @@
package worker

import (
"context"
"time"
)

// Handler function that will be run by the worker and given
// a slice of arguments
type Handler func(Args) error

// Worker interface that needs to be implemented to be considered
// a "worker"
type Worker interface {
// Start the worker with the given context
Start(context.Context) error
// Stop the worker
Stop() error
// Perform a job as soon as possibly
Perform(Job) error
// PerformAt performs a job at a particular time
PerformAt(Job, time.Time) error
// PerformIn performs a job after waiting for a specified amount of time
PerformIn(Job, time.Duration) error
// Register a Handler
Register(string, Handler) error
}

0 comments on commit ec18b42

Please sign in to comment.