Skip to content

Commit

Permalink
Merge 8767c87 into a6eb7a2
Browse files Browse the repository at this point in the history
  • Loading branch information
pashagolub committed Jan 18, 2022
2 parents a6eb7a2 + 8767c87 commit 5d13916
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 7 deletions.
6 changes: 5 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ COPY --from=builder /build/pg_timetable /
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/

# Command to run the executable
ENTRYPOINT ["/pg_timetable"]
ENTRYPOINT ["/pg_timetable"]

ENV PGTT_RESTPORT 8088
# Exposing port 8088 (you should specify your value!)
EXPOSE 8088
52 changes: 52 additions & 0 deletions internal/api/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package api

import (
"fmt"
"net/http"
"time"

"github.com/cybertec-postgresql/pg_timetable/internal/config"
"github.com/cybertec-postgresql/pg_timetable/internal/log"
)

// StatusIface is a common interface describing the cirrent status of a connection
type StatusIface interface {
IsReady() bool
}

type RestApiServer struct {
StatusReporter StatusIface
l log.LoggerIface
*http.Server
}

func Init(opts config.RestApiOpts, logger log.LoggerIface) *RestApiServer {
s := &RestApiServer{
nil,
logger,
&http.Server{
Addr: fmt.Sprintf(":%d", opts.Port),
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
MaxHeaderBytes: 1 << 20,
},
}
http.HandleFunc("/liveness", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) // i'm serving hence I'm alive
})
http.HandleFunc("/readiness", s.readinessHandler)
if opts.Port != 0 {
logger.WithField("port", opts.Port).Info("Starting REST API server...")
go func() { logger.Error(s.ListenAndServe()) }()
}
return s
}

func (Server *RestApiServer) readinessHandler(w http.ResponseWriter, r *http.Request) {
Server.l.Debug("Received /readiness REST API request")
if Server.StatusReporter == nil || !Server.StatusReporter.IsReady() {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
}
6 changes: 6 additions & 0 deletions internal/config/cmdparser.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ type ResourceOpts struct {
TaskTimeout int `long:"task-timeout" mapstructure:"task-timeout" description:"Abort any task within a chain that takes more than the specified number of milliseconds"`
}

// RestApiOpts fot internal web server impleenting REST API
type RestApiOpts struct {
Port int `long:"rest-port" mapstructure:"rest-port" description:"HTTP port for REST API" env:"PGTT_RESTPORT" default:"0"`
}

// CmdOptions holds command line options passed
type CmdOptions struct {
ClientName string `short:"c" long:"clientname" description:"Unique name for application instance" env:"PGTT_CLIENTNAME"`
Expand All @@ -51,6 +56,7 @@ type CmdOptions struct {
Logging LoggingOpts `group:"Logging" mapstructure:"Logging"`
Start StartOpts `group:"Start" mapstructure:"Start"`
Resource ResourceOpts `group:"Resource" mapstructure:"Resource"`
RestApi RestApiOpts `group:"HTTP" mapstructure:"HTTP"`
NoProgramTasks bool `long:"no-program-tasks" mapstructure:"no-program-tasks" description:"Disable executing of PROGRAM tasks" env:"PGTT_NOPROGRAMTASKS"`
NoHelpMessage bool `long:"no-help" mapstructure:"no-help" hidden:"system use"`
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pgengine/pgengine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func TestSamplesScripts(t *testing.T) {
assert.NoError(t, pge.ExecuteCustomScripts(ctx, "../../samples/"+f.Name()),
"Sample query failed: ", f.Name())
// either context should be cancelled or 'shutdown.sql' will terminate the session
assert.True(t, scheduler.New(pge, l).Run(ctx) > scheduler.ConnectionDropppedStatus)
assert.True(t, scheduler.New(pge, l).Run(ctx) > scheduler.RunningStatus)
_, err = pge.ConfigDb.Exec(context.Background(),
"TRUNCATE timetable.task, timetable.chain CASCADE")
assert.NoError(t, err)
Expand Down
19 changes: 15 additions & 4 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ const minChannelCapacity = 1024
type RunStatus int

const (
// ConnectionDropppedStatus specifies the connection has been dropped
ConnectionDropppedStatus RunStatus = iota
// RunningStatus specifies the connection has been dropped
RunningStatus RunStatus = iota
// ContextCancelledStatus specifies the context has been cancelled probably due to timeout
ContextCancelledStatus
// Shutdown specifies proper termination of the session
Expand All @@ -46,6 +46,7 @@ type Scheduler struct {
intervalChainsChan chan IntervalChain
intervalChainMutex sync.Mutex
shutdown chan struct{} // closed when shutdown is called
status RunStatus
}

func max(x, y int) int {
Expand All @@ -65,6 +66,7 @@ func New(pge *pgengine.PgEngine, logger log.LoggerIface) *Scheduler {
activeChains: make(map[int]func()), //holds cancel() functions to stop chains
intervalChains: make(map[int]IntervalChain),
shutdown: make(chan struct{}),
status: RunningStatus,
}
}

Expand All @@ -78,6 +80,11 @@ func (sch *Scheduler) Config() config.CmdOptions {
return sch.pgengine.CmdOptions
}

// IsReady returns True if the scheduler is in the main loop processing chains
func (sch *Scheduler) IsReady() bool {
return sch.status == RunningStatus
}

// Run executes jobs. Returns RunStatus why it terminated.
// There are only two possibilities: dropped connection and cancelled context.
func (sch *Scheduler) Run(ctx context.Context) RunStatus {
Expand Down Expand Up @@ -122,10 +129,14 @@ func (sch *Scheduler) Run(ctx context.Context) RunStatus {
case <-time.After(refetchTimeout * time.Second):
// pass
case <-ctx.Done():
return ContextCancelledStatus
sch.status = ContextCancelledStatus
case <-sch.shutdown:
sch.status = ShutdownStatus
sch.terminateChains()
return ShutdownStatus
}

if sch.status != RunningStatus {
return sch.status
}
}
}
8 changes: 7 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/signal"
"syscall"

"github.com/cybertec-postgresql/pg_timetable/internal/api"
"github.com/cybertec-postgresql/pg_timetable/internal/config"
"github.com/cybertec-postgresql/pg_timetable/internal/log"
"github.com/cybertec-postgresql/pg_timetable/internal/pgengine"
Expand Down Expand Up @@ -62,6 +63,9 @@ func main() {
return
}
defer pge.Finalize()

apiserver := api.Init(cmdOpts.RestApi, logger)

if cmdOpts.Start.Upgrade {
if err := pge.MigrateDb(ctx); err != nil {
logger.WithError(err).Error("Upgrade failed")
Expand All @@ -84,7 +88,9 @@ func main() {
return
}
sch := scheduler.New(pge, logger)
for sch.Run(ctx) == scheduler.ConnectionDropppedStatus {
apiserver.StatusReporter = sch

for sch.Run(ctx) == scheduler.RunningStatus {
pge.ReconnectAndFixLeftovers(ctx)
}
}

0 comments on commit 5d13916

Please sign in to comment.