Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-12400] MongoDBIO support for update within documents #14927

Merged
merged 8 commits into from Sep 6, 2021

Conversation

pareshsarafmdb
Copy link
Contributor

@pareshsarafmdb pareshsarafmdb commented Jun 2, 2021

Current MongoDBIO connector only supports inserts and replace in a collection. In general there will be requirements for updating an existing document - adding a new field, updating an existing field, pushing into an array or set etc. Added support for this.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@aromanenko-dev aromanenko-dev changed the title MongoDBIO support for update within documents [BEAM-12400] MongoDBIO support for update within documents Jun 4, 2021
@aromanenko-dev aromanenko-dev self-requested a review June 4, 2021 10:24
@robertwb robertwb requested a review from chamikaramj June 8, 2021 19:55
@pabloem pabloem self-requested a review June 10, 2021 23:25
@pabloem
Copy link
Member

pabloem commented Jun 11, 2021

can you improve the javadoc as well? It would be helpful if I could see what an example of usage of this transform looksl ike

@pareshsarafmdb
Copy link
Contributor Author

can you improve the javadoc as well? It would be helpful if I could see what an example of usage of this transform looksl ike

Have added javadoc related to update.

Comment on lines 938 to 942
// builder.add(DisplayData.item("isUpdate", isUpdate()));
// builder.add(DisplayData.item("updateKey", updateKey()));
// builder.add(DisplayData.item("updateOperator", updateOperator()));
// builder.add(Data.item("updateOptions", updateOptions()));
// builder.add(DisplayData.item("updateField", updateField()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uncoment these? Maybe check if they're not null and uncomment I suppose.

Comment on lines 901 to 916
public Write withIsUpdate(boolean isUpdate) {
return builder().setIsUpdate(isUpdate).build();
}

public Write withUpdateKey(String updateKey) {
return builder().setUpdateKey(updateKey).build();
}

public Write withUpdateOperator(String updateOperator) {
return builder().setUpdateOperator(updateOperator).build();
}

public Write withUpdateField(String updateField) {
return builder().setUpdateField(updateField).build();
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also add javadoc to each public method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pareshsarafmdb can you squash your commits? then we'll merge. Thanks!

Hi @pabloem As my changes are on master facing challenges is squashing commits. Will you be able to squash and merge by any chance. If not will try looking for alternatives.

* * .withUpdateKey("key-to-match")
* * .withUpdateField("field-to-update")
* * .withUpdateOperator("$set")
* * .withNumSplits(30))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, only one field can be updated. Is that correct? would it make sense to support updating more than one field?
I see that mongodb works with pairs of updateOperator+updateField. I wonder if it makes sense to support something like that instead?

e.g.:

  pipeline
    .apply(...)
    .apply(MongoDbIO.write()
      .withUri("mongodb://localhost:27017")
      .withDatabase("my-database")
      .withCollection("my-collection")
      .withIsUpdate(true)
      .withUpdateKey("key-to-match")
      .withUpdateFields(
                 "$set", "field1",
                 "$currentDate", "datefield1",
                 "$min", "special-minimum-field1")
      .withNumSplits(30))

or perhaps something like this:

      .withUpdateFields(
                 UpdateField.of("$set", "field1"),
                 UpdateField.of("$currentDate", "datefield1"),
                 UpdateField.of("$min", "special-minimum-field1")))

I also think it may be helpful to support upsert operations. WDYT?

I don't want to make this too complex - but it feels like these options could be supported easily, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Pablo. Thanks for the inputs. I thought the same. But I am confused how the actual values come in? With the single field update whatever value comes in from the pipeline will be set as the value for the field. Now if we have to update for multiple fields how it can be done ? I am not pretty sure how it works in beam. Hope my question is clear.

Upserts functionality can definitely be added.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my guess is that maybe we have an document like this in the database:

{
 "name": "ironman",
 "status": "active",
 "age": 55,
 "location": "malibu",
 "lastUpdated": "(SOME TIMESTAMP)"
}

And perhaps the PCollection that comes as input contains the following element:

{
 "name": "ironman",
 "status": "inactive",
 "age": 56,
 "location": "malibu"
}

In this case, we could do:

      .withUpdateKey("name")
      .withUpdateFields(
                 UpdateField.of("$set", "status"),
                 UpdateField.of("$currentDate", "lastUpdated"),
                 UpdateField.of("$set", "age")))

Does that make sense? Another thing I wonder about is - what if we just want to update all the fields without having to list them all one by one on the updateFields attribute of the transform.

thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification! This makes sense. But in real time Sync (CDC) scenario mostly it will be rows coming in and being added to an array or set within mongoDB. For eg say we have a customer collection. New transactions made by the customer is captured and it should be added to the transactions array within customer document. So we definitely need an option to update with full document. So I think we can add both of these options - updating all the fields together and one by one as well. That will be more feature rich as well. What say ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and other challenge - right now in your example we are assuming that the field/column name in the source is same as field name to be updated in the target. It can be different in most of the cases. So it might be like below:
.withUpdateKey("name")
.withUpdateFields(
UpdateField.of("$set", "status", "dest-status-field-in-mdb"),
UpdateField.of("$currentDate", "lastUpdated", "dest-lastUpdated-field-in-mdb"),
UpdateField.of("$set", "age", "dest-age-field-in-mdb")))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense to me! : )

I suppose when we update the full document, then the field names must be the same?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my information - is this something that you'll want to take on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. Have implemented most of it. Testing it now.

@pabloem pabloem self-requested a review June 15, 2021 21:36
Copy link
Contributor

@aromanenko-dev aromanenko-dev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I left my comments. Also, please add unit test(s) for this feature.

@@ -766,6 +800,14 @@ private MongoClient createClient(Read spec) {

abstract Builder setBatchSize(long batchSize);

abstract Builder setIsUpdate(boolean isUpdate);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all these arguments (isUpdate, updateKey, operator, updateField) should be used only together for update, would it be better to incapsulate it into a functional object and have only one new method, e.g. withUpdate(UpdateConfiguration)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the feedback. This should make it easy and clean. I will work on it.

batch.add(new Document(ctx.element()));
if (batch.size() >= spec.batchSize()) {
flush();
if (spec.isUpdate()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid code copying, please, adjust flush() method accordingly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aromanenko-dev @pabloem Have checked in the changes. We have an UpdateConfiguration object where all the update related details are wrapped. Sample below:
MongoDbIO.write().withUri(options.getMongoDBUri())
.withDatabase(options.getDatabase())
.withCollection(options.getCollection()).withUpdateConfiguration(
UpdateConfiguration.create().withUpdateKey("accId").withUpdateFields(
UpdateField.of("$set", "category", "category"),
UpdateField.of("$set","balance", "balance"),
UpdateField.of("$push", "transactions"))
Last update which is a push does update the dest field with full input document. Please review and let me know.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aromanenko-dev @pabloem gentle reminder to review

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pareshsarafmdb Thanks, let me take a look one more time.
@pabloem Do you have any comments on this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Pablo is on vacation until August, please ping again in early August.)

@pabloem
Copy link
Member

pabloem commented Aug 9, 2021

I'll take a look now

@pabloem
Copy link
Member

pabloem commented Aug 9, 2021

Run Java PreCommit

@pareshsarafmdb
Copy link
Contributor Author

Run Java PreCommit

Can you please help me how to do this? Thanks much

@aromanenko-dev
Copy link
Contributor

@pareshsarafmdb No worries, it's just a command to run Java PreCommit job. Actually, it fails (you can see the results by clicking on "Details" against Java ("Run Java PreCommit") check):

20:47:11 > Task :sdks:java:io:mongodb:compileJava
20:47:11 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Phrase/src/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java:43: error: [argument.type.incompatible] incompatible argument for parameter sourceField of UpdateField.
20:47:11     return new UpdateField(updateOperator, null, destField);
20:47:11                                            ^
20:47:11   found   : null (NullType)
20:47:11   required: @Initialized @NonNull String

I'd recommend to run this command ./gradlew :sdks:java:io:mongodb:check locally before pushing your changes to make sure that everything is fine.

@pareshsarafmdb
Copy link
Contributor Author

@pareshsarafmdb No worries, it's just a command to run Java PreCommit job. Actually, it fails (you can see the results by clicking on "Details" against Java ("Run Java PreCommit") check):

20:47:11 > Task :sdks:java:io:mongodb:compileJava
20:47:11 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Phrase/src/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java:43: error: [argument.type.incompatible] incompatible argument for parameter sourceField of UpdateField.
20:47:11     return new UpdateField(updateOperator, null, destField);
20:47:11                                            ^
20:47:11   found   : null (NullType)
20:47:11   required: @Initialized @NonNull String

I'd recommend to run this command ./gradlew :sdks:java:io:mongodb:check locally before pushing your changes to make sure that everything is fine.

I have done it and verified it again. No issues with the build in local.
BUILD SUCCESSFUL in 3m 4s
58 actionable tasks: 58 executed

@pareshsarafmdb
Copy link
Contributor Author

@pareshsarafmdb No worries, it's just a command to run Java PreCommit job. Actually, it fails (you can see the results by clicking on "Details" against Java ("Run Java PreCommit") check):

20:47:11 > Task :sdks:java:io:mongodb:compileJava
20:47:11 /home/jenkins/jenkins-slave/workspace/beam_PreCommit_Java_Phrase/src/sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/UpdateField.java:43: error: [argument.type.incompatible] incompatible argument for parameter sourceField of UpdateField.
20:47:11     return new UpdateField(updateOperator, null, destField);
20:47:11                                            ^
20:47:11   found   : null (NullType)
20:47:11   required: @Initialized @NonNull String

I'd recommend to run this command ./gradlew :sdks:java:io:mongodb:check locally before pushing your changes to make sure that everything is fine.

I have done it and verified it again. No issues with the build in local.
BUILD SUCCESSFUL in 3m 4s
58 actionable tasks: 58 executed

And If i see the console log the error doesn't seem to be related to changes I made. Can you pls check. @aromanenko-dev

@aromanenko-dev
Copy link
Contributor

Run Java PreCommit

@aromanenko-dev
Copy link
Contributor

@pareshsarafmdb Could you rebase against master, run ./gradlew :sdks:java:io:mongodb:compileJava locally and push it if it's ok?

Copy link
Member

@pabloem pabloem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added minor stylistic comments.

I still wonder: What happens when users want to update the whole old record into the whole new record?

Finally, I have one more question about a later improvement to consider for this IO - is it worth thinking later about passing a lambda to choose the destination collection?

Comment on lines 24 to 28
private String updateOperator;

private String sourceField;

private String destField;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these can be final, I suppose?


import java.io.Serializable;

public class UpdateField implements Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class could also be an AutoValue class, maybe?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pabloem Have changed it to auto value class which has two methods - fieldUpdate and fullUpdate. Example below:
MongoDbIO.write()
.withUri("mongodb://localhost:" + port)
.withDatabase(DATABASE)
.withCollection(collectionName)
.withUpdateConfiguration(
UpdateConfiguration.create()
.withUpdateKey("id")
.withUpdateFields(
UpdateField.create().fieldUpdate("$set", "scientist", "scientist"),
UpdateField.create().fieldUpdate("$set", "country", "country"))));
Please review and let me know. @aromanenko-dev Please trigger the build - Error should be hopefully gone now.


/** for updating with entire input document. */
public static UpdateField of(String updateOperator, String destField) {
return new UpdateField(updateOperator, null, destField);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used whenever we create a new field? Or whenever the source and dest fields have the same name?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it makes sense to add an option for whenever source and destination field have the same name : )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used when we need to update full source document into destination document. (Not updating specific fields). Whenever we have to update specific fields we need to specify which field in the source has to be updated to which field in dest document. Do you think there can be a better way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pabloem Have changed it to auto value class which has two methods - fieldUpdate and fullUpdate. Example below:
MongoDbIO.write()
.withUri("mongodb://localhost:" + port)
.withDatabase(DATABASE)
.withCollection(collectionName)
.withUpdateConfiguration(
UpdateConfiguration.create()
.withUpdateKey("id")
.withUpdateFields(
UpdateField.create().fieldUpdate("$set", "scientist", "scientist"),
UpdateField.create().fieldUpdate("$set", "country", "country"))));

@pabloem
Copy link
Member

pabloem commented Aug 10, 2021

the precommit seems to be failing due to a static check issue:

image

@pabloem
Copy link
Member

pabloem commented Aug 10, 2021

... also sorry about the delay! : )

@pareshsarafmdb
Copy link
Contributor Author

Just added minor stylistic comments.

I still wonder: What happens when users want to update the whole old record into the whole new record?

Finally, I have one more question about a later improvement to consider for this IO - is it worth thinking later about passing a lambda to choose the destination collection?

@pabloem If I understand what if user wants to update with entire source document into a destination document - In that case you have a overloaded method which just takes update operator and dest field. This does full document update. Did I make sense?

@aaltay
Copy link
Member

aaltay commented Aug 27, 2021

@pareshsarafmdb - Could you look at the test issues?

@pabloem
Copy link
Member

pabloem commented Aug 30, 2021

test issues are unrelated (at least Java Examples failures). This is on me to review. Sorry about the delay again.

Copy link
Member

@pabloem pabloem left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added a couple comments on the API. Once that's resolveds we can move forward.

Comment on lines 42 to 44
public static UpdateField create() {
return builder().build();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This initial constructor creates an UpdateField object that is not valid, right? Doesn't it make sense to make this method private, and then make fullUpdate and fieldUpdate static constructor methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have done the change.

})
public abstract class UpdateConfiguration implements Serializable {

abstract @Nullable String updateKey();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have more than one field as a key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now only one key which will be matched to _id field in mongodb. We can enhance in further releases is what I thought.

/** Sets the limit of documents to find. */
public UpdateField fullUpdate(String updateOperator, String destField) {
return toBuilder().setUpdateOperator(updateOperator).setDestField(destField).build();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we're inserting the full record into one field of the destination, right?

e.g.:
original

{
  "key": "mykey1",
  "value1": "myvalue1",
  "value2": ["1", "2"],
  "value3": "thisvalue"
}

newvalue

{
  "key": "mykey1",
  "value1": "myvalue1UPD",
  "value2": ["1", "2", "3", "4"],
  "value3": "thisvalueISUPDATED"
}

Given configuration UpdateField.fullUpdate("$push", "value4"):
result

{
  "key": "mykey1",
  "value1": "myvalue1",
  "value2": ["1", "2"],
  "value3": "thisvalue"
  "value4": {
    "key": "mykey1",
    "value1": "myvalue1UPD",
    "value2": ["1", "2", "3", "4"],
    "value3": "thisvalueISUPDATED"
  }
}

Is this correct / intended?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes thats right. Thats intended.

@pareshsarafmdb
Copy link
Contributor Author

Just added minor stylistic comments.
I still wonder: What happens when users want to update the whole old record into the whole new record?
Finally, I have one more question about a later improvement to consider for this IO - is it worth thinking later about passing a lambda to choose the destination collection?

@pabloem If I understand what if user wants to update with entire source document into a destination document - In that case you have a overloaded method which just takes update operator and dest field. This does full document update. Did I make sense?

Yes thats right,

@pabloem
Copy link
Member

pabloem commented Sep 1, 2021

okay this LGTM. Happy to merge @aromanenko-dev any other comments? : )

@aromanenko-dev
Copy link
Contributor

Thanks, I'm fine with code changes. LGTM.

However, before merging, please, squash all commits and avoid the merge commits - please, use rebase instead.

Also, it's not a good practice to create a PR from master branch, it's much better to use a dedicated feature branch for that. Though, I think we can stand with this for now.

@pabloem
Copy link
Member

pabloem commented Sep 2, 2021

@pareshsarafmdb can you squash your commits? then we'll merge. Thanks!

@pabloem
Copy link
Member

pabloem commented Sep 6, 2021

Run Java PostCommit

@pabloem pabloem merged commit bcb56b7 into apache:master Sep 6, 2021
@pabloem
Copy link
Member

pabloem commented Sep 6, 2021

squashed using Github's feature

meowcakes pushed a commit to meowcakes/beam that referenced this pull request Sep 6, 2021
…or update within documents

* Did changes for ability to update the existing documents using MongoDBIO write

* fixes related to formatting and mongodb update

* Updated javadoc for update enhancement

* address review comments - MongoDBIO update connector

* MongoDBIO - changed UpdateField to be autovalue class

* addressed review comments - javadoc, updatefield refactoring
dpcollins-google pushed a commit to dpcollins-google/beam that referenced this pull request Sep 16, 2021
…or update within documents

* Did changes for ability to update the existing documents using MongoDBIO write

* fixes related to formatting and mongodb update

* Updated javadoc for update enhancement

* address review comments - MongoDBIO update connector

* MongoDBIO - changed UpdateField to be autovalue class

* addressed review comments - javadoc, updatefield refactoring
calvinleungyk pushed a commit to calvinleungyk/beam that referenced this pull request Sep 22, 2021
…or update within documents

* Did changes for ability to update the existing documents using MongoDBIO write

* fixes related to formatting and mongodb update

* Updated javadoc for update enhancement

* address review comments - MongoDBIO update connector

* MongoDBIO - changed UpdateField to be autovalue class

* addressed review comments - javadoc, updatefield refactoring
@aelmaarouf
Copy link

Hello,

I'm sorry to bring up this old discussion but it's the only thing i found so far (i can create an issue if you prefer).
I have a hard time to understand the difference between withFindKey and withUpdateKey can anyone please explain it to me ?
Here is my issue : I have a Mongo document like this

{
 "_id" : objectId,
 "patientId" : "u-u-id",
 "therapyDate" : 1283839993, // timestamp
 "value": x
 }

I know that the combination (patientId and therapyDate) is unique so i am using the following update configuration :

.withFindKey("patientId")
.withFindKey("therapyDate")
.withUpdateFields( UpdateField.fieldUpdate("$set","value", "value"))

Using this configuration a new document is created instead of updating the existing one. Can you point me to the proper direction to update this kind of documents (not depending on the _id as updatekey) ?
thanks in advance

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants