-
Notifications
You must be signed in to change notification settings - Fork 216
update RethinkDB adaptor with new interfaces and tests #268
Conversation
pkg/adaptor/mongodb/mongodb.go
Outdated
s, err := m.client.Connect() | ||
if s, ok := s.(client.Closer); ok { | ||
s.Close() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we were leaking a connection here which meant there was a goroutine that would run forever when used as a library, should fix #265
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
pkg/adaptor/mongodb/mongodb.go
Outdated
@@ -142,6 +166,9 @@ func (m *MongoDB) Start() (err error) { | |||
if err != nil { | |||
return err | |||
} | |||
if s, ok := s.(client.Closer); ok { | |||
defer s.Close() | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
more connection leakage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -57,7 +57,7 @@ func (r *Reader) Read(filterFn client.NsFilterFunc) client.MessageChanFunc { | |||
log.With("db", r.db).Infoln("Read completed") | |||
return | |||
} | |||
msg := message.From(ops.Insert, fmt.Sprintf("%s.%s", r.db, result.c), data.Data(result.doc)) | |||
msg := message.From(ops.Insert, result.c, data.Data(result.doc)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switch to new namespace
) | ||
|
||
// Reader fulfills the client.Reader interface for use with both copying and tailing a RethinkDB | ||
// database. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this Reader can handle straight copies as well as fire up changefeeds to be used for tailing.
} | ||
iterationComplete := r.iterateTable(session, tables, out, done) | ||
var wg sync.WaitGroup | ||
func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may move this to a defined func rather than an anonymous one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, yeah it is a little awkward to see an anonymous function like this, but it does help the flow control down below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, part of me likes being to just read through the code without having to jump down/up to a func
pkg/adaptor/rethinkdb/rethinkdb.go
Outdated
} | ||
|
||
// prepareDocument moves the `id` field to the `_id` field, which is more | ||
// commonly used by downstream sinks. A transformer could be used to do the | ||
// same thing, but because transformers are not run for Delete messages, we | ||
// must do it here. | ||
func (r *RethinkDB) prepareDocument(doc map[string]interface{}) map[string]interface{} { | ||
func prepareDocument(doc map[string]interface{}) map[string]interface{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kind of hate that we do this because it makes it quite irritating for someone who is wanting do to a rethinkdb -> rethinkdb pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. It should be the responsibility of the respective sink's Writer to massage the message into the form necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 I'm going to remove this function and add logic to the writer
@@ -106,6 +106,11 @@ func (l logger) Errorf(format string, args ...interface{}) { | |||
var origLogger = logrus.New() | |||
var baseLogger = logger{entry: logrus.NewEntry(origLogger)} | |||
|
|||
// Orig provides access to the underlying *logrus.Logger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this to try and get the rethinkdb driver logging but it hasn't worked quite yet
@@ -12,12 +12,10 @@ nodes: | |||
type: mongo | |||
uri: mongodb://localhost/boom | |||
namespace: boom.foo | |||
debug: true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleanup
localmongo2: | ||
type: mongo | ||
uri: mongodb://localhost/boom | ||
namespace: boom.baz | ||
debug: true | ||
es: | ||
type: elasticsearch | ||
uri: https://nick:darling@haproxy1.dblayer.com:10291/thisgetsignored |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
everything below here is lovely glide updating :allthethings:
pkg/adaptor/mongodb/mongodb.go
Outdated
s, err := m.client.Connect() | ||
if s, ok := s.(client.Closer); ok { | ||
s.Close() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
pkg/adaptor/mongodb/mongodb.go
Outdated
@@ -142,6 +166,9 @@ func (m *MongoDB) Start() (err error) { | |||
if err != nil { | |||
return err | |||
} | |||
if s, ok := s.(client.Closer); ok { | |||
defer s.Close() | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
pkg/adaptor/rethinkdb/reader.go
Outdated
} | ||
}() | ||
log.With("db", r.db).Infoln("Read completed") | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this function doing? doesn't appear to be doing anything
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea... I had some issues with trying to still receive on the done channel when doing a tail but it's very likely that the wait group accomplishes the same thing since it gets propagated down to sendChanges
. I'll remove it and see what happens.
} | ||
iterationComplete := r.iterateTable(session, tables, out, done) | ||
var wg sync.WaitGroup | ||
func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, yeah it is a little awkward to see an anonymous function like this, but it does help the flow control down below.
pkg/adaptor/rethinkdb/rethinkdb.go
Outdated
} | ||
|
||
// prepareDocument moves the `id` field to the `_id` field, which is more | ||
// commonly used by downstream sinks. A transformer could be used to do the | ||
// same thing, but because transformers are not run for Delete messages, we | ||
// must do it here. | ||
func (r *RethinkDB) prepareDocument(doc map[string]interface{}) map[string]interface{} { | ||
func prepareDocument(doc map[string]interface{}) map[string]interface{} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. It should be the responsibility of the respective sink's Writer to massage the message into the form necessary
LGTM 👍 |
dc01fb9
to
c79263e
Compare
brings the adaptor up-to-date with the new client interfaces and adds much needed testing, removed the concept of replacing the id field with _id but during writes will check for the presence of _id if id does not exist fixes #242
a9306b1
to
be740e9
Compare
fixes #242