Skip to content

Commit

Permalink
Merge pull request #673 from confluentinc/pbadani/CC-21917
Browse files Browse the repository at this point in the history
CC-21917 | Added support for decimal.format config to be passed to json formatter.
  • Loading branch information
pbadani committed Aug 18, 2023
2 parents 7e7b15d + 68985ad commit 1087a61
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 4 deletions.
Expand Up @@ -32,6 +32,7 @@
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.util.ArrayList;
Expand Down Expand Up @@ -165,6 +166,13 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {
public static final String S3_PATH_STYLE_ACCESS_ENABLED_CONFIG = "s3.path.style.access.enabled";
public static final boolean S3_PATH_STYLE_ACCESS_ENABLED_DEFAULT = true;

public static final String DECIMAL_FORMAT_CONFIG = "json.decimal.format";
public static final String DECIMAL_FORMAT_DEFAULT = DecimalFormat.BASE64.name();
private static final String DECIMAL_FORMAT_DOC = "Controls which format json converter"
+ " will serialize decimals in."
+ " This value is case insensitive and can be either 'BASE64' (default) or 'NUMERIC'";
private static final String DECIMAL_FORMAT_DISPLAY = "Decimal Format";

public static final String STORE_KAFKA_KEYS_CONFIG = "store.kafka.keys";
public static final String STORE_KAFKA_HEADERS_CONFIG = "store.kafka.headers";
public static final String KEYS_FORMAT_CLASS_CONFIG = "keys.format.class";
Expand Down Expand Up @@ -449,6 +457,20 @@ public static ConfigDef newConfigDef() {
COMPRESSION_LEVEL_VALIDATOR
);

configDef.define(
DECIMAL_FORMAT_CONFIG,
ConfigDef.Type.STRING,
DECIMAL_FORMAT_DEFAULT,
ConfigDef.CaseInsensitiveValidString.in(
DecimalFormat.BASE64.name(), DecimalFormat.NUMERIC.name()),
Importance.MEDIUM,
DECIMAL_FORMAT_DOC,
group,
++orderInGroup,
Width.MEDIUM,
DECIMAL_FORMAT_DISPLAY
);

configDef.define(
S3_PART_RETRIES_CONFIG,
Type.INT,
Expand Down Expand Up @@ -768,6 +790,10 @@ public int getCompressionLevel() {
return getInt(COMPRESSION_LEVEL_CONFIG);
}

public String getJsonDecimalFormat() {
return getString(DECIMAL_FORMAT_CONFIG);
}

public CompressionCodecName parquetCompressionCodecName() {
return "none".equalsIgnoreCase(getString(PARQUET_CODEC_CONFIG))
? CompressionCodecName.fromConf(null)
Expand Down
Expand Up @@ -39,6 +39,10 @@ public JsonFormat(S3Storage storage) {
"schemas.cache.size",
String.valueOf(storage.conf().get(S3SinkConnectorConfig.SCHEMA_CACHE_SIZE_CONFIG))
);
converterConfig.put(
"decimal.format",
String.valueOf(storage.conf().getJsonDecimalFormat())
);
this.converter.configure(converterConfig, false);
}

Expand Down
Expand Up @@ -17,10 +17,13 @@

import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;

import io.confluent.connect.s3.format.bytearray.ByteArrayFormat;
import io.confluent.connect.s3.format.parquet.ParquetFormat;

import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.After;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand All @@ -47,6 +50,8 @@
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
import io.confluent.connect.avro.AvroDataConfig;

import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.DECIMAL_FORMAT_DEFAULT;
import static io.confluent.connect.s3.S3SinkConnectorConfig.HEADERS_FORMAT_CLASS_CONFIG;
import static io.confluent.connect.s3.S3SinkConnectorConfig.KEYS_FORMAT_CLASS_CONFIG;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -279,7 +284,11 @@ public void testConfigurableCredentialProviderMissingConfigs() {
);

connectorConfig = new S3SinkConnectorConfig(properties);
assertThrows("are mandatory configuration properties", ConfigException.class, () -> connectorConfig.getCredentialsProvider());
assertThrows(
"are mandatory configuration properties",
ConfigException.class,
() -> connectorConfig.getCredentialsProvider()
);
}

@Test
Expand All @@ -306,7 +315,11 @@ public void testConfigurableAwsAssumeRoleCredentialsProviderMissingConfigs() {
AwsAssumeRoleCredentialsProvider credentialsProvider =
(AwsAssumeRoleCredentialsProvider) connectorConfig.getCredentialsProvider();

assertThrows("Missing required configuration", ConfigException.class, () -> credentialsProvider.configure(properties));
assertThrows(
"Missing required configuration",
ConfigException.class,
() -> credentialsProvider.configure(properties)
);
}

@Test
Expand Down Expand Up @@ -408,6 +421,16 @@ public void testInvalidLowCompressionLevel() {
connectorConfig = new S3SinkConnectorConfig(properties);
}

@Test
public void testJsonDecimalFormat() {
connectorConfig = new S3SinkConnectorConfig(properties);
assertEquals(DecimalFormat.BASE64.name(), connectorConfig.getJsonDecimalFormat());

properties.put(S3SinkConnectorConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
connectorConfig = new S3SinkConnectorConfig(properties);
assertEquals(DecimalFormat.NUMERIC.name(), connectorConfig.getJsonDecimalFormat());
}

@Test
public void testValidCompressionLevels() {
IntStream.range(-1, 9).boxed().forEach(i -> {
Expand Down Expand Up @@ -457,7 +480,7 @@ public void testUnsupportedParquetCompressionType() {
}

@Test
public void testValidTimezoneWithScheduleIntervalAccepted (){
public void testValidTimezoneWithScheduleIntervalAccepted() {
properties.put(PartitionerConfig.TIMEZONE_CONFIG, "CET");
properties.put(S3SinkConnectorConfig.ROTATE_SCHEDULE_INTERVAL_MS_CONFIG, "30");
new S3SinkConnectorConfig(properties);
Expand All @@ -474,7 +497,7 @@ public void testEmptyTimezoneThrowsExceptionOnScheduleInterval() {
public void testEmptyTimezoneExceptionMessage() {
properties.put(PartitionerConfig.TIMEZONE_CONFIG, PartitionerConfig.TIMEZONE_DEFAULT);
properties.put(S3SinkConnectorConfig.ROTATE_SCHEDULE_INTERVAL_MS_CONFIG, "30");
String expectedError = String.format(
String expectedError = String.format(
"%s configuration must be set when using %s",
PartitionerConfig.TIMEZONE_CONFIG,
S3SinkConnectorConfig.ROTATE_SCHEDULE_INTERVAL_MS_CONFIG
Expand Down

0 comments on commit 1087a61

Please sign in to comment.