Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document the use of SMTs to customize _id field #82

Closed
wants to merge 7 commits into from

Conversation

emlaver
Copy link
Contributor

@emlaver emlaver commented Jun 15, 2022

Checklist

  • Tick to sign-off your agreement to the Developer Certificate of Origin (DCO) 1.1
  • Added tests for code changes or test/build only changes
  • Updated the change log file (CHANGES.md|CHANGELOG.md) or test/build only changes
  • Completed the PR template below:

Description

fixes #65

Approach

Add README section for using SMT to rename, replace, or filter out _id field.

Schema & API Changes

"No change"

Security and Privacy

"No change"

Testing

Monitoring and Logging

"No change"

Each SMT example was tested locally:

Rename name field to _id

Transforms config:

transforms=RenameField
transforms.RenameField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.RenameField.renames=name:_id

Pushed with producer:

echo '{"name":"newdoc","_rev":"1-15f65339921e497348be384867bb940f","hi":"new"}'  | ./bin/kafka-console-producer.sh --topic kafka_test_2 --bootstrap-server localhost:9092 -

Wiremock request JSON body output:

{"docs":[{"_rev":"1-15f65339921e497348be384867bb940f","hi":"new","_id":"newdoc"}]}

Remove _id field

Transforms config:

transforms=ReplaceField
transforms.ReplaceField.type=org.apache.kafka.connect.transforms.ReplaceField$Value
transforms.ReplaceField.exclude=_id

Producer:

echo '{"_id":"thiswillberemoved","_rev":"1-15f65339921e497348be384867bb940f","hi":"docshasnoid"}'  | ./bin/kafka-console-producer.sh --topic kafka_test_2 --bootstrap-server localhost:9092 -

Wiremock request body output:

{"docs":[{"_rev":"1-15f65339921e497348be384867bb940f","hi":"docshasnoid"}]}

Remove _id field when value is null

Transforms config:

transforms=dropNullRecords
transforms.dropNullRecords.type=org.apache.kafka.connect.transforms.Filter
transforms.dropNullRecords.predicate=isNullRecord

predicates=isNullRecord
predicates.isNullRecord.type=org.apache.kafka.connect.transforms.predicates.RecordIsTombstone

Producer:

echo '{"_id": null,"_rev":"1-15f65339921e497348be384867bb940f","hi":"docshasnoid"}'  | ./bin/kafka-console-producer.sh --topic kafka_test_2 --bootstrap-server localhost:9092 -

Wiremock request body output:

{"docs":[{"_rev":"1-15f65339921e497348be384867bb940f","hi":"docshasnoid"}]}

@emlaver emlaver self-assigned this Jun 15, 2022
@emlaver emlaver force-pushed the 65-sink-connector-id-config branch from ac937a7 to 51ff203 Compare June 16, 2022 13:30
@emlaver emlaver marked this pull request as ready for review June 16, 2022 13:30
@emlaver emlaver force-pushed the 65-sink-connector-id-config branch from 51ff203 to 17c961a Compare June 17, 2022 15:46
@emlaver emlaver force-pushed the 65-sink-connector-id-config branch from ac07722 to a1890d0 Compare July 11, 2022 19:51
…ave it as the value of '_id' field. This implementation tries to cast the record's key to a map for schemaless records. If there's an exception, catch it, print a log message, and attempt to get string value of the key.
…ey to a string for schemaless records. If the record's key is a map, then the result string will look like "{key=value}"
@emlaver emlaver force-pushed the 65-sink-connector-id-config branch from a1890d0 to d216dca Compare July 11, 2022 21:26
@emlaver
Copy link
Contributor Author

emlaver commented Jul 11, 2022

The main difference between commit 4c97796 and d216dca is that 4c97796 first tries to get message key as a Map object. If there's an exception, then it converts the message's key to string.

Examples when using custom KeyToDocId SMT with schemaless message:

Schemaless example with message key hello6:

echo '"hello6": {"test":"1", "try": 0, "time": true}'  | ./bin/kafka-console-producer.sh --topic kafka_test1 --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:" -

Bulk docs request body:

{"docs":[{"try":0,"time":true,"_id":"hello6","test":"1"}]}

Schemaless example with no message key:

echo '{"test":"1", "try": 0, "time": true}'  | ./bin/kafka-console-producer.sh --topic kafka_test1 --bootstrap-server localhost:9092 -

Bulk docs request body (verified that doc ID is generated as expected):

{"docs":[{"try":0,"time":true,"test":"1"}]}

Schemaless example with null key:

echo 'null: {"test":"1", "try": 0, "time": true}'  | ./bin/kafka-console-producer.sh --topic kafka_test1 --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:" -

Bulk docs request body:

{"docs":[{"try":0,"time":true,"test":"1"}]}

Schemaless example with boolean:

echo 'true: {"test":"1", "try": 0, "time": true}'  | ./bin/kafka-console-producer.sh --topic kafka_test1 --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:" -

Bulk docs request body:

{"docs":[{"try":0,"time":true,"_id":"true","test":"1"}]}

Schemaless example with number:

echo '100: {"test":"1", "try": 0, "time": true}'  | ./bin/kafka-console-producer.sh --topic kafka_test1 --bootstrap-server localhost:9092 --property "parse.key=true" --property "key.separator=:" -

Bulk docs request body:

{"docs":[{"try":0,"time":true,"_id":"100","test":"1"}]}

Copy link
Member

@ricellis ricellis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial thoughts, mostly for the transform.
There are, I think, 6 cases:

  1. No valueSchema, null value
  2. No valueSchema, non-null value
  3. Map valueSchema, null value
  4. Map valueSchema, non-null value
  5. Struct valueSchema, null value
  6. Struct valueSchema, non-null value

I think your code only covers 1, 2, and 6 at present. I suspect the simplest code would boil everything down to passing a Struct value with the _id added and optionally a Struct schema (if the existing value schema is not null), but that might not be the most efficient in terms of rebuilding every value when potentially we could just add an entry to an existing map - did you identify whether the existing record values were immutable?


public static final ConfigDef CONFIG_DEF = new ConfigDef();

private static final String PURPOSE = "Copy record key to _id value";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private static final String PURPOSE = "Copy record key to _id value";
private static final String PURPOSE = "Copy record key to _id field in record value";

README.md Outdated
transforms=KeyToDocId
transforms.KeyToDocId.type=com.ibm.cloudant.kafka.transforms.KeyToDocId
```
Note: The `_id` must exist in your message value schema. If it does not exist, a warning message will print in the logging and the message will be unmodified.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this statement is no longer valid based on current code.

Suggested change
Note: The `_id` must exist in your message value schema. If it does not exist, a warning message will print in the logging and the message will be unmodified.
Note: If a string `_id` field does not exist in the message value schema a modified schema will be created and included in the transformed message.

I also think we should make a note about the key being stringified into the _id and suggest that it might be advisable for more complex keys to be transformed to a useful string first since keys with brackets etc are a llittle more awkward to use in Cloudant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update this note in 493f518. I also added additional comments that the record key needs to be of type string and messages are left unmodified if not.

} else {
value = requireMapOrNull(record.value(), PURPOSE);
}
final Map<String, Object> updatedValue = new HashMap<>(value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to take some care here.
In this schemaless case we can use a Map for the value, but in the Schema case we'd have to conform to the type of the schema (i.e. a Struct in some cases and a Map in others). Right now you don't handle a null value in the schema case.

}

private R applyWithSchema(R record) {
final Struct value = requireStruct(record.value(), PURPOSE);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is valid, the message could reasaonbly have either a Map or Struct Schema.Type for JSON. Though I agree a Struct is more likely given the variable types possible in JSON it is conceivable that someone uses e.g. a Map<String, String> or Map<String, Integer> if they simple documents of a well known type.
I also think null is valid here as mentioned earlier.

Copy link
Contributor Author

@emlaver emlaver Jul 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I'll update this and add tests. What if we just add record.value() == null to the existing if statement in the apply function? Should we be adding an _id if there are no other value fields?
Edited: this is a slippery slope where we won't know if there are SMTs later on that may add fields to the value before saving the message to Cloudant. Seems like it's best to just handle the null case similar to schemaless.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think it's possible that a message stream could be a bunch of keys with null values where the keys are relevant information to be stored.

}

private Schema makeUpdatedSchema(Schema schema) {
final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In some cases this might change the schema from Map to Struct - I think that's ok and even necessary for cases e.g. where an incoming message is Map<String, Integer> and to add a String, String we'd need a more accommodating schema. Given the subtlety of that though I think it is worth at least a comment.

@emlaver
Copy link
Contributor Author

emlaver commented Jul 13, 2022

did you identify whether the existing record values were immutable?

value.schema() is not immutable. If we want to add _id we'll need to loop through the schema's fields and add them to a new schema which can be done in makeUpdatedSchema. Before looping through the schema fields we can do a check to see if _id exists and add it if not:

if (!schema.fields().contains(idFieldName)) {
            builder.field(
                    idFieldName,
                    SchemaBuilder.struct().name("_id").version(1).doc("Document _id schema")
                    .field(idFieldName, Schema.OPTIONAL_STRING_SCHEMA));
        }


public R apply(R record) {
if (record.key() != null
&& String.valueOf(record.key()).length() > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few things here:

  • Since you're doing a null check you can use record.key().toString()
  • You should hoist out the string value here and pass it down to the (with)Schema(less) variants
  • There are some caveats here about how which converters are appropriate, which probably need documenting. For instance key.converter=org.apache.kafka.connect.storage.StringConverter will definitely work, since the toString() will be a no-op. Other converters might work if they convert to a type which returns a stable and usable String (this might include JsonConverter, although I'm honestly not sure what the toString implementation of jackson does for its types). ByteArrayConverter definitely won't work and will return something like [B@41d477ed.

An alternative view on the last point would be to enforce run-time type checking that the key is a string, but this might be too restrictive.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're doing a null check you can use record.key().toString()

I'm changed this in 493f518 to check if it's null and if it's an instance of string. We will only support record keys that are of type string.

You should hoist out the string value here and pass it down to the (with)Schema(less) variants

Now that we only work with keys that are string types, I just save if directly to the _id field as record.key()

An alternative view on the last point would be to enforce run-time type checking that the key is a string, but this might be too restrictive.

Discussion around this in the comments above
I've moved forward with only supporting keys that are strings. I updated the documentation in 493f518.

- Only continue with transform if the record's key is of string type, else leave message unmodified
- Add _id field to value schema if it doesn't exist
- Update logging messages
- README and overview doc variable update
- Struct value schema with string key
- Struct value schema with unsupported struct key
- Struct value schema with unsupported map key
- Struct value schema with null key
- Map value schema with string key
- Map value schema with null key
- Null value schema with string key
- Null value schema with map value and string key
- Null value schema with null key
Copy link
Member

@ricellis ricellis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more clarity needed on tombstones. For the key->id part see my comment on the issue.

Comment on lines +73 to +82
3. If you have messages where the `_id` field is absent or `null` then Cloudant will generate a document ID. If you don't want this to happen then set an `_id` (see earlier examples) or filter out those documents.
If you have messages where the `_id` field is `null` then you'll need to use a transform and predicate to filter out and remove this field:
```
transforms=dropNullRecords
transforms.dropNullRecords.type=org.apache.kafka.connect.transforms.Filter
transforms.dropNullRecords.predicate=isNullRecord

predicates=isNullRecord
predicates.isNullRecord.type=org.apache.kafka.connect.transforms.predicates.RecordIsTombstone
```
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused by this example.

From the text my intuition is that the example should be for filtering out docs with a _id=null (though I think that could be made more clear by splitting the sentences).

3. If you have messages where the `_id` field is absent or `null` then Cloudant will generate
a document ID. If you don't want this to happen then set an `_id` (see earlier examples).
Alternatively filter out those documents. For example if you have messages where the `_id`
field is `null` then you'll need to use a transform and predicate to filter out and remove this
field:
...

However, the transform/predicate are dropping records that have a null value (which isn't the same). I think it is a useful example in its own right, but then we should update the doc to say:

For example if you have records where there is no value e.g. tombstones (and don't want the Cloudant sink connector to generate an empty doc with a generated ID) then you'll need to use a transform and predicate to filter out and remove these tombstone records:

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, the example is for dropping the whole record. I did a bit more digging and what we want is a conditional transform that checks if _id == null then drops the field if it's true. Unfortunately, there's no built-in SMT that handles conditionals and we'd have to create a custom SMT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I may be wrong as what I was reading was outdated. I'll re-review the filter SMT docs tomorrow.

@emlaver emlaver closed this Jul 19, 2022
@emlaver
Copy link
Contributor Author

emlaver commented Jul 19, 2022

New PR: #83

@emlaver emlaver deleted the 65-sink-connector-id-config branch April 4, 2023 19:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Sink connector ID configuration
3 participants