Skip to content

Commit

Permalink
Merge pull request #723 from mschoch/add-async-error
Browse files Browse the repository at this point in the history
add initial support for async error callback
  • Loading branch information
mschoch committed Jan 5, 2018
2 parents 0388805 + e756c7a commit d310649
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 6 deletions.
4 changes: 4 additions & 0 deletions index/scorch/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ package scorch

import "time"

// RegistryAsyncErrorCallbacks should be treated as read-only after
// process init()'ialization.
var RegistryAsyncErrorCallbacks = map[string]func(error){}

// RegistryEventCallbacks should be treated as read-only after
// process init()'ialization.
var RegistryEventCallbacks = map[string]func(Event){}
Expand Down
3 changes: 1 addition & 2 deletions index/scorch/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package scorch

import (
"fmt"
"log"
"os"
"sync/atomic"
"time"
Expand Down Expand Up @@ -48,7 +47,7 @@ OUTER:
// lets get started
err := s.planMergeAtSnapshot(ourSnapshot)
if err != nil {
log.Printf("merging err: %v", err)
s.fireAsyncError(fmt.Errorf("merging err: %v", err))
_ = ourSnapshot.DecRef()
continue OUTER
}
Expand Down
6 changes: 3 additions & 3 deletions index/scorch/persister.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ OUTER:
close(ch)
}
if err != nil {
log.Printf("got err persisting snapshot: %v", err)
s.fireAsyncError(fmt.Errorf("got err persisting snapshot: %v", err))
_ = ourSnapshot.DecRef()
continue OUTER
}
Expand Down Expand Up @@ -446,13 +446,13 @@ func (p uint64Descending) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (s *Scorch) removeOldData() {
removed, err := s.removeOldBoltSnapshots()
if err != nil {
log.Printf("got err removing old bolt snapshots: %v", err)
s.fireAsyncError(fmt.Errorf("got err removing old bolt snapshots: %v", err))
}

if removed > 0 {
err = s.removeOldZapFiles()
if err != nil {
log.Printf("got err removing old zap files: %v", err)
s.fireAsyncError(fmt.Errorf("got err removing old zap files: %v", err))
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion index/scorch/scorch.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ type Scorch struct {
rootBolt *bolt.DB
asyncTasks sync.WaitGroup

onEvent func(event Event)
onEvent func(event Event)
onAsyncError func(err error)
}

func NewScorch(storeName string,
Expand Down Expand Up @@ -100,6 +101,10 @@ func NewScorch(storeName string,
if ok {
rv.onEvent = RegistryEventCallbacks[ecbName]
}
aecbName, ok := config["asyncErrorCallbackName"].(string)
if ok {
rv.onAsyncError = RegistryAsyncErrorCallbacks[aecbName]
}
return rv, nil
}

Expand All @@ -109,6 +114,12 @@ func (s *Scorch) fireEvent(kind EventKind, dur time.Duration) {
}
}

func (s *Scorch) fireAsyncError(err error) {
if s.onAsyncError != nil {
s.onAsyncError(err)
}
}

func (s *Scorch) Open() error {
var ok bool
s.path, ok = s.config["path"].(string)
Expand Down

0 comments on commit d310649

Please sign in to comment.