Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suppot for document upsert #239

Merged
merged 3 commits into from Apr 12, 2019

Conversation

@jdurani
Copy link
Contributor

commented Sep 20, 2018

In some cases, documents in Kafka topics are only partial (consist only subset of fileds). With "insert" behavior documents are overwritten and fields set prior to last insert are lost.

This adds new ability to "upsert" documents = add/update fields.

@ConfluentCLABot

This comment has been minimized.

Copy link

commented Sep 20, 2018

It looks like @jdurani hasn't signed our Contributor License Agreement, yet.

The purpose of a CLA is to ensure that the guardian of a project's outputs has the necessary ownership or grants of rights over all contributions to allow them to distribute under the chosen licence.
Wikipedia

You can read and sign our full Contributor License Agreement here.

Once you've signed reply with [clabot:check] to prove it.

Appreciation of efforts,

clabot

@jdurani jdurani force-pushed the jdurani:master branch from d0e1de9 to 66fec9e Sep 20, 2018

@jdurani

This comment has been minimized.

Copy link
Contributor Author

commented Sep 20, 2018

[clabot:check]

@ConfluentCLABot

This comment has been minimized.

Copy link

commented Sep 20, 2018

@confluentinc It looks like @jdurani just signed our Contributor License Agreement. 👍

Always at your service,

clabot

@purbon

This comment has been minimized.

Copy link
Member

commented Oct 22, 2018

related to #248

@purbon purbon self-requested a review Oct 30, 2018

@purbon

This comment has been minimized.

Copy link
Member

commented Oct 30, 2018

Hi @jdurani, thanks a lot for your contribution. This is certainly a good feature to have in this connector, would you mind as well adding test for this? this will be great.

@jdurani

This comment has been minimized.

Copy link
Contributor Author

commented Oct 31, 2018

Hello,
will do.

@jdurani jdurani force-pushed the jdurani:master branch 2 times, most recently from 91eaebb to c7ced8f Nov 6, 2018

@jdurani

This comment has been minimized.

Copy link
Contributor Author

commented Nov 6, 2018

@purbon - Done. Unit test added.

@rhauch
Copy link
Member

left a comment

@jdurani, thanks for the PR! I have a few questions below:

public static final String USE_UPDATE_CONFIG = "use.update";
private static final String USE_UPDATE_DOC = "Whether to use UPDATE (UPSERT) instead of "
+ "INSERT method. This allows documents to be updated rather than overwritten "
+ "(e.g. new fields added). Valid options are 'true', 'false'. Default to 'false'.";

This comment has been minimized.

Copy link
@rhauch

rhauch Nov 9, 2018

Member

Rather than a boolean config property, how about a property that takes an enumeration so that we can add other options in the future. For example, update.method with allowed values of upsert and insert (default)? Not the best config name, but would be open to other options.

This comment has been minimized.

Copy link
@jdurani

jdurani Nov 9, 2018

Author Contributor

Actually, I do not see any other ways of update. But that is only my personal understanding at the moment :). I am OK with that. Since we are dealing with writing, I suggest 'write.method'. I found word update missleading.

This comment has been minimized.

Copy link
@jdurani

jdurani Nov 13, 2018

Author Contributor

@rhauch, I have added write.method string config. Allowed values are insert, update, upsert. Let me know if that's OK with you.

@jdurani

This comment has been minimized.

Copy link
Contributor Author

commented Nov 9, 2018

Just for the record. I found out that performance might suffer. E.g. we had to increase some timeouts and reduce batch size to make this work for ES with limited resources. I thing it would be nice to mention this somewhere in the documentation. At least for the reference. Basically, there are two options, modify kafka connect job properties and optimize ES for inserts (since by default it's optimized for search).

@smehta

This comment has been minimized.

Copy link

commented Nov 10, 2018

Thanks @jdurani for adding this feature. This will make our pipeline much simpler, currently we are doing aggregation before putting a message in the kafka topic to avoid full replace.

@jdurani jdurani force-pushed the jdurani:master branch 3 times, most recently from 717d025 to f6faa9e Nov 13, 2018

@purbon
Copy link
Member

left a comment

@jdurani good job with this PR, this will certainly be of great benefit for the connector users. I left a few comments on it, what do you think? hope it makes sense? otherwise I am more than happy to provide more clarification, writing sometimes does not grasp of details of communication 😄.

Again good job with the PR! thanks for your contribution.

@@ -148,6 +151,17 @@
+ "Elasticsearch rejects due to some malformation of the document itself, such as an index"
+ " mapping conflict or a field name containing illegal characters. Valid options are "
+ "'ignore', 'warn', and 'fail'.";
public static final String WRITE_METHOD_CONFIG = "write.method";
private static final String WRITE_METHOD_DOC = "What method to use for storing data to "

This comment has been minimized.

Copy link
@purbon

purbon Nov 13, 2018

Member

While the 3 actions have clear separations, I sort of see them as confusing for the end user. When would a user want to have the connector working only with update mode? any use case in mind?

This comment has been minimized.

Copy link
@purbon

purbon Nov 13, 2018

Member

I have a proposal for this, why not having 2 different configuration properties for this, something like:

In my thoughts this makes the interaction more intuitive for the user, makes sense for you?

This comment has been minimized.

Copy link
@jdurani

jdurani Nov 13, 2018

Author Contributor

Regarding 3 actions - I do not now, but I can image that someone have a separate pipeline to insert documents and separate one for update. In such case, upsert is not acceptable.
Actually, it is very ease to implement so I did.

This comment has been minimized.

Copy link
@jdurani

jdurani Nov 13, 2018

Author Contributor

Regarding 2 configuration properties.
Not bad idea, but I do not see any point. We will have one extra conf. prop. (doc_as_upsert) which applies only for one action.

Regarding action - I find it confusing. Introducing new property with possible value delete. Delete is already implemented <=> if value is null.
Moreover, if we define one action, we would need actually 4 connect jobs - one for each action. At least that's how I understand it.

This comment has been minimized.

Copy link
@purbon

purbon Nov 14, 2018

Member

Hi,
for our way of working, I agree delete does not makes much of sense. But at less the naming I find it more clear than write.method, with my proposal sort of we mimic the simply model of http, sort of easy to understand actions (something like insert, update) then the boolean value will change of of this action to do both. What do you think?

speaking about use cases, I sort of see more from personal experience either:

  • full insert pipeline, where every new event is a new document.
  • upsert pipeline, updates and insert mixed in the pipeline.

one example of this would be for example when doing CDC with something like http://maxwells-daemon.io

At the end, my question here goes into finding a proper interface for the user that sticks and helps understanding what to do... hope it makes sense.

This comment has been minimized.

Copy link
@jdurani

jdurani Nov 20, 2018

Author Contributor

@purbon What do you thing about my last commet?

This comment has been minimized.

Copy link
@purbon

purbon Nov 29, 2018

Member

thanks a lot for your kind description and feedback, I'm actually on the fence for this, option two keeps the actions to basic, then asking an action to do upsert, while the option 1 is more opinionated as well as more closed to what users might end up doing anyway.

I guess is ok for now to have simple actions:

  • INSERT: current behavior and default one.
  • UPDATE: update documents instead of override.
  • insert_or_update: this is another useful action we can use and is main goal of this PR.

We should note somewhere that updates are partial.

makes sense?

@rhauch do you agree with this?

This comment has been minimized.

Copy link
@jdurani

jdurani Nov 29, 2018

Author Contributor

It sure does make sense to me.

My conclusion:

  • we go with one property - currently named write.method (I am open to suggestions for new name if you want)
  • we go with 3 acceptable values - INSERT, UPDATE, UPSERT (again, I am open to suggestions for different name if necessary)
  • do proper documentation - where? is doc in class ElasticsearchSinkConnectorConfig.java sufficient? If yes, I will update doc description for that property.

This comment has been minimized.

Copy link
@purbon

purbon Nov 29, 2018

Member

Class for config is ElasticsearchSinkConnectorConfig yes.

This comment has been minimized.

Copy link
@jdurani

jdurani Nov 29, 2018

Author Contributor

Doc modified. Let me know whether it's OK.

@plarsson

This comment has been minimized.

Copy link

commented Nov 14, 2018

I think this change, allowing updates, will require a change so it's possible to configure how the _version in es handled? A valid usecase could be an update coming from a different topic / partition etc. ? See also #142

@purbon

This comment has been minimized.

Copy link
Member

commented Nov 14, 2018

@plarsson kinda agree with you, I left you a comment in your issue, hope it helps.

@jdurani jdurani force-pushed the jdurani:master branch from f6faa9e to bccb147 Nov 28, 2018

@mauliksoneji

This comment has been minimized.

Copy link

commented Dec 10, 2018

@purbon @jdurani Can you please give an ETA on when this PR will be merged.
We have the same use case of upserting documents in ES and want to start using this feature.

@jdurani

This comment has been minimized.

Copy link
Contributor Author

commented Dec 10, 2018

@purbon @jdurani Can you please give an ETA on when this PR will be merged.
We have the same use case of upserting documents in ES and want to start using this feature.

For me, it's ready. Depends on @purbon and @rhauch

@purbon

This comment has been minimized.

Copy link
Member

commented Jan 10, 2019

LGTM, but we should wait on @rhauch final thoughts as well.

@dkirrane

This comment has been minimized.

Copy link

commented Apr 2, 2019

Any update on this?

@jdurani jdurani force-pushed the jdurani:master branch from 837d5dd to 3485cba Apr 2, 2019

@jdurani jdurani force-pushed the jdurani:master branch from 3485cba to 7ad54d8 Apr 2, 2019

@jdurani

This comment has been minimized.

Copy link
Contributor Author

commented Apr 2, 2019

Hello guys! I have fixed this PR so it can be merged. @purbon , @rhauch, could you please confirm?

@rhauch
Copy link
Member

left a comment

Thanks for the contribution, @jdurani! This will be a very nice feature. I do have a few questions/comments below.

// Overridden here so that ConfigDef.toEnrichedRst shows possible values correctly
@Override
public String toString() {
return validator.toString();

This comment has been minimized.

Copy link
@rhauch

rhauch Apr 3, 2019

Member

This actually appears in the generated documentation, so it'd be good to have a more readable string here. Perhaps "One of insert, update, or delete", tho maybe use the literal values here.

This comment has been minimized.

Copy link
@jdurani

jdurani Apr 4, 2019

Author Contributor

Why not. Will do.

@Override
public void ensureValid(String name, Object value) {
if (value instanceof String) {
value = ((String) value).toLowerCase(Locale.ROOT);

This comment has been minimized.

Copy link
@rhauch

rhauch Apr 3, 2019

Member

The documentation already specifies lowercase values, so why lowercase the supplied value here? Why not just require the values to be lowercase to begin with, in which case a value like INSERT would be invalid?

+ ". Default to '" + WriteMethod.DEFAULT.toString() + "'.\n"
+ "If Elasticsearch has limited resources, " + WriteMethod.UPSERT.toString()
+ " may take significant amount of time. You may want to modify properties "
+ FLUSH_TIMEOUT_MS_CONFIG + ", " + READ_TIMEOUT_MS_CONFIG + " and " + BATCH_SIZE_CONFIG;

This comment has been minimized.

Copy link
@rhauch

rhauch Apr 3, 2019

Member

As a user, I'd want to know whether this affected the existing delete behavior (no), and whether the default of insert matched previous behavior (yes). And this could be improved to follow the style of our documentation.

For example, maybe:

Method used for writing data to Elasticsearch, and one of insert, update or upsert. The default method is insert, in which the connector constructs a document from the record value and inserts that document into Elasticsearch, completely replacing any existing document with the same ID; this matches previous behavior. The upsert method will create a new document if one with the specified ID does not yet exist, or will update an existing document with the same ID by adding/replacing only those fields present in the record value. The update method will update an existing document with the same ID by adding/replacing only those fields present in the record value, but will fail if the document with the specified ID does not exist. The upsert method may require additional time and resources of Elasticsearch, so consider increasing the flush.timeout.ms, read.timeout.ms, and batch.size configuration properties.

However, I'm struggling to understand the value of update and envision a realistic use case where write.method=update might be useful. My concern is that even if an edge use case does exist, users with more conventional use cases might struggle by accidentally setting it to update instead of upsert. Unless I'm missing something, perhaps it's better to start without update yet keep all the other code as-is so that we can add it later if it does become useful.

This comment has been minimized.

Copy link
@jdurani

jdurani Apr 4, 2019

Author Contributor

OK, I will replace current doc text with one you suggest.
I will keep the code as-is but valid values for write.method will be reduced to insert and upsert.

@jdurani jdurani force-pushed the jdurani:master branch from 0ab054a to 93dd4e4 Apr 4, 2019

@jdurani

This comment has been minimized.

Copy link
Contributor Author

commented Apr 4, 2019

@rhauch, @purbon - hope it's OK now.

@rhauch
Copy link
Member

left a comment

Thanks, @jdurani. This is very close, but I have a few more suggestions around completely removing the WriteMethod.UPDATE literal, since it is not exposed and IMO we won't bring the functionality back as the use case is still not well defined.


public enum WriteMethod {
INSERT,
UPDATE, // ignored for now

This comment has been minimized.

Copy link
@rhauch

rhauch Apr 12, 2019

Member

We should remove this UPDATE literal, because it is unnecessary for now and should be completely unused. After all, we may never need to add it back in the future.

@@ -351,6 +366,16 @@ private Index toIndexRequest(IndexableRecord record) {
return req.build();
}

private Update toUpdateRequest(IndexableRecord record, WriteMethod method) {
String payload = "{\"doc\":" + record.payload
+ ", \"doc_as_upsert\":" + (method == WriteMethod.UPSERT) + "}";

This comment has been minimized.

Copy link
@rhauch

rhauch Apr 12, 2019

Member

Since we should remove the WriteMethod.UPDATE, IMO we should also change this to be more straightforward:

Suggested change
+ ", \"doc_as_upsert\":" + (method == WriteMethod.UPSERT) + "}";
+ ", \"doc_as_upsert\":true}";

And if we do that, let's also remove the WriteMethod method parameter from this method call, since will be unused. And maybe rename the method to toUpsertRequest to be more consistent with the approach.

@jdurani

This comment has been minimized.

Copy link
Contributor Author

commented Apr 12, 2019

@rhauch This is kind of strange. In your previous comment you suggested to keep code around update as-is [1], now you want me to remove it. I also prefer to keep code as simple as possible (KISS) but I tried to follow your suggestions to finish this PR.
Nevertheless, I will remove that code.

[1] #239 (comment)

Unless I'm missing something, perhaps it's better to start without update yet keep all the other code as-is so that we can add it later if it does become useful.

@jdurani jdurani force-pushed the jdurani:master branch from 93dd4e4 to a6695a7 Apr 12, 2019

@jdurani jdurani force-pushed the jdurani:master branch from a6695a7 to c837047 Apr 12, 2019

@jdurani

This comment has been minimized.

Copy link
Contributor Author

commented Apr 12, 2019

@rhauch Here we go :)

@rhauch

This comment has been minimized.

Copy link
Member

commented Apr 12, 2019

@jdurani apologies if I wasn't clear. When I said:

Unless I'm missing something, perhaps it's better to start without update yet keep all the other code as-is so that we can add it later if it does become useful.

I was referring to all code other than what was related to update. IOW, keep the insert- and upsert-related code, but remove the update-related code. And I think my comments from today were in line with that objective. Again, sorry if I wasn't clear.

@rhauch

rhauch approved these changes Apr 12, 2019

Copy link
Member

left a comment

Looks great, @jdurani! Thank you for all the work to complete this improvement.

@rhauch rhauch merged commit a2b1403 into confluentinc:master Apr 12, 2019

1 check passed

continuous-integration/jenkins/pr-merge This commit looks good
Details
@jdurani

This comment has been minimized.

Copy link
Contributor Author

commented Apr 12, 2019

Great! I am happy we finally agreed! :)

@alexmorosmarco

This comment has been minimized.

Copy link

commented May 4, 2019

Thanks so much for this work @jdurani . 👏👏👏
I was checking the documentation of the connector and was missing the upsert operation, so I was doubting to use this connector.

On the other hand, are you aware about that the configuration documentation is not up to date? (discovered in #307)
Is this on your hands so that you can update the docu? Or maybe it is not maintained in this same repo?

@alexmorosmarco

This comment has been minimized.

Copy link

commented May 4, 2019

Hi @jdurani , @purbon & @rhauch.

I write you all as I see you have been really active in this topic and I have concern fully related.

How are you handling the versioning?
If we use an insert or upsert over an existing document we can have versioning conflicts, so I have 2 doubts:

  1. where should I give the version in the document to ElasticSearch? I am used to do it in the URL when using ElasticSearch API (example: PUT twitter/_doc/1?version=2), but I do not know how to do it with the connector.
  2. In case I want to use external versioning, where can I configure it? (related issue #308 )
@jdurani

This comment has been minimized.

Copy link
Contributor Author

commented May 4, 2019

Thanks so much for this work @jdurani . 👏👏👏
I was checking the documentation of the connector and was missing the upsert operation, so I was doubting to use this connector.

On the other hand, are you aware about that the configuration documentation is not up to date? (discovered in #307)
Is this on your hands so that you can update the docu? Or maybe it is not maintained in this same repo?

AFAIK, this was merged on master branch. So I expect this feature in 5.3. Moreover, you are referencing documentation for 5.2.1, which was release on 2019-04-03. This was merged 2019-04-12.

@jdurani

This comment has been minimized.

Copy link
Contributor Author

commented May 4, 2019

Hi @jdurani , @purbon & @rhauch.

I write you all as I see you have been really active in this topic and I have concern fully related.

How are you handling the versioning?
If we use an insert or upsert over an existing document we can have versioning conflicts, so I have 2 doubts:

  1. where should I give the version in the document to ElasticSearch? I am used to do it in the URL when using ElasticSearch API (example: PUT twitter/_doc/1?version=2), but I do not know how to do it with the connector.
  2. In case I want to use external versioning, where can I configure it? (related issue #308 )

Maybe this line will be interesting for you.

But in general for this PR, version handling was not the main goal of this PR.

@alexmorosmarco

This comment has been minimized.

Copy link

commented May 5, 2019

Thanks so much for this work @jdurani . 👏👏👏
I was checking the documentation of the connector and was missing the upsert operation, so I was doubting to use this connector.
On the other hand, are you aware about that the configuration documentation is not up to date? (discovered in #307)
Is this on your hands so that you can update the docu? Or maybe it is not maintained in this same repo?

AFAIK, this was merged on master branch. So I expect this feature in 5.3. Moreover, you are referencing documentation for 5.2.1, which was release on 2019-04-03. This was merged 2019-04-12.

Thanks. I see the merge to master on the 2019-04-12. But I do not see any 5.3.0 release in the releases page. Also I cannot choose the documentation for version 5.3.0 (I guess this makes sense as I understand no release has been created yet).

@jdurani and @rhauch how is the release process?
When can we expect to have the upsert write method ready to use in a release?
Thanks so much.

@jdurani

This comment has been minimized.

Copy link
Contributor Author

commented May 5, 2019

Thanks so much for this work @jdurani . 👏👏👏
I was checking the documentation of the connector and was missing the upsert operation, so I was doubting to use this connector.
On the other hand, are you aware about that the configuration documentation is not up to date? (discovered in #307)
Is this on your hands so that you can update the docu? Or maybe it is not maintained in this same repo?

AFAIK, this was merged on master branch. So I expect this feature in 5.3. Moreover, you are referencing documentation for 5.2.1, which was release on 2019-04-03. This was merged 2019-04-12.

Thanks. I see the merge to master on the 2019-04-12. But I do not see any 5.3.0 release in the releases page. Also I cannot choose the documentation for version 5.3.0 (I guess this makes sense as I understand no release has been created yet).

@jdurani and @rhauch how is the release process?
When can we expect to have the upsert write method ready to use in a release?
Thanks so much.

This is more question on @rhauch, @purbon or maybe other members of confluent inc. I am just guy from community who decided to contribute little bit :).

@shankao

This comment has been minimized.

Copy link

commented May 6, 2019

In fact, this would be incredibly useful to have in the 5.3 release! Is there any idea when that would happen or what's the process to follow?

@alexmorosmarco

This comment has been minimized.

Copy link

commented May 7, 2019

Thanks so much for this work @jdurani . 👏👏👏
I was checking the documentation of the connector and was missing the upsert operation, so I was doubting to use this connector.
On the other hand, are you aware about that the configuration documentation is not up to date? (discovered in #307)
Is this on your hands so that you can update the docu? Or maybe it is not maintained in this same repo?

AFAIK, this was merged on master branch. So I expect this feature in 5.3. Moreover, you are referencing documentation for 5.2.1, which was release on 2019-04-03. This was merged 2019-04-12.

Thanks. I see the merge to master on the 2019-04-12. But I do not see any 5.3.0 release in the releases page. Also I cannot choose the documentation for version 5.3.0 (I guess this makes sense as I understand no release has been created yet).
@jdurani and @rhauch how is the release process?
When can we expect to have the upsert write method ready to use in a release?
Thanks so much.

This is more question on @rhauch, @purbon or maybe other members of confluent inc. I am just guy from community who decided to contribute little bit :).

Hi @rhauch and @purbon
Thanks so much for your collaboration in this change.

We are really interested on user this feature.
Can you tell us in which release would it be present and when?
It is merged to master but I do not know how the release process is managed.

@shankao

This comment has been minimized.

Copy link

commented May 7, 2019

Thanks so much for this work @jdurani . clapclapclap
I was checking the documentation of the connector and was missing the upsert operation, so I was doubting to use this connector.
On the other hand, are you aware about that the configuration documentation is not up to date? (discovered in #307)
Is this on your hands so that you can update the docu? Or maybe it is not maintained in this same repo?

AFAIK, this was merged on master branch. So I expect this feature in 5.3. Moreover, you are referencing documentation for 5.2.1, which was release on 2019-04-03. This was merged 2019-04-12.

Thanks. I see the merge to master on the 2019-04-12. But I do not see any 5.3.0 release in the releases page. Also I cannot choose the documentation for version 5.3.0 (I guess this makes sense as I understand no release has been created yet).
@jdurani and @rhauch how is the release process?
When can we expect to have the upsert write method ready to use in a release?
Thanks so much.

This is more question on @rhauch, @purbon or maybe other members of confluent inc. I am just guy from community who decided to contribute little bit :).

Hi @rhauch and @purbon
Thanks so much for your collaboration in this change.

We are really interested on user this feature.
Can you tell us in which release would it be present and when?
It is merged to master but I do not know how the release process is managed.

We couldn't wait, so, just in case that anyone is interested, here is a quick and dirty Dockerfile that builds from master. You'll need to git clone the 3 repos (kafka, common and the kafka connector) in the same host folder for it to work. Adapt it to your needs.

FROM centos:7 as build

RUN yum install -y wget unzip

RUN yum-config-manager --add-repo http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo
RUN yum-config-manager --enable epel-apache-maven
RUN yum install -y apache-maven

COPY gradle-5.4.1-bin.zip gradle-5.4.1-bin.zip
ENV JAVA_HOME=/usr/lib/jvm/java
RUN unzip -d /opt/gradle gradle-5.4.1-bin.zip
ENV PATH=$PATH:/opt/gradle/gradle-5.4.1/bin
RUN gradle -v

COPY kafka kafka
RUN (cd kafka && gradle wrapper)
RUN (cd kafka && ./gradlew installAll)

COPY common common
RUN (cd common && mvn install)

COPY kafka-connect-elasticsearch kafka-connect-elasticsearch
RUN (cd kafka-connect-elasticsearch && mvn clean package)

FROM debezium/connect:0.9
# Deploy Confluent Elasticsearch sink connector
ENV KAFKA_CONNECT_ES_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-elasticsearch
RUN mkdir $KAFKA_CONNECT_ES_DIR
COPY --from=build kafka-connect-elasticsearch/target/kafka-connect-elasticsearch-5.3.0-SNAPSHOT-package/share/java/kafka-connect-elasticsearch/ $KAFKA_CONNECT_ES_DIR
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
10 participants
You can’t perform that action at this time.