Skip to content

Commit

Permalink
Merge pull request #379 from cybertec-postgresql/378_rest_api
Browse files Browse the repository at this point in the history
[!] add REST API, resolves #378
  • Loading branch information
pashagolub committed Jan 19, 2022
2 parents a6eb7a2 + 048297b commit fe2f499
Show file tree
Hide file tree
Showing 12 changed files with 154 additions and 11 deletions.
6 changes: 5 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ COPY --from=builder /build/pg_timetable /
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/

# Command to run the executable
ENTRYPOINT ["/pg_timetable"]
ENTRYPOINT ["/pg_timetable"]

# Expose REST API if needed
ENV PGTT_RESTPORT 8008
EXPOSE 8008
6 changes: 5 additions & 1 deletion config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,8 @@ resource:
# chain-timeout: Abort any chain that takes more than the specified number of milliseconds
chain-timeout: 0
# task-timeout: Abort any task within a chain that takes more than the specified number of milliseconds
task-timeout: 0
task-timeout: 0

# - REST API Settings -
# rest-port: REST API port (default: 0)
rest-port: 8008
2 changes: 2 additions & 0 deletions docs/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ Command line options
--task-timeout= Abort any task within a chain that takes more than the specified number
of milliseconds
REST:
--rest-port: REST API port (default: 0) [%PGTT_RESTPORT%]
Contributing
Expand Down
19 changes: 19 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
REST API
================================================

**pg_timetable** has a rich REST API, which can be used by external tools in order to perform start/stop/reinitialize/restarts/reloads,
by any kind of tools to perform HTTP health checks, and of course, could also be used for monitoring.

Below you will find the list of **pg_timetable** REST API endpoints.

Health check endpoints
------------------------------------------------

Currently, there are two health check endpoints available:

``GET /liveness``
Always returns HTTP status code ``200`` what only indicates that **pg_timetable** is running.

``GET /readiness``
Returns HTTP status code ``200`` when the **pg_timetable** is running and the scheduler is in the main loop processing chains.
If the scheduler connects to the database, creates the database schema, or upgrades it, it will return HTTP status code ``503``.
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Welcome to pg_timetable's documentation!
components
basic_jobs
samples
api
database_schema

Indices and tables
Expand Down
52 changes: 52 additions & 0 deletions internal/api/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package api

import (
"fmt"
"net/http"
"time"

"github.com/cybertec-postgresql/pg_timetable/internal/config"
"github.com/cybertec-postgresql/pg_timetable/internal/log"
)

// StatusReporter is a common interface describing the current status of a connection
type StatusReporter interface {
IsReady() bool
}

type RestApiServer struct {
Reporter StatusReporter
l log.LoggerIface
http.Server
}

func Init(opts config.RestApiOpts, logger log.LoggerIface) *RestApiServer {
s := &RestApiServer{
nil,
logger,
http.Server{
Addr: fmt.Sprintf(":%d", opts.Port),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
},
}
http.HandleFunc("/liveness", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) // i'm serving hence I'm alive
})
http.HandleFunc("/readiness", s.readinessHandler)
if opts.Port != 0 {
logger.WithField("port", opts.Port).Info("Starting REST API server...")
go func() { logger.Error(s.ListenAndServe()) }()
}
return s
}

func (Server *RestApiServer) readinessHandler(w http.ResponseWriter, r *http.Request) {
Server.l.Debug("Received /readiness REST API request")
if Server.Reporter == nil || !Server.Reporter.IsReady() {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
}
34 changes: 34 additions & 0 deletions internal/api/status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package api_test

import (
"net/http"
"testing"

"github.com/cybertec-postgresql/pg_timetable/internal/api"
"github.com/cybertec-postgresql/pg_timetable/internal/config"
"github.com/cybertec-postgresql/pg_timetable/internal/log"
"github.com/stretchr/testify/assert"
)

type reporter struct {
}

func (r *reporter) IsReady() bool {
return true
}

func TestStatus(t *testing.T) {
restsrv := api.Init(config.RestApiOpts{Port: 8080}, log.Init(config.LoggingOpts{LogLevel: "error"}))
r, err := http.Get("http://localhost:8080/liveness")
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, r.StatusCode)

r, err = http.Get("http://localhost:8080/readiness")
assert.NoError(t, err)
assert.Equal(t, http.StatusServiceUnavailable, r.StatusCode)

restsrv.Reporter = &reporter{}
r, err = http.Get("http://localhost:8080/readiness")
assert.NoError(t, err)
assert.Equal(t, http.StatusOK, r.StatusCode)
}
6 changes: 6 additions & 0 deletions internal/config/cmdparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ type ResourceOpts struct {
TaskTimeout int `long:"task-timeout" mapstructure:"task-timeout" description:"Abort any task within a chain that takes more than the specified number of milliseconds"`
}

// RestApiOpts fot internal web server impleenting REST API
type RestApiOpts struct {
Port int `long:"rest-port" mapstructure:"rest-port" description:"REST API port" env:"PGTT_RESTPORT" default:"0"`
}

// CmdOptions holds command line options passed
type CmdOptions struct {
ClientName string `short:"c" long:"clientname" description:"Unique name for application instance" env:"PGTT_CLIENTNAME"`
Expand All @@ -51,6 +56,7 @@ type CmdOptions struct {
Logging LoggingOpts `group:"Logging" mapstructure:"Logging"`
Start StartOpts `group:"Start" mapstructure:"Start"`
Resource ResourceOpts `group:"Resource" mapstructure:"Resource"`
RestApi RestApiOpts `group:"REST" mapstructure:"REST"`
NoProgramTasks bool `long:"no-program-tasks" mapstructure:"no-program-tasks" description:"Disable executing of PROGRAM tasks" env:"PGTT_NOPROGRAMTASKS"`
NoHelpMessage bool `long:"no-help" mapstructure:"no-help" hidden:"system use"`
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pgengine/pgengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestSamplesScripts(t *testing.T) {
assert.NoError(t, pge.ExecuteCustomScripts(ctx, "../../samples/"+f.Name()),
"Sample query failed: ", f.Name())
// either context should be cancelled or 'shutdown.sql' will terminate the session
assert.True(t, scheduler.New(pge, l).Run(ctx) > scheduler.ConnectionDropppedStatus)
assert.True(t, scheduler.New(pge, l).Run(ctx) > scheduler.RunningStatus)
_, err = pge.ConfigDb.Exec(context.Background(),
"TRUNCATE timetable.task, timetable.chain CASCADE")
assert.NoError(t, err)
Expand Down
26 changes: 19 additions & 7 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ const minChannelCapacity = 1024
type RunStatus int

const (
// ConnectionDropppedStatus specifies the connection has been dropped
ConnectionDropppedStatus RunStatus = iota
// RunningStatus specifies the scheduler is in the main loop processing chains
RunningStatus RunStatus = iota
// ContextCancelledStatus specifies the context has been cancelled probably due to timeout
ContextCancelledStatus
// Shutdown specifies proper termination of the session
Expand All @@ -46,9 +46,11 @@ type Scheduler struct {
intervalChainsChan chan IntervalChain
intervalChainMutex sync.Mutex
shutdown chan struct{} // closed when shutdown is called
status RunStatus
}

func max(x, y int) int {
// Max returns the maximum number of two arguments
func Max(x, y int) int {
if x < y {
return y
}
Expand All @@ -60,11 +62,12 @@ func New(pge *pgengine.PgEngine, logger log.LoggerIface) *Scheduler {
return &Scheduler{
l: logger,
pgengine: pge,
chainsChan: make(chan Chain, max(minChannelCapacity, pge.Resource.CronWorkers*2)),
intervalChainsChan: make(chan IntervalChain, max(minChannelCapacity, pge.Resource.IntervalWorkers*2)),
chainsChan: make(chan Chain, Max(minChannelCapacity, pge.Resource.CronWorkers*2)),
intervalChainsChan: make(chan IntervalChain, Max(minChannelCapacity, pge.Resource.IntervalWorkers*2)),
activeChains: make(map[int]func()), //holds cancel() functions to stop chains
intervalChains: make(map[int]IntervalChain),
shutdown: make(chan struct{}),
status: RunningStatus,
}
}

Expand All @@ -78,6 +81,11 @@ func (sch *Scheduler) Config() config.CmdOptions {
return sch.pgengine.CmdOptions
}

// IsReady returns True if the scheduler is in the main loop processing chains
func (sch *Scheduler) IsReady() bool {
return sch.status == RunningStatus
}

// Run executes jobs. Returns RunStatus why it terminated.
// There are only two possibilities: dropped connection and cancelled context.
func (sch *Scheduler) Run(ctx context.Context) RunStatus {
Expand Down Expand Up @@ -122,10 +130,14 @@ func (sch *Scheduler) Run(ctx context.Context) RunStatus {
case <-time.After(refetchTimeout * time.Second):
// pass
case <-ctx.Done():
return ContextCancelledStatus
sch.status = ContextCancelledStatus
case <-sch.shutdown:
sch.status = ShutdownStatus
sch.terminateChains()
return ShutdownStatus
}

if sch.status != RunningStatus {
return sch.status
}
}
}
3 changes: 3 additions & 0 deletions internal/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,8 @@ func TestRun(t *testing.T) {
time.Sleep(10 * time.Second)
sch.Shutdown()
}()
assert.Equal(t, 1, Max(0, 1))
assert.Equal(t, 1, Max(1, 0))
assert.True(t, sch.IsReady())
assert.Equal(t, ShutdownStatus, sch.Run(context.Background()))
}
8 changes: 7 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/signal"
"syscall"

"github.com/cybertec-postgresql/pg_timetable/internal/api"
"github.com/cybertec-postgresql/pg_timetable/internal/config"
"github.com/cybertec-postgresql/pg_timetable/internal/log"
"github.com/cybertec-postgresql/pg_timetable/internal/pgengine"
Expand Down Expand Up @@ -62,6 +63,9 @@ func main() {
return
}
defer pge.Finalize()

apiserver := api.Init(cmdOpts.RestApi, logger)

if cmdOpts.Start.Upgrade {
if err := pge.MigrateDb(ctx); err != nil {
logger.WithError(err).Error("Upgrade failed")
Expand All @@ -84,7 +88,9 @@ func main() {
return
}
sch := scheduler.New(pge, logger)
for sch.Run(ctx) == scheduler.ConnectionDropppedStatus {
apiserver.Reporter = sch

for sch.Run(ctx) == scheduler.RunningStatus {
pge.ReconnectAndFixLeftovers(ctx)
}
}

0 comments on commit fe2f499

Please sign in to comment.