Skip to content

Commit

Permalink
Merge pull request #1176 from sreekanth-cb/alice_backport
Browse files Browse the repository at this point in the history
Merge pull request #1119 from sreekanth-cb/concurrentmap_fix
  • Loading branch information
sreekanth-cb committed Apr 4, 2019
2 parents 5d49173 + 10622e4 commit 822d01d
Show file tree
Hide file tree
Showing 12 changed files with 438 additions and 97 deletions.
8 changes: 8 additions & 0 deletions index.go
Expand Up @@ -129,6 +129,14 @@ func (b *Batch) Merge(o *Batch) {
}
}

func (b *Batch) SetPersistedCallback(f index.BatchCallback) {
b.internal.SetPersistedCallback(f)
}

func (b *Batch) PersistedCallback() index.BatchCallback {
return b.internal.PersistedCallback()
}

// An Index implements all the indexing and searching
// capabilities of bleve. An Index can be created
// using the New() and Open() methods.
Expand Down
16 changes: 14 additions & 2 deletions index/index.go
Expand Up @@ -248,9 +248,12 @@ type DocIDReader interface {
Close() error
}

type BatchCallback func(error)

type Batch struct {
IndexOps map[string]*document.Document
InternalOps map[string][]byte
IndexOps map[string]*document.Document
InternalOps map[string][]byte
persistedCallback BatchCallback
}

func NewBatch() *Batch {
Expand All @@ -276,6 +279,14 @@ func (b *Batch) DeleteInternal(key []byte) {
b.InternalOps[string(key)] = nil
}

func (b *Batch) SetPersistedCallback(f BatchCallback) {
b.persistedCallback = f
}

func (b *Batch) PersistedCallback() BatchCallback {
return b.persistedCallback
}

func (b *Batch) String() string {
rv := fmt.Sprintf("Batch (%d ops, %d internal ops)\n", len(b.IndexOps), len(b.InternalOps))
for k, v := range b.IndexOps {
Expand All @@ -298,6 +309,7 @@ func (b *Batch) String() string {
func (b *Batch) Reset() {
b.IndexOps = make(map[string]*document.Document)
b.InternalOps = make(map[string][]byte)
b.persistedCallback = nil
}

func (b *Batch) Merge(o *Batch) {
Expand Down
9 changes: 7 additions & 2 deletions index/scorch/event_test.go
Expand Up @@ -22,8 +22,13 @@ import (
)

func TestEventBatchIntroductionStart(t *testing.T) {
testConfig := CreateConfig("TestEventBatchIntroductionStart")
err := InitTest(testConfig)
if err != nil {
t.Fatal(err)
}
defer func() {
err := DestroyTest()
err := DestroyTest(testConfig)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -70,4 +75,4 @@ func TestEventBatchIntroductionStart(t *testing.T) {
if count != 1 {
t.Fatalf("expected to see 1 batch introduction event event, saw %d", count)
}
}
}
14 changes: 10 additions & 4 deletions index/scorch/field_dict_test.go
Expand Up @@ -23,15 +23,21 @@ import (
)

func TestIndexFieldDict(t *testing.T) {
cfg := CreateConfig("TestIndexFieldDict")
err := InitTest(cfg)
if err != nil {
t.Fatal(err)
}
defer func() {
err := DestroyTest()
err := DestroyTest(cfg)
if err != nil {
t.Fatal(err)
t.Log(err)
}
}()


analysisQueue := index.NewAnalysisQueue(1)
idx, err := NewScorch(Name, testConfig, analysisQueue)
idx, err := NewScorch(Name, cfg, analysisQueue)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -181,4 +187,4 @@ func TestIndexFieldDict(t *testing.T) {
if !reflect.DeepEqual(expectedTerms, terms) {
t.Errorf("expected %#v, got %#v", expectedTerms, terms)
}
}
}
9 changes: 7 additions & 2 deletions index/scorch/introducer.go
Expand Up @@ -19,6 +19,7 @@ import (
"sync/atomic"

"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/blevesearch/bleve/index/scorch/segment/zap"
)
Expand All @@ -30,8 +31,9 @@ type segmentIntroduction struct {
ids []string
internal map[string][]byte

applied chan error
persisted chan error
applied chan error
persisted chan error
persistedCallback index.BatchCallback
}

type persistIntroduction struct {
Expand Down Expand Up @@ -213,6 +215,9 @@ func (s *Scorch) introduceSegment(next *segmentIntroduction) error {
if next.persisted != nil {
s.rootPersisted = append(s.rootPersisted, next.persisted)
}
if next.persistedCallback != nil {
s.persistedCallbacks = append(s.persistedCallbacks, next.persistedCallback)
}
// swap in new index snapshot
newSnapshot.epoch = s.nextSnapshotEpoch
s.nextSnapshotEpoch++
Expand Down
7 changes: 7 additions & 0 deletions index/scorch/persister.go
Expand Up @@ -30,6 +30,7 @@ import (
"time"

"github.com/RoaringBitmap/roaring"
"github.com/blevesearch/bleve/index"
"github.com/blevesearch/bleve/index/scorch/segment"
"github.com/blevesearch/bleve/index/scorch/segment/zap"
"github.com/boltdb/bolt"
Expand Down Expand Up @@ -105,6 +106,7 @@ OUTER:

var ourSnapshot *IndexSnapshot
var ourPersisted []chan error
var ourPersistedCallbacks []index.BatchCallback

// check to see if there is a new snapshot to persist
s.rootLock.Lock()
Expand All @@ -113,6 +115,8 @@ OUTER:
ourSnapshot.AddRef()
ourPersisted = s.rootPersisted
s.rootPersisted = nil
ourPersistedCallbacks = s.persistedCallbacks
s.persistedCallbacks = nil
atomic.StoreUint64(&s.iStats.persistSnapshotSize, uint64(ourSnapshot.Size()))
atomic.StoreUint64(&s.iStats.persistEpoch, ourSnapshot.epoch)
}
Expand Down Expand Up @@ -140,6 +144,9 @@ OUTER:
atomic.AddUint64(&s.stats.TotPersistLoopErr, 1)
continue OUTER
}
for i := range ourPersistedCallbacks {
ourPersistedCallbacks[i](err)
}

atomic.StoreUint64(&s.stats.LastPersistedEpoch, ourSnapshot.epoch)

Expand Down
34 changes: 25 additions & 9 deletions index/scorch/reader_test.go
Expand Up @@ -24,15 +24,21 @@ import (
)

func TestIndexReader(t *testing.T) {
cfg := CreateConfig("TestIndexReader")
err := InitTest(cfg)
if err != nil {
t.Fatal(err)
}
defer func() {
err := DestroyTest()
err := DestroyTest(cfg)
if err != nil {
t.Fatal(err)
t.Log(err)
}
}()


analysisQueue := index.NewAnalysisQueue(1)
idx, err := NewScorch(Name, testConfig, analysisQueue)
idx, err := NewScorch(Name, cfg, analysisQueue)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -214,15 +220,20 @@ func TestIndexReader(t *testing.T) {
}

func TestIndexDocIdReader(t *testing.T) {
cfg := CreateConfig("TestIndexDocIdReader")
err := InitTest(cfg)
if err != nil {
t.Fatal(err)
}
defer func() {
err := DestroyTest()
err := DestroyTest(cfg)
if err != nil {
t.Fatal(err)
t.Log(err)
}
}()

analysisQueue := index.NewAnalysisQueue(1)
idx, err := NewScorch(Name, testConfig, analysisQueue)
idx, err := NewScorch(Name, cfg, analysisQueue)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -325,15 +336,20 @@ func TestIndexDocIdReader(t *testing.T) {
}

func TestIndexDocIdOnlyReader(t *testing.T) {
cfg := CreateConfig("TestIndexDocIdOnlyReader")
err := InitTest(cfg)
if err != nil {
t.Fatal(err)
}
defer func() {
err := DestroyTest()
err := DestroyTest(cfg)
if err != nil {
t.Fatal(err)
t.Log(err)
}
}()

analysisQueue := index.NewAnalysisQueue(1)
idx, err := NewScorch(Name, testConfig, analysisQueue)
idx, err := NewScorch(Name, cfg, analysisQueue)
if err != nil {
t.Fatal(err)
}
Expand Down
18 changes: 10 additions & 8 deletions index/scorch/scorch.go
Expand Up @@ -54,6 +54,7 @@ type Scorch struct {
rootLock sync.RWMutex
root *IndexSnapshot // holds 1 ref-count on the root
rootPersisted []chan error // closed when root is persisted
persistedCallbacks []index.BatchCallback
nextSnapshotEpoch uint64
eligibleForRemoval []uint64 // Index snapshot epochs that are safe to GC.
ineligibleForRemoval map[string]bool // Filenames that should not be GC'ed yet.
Expand Down Expand Up @@ -355,7 +356,7 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
atomic.AddUint64(&s.stats.TotBatchesEmpty, 1)
}

err = s.prepareSegment(newSegment, ids, batch.InternalOps)
err = s.prepareSegment(newSegment, ids, batch.InternalOps, batch.PersistedCallback())
if err != nil {
if newSegment != nil {
_ = newSegment.Close()
Expand All @@ -375,16 +376,17 @@ func (s *Scorch) Batch(batch *index.Batch) (err error) {
}

func (s *Scorch) prepareSegment(newSegment segment.Segment, ids []string,
internalOps map[string][]byte) error {
internalOps map[string][]byte, persistedCallback index.BatchCallback) error {

// new introduction
introduction := &segmentIntroduction{
id: atomic.AddUint64(&s.nextSegmentID, 1),
data: newSegment,
ids: ids,
obsoletes: make(map[uint64]*roaring.Bitmap),
internal: internalOps,
applied: make(chan error),
id: atomic.AddUint64(&s.nextSegmentID, 1),
data: newSegment,
ids: ids,
obsoletes: make(map[uint64]*roaring.Bitmap),
internal: internalOps,
applied: make(chan error),
persistedCallback: persistedCallback,
}

if !s.unsafeBatch {
Expand Down

0 comments on commit 822d01d

Please sign in to comment.