Skip to content

Commit

Permalink
[!] add REST API, resolves #378
Browse files Browse the repository at this point in the history
[+] add `Scheduler.status` property and `RunningStatus` constant
[+] add `Scheduler.IsReady()` method to be used in REST API
[+] add `RestApi` section and `--rest-port` command-line option
  • Loading branch information
pashagolub committed Jan 18, 2022
1 parent 4cdee9c commit 83ac6e9
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 65 deletions.
52 changes: 52 additions & 0 deletions internal/api/status.go
@@ -0,0 +1,52 @@
package api

import (
"fmt"
"net/http"
"time"

"github.com/cybertec-postgresql/pg_timetable/internal/config"
"github.com/cybertec-postgresql/pg_timetable/internal/log"
)

// StatusIface is a common interface describing the cirrent status of a connection
type StatusIface interface {
IsReady() bool
}

type RestApiServer struct {
StatusReporter StatusIface
l log.LoggerIface
*http.Server
}

func Init(opts config.RestApiOpts, logger log.LoggerIface) *RestApiServer {
s := &RestApiServer{
nil,
logger,
&http.Server{
Addr: fmt.Sprintf(":%d", opts.Port),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
},
}
http.HandleFunc("/liveness", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) // i'm serving hence I'm alive
})
http.HandleFunc("/readiness", s.readinessHandler)
if opts.Port != 0 {
logger.WithField("port", opts.Port).Info("Starting REST API server...")
go func() { logger.Error(s.ListenAndServe()) }()
}
return s
}

func (Server *RestApiServer) readinessHandler(w http.ResponseWriter, r *http.Request) {
Server.l.Debug("Received /readiness REST API request")
if Server.StatusReporter == nil || !Server.StatusReporter.IsReady() {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
}
10 changes: 4 additions & 6 deletions internal/config/cmdparser.go
Expand Up @@ -43,11 +43,9 @@ type ResourceOpts struct {
TaskTimeout int `long:"task-timeout" mapstructure:"task-timeout" description:"Abort any task within a chain that takes more than the specified number of milliseconds"`
}

// HTTP Options fot internal web server
type HTTPOpts struct {
//Port int `long:"http-port" description:"HTTP port for service health checking" env:"PGTT_HTTPPORT" default: "0"`
Port int `long:"http-port" mapstructure:"http-port" description:"HTTP port for service health checking" env:"PGTT_HTTPPORT" default:"0"`
//Port int `long:"http-port" description:"HTTP port for service health checking" env:"PGTT_HTTPPORT" default:"8085"`
// RestApiOpts fot internal web server impleenting REST API
type RestApiOpts struct {
Port int `long:"rest-port" mapstructure:"rest-port" description:"HTTP port for REST API" env:"PGTT_RESTPORT" default:"0"`
}

// CmdOptions holds command line options passed
Expand All @@ -58,7 +56,7 @@ type CmdOptions struct {
Logging LoggingOpts `group:"Logging" mapstructure:"Logging"`
Start StartOpts `group:"Start" mapstructure:"Start"`
Resource ResourceOpts `group:"Resource" mapstructure:"Resource"`
HTTP HTTPOpts `group:"HTTP" mapstructure:"HTTP"`
RestApi RestApiOpts `group:"HTTP" mapstructure:"HTTP"`
NoProgramTasks bool `long:"no-program-tasks" mapstructure:"no-program-tasks" description:"Disable executing of PROGRAM tasks" env:"PGTT_NOPROGRAMTASKS"`
NoHelpMessage bool `long:"no-help" mapstructure:"no-help" hidden:"system use"`
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pgengine/pgengine_test.go
Expand Up @@ -237,7 +237,7 @@ func TestSamplesScripts(t *testing.T) {
assert.NoError(t, pge.ExecuteCustomScripts(ctx, "../../samples/"+f.Name()),
"Sample query failed: ", f.Name())
// either context should be cancelled or 'shutdown.sql' will terminate the session
assert.True(t, scheduler.New(pge, l).Run(ctx) > scheduler.ConnectionDropppedStatus)
assert.True(t, scheduler.New(pge, l).Run(ctx) > scheduler.RunningStatus)
_, err = pge.ConfigDb.Exec(context.Background(),
"TRUNCATE timetable.task, timetable.chain CASCADE")
assert.NoError(t, err)
Expand Down
46 changes: 0 additions & 46 deletions internal/pgtt_http/http_status.go

This file was deleted.

19 changes: 15 additions & 4 deletions internal/scheduler/scheduler.go
Expand Up @@ -20,8 +20,8 @@ const minChannelCapacity = 1024
type RunStatus int

const (
// ConnectionDropppedStatus specifies the connection has been dropped
ConnectionDropppedStatus RunStatus = iota
// RunningStatus specifies the connection has been dropped
RunningStatus RunStatus = iota
// ContextCancelledStatus specifies the context has been cancelled probably due to timeout
ContextCancelledStatus
// Shutdown specifies proper termination of the session
Expand All @@ -46,6 +46,7 @@ type Scheduler struct {
intervalChainsChan chan IntervalChain
intervalChainMutex sync.Mutex
shutdown chan struct{} // closed when shutdown is called
status RunStatus
}

func max(x, y int) int {
Expand All @@ -65,6 +66,7 @@ func New(pge *pgengine.PgEngine, logger log.LoggerIface) *Scheduler {
activeChains: make(map[int]func()), //holds cancel() functions to stop chains
intervalChains: make(map[int]IntervalChain),
shutdown: make(chan struct{}),
status: RunningStatus,
}
}

Expand All @@ -78,6 +80,11 @@ func (sch *Scheduler) Config() config.CmdOptions {
return sch.pgengine.CmdOptions
}

// IsReady returns True if the scheduler is in the main loop processing chains
func (sch *Scheduler) IsReady() bool {
return sch.status == RunningStatus
}

// Run executes jobs. Returns RunStatus why it terminated.
// There are only two possibilities: dropped connection and cancelled context.
func (sch *Scheduler) Run(ctx context.Context) RunStatus {
Expand Down Expand Up @@ -122,10 +129,14 @@ func (sch *Scheduler) Run(ctx context.Context) RunStatus {
case <-time.After(refetchTimeout * time.Second):
// pass
case <-ctx.Done():
return ContextCancelledStatus
sch.status = ContextCancelledStatus
case <-sch.shutdown:
sch.status = ShutdownStatus
sch.terminateChains()
return ShutdownStatus
}

if sch.status != RunningStatus {
return sch.status
}
}
}
14 changes: 6 additions & 8 deletions main.go
Expand Up @@ -7,10 +7,10 @@ import (
"os/signal"
"syscall"

"github.com/cybertec-postgresql/pg_timetable/internal/api"
"github.com/cybertec-postgresql/pg_timetable/internal/config"
"github.com/cybertec-postgresql/pg_timetable/internal/log"
"github.com/cybertec-postgresql/pg_timetable/internal/pgengine"
"github.com/cybertec-postgresql/pg_timetable/internal/pgtt_http"
"github.com/cybertec-postgresql/pg_timetable/internal/scheduler"
)

Expand Down Expand Up @@ -58,15 +58,14 @@ func main() {
}
logger := log.Init(cmdOpts.Logging)

if cmdOpts.HTTP.Port != 0 {
go pgtt_http.StartHTTP(logger, cmdOpts.HTTP.Port)
}

if pge, err = pgengine.New(ctx, *cmdOpts, logger); err != nil {
exitCode = ExitCodeDBEngineError
return
}
defer pge.Finalize()

apiserver := api.Init(cmdOpts.RestApi, logger)

if cmdOpts.Start.Upgrade {
if err := pge.MigrateDb(ctx); err != nil {
logger.WithError(err).Error("Upgrade failed")
Expand All @@ -89,10 +88,9 @@ func main() {
return
}
sch := scheduler.New(pge, logger)
apiserver.StatusReporter = sch

pgtt_http.SetHttpStatusRunning()

for sch.Run(ctx) == scheduler.ConnectionDropppedStatus {
for sch.Run(ctx) == scheduler.RunningStatus {
pge.ReconnectAndFixLeftovers(ctx)
}
}

0 comments on commit 83ac6e9

Please sign in to comment.