Skip to content
This repository has been archived by the owner on May 2, 2018. It is now read-only.

Commit

Permalink
Fixed GTG and Health check.
Browse files Browse the repository at this point in the history
  • Loading branch information
carlosroman committed Nov 22, 2016
1 parent eb235bb commit 8ba1141
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 49 deletions.
4 changes: 2 additions & 2 deletions people/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (h *PeopleHandler) HealthCheck() v1a.Check {

func (h *PeopleHandler) G2GCheck() gtg.Status {
count, err := h.service.getCount()
if err == nil && count > 0 {
if h.service.isInitialised() && err == nil && count > 0 {
return gtg.Status{GoodToGo: true}
}
return gtg.Status{GoodToGo: false}
Expand All @@ -84,7 +84,7 @@ func (h *PeopleHandler) GetPersonByUUID(writer http.ResponseWriter, req *http.Re

func (h *PeopleHandler) Reload(writer http.ResponseWriter, req *http.Request) {
go func() {
if err := h.service.loadDB(); err != nil {
if err := h.service.reloadDB(); err != nil {
log.Errorf("ERROR opening db: %v", err.Error())
}
}()
Expand Down
20 changes: 16 additions & 4 deletions people/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,19 @@ func TestHandlers(t *testing.T) {
"application/json",
""},
{"GTG unavailable - get GTG but no people",
newRequest("GET", status.GTGPath),
&dummyService{
found: false,
initialised: true},
http.StatusServiceUnavailable,
"application/json",
""},
{"GTG unavailable - get GTG count returns error",
newRequest("GET", status.GTGPath),
&dummyService{
found: false,
initialised: true,
people: []person{}},
err: errors.New("Count error")},
http.StatusServiceUnavailable,
"application/json",
""},
Expand All @@ -141,8 +149,7 @@ func TestHandlers(t *testing.T) {
&dummyService{
found: true,
initialised: true,
count: 2,
people: []person{}},
count: 2},
http.StatusOK,
"application/json",
"OK"},
Expand Down Expand Up @@ -232,6 +239,7 @@ type dummyService struct {
found bool
people []person
initialised bool
dataLoaded bool
count int
err error
loadDBCalled bool
Expand All @@ -258,11 +266,15 @@ func (s *dummyService) isInitialised() bool {
return s.initialised
}

func (s *dummyService) isDataLoaded() bool {
return s.dataLoaded
}

func (s *dummyService) Shutdown() error {
return s.err
}

func (s *dummyService) loadDB() error {
func (s *dummyService) reloadDB() error {
defer s.wg.Done()
s.loadDBCalled = true
return s.err
Expand Down
114 changes: 86 additions & 28 deletions people/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,47 @@ type PeopleService interface {
getPersonByUUID(uuid string) (person, bool, error)
getCount() (int, error)
isInitialised() bool
loadDB() error
isDataLoaded() bool
reloadDB() error
Shutdown() error
}

type peopleServiceImpl struct {
sync.RWMutex
repository tmereader.Repository
baseURL string
personLinks []personLink
taxonomyName string
maxTmeRecords int
initialised bool
cacheFileName string
db *bolt.DB
repository tmereader.Repository
baseURL string
personLinks []personLink
taxonomyName string
maxTmeRecords int
initialised bool
initialisedChannel chan bool
dataLoaded bool
dataLoadedChannel chan bool
cacheFileName string
db *bolt.DB
}

func NewPeopleService(repo tmereader.Repository, baseURL string, taxonomyName string, maxTmeRecords int, cacheFileName string) PeopleService {
s := &peopleServiceImpl{repository: repo, baseURL: baseURL, taxonomyName: taxonomyName, maxTmeRecords: maxTmeRecords, initialised: false, cacheFileName: cacheFileName}
s := &peopleServiceImpl{repository: repo, baseURL: baseURL, taxonomyName: taxonomyName, maxTmeRecords: maxTmeRecords, initialised: true, cacheFileName: cacheFileName}
s.initialisedChannel = make(chan bool)
s.dataLoadedChannel = make(chan bool)

go func(service *peopleServiceImpl) {
for initialised := range service.initialisedChannel {
s.Lock()
s.initialised = initialised
s.Unlock()
}
}(s)

go func(service *peopleServiceImpl) {
for loaded := range service.dataLoadedChannel {
s.Lock()
s.dataLoaded = loaded
s.Unlock()
}
}(s)

go func(service *peopleServiceImpl) {
err := service.loadDB()
if err != nil {
Expand All @@ -56,15 +79,36 @@ func (s *peopleServiceImpl) isInitialised() bool {
return s.initialised
}

func (s *peopleServiceImpl) isDataLoaded() bool {
s.RLock()
defer s.RUnlock()
return s.dataLoaded
}

func (s *peopleServiceImpl) Shutdown() error {
log.Info("Shutingdown...")
log.Info("Shuting down...")
s.Lock()
s.initialised = false
s.dataLoaded = false
defer func() {
s.Unlock()
log.Info("Closing channels")
close(s.initialisedChannel)
close(s.dataLoadedChannel)
}()
if s.db == nil {
return errors.New("DB not open")
}
return s.db.Close()
}

func (s *peopleServiceImpl) getCount() (int, error) {
s.RLock()
defer s.RUnlock()
if !s.isDataLoaded() {
return 0, nil
}

var count int
err := s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(cacheBucket))
Expand All @@ -78,13 +122,17 @@ func (s *peopleServiceImpl) getCount() (int, error) {
}

func (s *peopleServiceImpl) getPeople() ([]personLink, bool) {
s.RLock()
defer s.RUnlock()
if len(s.personLinks) > 0 {
return s.personLinks, true
}
return s.personLinks, false
}

func (s *peopleServiceImpl) getPersonByUUID(uuid string) (person, bool, error) {
s.RLock()
defer s.RUnlock()
var cachedValue []byte
err := s.db.View(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(cacheBucket))
Expand All @@ -100,46 +148,49 @@ func (s *peopleServiceImpl) getPersonByUUID(uuid string) (person, bool, error) {
return person{}, false, err
}
if len(cachedValue) == 0 {
log.Infof("INFO No cached value for [%v]", uuid)
log.Infof("INFO No cached value for [%v].", uuid)
return person{}, false, nil
}
var cachedPerson person
err = json.Unmarshal(cachedValue, &cachedPerson)
if err != nil {
log.Errorf("ERROR unmarshalling cached value for [%v]: %v", uuid, err.Error())
log.Errorf("ERROR unmarshalling cached value for [%v]: %v.", uuid, err.Error())
return person{}, true, err
}
return cachedPerson, true, nil

}

func (s *peopleServiceImpl) openDB() error {
s.Lock()
log.Infof("Opening database '%v'.", s.cacheFileName)
if s.db == nil {
var err error
if s.db, err = bolt.Open(s.cacheFileName, 0600, &bolt.Options{Timeout: 1 * time.Second}); err != nil {
log.Errorf("ERROR opening cache file for init: %v", err.Error())
log.Errorf("ERROR opening cache file for init: %v.", err.Error())
s.Unlock()
return err
}
}
return s.createCacheBucket()
}

func (s *peopleServiceImpl) reloadDB() error {
return s.loadDB()
}

func (s *peopleServiceImpl) loadDB() error {
var wg sync.WaitGroup
wg.Add(1)
log.Info("Loading DB...")
c := make(chan []person)
go s.processPeople(c, &wg)
s.Lock()
s.initialised = false
defer func(w *sync.WaitGroup) {
close(c)
w.Wait()
s.initialised = true
s.Unlock()
}(&wg)

if err := s.openDB(); err != nil {
s.initialisedChannel <- false
return err
}

Expand All @@ -150,11 +201,11 @@ func (s *peopleServiceImpl) loadDB() error {
return err
}
if len(terms) < 1 {
log.Info("Finished fetching people from TME. Waiting subroutines to terminate")
log.Info("Finished fetching people from TME. Waiting subroutines to terminate.")
break
}

log.Infof("Terms length is: %v", len(terms))
wg.Add(1)
s.processTerms(terms, c)
responseCount += s.maxTmeRecords
}
Expand All @@ -176,7 +227,7 @@ func (s *peopleServiceImpl) processTerms(terms []interface{}, c chan<- []person)

func (s *peopleServiceImpl) processPeople(c <-chan []person, wg *sync.WaitGroup) {
for people := range c {
log.Infof("Processing batch of %v people", len(people))
log.Infof("Processing batch of %v people.", len(people))
if err := s.db.Batch(func(tx *bolt.Tx) error {
bucket := tx.Bucket([]byte(cacheBucket))
if bucket == nil {
Expand All @@ -194,20 +245,27 @@ func (s *peopleServiceImpl) processPeople(c <-chan []person, wg *sync.WaitGroup)
}
return nil
}); err != nil {
log.Errorf("ERROR storing to cache: %+v", err)
log.Errorf("ERROR storing to cache: %+v.", err)
}
wg.Done()
}

log.Info("Finished processing all people")
wg.Done()
log.Info("Finished processing all people.")
if(s.isInitialised()) {
s.dataLoadedChannel <- true
}
}

func (s *peopleServiceImpl) createCacheBucket() error {
defer s.Unlock()
return s.db.Update(func(tx *bolt.Tx) error {
log.Infof("Creating bucket '%s'", cacheBucket)
if err := tx.DeleteBucket([]byte(cacheBucket)); err != nil {
log.Warnf("Cache bucket [%v] could not be deleted\n", cacheBucket)
if tx.Bucket([]byte(cacheBucket)) != nil {
log.Infof("Deleting bucket '%v'.", cacheBucket)
if err := tx.DeleteBucket([]byte(cacheBucket)); err != nil {
log.Warnf("Cache bucket [%v] could not be deleted.", cacheBucket)
}
}
log.Infof("Creating bucket '%s'.", cacheBucket)
_, err := tx.CreateBucket([]byte(cacheBucket))
return err
})
Expand Down
Loading

0 comments on commit 8ba1141

Please sign in to comment.