Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector-V2] Support extract partition from SeaTunnelRow fields #3085

Merged
merged 12 commits into from
Oct 18, 2022

Conversation

TaoZex
Copy link
Contributor

@TaoZex TaoZex commented Oct 13, 2022

Purpose of this pull request

issue:#2787

The key is specified by field name.Determine the partition of the kafka send message based on the key.

Check list

@TaoZex
Copy link
Contributor Author

TaoZex commented Oct 13, 2022

@hailin0 PTAL

List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
String key;
if (fields.contains(keyField)) {
key = element.getField(fields.indexOf(keyField)).toString();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to check null ?


@Override
public ProducerRecord<byte[], byte[]> serializeRowByKey(String key, SeaTunnelRow row) {
return new ProducerRecord<>(topic, key.getBytes(), jsonSerializationSchema.serialize(row));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check key is null?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check key is null?

I think we don not need check key is null, because if the key is null, kafka will send it to a random partition(random select a new partition per topic.metadata.refresh.ms).

The third property "partitioner.class" defines what class to use to determine which Partition in the Topic the message is to be sent to. This is optional, but for any non-trivial implementation you are going to want to implement a partitioning scheme. More about the implementation of this class later. If you include a value for the key but haven't defined a partitioner.class Kafka will use the default partitioner. If the key is null, then the Producer will assign the message to a random Partition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry, you need check key is null. Because you use key.toBytes() . You can update to
return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row));

@@ -21,6 +21,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on
| bootstrap.servers | string | yes | - |
| kafka.* | kafka producer config | no | - |
| semantic | string | no | NON |
| key | string | no | - |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| key | string | no | - |
| partition_key | string | no |- |

@@ -50,6 +51,21 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b

NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.

### key [string]

Determine the partition of the kafka send message based on the key.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add support config filed name

@@ -50,6 +51,23 @@ In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka b

NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.

### partition_key [string]

Determine the partition of the kafka send message based on the key.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with Configure which field is used as the key of the kafka message better.

- Add kafka sink doc
- New feature : Kafka specified partition to send
- New feature : Determine the partition that kafka send based on the message content
- New feature : Determine the partition that kafka send messag based on the message content
- New feature : Determine the partition of the kafka send message based on the field name
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace with Configure which field is used as the key of the kafka message better ?


@Override
public ProducerRecord<byte[], byte[]> serializeRowByKey(String key, SeaTunnelRow row) {
return new ProducerRecord<>(topic, key.getBytes(), jsonSerializationSchema.serialize(row));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry, you need check key is null. Because you use key.toBytes() . You can update to
return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row));

Object field = element.getField(fields.indexOf(keyField));
//If the field is null, send the message to the same partition
if (field == null) {
key = "null";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not a good way. because there may be a lot of null values. If you case null to "null", it means all of null values will be write to a same partition.

You can keep null and update the code in seaTunnelRowSerializer.serializeRowByKey(key, element) like this

return new ProducerRecord<>(topic, key == null ? null : key.getBytes(), jsonSerializationSchema.serialize(row));

if the key is null, kafka will send it to a random partition(random select a new partition per topic.metadata.refresh.ms).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It' s a good way, thanks for your advice.

@@ -74,6 +92,7 @@ public KafkaSinkWriter(
List<KafkaSinkState> kafkaStates) {
this.context = context;
this.pluginConfig = pluginConfig;
this.seaTunnelRowType = seaTunnelRowType;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.seaTunnelRowType = seaTunnelRowType;
this.partitionExtractor = createPartitionExtractor(pluginConfig, seaTunnelRowType);

Add this method

    private Function<SeaTunnelRow, String> createPartitionExtractor(Config pluginConfig,
                                                                    SeaTunnelRowType seaTunnelRowType) {
        if (!pluginConfig.hasPath(PARTITION_KEY)){
            return row -> null;
        }
        String partitionKey = pluginConfig.getString(PARTITION_KEY);
        List<String> fieldNames = Arrays.asList(seaTunnelRowType.getFieldNames());
        if (!fieldNames.contains(partitionKey)) {
            return row -> partitionKey;
        }
        int partitionFieldIndex = seaTunnelRowType.indexOf(partitionKey);
        return row -> {
            Object partitionFieldValue = row.getField(partitionFieldIndex);
            if (partitionFieldValue != null) {
                return partitionFieldValue.toString();
            }
            return null;
        };
    }

Comment on lines 72 to 79
String keyField = pluginConfig.getString(PARTITION_KEY);
List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
String key;
if (fields.contains(keyField)) {
key = element.getField(fields.indexOf(keyField)).toString();
} else {
key = keyField;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String keyField = pluginConfig.getString(PARTITION_KEY);
List<String> fields = Arrays.asList(seaTunnelRowType.getFieldNames());
String key;
if (fields.contains(keyField)) {
key = element.getField(fields.indexOf(keyField)).toString();
} else {
key = keyField;
}
String key = partitionExtractor.apply(element);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @TaoZex I think @hailin0 gave you a good suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks to @EricJoy2048 and @hailin0 the good suggestion, I will fix it.

hailin0
hailin0 previously approved these changes Oct 17, 2022
Copy link
Member

@hailin0 hailin0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -21,6 +21,7 @@ By default, we will use 2pc to guarantee the message is sent to kafka exactly on
| bootstrap.servers | string | yes | - |
| kafka.* | kafka producer config | no | - |
| semantic | string | no | NON |
| partition_key | string | no | - |
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
| partition_key | string | no | - |
| partition_key | string | no | - |

checkstyle

Copy link
Member

@EricJoy2048 EricJoy2048 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@dijiekstra
Copy link
Contributor

LGTM

Copy link
Member

@ashulin ashulin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@ashulin ashulin merged commit 385e1f4 into apache:dev Oct 18, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants