Skip to content

Commit

Permalink
Reflect server's running state on load (#26)
Browse files Browse the repository at this point in the history
* Add ui call to backend for running state

* Refactor StatusSwitch to use hooks

* Store running state (#27)

* Add running state into postgres for persistence

* Rename paused to isRunning and refactor store

* Use atomicBool for concurrency safety

* Use async/await pattern
  • Loading branch information
slai11 committed Jun 20, 2019
1 parent 5289792 commit ca55af4
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 86 deletions.
34 changes: 21 additions & 13 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ import (
"github.com/datagovsg/nomad-parametric-autoscaler/policy"
"github.com/datagovsg/nomad-parametric-autoscaler/resources"
"github.com/gin-gonic/gin"
"go.uber.org/atomic"
)

type App struct {
wp *WrappedPolicy
vc *resources.VaultClient
router *gin.Engine
paused *bool
wp *WrappedPolicy
vc *resources.VaultClient
router *gin.Engine
isRunning *atomic.Bool
}

// WrappedPolicy has a lock to prevent race
Expand Down Expand Up @@ -57,17 +58,24 @@ func NewApp() (*App, error) {

wrappedPolicy := newWrappedPolicy(existingPolicy)

paused := false // always starts off running
var isRunning *atomic.Bool
if b, err := store.GetLatestRunningState(); err != nil {
isRunning = atomic.NewBool(true)
logging.Error(err.Error())
} else {
isRunning = atomic.NewBool(b)
}

return &App{
vc: vaultClient,
wp: wrappedPolicy,
paused: &paused,
vc: vaultClient,
wp: wrappedPolicy,
isRunning: isRunning,
router: NewRouter(
&endpoints{
wp: wrappedPolicy,
vc: vaultClient,
store: store,
paused: &paused,
wp: wrappedPolicy,
vc: vaultClient,
store: store,
isRunning: isRunning,
}),
}, nil
}
Expand Down Expand Up @@ -99,7 +107,7 @@ func (app *App) Run() {
case <-ticker.C:
app.wp.lock.Lock()

if *app.paused != true {
if app.isRunning.Load() {
app.wp.policy.Scale(app.vc)
}

Expand Down
42 changes: 30 additions & 12 deletions app/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (
"github.com/datagovsg/nomad-parametric-autoscaler/resources"
"github.com/datagovsg/nomad-parametric-autoscaler/types"
"github.com/gin-gonic/gin"
"go.uber.org/atomic"
)

type endpoints struct {
wp *WrappedPolicy
vc *resources.VaultClient
store *Store
paused *bool
wp *WrappedPolicy
vc *resources.VaultClient
store *Store
isRunning *atomic.Bool
}

// GetPolicy returns JSON version of current policy
Expand Down Expand Up @@ -74,16 +75,33 @@ func (ep *endpoints) UpdatePolicy(c *gin.Context) {
// PausePolicy is a utility endpoint that pauses the app and skips
// the checking-scaling step.
func (ep *endpoints) PausePolicy(c *gin.Context) {
*ep.paused = true
c.JSON(200, gin.H{
"message": "Nomad AutoScaler paused",
})
if err := ep.store.SaveRunningState(false); err == nil {
ep.isRunning.Store(false)
c.JSON(200, gin.H{
"message": "Nomad AutoScaler resumed",
})
} else {
c.JSON(400, gin.H{
"message": err.Error(),
})
}
}

// ResumePolicy is a utility endpoint that resumes the app's checking-scaling cycle
func (ep *endpoints) ResumePolicy(c *gin.Context) {
*ep.paused = false
c.JSON(200, gin.H{
"message": "Nomad AutoScaler resumed",
})
if err := ep.store.SaveRunningState(true); err == nil {
ep.isRunning.Store(true)
c.JSON(200, gin.H{
"message": "Nomad AutoScaler resumed",
})
} else {
c.JSON(400, gin.H{
"message": err.Error(),
})
}
}

// GetState returns running state of server
func (ep *endpoints) GetState(c *gin.Context) {
c.JSON(200, ep.isRunning.Load())
}
12 changes: 7 additions & 5 deletions app/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ func NewRouter(ep *endpoints) *gin.Engine {
router.Use(corsMiddleware(&conf))

// Core endpoints
router.GET("/state", ep.GetPolicy)
router.GET("/status", ep.GetResourceStatus)
router.GET("/policy", ep.GetPolicy)
router.POST("/policy", ep.UpdatePolicy)

router.GET("/resource", ep.GetResourceStatus)
router.GET("/predefined", ep.GetPredefinedFeatures)
router.POST("/update", ep.UpdatePolicy)

// Helper endpoints
router.PUT("/pause", ep.PausePolicy)
router.PUT("/resume", ep.ResumePolicy)
router.GET("/state", ep.GetState)
router.PUT("/state/pause", ep.PausePolicy)
router.PUT("/state/resume", ep.ResumePolicy)

// Healthcheck
router.GET("/ping", func(c *gin.Context) {
Expand Down
54 changes: 43 additions & 11 deletions app/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ CREATE TABLE autoscaler (
state text
);
`

// createTablesSQL contains statement for table if doesnt exist
const createRunningStateTablesSQL = `
CREATE TABLE autoscaler_running_state (
timestamp TIMESTAMP,
state BOOLEAN
);
`

const (
dbhost = "POSTGRES_HOST"
dbport = "POSTGRES_PORT"
Expand Down Expand Up @@ -96,21 +105,16 @@ func (st *Store) Init() error {
}
}

// check if table exist else create it
if err := st.createTables(); err != nil {
logging.Warning(err.Error())
}
st.createTables()

return nil
}

// SaveState stores the state in compacted string form to the psql db
func (st Store) SaveState(state string) error {
submitTime := time.Now()
if _, err := st.db.Exec("INSERT INTO autoscaler(timestamp, state) VALUES ($1, $2)", submitTime, state); err != nil {
return err
}
return nil
_, err := st.db.Exec("INSERT INTO autoscaler(timestamp, state) VALUES ($1, $2)", submitTime, state)
return err
}

// GetLatestState reads the state column of the row with latest timestamp
Expand All @@ -125,6 +129,28 @@ func (st Store) GetLatestState() (string, error) {
return state, nil
}

// SaveState stores the state in compacted string form to the psql db
func (st Store) SaveRunningState(state bool) error {
submitTime := time.Now()
_, err := st.db.Exec("INSERT INTO autoscaler_running_state(timestamp, state) VALUES ($1, $2)", submitTime, state)
return err
}

// GetLatestState reads the state column of the row with latest timestamp
func (st Store) GetLatestRunningState() (bool, error) {
readStatement := "SELECT state FROM autoscaler_running_state WHERE timestamp = (SELECT MAX(timestamp) FROM autoscaler_running_state);"
state := []bool{}
if err := st.db.Select(&state, readStatement); err != nil {
return false, err
}

if len(state) < 1 {
return false, fmt.Errorf("state table is empty, using default state")
}

return state[0], nil
}

func (st Store) read(statement string) (string, error) {
state := []string{}
err := st.db.Select(&state, statement)
Expand All @@ -139,8 +165,14 @@ func (st Store) read(statement string) (string, error) {
return state[0], nil
}

func (st *Store) createTables() error {
func (st *Store) createTables() {
logging.Info("create table if does not exist")
_, err := st.db.Exec(createTablesSQL)
return err

if _, err := st.db.Exec(createRunningStateTablesSQL); err != nil {
logging.Warning(err.Error())
}

if _, err := st.db.Exec(createTablesSQL); err != nil {
logging.Warning(err.Error())
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ require (
github.com/ugorji/go/codec v0.0.0-20190204201341-e444a5086c43 // indirect
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 // indirect
go.opencensus.io v0.19.1 // indirect
go.uber.org/atomic v1.4.0
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c // indirect
golang.org/x/tools v0.0.0-20190619215442-4adf7a708c2d // indirect
gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,8 @@ go.opencensus.io v0.19.1 h1:gPYKQ/GAQYR2ksU+qXNmq3CrOZWT1kkryvW6O0v1acY=
go.opencensus.io v0.19.1/go.mod h1:gug0GbSHa8Pafr0d2urOSgoXHZ6x/RUlaiT0d9pqb4A=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
Expand Down
15 changes: 8 additions & 7 deletions ui/src/App.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,21 @@ class App extends Component {
constructor(props) {
super(props);
this.sendUpdate = this.sendUpdate.bind(this);
this.refreshState = this.refreshState.bind(this);
this.refreshPolicy = this.refreshPolicy.bind(this);
}

componentDidMount() {
this.refreshState();
this.refreshPolicy();
}

async refreshState() {
async refreshPolicy() {
const predefinedUrl = new URL(
"/predefined",
window.config.env.REACT_APP_NOPAS_ENDPOINT
);
const stateUrl = new URL(
"/state",

const policyUrl = new URL(
"/policy",
window.config.env.REACT_APP_NOPAS_ENDPOINT
);

Expand All @@ -41,7 +42,7 @@ class App extends Component {
}

this.props.updatePossibleDefaultsList(firstResponse.data);
let secondResponse = await axios.get(stateUrl);
let secondResponse = await axios.get(policyUrl);
if (secondResponse.err) {
alert(secondResponse.err);
} else {
Expand All @@ -53,7 +54,7 @@ class App extends Component {
sendUpdate() {
const out = uiToServerConversion(this.props.state);
const reqUrl = new URL(
"/update",
"/policy",
window.config.env.REACT_APP_NOPAS_ENDPOINT
);
out && axios.post(reqUrl, out);
Expand Down
79 changes: 41 additions & 38 deletions ui/src/components/StatusSwitch.jsx
Original file line number Diff line number Diff line change
@@ -1,51 +1,54 @@
import React from "react";
import React, { useState, useEffect } from "react";
import FormControlLabel from "@material-ui/core/FormControlLabel";
import Switch from "@material-ui/core/Switch";
import axios from "axios";

class StatusSwitch extends React.Component {
constructor(props) {
super(props);
const StatusSwitch = () => {
const [isRunning, setIsRunning] = useState(true);

this.state = {
checked: true,
status: "Running"
};
useEffect(() => {
const stateUrl = new URL(
"/state",
window.config.env.REACT_APP_NOPAS_ENDPOINT
);

this.handleChange = this.handleChange.bind(this);
}
axios.get(stateUrl)
.then(rsp => setIsRunning(rsp.data))
.catch(function (error) {
console.log(error);
})
}, []);

handleChange(event) {
if (event.target.checked) {
const resumeUrl = new URL(
"/resume",
window.config.env.REACT_APP_NOPAS_ENDPOINT
);
axios.put(resumeUrl);
} else {
const pauseUrl = new URL(
"/pause",
window.config.env.REACT_APP_NOPAS_ENDPOINT
);
axios.put(pauseUrl);
const handleChange = async (event) => {
try {
if (event.target.checked) {
const resumeUrl = new URL(
"/state/resume",
window.config.env.REACT_APP_NOPAS_ENDPOINT
);
await axios.put(resumeUrl)
setIsRunning(true)
} else {
const pauseUrl = new URL(
"/state/pause",
window.config.env.REACT_APP_NOPAS_ENDPOINT
);
await axios.put(pauseUrl)
setIsRunning(false)
}
} catch(err) {
console.log(err)
}

this.setState({
checked: event.target.checked,
status: event.target.checked ? "Running" : "Paused"
});
}

render() {
return (
<FormControlLabel
control={
<Switch checked={this.state.checked} onChange={this.handleChange} />
}
label={this.state.status}
/>
);
}
return (
<FormControlLabel
control={
<Switch checked={isRunning} onChange={handleChange} />
}
label={isRunning ? "Running" : "Paused"}
/>
);
}

export default StatusSwitch;

0 comments on commit ca55af4

Please sign in to comment.