Skip to content

Commit

Permalink
add listing enpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Oct 5, 2020
1 parent 9645d64 commit e59a3d0
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 5 deletions.
27 changes: 27 additions & 0 deletions internal/jobmanager/cursor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package jobmanager

import (
"encoding/base64"
"encoding/json"
)

type Cursor struct {
Path string
SortBy string
Reverse bool
LastValue string
Limit int64
}

func (c Cursor) Encode() string {
b, _ := json.Marshal(&c)
return base64.StdEncoding.EncodeToString(b)
}

func (c *Cursor) Decode(s string) error {
b, err := base64.StdEncoding.DecodeString(s)
if err != nil {
return err
}
return json.Unmarshal(b, c)
}
87 changes: 87 additions & 0 deletions internal/jobmanager/jobmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package jobmanager
import (
"context"
"errors"
"strconv"
"time"

"github.com/cenkalti/dalga/v4/internal/log"
Expand All @@ -11,6 +12,11 @@ import (
"github.com/senseyeio/duration"
)

const (
orderInsert = "insert-order"
orderNextSched = "next-sched"
)

type JobManager struct {
table *table.Table
scheduler *scheduler.Scheduler
Expand All @@ -33,6 +39,87 @@ func New(t *table.Table, s *scheduler.Scheduler) *JobManager {
}
}

func (m *JobManager) List(ctx context.Context, path, sortBy string, reverse bool, limit int64) (jobs []table.Job, cursor string, err error) {
var orderByColumn string
switch sortBy {
case orderInsert:
orderByColumn = "id"
case orderNextSched:
orderByColumn = "next_sched, id"
default:
return nil, "", errors.New("invalid sort-by param")
}
jobsWithID, err := m.table.List(ctx, path, orderByColumn, reverse, "", "", limit)
if err != nil {
return nil, "", err
}
jobs = make([]table.Job, 0, len(jobsWithID))
for _, j := range jobsWithID {
jobs = append(jobs, j.Job)
}
if len(jobsWithID) > 0 {
lastItem := jobsWithID[len(jobsWithID)-1]
cursor = generateCursor(lastItem, path, sortBy, reverse, limit)
}
return
}

func (m *JobManager) ListContinue(ctx context.Context, cursor string) (jobs []table.Job, nextCursor string, err error) {
var c Cursor
err = c.Decode(cursor)
if err != nil {
return nil, "", errors.New("invalid cursor")
}
var orderByColumn, greaterThan, lessThan string
switch c.SortBy {
case orderInsert:
orderByColumn = "id"
if c.Reverse {
lessThan = c.LastValue
} else {
greaterThan = c.LastValue
}
case orderNextSched:
orderByColumn = "next_sched, id"
if c.Reverse {
lessThan = "'" + c.LastValue + "'"
} else {
greaterThan = "'" + c.LastValue + "'"
}
default:
return nil, "", errors.New("invalid sort-by param")
}
jobsWithID, err := m.table.List(ctx, c.Path, orderByColumn, c.Reverse, greaterThan, lessThan, c.Limit)
if err != nil {
return nil, "", err
}
jobs = make([]table.Job, 0, len(jobsWithID))
for _, j := range jobsWithID {
jobs = append(jobs, j.Job)
}
if len(jobsWithID) > 0 {
lastItem := jobsWithID[len(jobsWithID)-1]
nextCursor = generateCursor(lastItem, c.Path, c.SortBy, c.Reverse, c.Limit)
}
return
}

func generateCursor(job table.JobWithID, path, sortBy string, reverse bool, limit int64) string {
c := Cursor{
Path: path,
SortBy: sortBy,
Reverse: reverse,
Limit: limit,
}
switch sortBy {
case orderInsert:
c.LastValue = strconv.FormatInt(job.ID, 10)
case orderNextSched:
c.LastValue = job.NextSched.Format(time.RFC3339)
}
return c.Encode()
}

func (m *JobManager) Get(ctx context.Context, path, body string) (*table.Job, error) {
return m.table.Get(ctx, path, body)
}
Expand Down
57 changes: 57 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ func (s *Server) createServer() http.Server {
m.Put(path, handler(s.handleSchedule))
m.Patch(path, handler(s.handlePatch))
m.Del(path, handler(s.handleCancel))
m.Get("/jobs", http.HandlerFunc(s.handleJobs))
m.Get("/jobs/continue", http.HandlerFunc(s.handleJobsContinue))
m.Get("/status", http.HandlerFunc(s.handleStatus))
return http.Server{
Handler: m,
Expand Down Expand Up @@ -240,6 +242,61 @@ func (s *Server) handleCancel(w http.ResponseWriter, r *http.Request, path, body
w.WriteHeader(http.StatusNoContent)
}

func (s *Server) handleJobs(w http.ResponseWriter, r *http.Request) {
path := r.FormValue("path")
sortBy := r.FormValue("sort-by")
if sortBy == "" {
sortBy = "insert-order"
}
reverse, _ := strconv.ParseBool(r.FormValue("reverse"))
limit := int64(100)
limitString := r.FormValue("limit")
if limitString != "" {
var err error
limit, err = strconv.ParseInt(limitString, 10, 64)
if err != nil {
http.Error(w, "invalid limit", http.StatusBadRequest)
return
}
}
jobs, cursor, err := s.jobs.List(r.Context(), path, sortBy, reverse, limit)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
result := map[string]interface{}{
"jobs": jobs,
"cursor": cursor,
}
data, err := json.Marshal(result)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write(data)
}

func (s *Server) handleJobsContinue(w http.ResponseWriter, r *http.Request) {
cursor := r.FormValue("cursor")
jobs, cursor, err := s.jobs.ListContinue(r.Context(), cursor)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
result := map[string]interface{}{
"jobs": jobs,
"cursor": cursor,
}
data, err := json.Marshal(result)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_, _ = w.Write(data)
}

func (s *Server) handleStatus(w http.ResponseWriter, r *http.Request) {
total, err := s.table.Count(r.Context())
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions internal/table/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type Job struct {
InstanceID *uint32
}

type JobWithID struct {
Job
ID int64
}

type Key struct {
// Path is where the job is going to be POSTed when it's time came.
Path string
Expand Down
62 changes: 57 additions & 5 deletions internal/table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"math/rand"
"strconv"
"strings"
"time"

"github.com/cenkalti/dalga/v4/internal/clock"
Expand Down Expand Up @@ -42,6 +43,7 @@ func New(db *sql.DB, name string) *Table {
func (t *Table) Create(ctx context.Context) error {
const createTableSQL = "" +
"CREATE TABLE `%s` (" +
" `id` BIGINT NOT NULL AUTO_INCREMENT," +
" `path` VARCHAR(255) NOT NULL," +
" `body` VARCHAR(255) NOT NULL," +
" `interval` VARCHAR(255) NOT NULL," +
Expand All @@ -51,6 +53,7 @@ func (t *Table) Create(ctx context.Context) error {
" `instance_id` INT UNSIGNED," +
" PRIMARY KEY (`path`, `body`)," +
" KEY (`next_run`)," +
" UNIQUE KEY (`id`)," +
" FOREIGN KEY (`instance_id`) REFERENCES `%s_instances` (`id`) ON DELETE SET NULL" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"
const createInstancesTableSQL = "" +
Expand Down Expand Up @@ -85,9 +88,49 @@ func (t *Table) Drop(ctx context.Context) error {
return nil
}

func (t *Table) List(ctx context.Context, path, orderByColumn string, reverse bool, greaterThan, lessThan string, limit int64) (jobs []JobWithID, err error) {
var conditions []string
if path != "" {
conditions = append(conditions, "path='"+path+"'")
}
orderByDirection := "ASC"
if reverse {
orderByDirection = "DESC"
}
if greaterThan != "" {
conditions = append(conditions, orderByColumn+" > "+greaterThan)
}
if lessThan != "" {
conditions = append(conditions, orderByColumn+" < "+lessThan)
}
s := "SELECT id, path, body, `interval`, location, next_run, next_sched, instance_id FROM " + t.name
if len(conditions) > 0 {
s += " WHERE " + strings.Join(conditions, " AND ")
}
if orderByColumn != "" {
s += " ORDER BY " + orderByColumn + " " + orderByDirection
}
if limit != 0 {
s += " LIMIT " + strconv.FormatInt(limit, 10)
}
rows, err := t.db.QueryContext(ctx, s)
if err != nil {
return nil, err
}
defer rows.Close()
for rows.Next() {
ji, _, err := t.scanJobWithID(rows, false)
if err != nil {
return nil, err
}
jobs = append(jobs, ji)
}
return jobs, rows.Err()
}

// Get returns a job from the scheduler table, whether or not it is disabled.
func (t *Table) Get(ctx context.Context, path, body string) (*Job, error) {
s := "SELECT path, body, `interval`, location, next_run, next_sched, instance_id " +
s := "SELECT id, path, body, `interval`, location, next_run, next_sched, instance_id " +
"FROM " + t.name + " " +
"WHERE path = ? AND body = ?"
row := t.db.QueryRowContext(ctx, s, path, body)
Expand All @@ -105,21 +148,30 @@ func (t *Table) getForUpdate(ctx context.Context, path, body string) (tx *sql.Tx
tx.Rollback() // nolint: errcheck
}
}()
s := "SELECT path, body, `interval`, location, next_run, next_sched, instance_id, IFNULL(CAST(? as DATETIME), UTC_TIMESTAMP()) " +
s := "SELECT id, path, body, `interval`, location, next_run, next_sched, instance_id, IFNULL(CAST(? as DATETIME), UTC_TIMESTAMP()) " +
"FROM " + t.name + " " +
"WHERE path = ? AND body = ? FOR UPDATE"
row := tx.QueryRowContext(ctx, s, t.Clk.NowUTC(), path, body)
j, now, err = t.scanJob(row, true)
return
}

func (t *Table) scanJob(row *sql.Row, withCurrentTime bool) (j Job, now time.Time, err error) {
type Scanner interface {
Scan(dest ...interface{}) error
}

func (t *Table) scanJob(row Scanner, withCurrentTime bool) (j Job, now time.Time, err error) {
ji, now, err := t.scanJobWithID(row, withCurrentTime)
return ji.Job, now, err
}

func (t *Table) scanJobWithID(row Scanner, withCurrentTime bool) (j JobWithID, now time.Time, err error) {
var interval, locationName string
var instanceID sql.NullInt64
if withCurrentTime {
err = row.Scan(&j.Path, &j.Body, &interval, &locationName, &j.NextRun, &j.NextSched, &instanceID, &now)
err = row.Scan(&j.ID, &j.Path, &j.Body, &interval, &locationName, &j.NextRun, &j.NextSched, &instanceID, &now)
} else {
err = row.Scan(&j.Path, &j.Body, &interval, &locationName, &j.NextRun, &j.NextSched, &instanceID)
err = row.Scan(&j.ID, &j.Path, &j.Body, &interval, &locationName, &j.NextRun, &j.NextSched, &instanceID)
}
if err == sql.ErrNoRows {
err = ErrNotExist
Expand Down

0 comments on commit e59a3d0

Please sign in to comment.