Skip to content

Commit

Permalink
TestManagerCreateSimpleFeed()
Browse files Browse the repository at this point in the history
  • Loading branch information
steveyen committed Oct 28, 2014
1 parent 36edd74 commit 6cef79e
Show file tree
Hide file tree
Showing 6 changed files with 142 additions and 22 deletions.
8 changes: 4 additions & 4 deletions feed_simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ type SimpleFeed struct {
pf StreamPartitionFunc
}

func (t *SimpleFeed) Source() Stream {
return t.source
}

func NewSimpleFeed(name string, source Stream, pf StreamPartitionFunc,
streams map[string]Stream) (*SimpleFeed, error) {
return &SimpleFeed{
Expand Down Expand Up @@ -144,3 +140,7 @@ func (t *SimpleFeed) Close() error {
func (t *SimpleFeed) Streams() map[string]Stream {
return t.streams
}

func (t *SimpleFeed) Source() Stream {
return t.source
}
49 changes: 33 additions & 16 deletions manager_janitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,14 @@ func (mgr *Manager) JanitorOnce(reason string) error {
err = mgr.stopFeed(removeFeed)
if err != nil {
return fmt.Errorf("error: janitor removing feed name: %s, err: %v",
removeFeed.Name, err)
removeFeed.Name(), err)
}
}
// Then, (re-)create feeds that we're missing.
for addFeedName, targetPIndexes := range addFeeds {
err = mgr.startFeed(targetPIndexes)
for _, addFeedTargetPIndexes := range addFeeds {
err = mgr.startFeed(addFeedTargetPIndexes)
if err != nil {
return fmt.Errorf("error: janitor adding feed, addFeedName: %s, err: %v",
addFeedName, err)
return fmt.Errorf("error: janitor adding feed, err: %v", err)
}
}

Expand Down Expand Up @@ -356,6 +355,13 @@ func (mgr *Manager) stopFeed(feed Feed) error {

// --------------------------------------------------------

// TODO: need way to track dead cows (non-beef)
// TODO: need a way to collect these errors so REST api
// can show them to user ("hey, perhaps you deleted a bucket
// and should delete these related full-text indexes?
// or the couchbase cluster is just down.");
// perhaps as specialized clog writer?

func (mgr *Manager) startFeedByType(feedName, indexName, indexUUID,
sourceType, sourceName, sourceUUID string,
streams map[string]Stream) error {
Expand All @@ -365,6 +371,10 @@ func (mgr *Manager) startFeedByType(feedName, indexName, indexUUID,
sourceName, sourceUUID, streams)
}

if sourceType == "simple" {
return mgr.startSimpleFeed(feedName, streams)
}

if sourceType == "nil" {
return mgr.registerFeed(NewNILFeed(feedName, streams))
}
Expand All @@ -380,23 +390,30 @@ func (mgr *Manager) startTAPFeed(feedName, indexName, indexUUID,
return fmt.Errorf("error: could not prepare TAP stream to server: %s,"+
" bucketName: %s, indexName: %s, err: %v",
mgr.server, bucketName, indexName, err)
// TODO: need a way to collect these errors so REST api
// can show them to user ("hey, perhaps you deleted a bucket
// and should delete these related full-text indexes?
// or the couchbase cluster is just down.");
// perhaps as specialized clog writer?
// TODO: cleanup on error?
}

if err = feed.Start(); err != nil {
// TODO: need way to track dead cows (non-beef)
// TODO: cleanup?
return fmt.Errorf("error: could not start feed, server: %s, err: %v",
return fmt.Errorf("error: could not start tap feed, server: %s, err: %v",
mgr.server, err)
}
if err = mgr.registerFeed(feed); err != nil {
feed.Close()
return err
}
return nil
}

func (mgr *Manager) startSimpleFeed(feedName string,
streams map[string]Stream) error {
feed, err := NewSimpleFeed(feedName, make(Stream), EmptyPartitionFunc, streams)
if err != nil {
return err
}
if err = feed.Start(); err != nil {
return fmt.Errorf("error: could not start simple feed, server: %s, err: %v",
mgr.server, err)
}
if err = mgr.registerFeed(feed); err != nil {
// TODO: cleanup?
feed.Close()
return err
}

Expand Down
60 changes: 60 additions & 0 deletions manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,3 +692,63 @@ func TestManagerClosePIndex(t *testing.T) {
t.Errorf("meh callbacks were wrong")
}
}

func TestManagerCreateSimpleFeed(t *testing.T) {
testManagerSimpleFeed(t, func(mgr *Manager, sf *SimpleFeed) {
err := sf.Close()
if err != nil {
t.Errorf("expected simple feed close to work")
}
})
}

func TestManagerSimpleFeedCloseSource(t *testing.T) {
testManagerSimpleFeed(t, func(mgr *Manager, sf *SimpleFeed) {
close(sf.Source())
err := sf.Close()
if err != nil {
t.Errorf("expected simple feed close after source close to work")
}
})
}

func testManagerSimpleFeed(t *testing.T, andThen func(*Manager, *SimpleFeed)) {
emptyDir, _ := ioutil.TempDir("./tmp", "test")
defer os.RemoveAll(emptyDir)
cfg := NewCfgMem()
meh := &TestMEH{}
m := NewManager(VERSION, cfg, NewUUID(), nil, ":1000", emptyDir, "some-datasource", meh)
if err := m.Start(true); err != nil {
t.Errorf("expected Manager.Start() to work, err: %v", err)
}
if err := m.CreateIndex("simple", "sourceName", "sourceUUID",
"bleve", "foo", ""); err != nil {
t.Errorf("expected simple CreateIndex() to work")
}
m.PlannerNOOP("test")
m.JanitorNOOP("test")
feeds, pindexes := m.CurrentMaps()
if len(feeds) != 1 || len(pindexes) != 1 {
t.Errorf("expected to be 1 feed and 1 pindex, got feeds: %+v, pindexes: %+v",
feeds, pindexes)
}
if meh.lastPIndex == nil {
t.Errorf("expected to be meh.lastPIndex")
}
feedName := FeedName(meh.lastPIndex)
feed, exists := feeds[feedName]
if !exists || feed == nil {
t.Errorf("expected there to be feed: %s", feedName)
}
sf, ok := feed.(*SimpleFeed)
if !ok || sf == nil {
t.Errorf("expected feed to be simple")
}
if sf.Source() == nil {
t.Errorf("expected simple feed source to be there")
}
if sf.Streams() == nil {
t.Errorf("expected simple feed streams to be there")
}
andThen(m, sf)
}
4 changes: 2 additions & 2 deletions pindex_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ func (pindex *PIndex) Run(mgr PIndexManager) {
close, cleanup, err = RunBleveStream(mgr, pindex, pindex.Stream,
pindex.Impl.(bleve.Index))
if err != nil {
log.Printf("error: RunBleveStream, close: %b, cleanup: %b, err: %v",
log.Printf("error: RunBleveStream, close: %t, cleanup: %t, err: %v",
close, cleanup, err)
} else {
log.Printf("done: RunBleveStream, close: %b, cleanup: %b",
log.Printf("done: RunBleveStream, close: %t, cleanup: %t",
close, cleanup)
}
} else {
Expand Down
15 changes: 15 additions & 0 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

package main

import (
"fmt"
)

type Stream chan StreamRequest

type StreamPartitionFunc func(StreamRequest, map[string]Stream) (Stream, error)
Expand Down Expand Up @@ -53,3 +57,14 @@ type StreamUpdate struct {
type StreamDelete struct {
Id []byte
}

// ----------------------------------------------

// This partition func sends all stream requests to the "" partition.
func EmptyPartitionFunc(req StreamRequest, streams map[string]Stream) (Stream, error) {
stream, exists := streams[""]
if !exists || stream == nil {
return nil, fmt.Errorf("error: no empty/all partition in streams: %#v", streams)
}
return stream, nil
}
28 changes: 28 additions & 0 deletions stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.

package main

import (
"testing"
)

func TestEmptyPartitionFunc(t *testing.T) {
stream := make(Stream)
s, err := EmptyPartitionFunc(nil, map[string]Stream{"": stream})
if err != nil || s != stream {
t.Errorf("expected TestEmptyPartitionFunc to work")
}
s, err = EmptyPartitionFunc(nil, map[string]Stream{"foo": stream})
if err == nil || s == stream {
t.Errorf("expected TestEmptyPartitionFunc to not work")
}
}

0 comments on commit 6cef79e

Please sign in to comment.