Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve UX of the CDC API #21536

Merged
merged 18 commits into from Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -19,6 +19,7 @@
import com.hazelcast.jet.annotation.EvolvingApi;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

/**
* Information pertaining to a single data change event (insert, delete or
Expand Down Expand Up @@ -64,9 +65,6 @@ public interface ChangeRecord {
* be easy to identify them because they have a separate {@code SYNC}
* operation instead of {@code INSERT}, however some databases emit {@code
* INSERT} events in both cases (a notable example is MySQL).
*
* @throws ParsingException if the timestamp field isn't present or
* is unparseable
Comment on lines -67 to -69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change record is marked with @EvolvingApi so we can remove the throws ParsingException if it's not the case anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done

*/
long timestamp() throws ParsingException;

Expand Down Expand Up @@ -97,8 +95,6 @@ public interface ChangeRecord {
* @return {@link Operation#UNSPECIFIED} if this {@code ChangeRecord}
* doesn't have an operation field, otherwise the appropriate {@link
* Operation} that matches the CDC record's operation field
* @throws ParsingException if there is an operation field, but its
* value is not among the handled ones.
*/
@Nonnull
Operation operation() throws ParsingException;
Expand All @@ -107,8 +103,6 @@ public interface ChangeRecord {
* Returns the name of the database containing the record's table.
*
* @return name of the source database for the current record
* @throws ParsingException if the database name field isn't present
* or is unparseable
*/
@Nonnull
String database() throws ParsingException;
Expand All @@ -119,8 +113,6 @@ public interface ChangeRecord {
* MySQL).
*
* @return name of the source schema for the current record
* @throws ParsingException if the schema name field isn't present
* or is unparseable
*/
@Nonnull
String schema() throws ParsingException, UnsupportedOperationException;
Expand All @@ -129,8 +121,6 @@ public interface ChangeRecord {
* Returns the name of the table this record is part of.
*
* @return name of the source table for the current record
* @throws ParsingException if the table name field isn't present or
* is unparseable
*/
@Nonnull
String table() throws ParsingException;
Expand All @@ -145,7 +135,7 @@ public interface ChangeRecord {
* Returns the value part of the CDC event. It includes fields like the
* timestamp, operation, and database record data.
* <p>
* For <em>insert</em> and <em>update</em> operations the value describes
* For <em>sync</em>, <em>insert</em> and <em>update</em> operations the value describes
* the database record as it looks AFTER the event, so the latest image.
* <p>
* For <em>delete</em> operations the value describes the database record as
Expand All @@ -154,6 +144,25 @@ public interface ChangeRecord {
@Nonnull
RecordPart value();

/**
* Returns the new value of the record. For <em>sync</em>, <em>insert</em> and <em>update</em> operations the value
* describes the database record as it looks AFTER the event, so the latest image, for <em>delete</em> it returns null.
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved
*
* @since 5.2
*/
@Nullable
RecordPart newValue();

/**
* Returns the new value of the record. For <em>update</em> and <em>delete</em> operations the value
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved
* describes the database record as it looks <strong>before</strong> the event,
TomaszGaweda marked this conversation as resolved.
Show resolved Hide resolved
* for <em>sync</em> and <em>insert</em> it returns null.
*
* @since 5.2
*/
@Nullable
RecordPart oldValue();

/**
* Returns the raw JSON string from the CDC event underlying this {@code
* ChangeRecord}. You can use it if higher-level parsing (see other
Expand Down
Expand Up @@ -28,11 +28,14 @@
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.StreamSource;
import org.apache.kafka.connect.source.SourceConnector;

import javax.annotation.Nonnull;
import java.util.Map.Entry;
import java.util.Properties;

import static com.hazelcast.internal.util.Preconditions.checkState;

/**
* Contains factory methods for creating Change Data Capture (CDC) sources.
* <p>
Expand All @@ -55,6 +58,7 @@ private DebeziumCdcSources() {
*
* @param name the name of this source, must unique, will be passed
* to the underlying Kafka Connect source
* @param connectorClass name of the Debezium connector class
* @return a builder you can use to set the source's properties and
* then construct it
*/
Expand All @@ -64,6 +68,24 @@ public static Builder<ChangeRecord> debezium(@Nonnull String name, @Nonnull Stri
(properties, eventTimePolicy) -> new ChangeRecordCdcSourceP(properties, eventTimePolicy));
}

/**
* Creates a CDC source that streams change data from a
* Debezium-supported database to a Hazelcast Jet pipeline.
*
* @param name the name of this source, must unique, will be passed
* to the underlying Kafka Connect source
* @param connectorClass class of the Debezium Connector which will be used for the CDC
* @return a builder you can use to set the source's properties and
* then construct it
*/
@Nonnull
public static Builder<ChangeRecord> debezium(
@Nonnull String name,
@Nonnull Class<? extends SourceConnector> connectorClass) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can provide Class<?> but not extends SourceConnector - leaking the kafka connect API to the interface.

return new Builder<>(name, connectorClass.getName(),
(properties, eventTimePolicy) -> new ChangeRecordCdcSourceP(properties, eventTimePolicy));
}

/**
* Creates a CDC source that streams change data from a
* Debezium-supported database to a Hazelcast Jet pipeline.
Expand All @@ -73,6 +95,9 @@ public static Builder<ChangeRecord> debezium(@Nonnull String name, @Nonnull Stri
* raw Debezium data. Just returns a pair of JSON strings, one is
* the key of the Debezium CDC event, the other the value.
*
* @param name the name of this source, must unique, will be passed
* to the underlying Kafka Connect source
* @param connectorClass name of the Debezium connector class
* @return a builder you can use to set the source's properties and then construct it
*/
@Nonnull
Expand All @@ -81,6 +106,30 @@ public static Builder<Entry<String, String>> debeziumJson(@Nonnull String name,
(properties, eventTimePolicy) -> new JsonCdcSourceP(properties, eventTimePolicy));
}

/**
* Creates a CDC source that streams change data from a
* Debezium-supported database to a Hazelcast Jet pipeline.
* <p>
* Differs from the {@link #debezium(String, String) regular source}
* in that it does the least possible amount of processing of the
* raw Debezium data. Just returns a pair of JSON strings, one is
* the key of the Debezium CDC event, the other the value.
*
* @param name the name of this source, must unique, will be passed
* to the underlying Kafka Connect source
* @param connectorClass class of the Debezium Connector which will be used for the CDC
* @return a builder you can use to set the source's properties and then construct it
*/
@Nonnull
public static Builder<Entry<String, String>> debeziumJson(
@Nonnull String name,
@Nonnull Class<?> connectorClass) {
checkState(SourceConnector.class.isAssignableFrom(connectorClass), "connector class must be a subclass" +
" of SourceConnector");
return new Builder<>(name, connectorClass.getName(),
(properties, eventTimePolicy) -> new JsonCdcSourceP(properties, eventTimePolicy));
}

/**
* A builder to configure a CDC source that streams the change data from
* a Debezium-supported database to Hazelcast Jet.
Expand Down
Expand Up @@ -62,6 +62,13 @@ public enum Operation {
this.id = id;
}

/**
* Returns the Debezium operation code as String.
*/
public String code() {
return String.valueOf(id);
}

/**
* Parses the string present in a CDC message into the corresponding
* {@code Operation} enum member.
Expand Down
Expand Up @@ -24,7 +24,7 @@
* @since Jet 4.2
*/
@EvolvingApi
public class ParsingException extends Exception {
public class ParsingException extends RuntimeException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the operations now are not lazy-parsing the json, so no exception is really expected. However, if user already used methods like .database(), he may wrap the invocation in the try-catch block. Therefore, I cannot completely remove "throws ParsingException" from those methods to not break source compatibility, yet new users should not be required to deal with exception that should never occurs. One workaround is to change ParsingException to RuntimeException - user by default won't deal with it, but old code will still work

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should mark this class as deprecated then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is still used in lazy-parsing in RecordPartImpl, although I'm not against changing this to other exception type.


private static final long serialVersionUID = 1L;

Expand Down
Expand Up @@ -16,6 +16,8 @@

package com.hazelcast.jet.cdc.impl;

import com.hazelcast.jet.cdc.Operation;
import com.hazelcast.jet.cdc.RecordPart;
import com.hazelcast.jet.impl.serialization.SerializerHookConstants;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
Expand Down Expand Up @@ -52,19 +54,36 @@ public int getTypeId() {

@Override
public void write(ObjectDataOutput out, ChangeRecordImpl record) throws IOException {
out.writeLong(record.timestamp());
out.writeLong(record.sequenceSource());
out.writeLong(record.sequenceValue());
out.writeString(record.operation().code());
out.writeUTF(record.getKeyJson());
out.writeUTF(record.getValueJson());
RecordPart oldValue = record.oldValue();
out.writeUTF(oldValue == null ? null : oldValue.toJson());
RecordPart newValue = record.newValue();
out.writeUTF(newValue == null ? null : newValue.toJson());
out.writeString(record.table());
out.writeString(record.schema());
out.writeString(record.database());
}

@Override
public ChangeRecordImpl read(ObjectDataInput in) throws IOException {
long timestamp = in.readLong();
long sequenceSource = in.readLong();
long sequenceValue = in.readLong();
String keyJson = in.readUTF();
String valueJson = in.readUTF();
return new ChangeRecordImpl(sequenceSource, sequenceValue, keyJson, valueJson);
Operation operation = Operation.get(in.readString());
String keyJson = in.readString();
String oldValueJson = in.readString();
String newValueJson = in.readString();
String table = in.readString();
String schema = in.readString();
String database = in.readString();
return new ChangeRecordImpl(
timestamp, sequenceSource, sequenceValue,
operation, keyJson, oldValueJson, newValueJson,
table, schema, database);
}
};
}
Expand Down
Expand Up @@ -17,20 +17,17 @@
package com.hazelcast.jet.cdc.impl;

import com.hazelcast.jet.cdc.ChangeRecord;
import com.hazelcast.jet.cdc.Operation;
import com.hazelcast.jet.core.EventTimePolicy;
import io.debezium.transforms.ExtractNewRecordState;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Values;
import org.apache.kafka.connect.source.SourceRecord;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Supplier;

import static com.hazelcast.jet.impl.util.ExceptionUtil.rethrow;

Expand All @@ -39,7 +36,6 @@ public class ChangeRecordCdcSourceP extends CdcSourceP<ChangeRecord> {
public static final String DB_SPECIFIC_EXTRA_FIELDS_PROPERTY = "db.specific.extra.fields";

private final SequenceExtractor sequenceExtractor;
private final ExtractNewRecordState<SourceRecord> transform;

public ChangeRecordCdcSourceP(
@Nonnull Properties properties,
Expand All @@ -50,7 +46,6 @@ public ChangeRecordCdcSourceP(
try {
sequenceExtractor = newInstance(properties.getProperty(SEQUENCE_EXTRACTOR_CLASS_PROPERTY),
"sequence extractor ");
transform = initTransform(properties.getProperty(DB_SPECIFIC_EXTRA_FIELDS_PROPERTY));
} catch (Exception e) {
throw rethrow(e);
}
Expand All @@ -59,34 +54,44 @@ public ChangeRecordCdcSourceP(
@Nullable
@Override
protected ChangeRecord map(SourceRecord record) {
record = transform.apply(record);
if (record == null) {
return null;
}

long sequenceSource = sequenceExtractor.source(record.sourcePartition(), record.sourceOffset());
long sequenceValue = sequenceExtractor.sequence(record.sourceOffset());
String keyJson = Values.convertToString(record.keySchema(), record.key());
String valueJson = Values.convertToString(record.valueSchema(), record.value());
return new ChangeRecordImpl(sequenceSource, sequenceValue, keyJson, valueJson);
}

private static ExtractNewRecordState<SourceRecord> initTransform(String dbSpecificExtraFields) {
ExtractNewRecordState<SourceRecord> transform = new ExtractNewRecordState<>();
Struct value = (Struct) record.value();
Struct source = (Struct) value.get("source");

Map<String, String> config = new HashMap<>();
config.put("add.fields", String.join(",", extraFields(dbSpecificExtraFields)));
config.put("delete.handling.mode", "rewrite");
transform.configure(config);
Operation operation = Operation.get(value.getString("op"));
Schema valueSchema = record.valueSchema();

return transform;
Object before = value.get("before");
Object after = value.get("after");
Supplier<String> oldValueJson = before == null
? null
: () -> Values.convertToString(valueSchema.field("before").schema(), before);
Supplier<String> newValueJson = after == null
? null
: () -> Values.convertToString(valueSchema.field("after").schema(), after);
return new ChangeRecordImpl(
value.getInt64("ts_ms"),
sequenceSource,
sequenceValue,
operation,
keyJson,
oldValueJson,
newValueJson,
fieldOrNull(source, "table"),
fieldOrNull(source, "schema"),
fieldOrNull(source, "db")
);
}

private static Collection<String> extraFields(String dbSpecificExtraFields) {
Set<String> extraFields = new HashSet<>(Arrays.asList("db", "table", "op", "ts_ms"));
if (dbSpecificExtraFields != null) {
extraFields.addAll(Arrays.asList(dbSpecificExtraFields.split(",")));
}
return extraFields;
private static String fieldOrNull(Struct struct, String fieldName) {
return struct.schema().field(fieldName) != null
? struct.getString(fieldName)
: null;
}
}