From 35e059a950b1938e40085331e0c02e2d5240d3a6 Mon Sep 17 00:00:00 2001 From: Steve Yen Date: Tue, 28 Oct 2014 12:16:27 -0700 Subject: [PATCH] first attempt at simple, memory-only feed for testing --- feed_nil.go | 1 + feed_simple.go | 146 +++++++++++++++++++++++++++++++++++++++++++++++++ stream.go | 17 +++++- 3 files changed, 162 insertions(+), 2 deletions(-) create mode 100644 feed_simple.go diff --git a/feed_nil.go b/feed_nil.go index 29a8211..4c4bcd0 100644 --- a/feed_nil.go +++ b/feed_nil.go @@ -16,6 +16,7 @@ type NILFeed struct { streams map[string]Stream } +// A NILFeed never feeds any data to its streams. func NewNILFeed(name string, streams map[string]Stream) *NILFeed { return &NILFeed{ name: name, diff --git a/feed_simple.go b/feed_simple.go new file mode 100644 index 0000000..f983172 --- /dev/null +++ b/feed_simple.go @@ -0,0 +1,146 @@ +// Copyright (c) 2014 Couchbase, Inc. +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the +// License. You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an "AS +// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language +// governing permissions and limitations under the License. + +package main + +import ( + "fmt" + log "github.com/couchbaselabs/clog" +) + +// A SimpleFeed is uses a local, in-memory channel as its Stream +// datasource. It's useful, amongst other things, for testing. +type SimpleFeed struct { + name string + streams map[string]Stream + closeCh chan bool + doneCh chan bool + doneErr error + doneMsg string + source Stream + pf StreamPartitionFunc +} + +func (t *SimpleFeed) Source() Stream { + return t.source +} + +func NewSimpleFeed(name string, source Stream, pf StreamPartitionFunc, + streams map[string]Stream) (*SimpleFeed, error) { + return &SimpleFeed{ + name: name, + streams: streams, + closeCh: make(chan bool), + doneCh: make(chan bool), + doneErr: nil, + doneMsg: "", + source: source, + pf: pf, + }, nil +} + +func (t *SimpleFeed) Name() string { + return t.name +} + +func (t *SimpleFeed) Start() error { + log.Printf("SimpleFeed.Start, name: %s", t.Name()) + go t.feed() + return nil +} + +func (t *SimpleFeed) feed() { + for { + select { + case <-t.closeCh: + t.doneErr = nil + t.doneMsg = "closeCh closed" + close(t.doneCh) + return + + case req, alive := <-t.source: + if !alive { + t.waitForClose("source closed", nil) + return + } + + stream, err := t.pf(req, t.streams) + if err != nil { + t.waitForClose("partition func error", + fmt.Errorf("error: SimpleFeed pf on req: %#v, err: %v", + req, err)) + return + } + + var doneChOrig chan error + var doneCh chan error + wantWaitForClose := "" + + switch req := req.(type) { + case *StreamEnd: + doneCh := make(chan error) + req.doneCh, doneChOrig = doneCh, req.doneCh + wantWaitForClose = "source stream end" + + case *StreamFlush: + doneCh := make(chan error) + req.doneCh, doneChOrig = doneCh, req.doneCh + + case *StreamRollback: + doneCh := make(chan error) + req.doneCh, doneChOrig = doneCh, req.doneCh + wantWaitForClose = "source stream rollback" + + case *StreamSnapshot: + doneCh := make(chan error) + req.doneCh, doneChOrig = doneCh, req.doneCh + + case *StreamUpdate: + case *StreamDelete: + } + + stream <- req + + if doneCh != nil { + err = <-doneCh + } + + if doneChOrig != nil { + if err != nil { + doneChOrig <- err + } + close(doneChOrig) + } + + if wantWaitForClose != "" { + t.waitForClose(wantWaitForClose, nil) + return + } + } + } +} + +func (t *SimpleFeed) waitForClose(msg string, err error) { + <-t.closeCh + t.doneErr = err + t.doneMsg = msg + close(t.doneCh) +} + +func (t *SimpleFeed) Close() error { + close(t.closeCh) + <-t.doneCh + return t.doneErr +} + +func (t *SimpleFeed) Streams() map[string]Stream { + return t.streams +} diff --git a/stream.go b/stream.go index 8d3d49d..655cc7e 100644 --- a/stream.go +++ b/stream.go @@ -13,25 +13,36 @@ package main type Stream chan StreamRequest -type StreamRequest interface { -} +type StreamPartitionFunc func(StreamRequest, map[string]Stream) (Stream, error) + +type StreamRequest interface{} + +// ---------------------------------------------- type StreamEnd struct { doneCh chan error } +// ---------------------------------------------- + type StreamFlush struct { doneCh chan error } +// ---------------------------------------------- + type StreamRollback struct { doneCh chan error } +// ---------------------------------------------- + type StreamSnapshot struct { doneCh chan error } +// ---------------------------------------------- + type StreamUpdate struct { id []byte body []byte @@ -45,6 +56,8 @@ func (s *StreamUpdate) Body() []byte { return s.body } +// ---------------------------------------------- + type StreamDelete struct { id []byte }