Skip to content

Commit

Permalink
Add overall package documentation
Browse files Browse the repository at this point in the history
Adds examples for the server and dequeuer at the top level. This made it clear
that the JobProcessor should have a function that provides it with a reasonable
default for the timeout, instead of 0. Adds NewJobProcessor to do that, and
updates some tests and other code to use it.

Fixes more package-level documentation.
  • Loading branch information
Kevin Burke committed May 2, 2016
1 parent 0291244 commit db4b4a3
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 24 deletions.
4 changes: 1 addition & 3 deletions downstream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ func init() {
Logger = log.New(os.Stderr, "", log.LstdFlags)
}

var httpClient = &http.Client{Timeout: defaultHTTPTimeout}

// The DownstreamClient is an API client for a downstream service that can
// handle POST requests to /v1/jobs/:job-name/:job-id. The service is expected
// to return a 202 and then make a callback to the job scheduler when the job
Expand All @@ -45,7 +43,7 @@ func NewClient(id, token, base string) *Client {
c := &Client{&rest.Client{
Id: id,
Token: token,
Client: httpClient,
Client: &http.Client{Timeout: defaultHTTPTimeout},
Base: base,
}, nil}
c.Job = &JobService{Client: c}
Expand Down
81 changes: 81 additions & 0 deletions example_dequeuer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Run the rickover dequeuer.
//
// All of the project defaults are used. There is one authenticated user for
// basic auth, the user is "test" and the password is "hymanrickover". You will
// want to copy this binary and add your own authentication scheme.

package rickover

import (
"fmt"
"log"
"os"
"os/signal"
"sync"
"syscall"
"time"

"github.com/Shyp/go-simple-metrics"
"github.com/Shyp/rickover/config"
"github.com/Shyp/rickover/dequeuer"
"github.com/Shyp/rickover/models/db"
"github.com/Shyp/rickover/services"
"github.com/Shyp/rickover/setup"
)

func checkError(err error) {
if err != nil {
log.Fatal(err)
}
}

func Example_dequeuer() {
dbConns, err := config.GetInt("PG_WORKER_POOL_SIZE")
if err != nil {
log.Printf("Error getting database pool size: %s. Defaulting to 20", err)
dbConns = 20
}

err = setup.DB(db.DefaultConnection, dbConns)
checkError(err)

go setup.MeasureActiveQueries(1 * time.Second)
go setup.MeasureQueueDepth(5 * time.Second)
go setup.MeasureInProgressJobs(1 * time.Second)

// Every minute, check for in-progress jobs that haven't been updated for
// 7 minutes, and mark them as failed.
go services.WatchStuckJobs(1*time.Minute, 7*time.Minute)

metrics.Namespace = "rickover.dequeuer"
metrics.Start("worker")

downstreamUrl := config.GetURLOrBail("DOWNSTREAM_URL")
downstreamPassword := os.Getenv("DOWNSTREAM_WORKER_AUTH")
jp := services.NewJobProcessor(downstreamUrl.String(), downstreamPassword)

// CreatePools will read all job types out of the jobs table, then start
// all dequeuers for those jobs.
pools, err := dequeuer.CreatePools(jp)
checkError(err)

sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGTERM)
sig := <-sigterm
fmt.Printf("Caught signal %v, shutting down...\n", sig)
var wg sync.WaitGroup
for _, p := range pools {
if p != nil {
wg.Add(1)
go func(p *dequeuer.Pool) {
err = p.Shutdown()
if err != nil {
log.Printf("Error shutting down pool: %s\n", err.Error())
}
wg.Done()
}(p)
}
}
wg.Wait()
fmt.Println("All pools shut down. Quitting.")
}
44 changes: 44 additions & 0 deletions example_server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Run the rickover server.
//
// All of the project defaults are used. There is one authenticated user for
// basic auth, the user is "test" and the password is "hymanrickover". You will
// want to copy this binary and add your own authentication scheme.
package rickover

import (
"log"
"net/http"
"os"
"time"

"github.com/Shyp/go-simple-metrics"
"github.com/Shyp/rickover/config"
"github.com/Shyp/rickover/models/db"
"github.com/Shyp/rickover/server"
"github.com/Shyp/rickover/setup"
"github.com/gorilla/handlers"
)

func Example_server() {
dbConns, err := config.GetInt("PG_SERVER_POOL_SIZE")
if err != nil {
log.Printf("Error getting database pool size: %s. Defaulting to 10", err)
dbConns = 10
}

if err = setup.DB(db.DefaultConnection, dbConns); err != nil {
log.Fatal(err)
}

metrics.Namespace = "rickover.server"
metrics.Start("web")

go setup.MeasureActiveQueries(5 * time.Second)

// Change this user to a private value
server.AddUser("test", "hymanrickover")
s := server.Get(server.DefaultAuthorizer)

log.Println("Listening on port 9090\n")
log.Fatal(http.ListenAndServe(":9090", handlers.LoggingHandler(os.Stdout, s)))
}
3 changes: 3 additions & 0 deletions rickover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Package rickover contains logic for a scheduler and job queue backed by
// Postgres.
package rickover
1 change: 1 addition & 0 deletions server/error_responses.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Helpers for building various types of error responses.

package server

import (
Expand Down
2 changes: 0 additions & 2 deletions server/regex_handler.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// A simple http.Handler that can match wildcard routes, and call the
// appropriate handler.
package server

import (
Expand Down
2 changes: 1 addition & 1 deletion server/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// API interface for enqueueing/updating jobs
// Package server provides an HTTP interface for the job queue/scheduler.
package server

import (
Expand Down
21 changes: 19 additions & 2 deletions services/process_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,30 @@ var DefaultTimeout = 5 * time.Minute

// JobProcessor implements the Worker interface.
type JobProcessor struct {
// A Client for making requests to the downstream server.
Client *downstream.Client

// Amount of time we should wait for a response before marking the job as
// failed.
// Amount of time we should wait for the downstream server to hit the
// callback before marking the job as failed.
Timeout time.Duration
}

// NewJobProcessor creates a services.JobProcessor that makes requests to the
// downstream url.
//
// By default the Client uses Basic Auth with "jobs" as the username, and the
// configured password as the password.
//
// If the downstream server does not hit the callback, jobs sent to the
// downstream server are timed out and marked as failed after DefaultTimeout
// has elapsed.
func NewJobProcessor(downstreamUrl string, downstreamPassword string) *JobProcessor {
return &JobProcessor{
Client: downstream.NewClient("jobs", downstreamPassword, downstreamUrl),
Timeout: DefaultTimeout,
}
}

// isTimeout returns true if the err was caused by a request timeout.
func isTimeout(err error) bool {
// This is difficult in Go 1.5: http://stackoverflow.com/a/23497404/329700
Expand Down
26 changes: 10 additions & 16 deletions test/services/process_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/Shyp/rickover/Godeps/_workspace/src/github.com/Shyp/go-types"
"github.com/Shyp/rickover/Godeps/_workspace/src/github.com/nu7hatch/gouuid"
"github.com/Shyp/rickover/downstream"
"github.com/Shyp/rickover/models"
"github.com/Shyp/rickover/models/archived_jobs"
"github.com/Shyp/rickover/models/jobs"
Expand All @@ -34,15 +33,13 @@ func TestExpiredJobNotEnqueued(t *testing.T) {
c1 <- true
}))
defer s.Close()
jp := services.JobProcessor{
Client: downstream.NewClient("jobs", "password", s.URL),
}
jp := services.NewJobProcessor("password", s.URL)

_, err := jobs.Create(factory.SampleJob)
test.AssertNotError(t, err, "")
expiresAt := types.NullTime{
Valid: true,
Time: time.Now().Add(-5 * time.Millisecond),
Time: time.Now().UTC().Add(-5 * time.Millisecond),
}
qj, err := queued_jobs.Enqueue(factory.JobId, "echo", time.Now().UTC(), expiresAt, factory.EmptyData)
test.AssertNotError(t, err, "")
Expand Down Expand Up @@ -122,9 +119,7 @@ func TestWorkerRetriesJSON503(t *testing.T) {
func TestWorkerWaitsConnectTimeout(t *testing.T) {
db.SetUp(t)
defer db.TearDown(t)
jp := services.JobProcessor{
Client: downstream.NewClient("jobs", "password", "http://10.255.255.1"),
}
jp := services.NewJobProcessor("http://10.255.255.1", "password")

// Okay this is not the world's best design.
// Job processor client -> worker client -> generic rest client
Expand Down Expand Up @@ -154,9 +149,7 @@ func TestWorkerWaitsRequestTimeout(t *testing.T) {
}))
defer s.Close()

jp := services.JobProcessor{
Client: downstream.NewClient("jobs", "password", s.URL),
}
jp := services.NewJobProcessor(s.URL, "password")

// Okay this is not the world's best design.
// Job processor client -> worker client -> generic rest client
Expand All @@ -168,9 +161,9 @@ func TestWorkerWaitsRequestTimeout(t *testing.T) {
test.AssertNotError(t, err, "")
}()

err := jp.DoWork(qj)
workErr := jp.DoWork(qj)
test.AssertNotError(t, workErr, "")
wg.Wait()
test.AssertNotError(t, err, "")
aj, err := archived_jobs.Get(qj.Id)
test.AssertNotError(t, err, "")
test.AssertEquals(t, aj.Status, models.StatusSucceeded)
Expand All @@ -179,10 +172,11 @@ func TestWorkerWaitsRequestTimeout(t *testing.T) {
func TestWorkerDoesNotWaitConnectionFailure(t *testing.T) {
db.SetUp(t)
defer db.TearDown(t)
jp := services.JobProcessor{
jp := services.NewJobProcessor(
"password",
// TODO empty port finder
Client: downstream.NewClient("jobs", "password", "http://127.0.0.1:29656"),
}
"http://127.0.0.1:29656",
)

// Okay this is not the world's best design.
// Job processor client -> worker client -> generic rest client
Expand Down

0 comments on commit db4b4a3

Please sign in to comment.