Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using elasticsearch generated ID's #97

Open
idarlington opened this issue Jul 10, 2017 · 12 comments
Open

Using elasticsearch generated ID's #97

idarlington opened this issue Jul 10, 2017 · 12 comments

Comments

@idarlington
Copy link

I noticed that documents indexed in elasticsearch have their ids in the following format topic+partition+offset.

I would prefer to use id's generated by elasticsearch. It seems topic+partition+offset is not usually unique so I am loosing data.

How can I change that?

@idarlington idarlington changed the title Using elasticsearch document ids Using elasticsearch generated ID's Jul 10, 2017
@ewencp
Copy link
Contributor

ewencp commented Jul 10, 2017

@idarlington topic+partition+offset should be unique within a Kafka cluster -- that is how you uniquely identify a topic which is why the connector uses that. You could have conflicting IDs if you're pulling data from two Kafka clusters where the same topic name is used. If this is the cause of your data loss, you could add a RegexpRouter transformation to your connector to ensure the topic name is prepended with a cluster name as well so you'll have globally unique topic names.

Currently the ID is controlled by the key.ignore setting. If it is true then you get the topic+partition+offset format. If it is false, the ID will be the Kafka key, which allows you to update documents in Elasticsearch. I don't think there's a way to just use Elasticsearch IDs currently, though I think that's the behavior you would see if your data doesn't have keys.

@idarlington
Copy link
Author

Thanks @ewencp.

I am currently operating a single cluster. I noticed that the id is now topic+partition+logsize.

From the offsetchecker it seems the offset is not updating.

Group           Topic                          Pid Offset          logSize         Lag             Owner
my_group        gritServer                     0   81848           142559          60711           none

Also, if the server is restarted does the offset value change. I currently have my log.dir in tmp

Finally can you point me to examples of RegexpRouter transformation. Thank you.

@ewencp
Copy link
Contributor

ewencp commented Jul 10, 2017

Interesting that the offset does not seem to be correct. I'm looking at the code where that ID is generated and it's definitely using the Kafka offset for the last part. That wouldn't rule out an issue in the framework, but I don't think anything has changed there. Is the logSize and Lag from that command increasing? If the offset isn't, that would imply that offsets are not being successfully committed. Could you check your Connect logs to see if there are any errors or messages about offset commit failing?

I think it's probably not your problem since you seem to have tracked the issue down to the offset, but although the docs don't have an example of the regex router, I can give a quick example here:

transforms=Rename
transforms.Rename.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.Rename.regex=(.*)
transforms.Rename.replacement=prefix-$1

That will prefix the topic names with a constant prefix before the connector processes them.

Also, note that log.dir in tmp is a bad idea. It works fine when you're just developing/testing locally and don't care about the data, but depending on how your /tmp is managed, files could seemingly randomly be deleted and data would disappear.

@idarlington
Copy link
Author

Thanks @ewencp

Yes, the logsize and lag are increasing and the offset isn't increasing. BTW I am using Confluent 3.1.2 with ES 2.0.0

I can't find any errors in the logs.These are mostly the contents:

[2017-07-10 18:09:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:10:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:11:53,926] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:12:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:13:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:14:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:15:53,928] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:16:53,926] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:17:53,927] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)
[2017-07-10 18:18:53,928] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

I would be updating the log.dir then, I do hope moving the contents of the current dir and updating its value in server.properties would suffice.

@ewencp
Copy link
Contributor

ewencp commented Jul 10, 2017

I'm also wondering if possibly you got into a bad state for the topic such that the consumer cannot make progress. If you increase the log level, you should see more messages indicating the sink task's progress. Increasing all the way to TRACE, if even only for WorkerSinkTask, would give log info about both the messages as they are consumed (including even logging the key and value) and the offsets that are actually being committed.

You might also just want to check that all the files we'd expect to be in the Kafka directory are there, i.e. that something didn't get deleted.

@idarlington
Copy link
Author

idarlington commented Jul 11, 2017

@ewencp

I have increased the log to TRACE, this is a snippet:

[2017-07-11 07:39:54,670] TRACE Executing batch 450 of 1 records (io.confluent.connect.elasticsearch.bulk.BulkProcessor:347)
[2017-07-11 07:39:54,671] DEBUG POST method created based on client request (io.searchbox.client.http.JestHttpClient:99)
[2017-07-11 07:39:54,671] DEBUG Request method=POST url=http://localhost:9200/_bulk (io.searchbox.client.http.JestHttpClient:84)
[2017-07-11 07:39:54,671] DEBUG CookieSpec selected: default (org.apache.http.client.protocol.RequestAddCookies:122)
[2017-07-11 07:39:54,672] DEBUG Auth cache not set in the context (org.apache.http.client.protocol.RequestAuthCache:76)
[2017-07-11 07:39:54,672] DEBUG Connection request: [route: {}->http://localhost:9200][total kept alive: 2; route allocated: 2 of 2; total allocated: 2 of 20] (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:249)
[2017-07-11 07:39:54,672] DEBUG Connection leased: [id: 0][route: {}->http://localhost:9200][total kept alive: 1; route allocated: 2 of 2; total allocated: 2 of 20] (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:282)
[2017-07-11 07:39:54,672] DEBUG http-outgoing-0: set socket timeout to 3000 (org.apache.http.impl.conn.DefaultManagedHttpClientConnection:90)
[2017-07-11 07:39:54,673] DEBUG Executing request POST /_bulk HTTP/1.1 (org.apache.http.impl.execchain.MainClientExec:255)
[2017-07-11 07:39:54,673] DEBUG Target auth state: UNCHALLENGED (org.apache.http.impl.execchain.MainClientExec:260)
[2017-07-11 07:39:54,673] DEBUG Proxy auth state: UNCHALLENGED (org.apache.http.impl.execchain.MainClientExec:266)
[2017-07-11 07:39:54,673] DEBUG http-outgoing-0 >> POST /_bulk HTTP/1.1 (org.apache.http.headers:135)
[2017-07-11 07:39:54,673] DEBUG http-outgoing-0 >> Content-Length: 2216 (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Content-Type: application/json; charset=UTF-8 (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Host: localhost:9200 (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Connection: Keep-Alive (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> User-Agent: Apache-HttpClient/4.5.1 (Java/1.8.0_111) (org.apache.http.headers:138)
[2017-07-11 07:39:54,674] DEBUG http-outgoing-0 >> Accept-Encoding: gzip,deflate (org.apache.http.headers:138)
[2017-07-11 07:39:54,675] DEBUG http-outgoing-0 >> "POST /_bulk HTTP/1.1[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,675] DEBUG http-outgoing-0 >> "Content-Length: 2216[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,675] DEBUG http-outgoing-0 >> "Content-Type: application/json; charset=UTF-8[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,676] DEBUG http-outgoing-0 >> "Host: localhost:9200[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,676] DEBUG http-outgoing-0 >> "Connection: Keep-Alive[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,676] DEBUG http-outgoing-0 >> "User-Agent: Apache-HttpClient/4.5.1 (Java/1.8.0_111)[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,677] DEBUG http-outgoing-0 >> "Accept-Encoding: gzip,deflate[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,677] DEBUG http-outgoing-0 >> "[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,677] DEBUG http-outgoing-0 >> "{"index":{"_id":"gritServer+0+199211","_index":"grits","_type":"docs"}}[\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,678] DEBUG http-outgoing-0 >> "{"data":[{"costSinceLast":[{"sourceName":"Sunhive_4","sourceType":"grid","costSinceLast":3.03,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"powerSinceLast":[{"sourceName":"Sunhive_4","sourceType":"grid","powerSinceLast":141371.1,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","sourceType":"","powerSinceLast":252830.4,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"energyTodaySource":[{"sourceName":"Sunhive_4","sourceType":"grid","energyToday":1111338.2,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","sourceType":"","energyToday":2078056.5,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"activeSource":[{"sourceName":"Sunhive_4","sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"timeTodaySource":[{"sourceName":"Sunhive_4","sourceType":"grid","timeToday":1.3630964,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","sourceType":"","timeToday":0.0,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"current":[{"sourceName":"Sunhive_4","value":[227.49,215.29,189.65],"sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","value":[227.49,227.49,227.49,227.49,227.49],"sourceType":"","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"voltage":[{"sourceName":"Sunhive_4","value":[233.744,232.89,231.452],"sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","value":[233.744,233.744,233.744,233.744,233.744],"sourceType":"","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"powerfactor":[{"sourceName":"Sunhive_4","value":[0.984,0.981,0.977],"sourceType":"grid","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"},{"sourceName":"","value":[0.984,0.984,0.984,0.984,0.984],"sourceType":"","configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}],"time":"2017-07-11T08:39:46.893+0100","Type":"1","id":"70:B3:D5:43:09:E6","costTodaySource":[{"sourceName":"Sunhive_4","sourceType":"grid","costToday":27783.455,"configID_FK":"3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"}]}],"master":{"id":"70:B3:D5:43:09:E6","time":"2017-07-11T08:39:46.893+0100","configuration_IDs":["3Y5YFOYIA3KQKBZYVUB0EOM2JUC0YIK0"]}}[\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,685] DEBUG http-outgoing-0 << "HTTP/1.1 200 OK[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,685] DEBUG http-outgoing-0 << "Content-Type: application/json; charset=UTF-8[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,685] DEBUG http-outgoing-0 << "Content-Length: 181[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << "[\r][\n]" (org.apache.http.wire:72)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << "{"took":7,"errors":false,"items":[{"index":{"_index":"grits","_type":"docs","_id":"gritServer+0+199211","_version":7,"_shards":{"total":2,"successful":1,"failed":0},"status":200}}]}" (org.apache.http.wire:86)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << HTTP/1.1 200 OK (org.apache.http.headers:124)
[2017-07-11 07:39:54,686] DEBUG http-outgoing-0 << Content-Type: application/json; charset=UTF-8 (org.apache.http.headers:127)
[2017-07-11 07:39:54,687] DEBUG http-outgoing-0 << Content-Length: 181 (org.apache.http.headers:127)
[2017-07-11 07:39:54,687] DEBUG Connection can be kept alive indefinitely (org.apache.http.impl.execchain.MainClientExec:284)
[2017-07-11 07:39:54,687] DEBUG Connection [id: 0][route: {}->http://localhost:9200] can be kept alive indefinitely (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:314)
[2017-07-11 07:39:54,688] DEBUG Connection released: [id: 0][route: {}->http://localhost:9200][total kept alive: 2; route allocated: 2 of 2; total allocated: 2 of 20] (org.apache.http.impl.conn.PoolingHttpClientConnectionManager:320)
[2017-07-11 07:39:54,688] DEBUG Bulk operation was successfull (io.searchbox.core.Bulk:143)

tree output :

├── bin
│   ├── camus-config
│   ├── camus-run
│   ├── confluent-rebalancer
│   ├── connect-distributed
│   ├── connect-standalone
│   ├── control-center-3_0_0-reset
│   ├── control-center-3_0_1-reset
│   ├── control-center-console-consumer
│   ├── control-center-reset
│   ├── control-center-run-class
│   ├── control-center-set-acls
│   ├── control-center-start
│   ├── kafka-acls
│   ├── kafka-avro-console-consumer
│   ├── kafka-avro-console-producer
│   ├── kafka-configs
│   ├── kafka-console-consumer
│   ├── kafka-console-producer
│   ├── kafka-consumer-groups
│   ├── kafka-consumer-offset-checker
│   ├── kafka-consumer-perf-test
│   ├── kafka-mirror-maker
│   ├── kafka-preferred-replica-election
│   ├── kafka-producer-perf-test
│   ├── kafka-reassign-partitions
│   ├── kafka-replay-log-producer
│   ├── kafka-replica-verification
│   ├── kafka-rest-run-class
│   ├── kafka-rest-start
│   ├── kafka-rest-stop
│   ├── kafka-rest-stop-service
│   ├── kafka-run-class
│   ├── kafka-server-start
│   ├── kafka-server-stop
│   ├── kafka-simple-consumer-shell
│   ├── kafka-streams-application-reset
│   ├── kafka-topics
│   ├── kafka-verifiable-consumer
│   ├── kafka-verifiable-producer
│   ├── schema-registry-run-class
│   ├── schema-registry-start
│   ├── schema-registry-stop
│   ├── schema-registry-stop-service
│   ├── support-metrics-bundle
│   ├── windows
│   ├── zookeeper-security-migration
│   ├── zookeeper-server-start
│   ├── zookeeper-server-stop
│   └── zookeeper-shell
├── etc
│   ├── camus
│   ├── confluent-common
│   ├── confluent-control-center
│   ├── confluent-rebalancer
│   ├── kafka
│   ├── kafka-connect-elasticsearch
│   ├── kafka-connect-hdfs
│   ├── kafka-connect-jdbc
│   ├── kafka-connect-replicator
│   ├── kafka-rest
│   ├── rest-utils
│   └── schema-registry

etc/kafka

.
├── connect-console-sink.properties
├── connect-console-source.properties
├── connect-distributed.properties
├── connect-file-sink.properties
├── connect-file-source.properties
├── connect-log4j.properties
├── connect-standalone.properties
├── consumer.properties
├── log4j.properties
├── producer.properties
├── server.properties
├── tools-log4j.properties
└── zookeeper.properties

@ewencp
Copy link
Contributor

ewencp commented Jul 11, 2017

@idarlington Ok, so it looks like it's definitely making requests. We might want a larger snippet though because the TRACE and DEBUG output is so verbose -- most of that is from the underlying library making the request, but doesn't include much output from Connect itself. We'd probably want more lines that include io.confluent or org.apache.kafka logs.

@idarlington
Copy link
Author

Hi @ewencp, I apologize for the late reply.

You can find a larger log output here

Thanks.

@ewencp
Copy link
Contributor

ewencp commented Jul 21, 2017

@idarlington I'm not seeing any duplicates in that log? In fact it looks like requests are working as expected. I've grepped out a bit to help show what the connector is doing:

 $ grep _id log | grep '>>' | cut -f 6 -d ' '
"{"index":{"_id":"gritServer+0+199576","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199577","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199578","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199579","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199580","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199581","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199582","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199583","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199584","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199585","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199586","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199587","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199588","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199589","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199590","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199591","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199592","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199593","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199594","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199595","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199596","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199597","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199598","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199599","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199600","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199601","_index":"grits","_type":"docs"}}[\n]"
"POST
"{"index":{"_id":"gritServer+0+199603","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199604","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199605","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199606","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199607","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199608","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199609","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199610","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199611","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199612","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199613","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199614","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199615","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199616","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199617","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199618","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199619","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199620","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199621","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199622","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199623","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199624","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199625","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199626","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199627","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199628","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199629","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199630","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199631","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199632","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199633","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199634","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199635","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199636","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199637","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199638","_index":"grits","_type":"docs"}}[\n]"
"{"index":{"_id":"gritServer+0+199639","_index":"grits","_type":"docs"}}[\n]"

(It's missing one and has POST in there because the log is a bit mangled.) So for gritServer-0, we're seeing what we expect -- each message processed once, and in order.

Looking at the offset commits:

[2017-07-11 07:43:40,673] TRACE Flushing data to Elasticsearch with the following offsets: {dailyData-0=OffsetAndMetadata{offset=0, metadata=''}, gritServer-0=OffsetAndMetadata{offset=199607, metadata=''}} (io.confluent.connect.elasticsearch.ElasticsearchSinkT metadata=''}}  (org.apache.kafka.clients.consumer.KafkaConsumer:1160)
[2017-07-11 07:43:40,675] TRACE Sending offset-commit request with {dailyData-0=OffsetAndMetadata{offset=0, metadata=''}, gritServer-0=OffsetAndMetadata{offset=199607, metadata=''}} to coordinator 45.76.39.129:9092 (id: 2147483647 rack: null) for group connect-elasticsearch-sink (org.aconnect-elasticsearch-sink committed offset 199607 for partition gritServer-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:640)
...snip...
[2017-07-11 07:43:58,194] TRACE Flushing data to Elasticsearch with the following offsets: {dailyData-0=OffsetAndMetadata{offset=0, metadata=''}, gritServer-0=OffsetAndMetadata{offset=199640, metadata=''}} (io.confluent.connect.SinkTask:130)
[2017-07-11 07:43:58,194] INFO WorkerSinkTask{id=elasticsearch-sink-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:262)

You can see that the offset being committed for gritServer-0 is definitely increasing. What's notable is that dailyData-0 is not increasing and none of the IDs reported above have that in their name, which indicates no messages are being seen for it. Is there actually any data flowing into that topic partition? The log clearly shows that fetch requests are being sent:

[2017-07-11 07:43:29,127] TRACE Added fetch request for partition dailyData-0 at offset 0 (org.apache.kafka.clients.consumer.internals.Fetcher:639)

but it seems they are not receiving any data in response.

It looks like things are running fine, there just isn't data in one of the topic partitions.

@panda87
Copy link

panda87 commented Jul 23, 2017

@ewencp I want to add another input to this thread
Using our predefined _id actually degradate the index time and index performance
This is also one of the Elastic recommendation to use their generated id's instead of ours so each bulk will not have to check whether the _id already exists.

I can understand the design but for some cases like basic logs, we don't need the validation of the _id and as a result the degradation but just able to index with less performance costs

@synhershko
Copy link

I don't think there's a way to just use Elasticsearch IDs currently, though I think that's the behavior you would see if your data doesn't have keys.

The way to do that is send null as the document ID in the indexing / bulk request, as opposed to forcing it to be a string like you do now.

In the logging use case it is usually okay to have some messages added twice (or even completely dropped), so IMO there should be an opt-in configuration which allows you to rely on ES IDs. This will result in much faster inserts to ES.

@kelbyloden
Copy link

See the note I added on issue #139 for a very minor code change I made to support allowing null document keys and thus auto-generating the ids in Elasticsearch.
#139 (comment)
I've been running this for several months now at an average rate of 2.5 billion messages per day with no issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants