Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

209 ES adaptor stability #233

Merged
merged 39 commits into from
Jan 25, 2017
Merged

209 ES adaptor stability #233

merged 39 commits into from
Jan 25, 2017

Conversation

jipperinbham
Copy link
Contributor

@jipperinbham jipperinbham commented Jan 19, 2017

  • CHANGELOG.md updated
  • README.md updated

switch from elastigo to elastic library and create underlying clients for each major version of elasticsearch.

fixes #209, #222, #167, and #159

@jipperinbham jipperinbham changed the title [WIP] 209 ES adaptor stability 209 ES adaptor stability Jan 21, 2017
Copy link
Contributor

@trinchan trinchan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initial look through, I'll give it another look later today


// Closer takes care of receiving on the done channel and then properly cleaning up
// the session and WaitGroup.
func Closer(done chan struct{}, wg *sync.WaitGroup, s client.Session) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Closer sounds like it should be an interface by its name. Maybe should be Close

type Creator func(chan struct{}, *sync.WaitGroup, *ClientOptions) (client.Writer, error)

// Clients contains the map of versioned clients
var Clients = map[string]*VersionedClient{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be private if we're going to delegate access through Add below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

planning on addressing this in #234

@@ -0,0 +1,36 @@
package clients
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see us having/wanting to use this sort of registry for other db types outside of ES. Let's make an issue for making this generic and have the other adaptors use the registry as well. I guess we would need to be able to get the version information through an API call like we do for ES. Should be possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue created to track that work, #234

var id string
if _, ok := msg.Data()["_id"]; ok {
id = msg.ID()
msg.Data().Delete("_id")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will we have to restore the id field after the write for nodes further down in the pipeline that may expect _id to be present? I'm not sure if msg.Data() returns a copy or not. I think not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

msg.Data() does not return a copy and I hadn't really thought about the use case for chaining the adaptors in such a way, it does bring up the issue of whether the msg.Data should be immutable because in instances were multiple Sink adaptors are attached to the same Source, this code could cause a race condition with a concurrent map read/write.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

func (w *Writer) Close() {
log.Infoln("flushing BulkProcessor")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have this be a little more descriptive (that it's an ES bulk processor)

log.Debugln("tests shutdown complete")
}

type CountResponse struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this private

var id string
if _, ok := msg.Data()["_id"]; ok {
id = msg.ID()
msg.Data().Delete("_id")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as with v1, do we need to restore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed by making a copy of the original msg.Data() in the adaptor.

@@ -0,0 +1,91 @@
package v2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know you're aware of all the code duplication, just want to make sure we track it in an issue to investigate clever ways to do the versioning.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

hostBits := strings.Split(e.uri.Host, ":")
if len(hostBits) > 1 {
client.SetPort(hostBits[1])
for _, vc := range clients.Clients {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make the registry generic, I think a function to return the proper client given the adaptor type and version would be better than having this logic in the adaptor. Even for just elasticsearch, I think it would be better to have this in the registry.

@@ -96,6 +98,10 @@ func (l logger) Errorf(format string, args ...interface{}) {
l.entry.Errorf(format, args...)
}

func (l logger) Printf(format string, args ...interface{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this necessary if we have logger.Infof?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to fulfill the Logger interface in the new elasticsearch lib, https://github.com/olivere/elastic/wiki/Logging

}

func newWriter(client *elastic.Client, done chan struct{}, wg *sync.WaitGroup) *Writer {
p, _ := client.BulkProcessor().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what we'd do with the errors, but right now we're not catching any of the errors from the BulkProcessor. You can use a BulkAfterFunc and .After(callback) to get at the errors though. Would be bad to throw away errors without any output.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, thanks for the reminder, I wanted to log any errors so I'll get that in now

BulkActions(1000). // commit if # requests >= 1000
BulkSize(2 << 20). // commit if size of requests >= 2 MB
FlushInterval(30 * time.Second). // commit every 30s
Do()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here with the error handling

e.pipe.Stop()
e.indexer.Stop()
}
log.With("path", e.path).Infoln("adaptor Stopping...")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i noticed the e.path there, it will show the adaptor type, all good.


func newTransport(accessKeyID, secretAccessKey string) http.RoundTripper {
t := http.DefaultTransport
if accessKeyID != "" && secretAccessKey != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any way to test the creds and error sooner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would we be testing here? when adding support for AWS signed auth, this seemed like the best/cleanest route to go.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was just wondering if there was a way to bail out early if the creds were bad

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, I honestly don't know

@coveralls
Copy link

Coverage Status

Coverage increased (+4.2%) to 57.076% when pulling 278d25a on 209-es-stability into 1dc0c3d on experimental.

@jipperinbham jipperinbham merged commit a97392d into experimental Jan 25, 2017
@jipperinbham jipperinbham deleted the 209-es-stability branch January 25, 2017 16:05
jipperinbham added a commit that referenced this pull request Jan 27, 2017
addresses multiple issues with sending data to elasticsearch, handling of _id, major API version differences, and performance

fixes #209
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants