Skip to content

Commit

Permalink
Merge branch '2.5.x' into master by sp-gupta
Browse files Browse the repository at this point in the history
  • Loading branch information
ConfluentJenkins committed Mar 20, 2023
2 parents 39a43f9 + 6c12fde commit 578d189
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,12 @@ public class BigQuerySinkConfig extends AbstractConfig {
private static final String BIGQUERY_CLUSTERING_FIELD_NAMES_DOC =
"List of fields on which data should be clustered by in BigQuery, separated by commas";

public static final String CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_CONFIG = "convertDebeziumTimestampToInteger";
private static final ConfigDef.Type CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_TYPE = ConfigDef.Type.BOOLEAN;
private static final Boolean CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_DEFAULT = false;
private static final ConfigDef.Importance CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_IMPORTANCE =
ConfigDef.Importance.MEDIUM;

public static final String TIME_PARTITIONING_TYPE_CONFIG = "timePartitioningType";
private static final ConfigDef.Type TIME_PARTITIONING_TYPE_TYPE = ConfigDef.Type.STRING;
public static final String TIME_PARTITIONING_TYPE_DEFAULT = TimePartitioning.Type.DAY.name().toUpperCase();
Expand Down Expand Up @@ -795,6 +801,11 @@ public static ConfigDef getConfig() {
BIGQUERY_CLUSTERING_FIELD_NAMES_VALIDATOR,
BIGQUERY_CLUSTERING_FIELD_NAMES_IMPORTANCE,
BIGQUERY_CLUSTERING_FIELD_NAMES_DOC
).defineInternal(
CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_CONFIG,
CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_TYPE,
CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_DEFAULT,
CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_IMPORTANCE
).define(
TIME_PARTITIONING_TYPE_CONFIG,
TIME_PARTITIONING_TYPE_TYPE,
Expand Down Expand Up @@ -948,7 +959,7 @@ public SchemaConverter<Schema> getSchemaConverter() {
* @return a {@link RecordConverter} for BigQuery.
*/
public RecordConverter<Map<String, Object>> getRecordConverter() {
return new BigQueryRecordConverter(getBoolean(CONVERT_DOUBLE_SPECIAL_VALUES_CONFIG));
return new BigQueryRecordConverter(getBoolean(CONVERT_DOUBLE_SPECIAL_VALUES_CONFIG), getBoolean(CONVERT_DEBEZIUM_TIMESTAMP_TO_INTEGER_CONFIG));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ public class BigQueryRecordConverter implements RecordConverter<Map<String, Obje
Integer.class, Long.class, Float.class, Double.class, String.class)
);
private final boolean shouldConvertSpecialDouble;
private boolean shouldConvertDebeziumTimestampToInteger;

static {
// force registration
new DebeziumLogicalConverters();
new KafkaLogicalConverters();
}

public BigQueryRecordConverter(boolean shouldConvertDoubleSpecial) {
public BigQueryRecordConverter(boolean shouldConvertDoubleSpecial, boolean shouldConvertDebeziumTimestampToInteger) {
this.shouldConvertSpecialDouble = shouldConvertDoubleSpecial;
this.shouldConvertDebeziumTimestampToInteger = shouldConvertDebeziumTimestampToInteger;
}

/**
Expand Down Expand Up @@ -230,6 +232,10 @@ private Object convertLogical(Object kafkaConnectObject,
Schema kafkaConnectSchema) {
LogicalTypeConverter converter =
LogicalConverterRegistry.getConverter(kafkaConnectSchema.name());

if(shouldConvertDebeziumTimestampToInteger && converter instanceof DebeziumLogicalConverters.TimestampConverter) {
return (Long) kafkaConnectObject;
}
return converter.convert(kafkaConnectObject);
}

Expand Down

0 comments on commit 578d189

Please sign in to comment.