Skip to content

Commit

Permalink
first attempt at simple, memory-only feed for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
steveyen committed Oct 28, 2014
1 parent db1834d commit 35e059a
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 2 deletions.
1 change: 1 addition & 0 deletions feed_nil.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type NILFeed struct {
streams map[string]Stream
}

// A NILFeed never feeds any data to its streams.
func NewNILFeed(name string, streams map[string]Stream) *NILFeed {
return &NILFeed{
name: name,
Expand Down
146 changes: 146 additions & 0 deletions feed_simple.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright (c) 2014 Couchbase, Inc.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the
// License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an "AS
// IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
// express or implied. See the License for the specific language
// governing permissions and limitations under the License.

package main

import (
"fmt"
log "github.com/couchbaselabs/clog"
)

// A SimpleFeed is uses a local, in-memory channel as its Stream
// datasource. It's useful, amongst other things, for testing.
type SimpleFeed struct {
name string
streams map[string]Stream
closeCh chan bool
doneCh chan bool
doneErr error
doneMsg string
source Stream
pf StreamPartitionFunc
}

func (t *SimpleFeed) Source() Stream {
return t.source
}

func NewSimpleFeed(name string, source Stream, pf StreamPartitionFunc,
streams map[string]Stream) (*SimpleFeed, error) {
return &SimpleFeed{
name: name,
streams: streams,
closeCh: make(chan bool),
doneCh: make(chan bool),
doneErr: nil,
doneMsg: "",
source: source,
pf: pf,
}, nil
}

func (t *SimpleFeed) Name() string {
return t.name
}

func (t *SimpleFeed) Start() error {
log.Printf("SimpleFeed.Start, name: %s", t.Name())
go t.feed()
return nil
}

func (t *SimpleFeed) feed() {
for {
select {
case <-t.closeCh:
t.doneErr = nil
t.doneMsg = "closeCh closed"
close(t.doneCh)
return

case req, alive := <-t.source:
if !alive {
t.waitForClose("source closed", nil)
return
}

stream, err := t.pf(req, t.streams)
if err != nil {
t.waitForClose("partition func error",
fmt.Errorf("error: SimpleFeed pf on req: %#v, err: %v",
req, err))
return
}

var doneChOrig chan error
var doneCh chan error
wantWaitForClose := ""

switch req := req.(type) {
case *StreamEnd:
doneCh := make(chan error)
req.doneCh, doneChOrig = doneCh, req.doneCh
wantWaitForClose = "source stream end"

case *StreamFlush:
doneCh := make(chan error)
req.doneCh, doneChOrig = doneCh, req.doneCh

case *StreamRollback:
doneCh := make(chan error)
req.doneCh, doneChOrig = doneCh, req.doneCh
wantWaitForClose = "source stream rollback"

case *StreamSnapshot:
doneCh := make(chan error)
req.doneCh, doneChOrig = doneCh, req.doneCh

case *StreamUpdate:
case *StreamDelete:
}

stream <- req

if doneCh != nil {
err = <-doneCh
}

if doneChOrig != nil {
if err != nil {
doneChOrig <- err
}
close(doneChOrig)
}

if wantWaitForClose != "" {
t.waitForClose(wantWaitForClose, nil)
return
}
}
}
}

func (t *SimpleFeed) waitForClose(msg string, err error) {
<-t.closeCh
t.doneErr = err
t.doneMsg = msg
close(t.doneCh)
}

func (t *SimpleFeed) Close() error {
close(t.closeCh)
<-t.doneCh
return t.doneErr
}

func (t *SimpleFeed) Streams() map[string]Stream {
return t.streams
}
17 changes: 15 additions & 2 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,36 @@ package main

type Stream chan StreamRequest

type StreamRequest interface {
}
type StreamPartitionFunc func(StreamRequest, map[string]Stream) (Stream, error)

type StreamRequest interface{}

// ----------------------------------------------

type StreamEnd struct {
doneCh chan error
}

// ----------------------------------------------

type StreamFlush struct {
doneCh chan error
}

// ----------------------------------------------

type StreamRollback struct {
doneCh chan error
}

// ----------------------------------------------

type StreamSnapshot struct {
doneCh chan error
}

// ----------------------------------------------

type StreamUpdate struct {
id []byte
body []byte
Expand All @@ -45,6 +56,8 @@ func (s *StreamUpdate) Body() []byte {
return s.body
}

// ----------------------------------------------

type StreamDelete struct {
id []byte
}
Expand Down

0 comments on commit 35e059a

Please sign in to comment.