Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CC-21917 | Added support for decimal.format config to be passed to json formatter. #673

Merged
merged 1 commit into from Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.util.ArrayList;
Expand Down Expand Up @@ -165,6 +166,13 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
public static final String S3_PATH_STYLE_ACCESS_ENABLED_CONFIG = "s3.path.style.access.enabled";
public static final boolean S3_PATH_STYLE_ACCESS_ENABLED_DEFAULT = true;

public static final String DECIMAL_FORMAT_CONFIG = "json.decimal.format";
public static final String DECIMAL_FORMAT_DEFAULT = DecimalFormat.BASE64.name();
private static final String DECIMAL_FORMAT_DOC = "Controls which format json converter"
+ " will serialize decimals in."
+ " This value is case insensitive and can be either 'BASE64' (default) or 'NUMERIC'";
private static final String DECIMAL_FORMAT_DISPLAY = "Decimal Format";

public static final String STORE_KAFKA_KEYS_CONFIG = "store.kafka.keys";
public static final String STORE_KAFKA_HEADERS_CONFIG = "store.kafka.headers";
public static final String KEYS_FORMAT_CLASS_CONFIG = "keys.format.class";
Expand Down Expand Up @@ -449,6 +457,20 @@ public static ConfigDef newConfigDef() {
COMPRESSION_LEVEL_VALIDATOR
);

configDef.define(
DECIMAL_FORMAT_CONFIG,
ConfigDef.Type.STRING,
DECIMAL_FORMAT_DEFAULT,
ConfigDef.CaseInsensitiveValidString.in(
DecimalFormat.BASE64.name(), DecimalFormat.NUMERIC.name()),
Importance.MEDIUM,
DECIMAL_FORMAT_DOC,
group,
++orderInGroup,
Width.MEDIUM,
DECIMAL_FORMAT_DISPLAY
);

configDef.define(
S3_PART_RETRIES_CONFIG,
Type.INT,
Expand Down Expand Up @@ -768,6 +790,10 @@ public int getCompressionLevel() {
return getInt(COMPRESSION_LEVEL_CONFIG);
}

public String getJsonDecimalFormat() {
return getString(DECIMAL_FORMAT_CONFIG);
}

public CompressionCodecName parquetCompressionCodecName() {
return "none".equalsIgnoreCase(getString(PARQUET_CODEC_CONFIG))
? CompressionCodecName.fromConf(null)
Expand Down
Expand Up @@ -39,6 +39,10 @@ public JsonFormat(S3Storage storage) {
"schemas.cache.size",
String.valueOf(storage.conf().get(S3SinkConnectorConfig.SCHEMA_CACHE_SIZE_CONFIG))
);
converterConfig.put(
"decimal.format",
String.valueOf(storage.conf().getJsonDecimalFormat())
);
this.converter.configure(converterConfig, false);
}

Expand Down
Expand Up @@ -17,10 +17,13 @@

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;

import io.confluent.connect.s3.format.bytearray.ByteArrayFormat;
import io.confluent.connect.s3.format.parquet.ParquetFormat;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand All @@ -47,6 +50,8 @@
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
import io.confluent.connect.avro.AvroDataConfig;

import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_DEFAULT;
import static io.confluent.connect.s3.S3SinkConnectorConfig.HEADERS_FORMAT_CLASS_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.KEYS_FORMAT_CLASS_CONFIG;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -279,7 +284,11 @@ public void testConfigurableCredentialProviderMissingConfigs() {
);

connectorConfig = new S3SinkConnectorConfig(properties);
assertThrows("are mandatory configuration properties", ConfigException.class, () -> connectorConfig.getCredentialsProvider());
assertThrows(
"are mandatory configuration properties",
ConfigException.class,
() -> connectorConfig.getCredentialsProvider()
);
}

@Test
Expand All @@ -306,7 +315,11 @@ public void testConfigurableAwsAssumeRoleCredentialsProviderMissingConfigs() {
AwsAssumeRoleCredentialsProvider credentialsProvider =
(AwsAssumeRoleCredentialsProvider) connectorConfig.getCredentialsProvider();

assertThrows("Missing required configuration", ConfigException.class, () -> credentialsProvider.configure(properties));
assertThrows(
"Missing required configuration",
ConfigException.class,
() -> credentialsProvider.configure(properties)
);
}

@Test
Expand Down Expand Up @@ -408,6 +421,16 @@ public void testInvalidLowCompressionLevel() {
connectorConfig = new S3SinkConnectorConfig(properties);
}

@Test
public void testJsonDecimalFormat() {
connectorConfig = new S3SinkConnectorConfig(properties);
assertEquals(DecimalFormat.BASE64.name(), connectorConfig.getJsonDecimalFormat());

properties.put(S3SinkConnectorConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
connectorConfig = new S3SinkConnectorConfig(properties);
assertEquals(DecimalFormat.NUMERIC.name(), connectorConfig.getJsonDecimalFormat());
}

@Test
public void testValidCompressionLevels() {
IntStream.range(-1, 9).boxed().forEach(i -> {
Expand Down Expand Up @@ -457,7 +480,7 @@ public void testUnsupportedParquetCompressionType() {
}

@Test
public void testValidTimezoneWithScheduleIntervalAccepted (){
public void testValidTimezoneWithScheduleIntervalAccepted() {
properties.put(PartitionerConfig.TIMEZONE_CONFIG, "CET");
properties.put(S3SinkConnectorConfig.ROTATE_SCHEDULE_INTERVAL_MS_CONFIG, "30");
new S3SinkConnectorConfig(properties);
Expand All @@ -474,7 +497,7 @@ public void testEmptyTimezoneThrowsExceptionOnScheduleInterval() {
public void testEmptyTimezoneExceptionMessage() {
properties.put(PartitionerConfig.TIMEZONE_CONFIG, PartitionerConfig.TIMEZONE_DEFAULT);
properties.put(S3SinkConnectorConfig.ROTATE_SCHEDULE_INTERVAL_MS_CONFIG, "30");
String expectedError = String.format(
String expectedError = String.format(
"%s configuration must be set when using %s",
PartitionerConfig.TIMEZONE_CONFIG,
S3SinkConnectorConfig.ROTATE_SCHEDULE_INTERVAL_MS_CONFIG
Expand Down