Skip to content

Raise Readiness Only Once All Clients Have Connected To Hazelcast #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
62 changes: 42 additions & 20 deletions api/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ package api

import (
"encoding/json"
"hazeltest/maps"
"fmt"
log "github.com/sirupsen/logrus"
"hazeltest/client"
"hazeltest/logging"
"net/http"
"sync"
)
Expand All @@ -13,24 +16,26 @@ type liveness struct {
Up bool
}
type readiness struct {
Up bool
}
type status struct {
TestLoops []maps.TestLoopStatus
Up bool
atLeastOneClientRegistered bool
numClientsNotReady int
}

var (
l *liveness
r *readiness
s status
mutex sync.Mutex
l *liveness
r *readiness
s status
lp *logging.LogProvider
apiMutex sync.Mutex
)

func init() {

l = &liveness{true}
r = &readiness{false}
s = status{[]maps.TestLoopStatus{}}
r = &readiness{false, false, 0}
s = status{[]TestLoopStatus{}}

lp = &logging.LogProvider{ClientID: client.ClientID()}

}

Expand All @@ -48,13 +53,32 @@ func Serve() {

}

func Ready() {
func RaiseNotReady() {

apiMutex.Lock()
{
r.numClientsNotReady++
if !r.atLeastOneClientRegistered {
r.atLeastOneClientRegistered = true
}
lp.LogApiEvent(fmt.Sprintf("client has raised 'not ready', number of non-ready clients now %d", r.numClientsNotReady), log.InfoLevel)
}
apiMutex.Unlock()

}

func RaiseReady() {

mutex.Lock()
apiMutex.Lock()
{
r.Up = true
r.numClientsNotReady--
lp.LogApiEvent(fmt.Sprintf("client has raised readiness, number of non-ready clients now %d", r.numClientsNotReady), log.InfoLevel)
if r.numClientsNotReady == 0 && r.atLeastOneClientRegistered && !r.Up {
r.Up = true
lp.LogApiEvent("all clients ready", log.InfoLevel)
}
}
mutex.Unlock()
apiMutex.Unlock()

}

Expand All @@ -73,11 +97,9 @@ func statusHandler(w http.ResponseWriter, req *http.Request) {

func updateStatus(s *status) {

loops := maps.Loops

if len(loops) > 0 {
values := make([]maps.TestLoopStatus, 0, len(loops))
for _, v := range loops {
if len(Loops) > 0 {
values := make([]TestLoopStatus, 0, len(Loops))
for _, v := range Loops {
values = append(values, *v)
}
s.TestLoops = values
Expand Down
77 changes: 77 additions & 0 deletions api/endpoints_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package api

import (
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
)

func TestReadinessCheck(t *testing.T) {

RaiseNotReady()

request := httptest.NewRequest(http.MethodGet, "localhost:8080/readiness", nil)
recorder := httptest.NewRecorder()

readinessHandler(recorder, request)

response := recorder.Result()
defer response.Body.Close()

data, err := ioutil.ReadAll(response.Body)

if err != nil {
t.Errorf("unexpected error occurred: %v", err)
}

if len(data) > 0 {
t.Errorf("expected nil payload to be returned, got: %s", data)
}

statusCode := response.StatusCode
expectedStatusCode := 503
if statusCode != expectedStatusCode {
t.Errorf("expected status code %d, got %d", expectedStatusCode, statusCode)
}

RaiseReady()

recorder = httptest.NewRecorder()

readinessHandler(recorder, request)

response = recorder.Result()
defer response.Body.Close()

statusCode = response.StatusCode
expectedStatusCode = 200
if statusCode != expectedStatusCode {
t.Errorf("expected status code %d, got %d", expectedStatusCode, statusCode)
}

data, err = ioutil.ReadAll(response.Body)

if err != nil {
t.Errorf("unexpected error occurred: %v", err)
}

var decodedData map[string]interface{}
err = json.Unmarshal(data, &decodedData)

if err != nil {
t.Errorf("got malformed json response: %s", data)
}

expectedKey := "Up"
if _, ok := decodedData[expectedKey]; !ok {
t.Errorf("expected key '%s' not present in returned json response", expectedKey)
}

ready := decodedData[expectedKey].(bool)
if !ready {
t.Error("api did not signal readiness")
}

}
45 changes: 45 additions & 0 deletions api/status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package api

import (
"github.com/google/uuid"
"sync"
)

type TestLoopStatus struct {
Source string
NumMaps int
NumRuns int
TotalRuns int
TotalRunsFinished int
}

type status struct {
TestLoops []TestLoopStatus
}

var (
Loops map[uuid.UUID]*TestLoopStatus
loopsMutex sync.Mutex
)

func init() {

Loops = make(map[uuid.UUID]*TestLoopStatus)

}

func InsertInitialTestLoopStatus(testLoopID uuid.UUID, status *TestLoopStatus) {

Loops[testLoopID] = status

}

func IncreaseTotalNumRunsCompleted(testLoopID uuid.UUID, increase int) {

loopsMutex.Lock()
{
Loops[testLoopID].TotalRunsFinished += increase
}
loopsMutex.Unlock()

}
23 changes: 9 additions & 14 deletions client/config/valueextractor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import (
)

var mapTestsPokedexWithNumMaps = map[string]interface{}{
"mapTests": map[string]interface{} {
"mapTests": map[string]interface{}{
"pokedex": map[string]interface{}{
"numMaps": 5,
},
},
}

var mapTestsPokedexWithEnabled = map[string]interface{}{
"mapTests": map[string]interface{} {
"mapTests": map[string]interface{}{
"pokedex": map[string]interface{}{
"enabled": true,
},
Expand All @@ -30,7 +30,7 @@ func TestExtractNestedNotMap(t *testing.T) {
_, err := config.ExtractConfigValue(sourceMap, "mapTests.pokedex")

if err == nil {
t.Error("Expected non-nil error value, received nil instead")
t.Error("expected non-nil error value, received nil instead")
}

}
Expand All @@ -46,7 +46,7 @@ func TestExctractKeyNotPresent(t *testing.T) {
}

if actual != nil {
t.Error("Expected nil payload value, got non-nil value instead")
t.Error("expected nil payload value, got non-nil value instead")
}

}
Expand All @@ -61,32 +61,27 @@ func TestExtractNestedInt(t *testing.T) {
if err != nil {
t.Errorf("Got non-nil error value: %s", err)
}

actualInt = actualInt.(int)

if expectedInt != actualInt {
t.Errorf("Expected: %d; got: %d", expectedInt, actualInt)
}



}

func TestExtractNestedBool(t *testing.T) {

sourceMap := mapTestsPokedexWithEnabled

expected := true
actual, err := config.ExtractConfigValue(sourceMap, "mapTests.pokedex.enabled")
result, err := config.ExtractConfigValue(sourceMap, "mapTests.pokedex.enabled")

if err != nil {
t.Errorf("Got non-nil error value: %s", err)
}

actual = actual.(bool)

if expected != actual {
t.Errorf("Expected: %t; got: %t", expected, actual)
if !(result.(bool)) {
t.Error("expected result to be 'true', but was 'false'")
}

}
}
3 changes: 0 additions & 3 deletions hazeltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ func main() {
logConfigurationError("HZ_MEMBERS", "environment variables", "HZ_MEMBERS environment variable must be provided")
}

// TODO Should only be set once all runners have successfully connected to Hazelcast
api.Ready()

hzMemberList := strings.Split(hzMembers, ",")

var wg sync.WaitGroup
Expand Down
11 changes: 11 additions & 0 deletions logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
)

const ApiInfo = "api info"
const InternalStateInfo = "internal state info"
const TimingInfo = "timing info"
const IoError = "io error"
Expand Down Expand Up @@ -66,6 +67,16 @@ func (lp *LogProvider) LogIoEvent(msg string, level log.Level) {

}

func (lp *LogProvider) LogApiEvent(msg string, level log.Level) {

fields := log.Fields{
"kind": ApiInfo,
}

lp.doLog(msg, fields, level)

}

func (lp *LogProvider) LogTimingEvent(operation string, mapName string, tookMs int, level log.Level) {

fields := log.Fields{
Expand Down
7 changes: 6 additions & 1 deletion maps/loadrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/gob"
"errors"
"fmt"
"hazeltest/api"
"hazeltest/client"
"hazeltest/client/config"
"math/rand"
Expand Down Expand Up @@ -54,6 +55,8 @@ func (r loadRunner) runMapTests(hzCluster string, hzMembers []string) {
return
}

api.RaiseNotReady()

ctx := context.TODO()

clientID := client.ClientID()
Expand All @@ -64,14 +67,16 @@ func (r loadRunner) runMapTests(hzCluster string, hzMembers []string) {
}
defer hzClient.Shutdown(ctx)

api.RaiseReady()

lp.LogInternalStateEvent("initialized hazelcast client", log.InfoLevel)
lp.LogInternalStateEvent("starting load test loop", log.InfoLevel)

elements := populateLoadElements()

testLoop := testLoop[loadElement]{
id: uuid.New(),
source: "load",
source: "loadrunner",
hzClient: hzClient,
config: mapRunnerConfig,
elements: elements,
Expand Down
Loading