Skip to content

Commit

Permalink
using public fields instead of getters
Browse files Browse the repository at this point in the history
  • Loading branch information
steveyen committed Oct 28, 2014
1 parent 35e059a commit 36edd74
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 39 deletions.
6 changes: 3 additions & 3 deletions feed_dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,13 @@ func (t *DCPFeed) feed() (int, error) {
if uprEvent.Opcode == gomemcached.UPR_MUTATION {
// TODO: Handle dispatch to streams correctly.
t.streams[""] <- &StreamUpdate{
id: uprEvent.Key,
body: uprEvent.Value,
Id: uprEvent.Key,
Body: uprEvent.Value,
}
} else if uprEvent.Opcode == gomemcached.UPR_DELETION {
// TODO: Handle dispatch to streams correctly.
t.streams[""] <- &StreamDelete{
id: uprEvent.Key,
Id: uprEvent.Key,
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions feed_simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,21 +87,21 @@ func (t *SimpleFeed) feed() {
switch req := req.(type) {
case *StreamEnd:
doneCh := make(chan error)
req.doneCh, doneChOrig = doneCh, req.doneCh
req.DoneCh, doneChOrig = doneCh, req.DoneCh
wantWaitForClose = "source stream end"

case *StreamFlush:
doneCh := make(chan error)
req.doneCh, doneChOrig = doneCh, req.doneCh
req.DoneCh, doneChOrig = doneCh, req.DoneCh

case *StreamRollback:
doneCh := make(chan error)
req.doneCh, doneChOrig = doneCh, req.doneCh
req.DoneCh, doneChOrig = doneCh, req.DoneCh
wantWaitForClose = "source stream rollback"

case *StreamSnapshot:
doneCh := make(chan error)
req.doneCh, doneChOrig = doneCh, req.doneCh
req.DoneCh, doneChOrig = doneCh, req.DoneCh

case *StreamUpdate:
case *StreamDelete:
Expand Down
6 changes: 3 additions & 3 deletions feed_tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,13 @@ func (t *TAPFeed) feed() (int, error) {
if op.Opcode == memcached.TapMutation {
// TODO: Handle dispatch to streams correctly.
t.streams[""] <- &StreamUpdate{
id: op.Key,
body: op.Value,
Id: op.Key,
Body: op.Value,
}
} else if op.Opcode == memcached.TapDeletion {
// TODO: Handle dispatch to streams correctly.
t.streams[""] <- &StreamDelete{
id: op.Key,
Id: op.Key,
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions pindex_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,23 +61,23 @@ func RunBleveStream(mgr PIndexManager, pindex *PIndex, stream Stream,

switch m := m.(type) {
case *StreamUpdate:
bindex.Index(string(m.Id()), m.Body())
bindex.Index(string(m.Id), m.Body)

case *StreamDelete:
bindex.Delete(string(m.Id()))
bindex.Delete(string(m.Id))

case *StreamEnd:
// Perhaps the datasource exited or is restarting? We'll
// keep our stream open in case a new feed is hooked up.
if m.doneCh != nil {
close(m.doneCh)
if m.DoneCh != nil {
close(m.DoneCh)
}

case *StreamFlush:
// TODO: Need to delete all records here. So, why not
// implement this the same as rollback to zero?
if m.doneCh != nil {
close(m.doneCh)
if m.DoneCh != nil {
close(m.DoneCh)
}

case *StreamRollback:
Expand All @@ -89,8 +89,8 @@ func RunBleveStream(mgr PIndexManager, pindex *PIndex, stream Stream,
os.RemoveAll(pindex.Path)

// First, respond to the feed so that it can unblock.
if m.doneCh != nil {
close(m.doneCh)
if m.DoneCh != nil {
close(m.DoneCh)
}

// Because, here the manager/janitor will synchronously
Expand All @@ -101,8 +101,8 @@ func RunBleveStream(mgr PIndexManager, pindex *PIndex, stream Stream,

case *StreamSnapshot:
// TODO: Need to ACK some snapshot?
if m.doneCh != nil {
close(m.doneCh)
if m.DoneCh != nil {
close(m.DoneCh)
}
}
}
Expand Down
26 changes: 7 additions & 19 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,48 +20,36 @@ type StreamRequest interface{}
// ----------------------------------------------

type StreamEnd struct {
doneCh chan error
DoneCh chan error
}

// ----------------------------------------------

type StreamFlush struct {
doneCh chan error
DoneCh chan error
}

// ----------------------------------------------

type StreamRollback struct {
doneCh chan error
DoneCh chan error
}

// ----------------------------------------------

type StreamSnapshot struct {
doneCh chan error
DoneCh chan error
}

// ----------------------------------------------

type StreamUpdate struct {
id []byte
body []byte
}

func (s *StreamUpdate) Id() []byte {
return s.id
}

func (s *StreamUpdate) Body() []byte {
return s.body
Id []byte
Body []byte
}

// ----------------------------------------------

type StreamDelete struct {
id []byte
}

func (s *StreamDelete) Id() []byte {
return s.id
Id []byte
}

0 comments on commit 36edd74

Please sign in to comment.