Skip to content

Commit

Permalink
Support setting KEY_BASED batch builder for Pulsar Sinks
Browse files Browse the repository at this point in the history
- include batchBuilder in ProducerSpec -> ProducerConfig.ProducerConfigBuilder conversion
  since it was missing

- support setting batch builder with "--batch-builder KEY_BASED" argument
  • Loading branch information
lhotari committed Aug 19, 2021
1 parent f48e002 commit 3631202
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ private void setupOutput(ContextImpl contextImpl) throws Exception {
ProducerConfig.ProducerConfigBuilder builder = ProducerConfig.builder()
.maxPendingMessages(conf.getMaxPendingMessages())
.maxPendingMessagesAcrossPartitions(conf.getMaxPendingMessagesAcrossPartitions())
.batchBuilder(conf.getBatchBuilder())
.useThreadLocalProducers(conf.getUseThreadLocalProducers())
.cryptoConfig(CryptoUtils.convertFromSpec(conf.getCryptoSpec()));
pulsarSinkConfig.setProducerConfig(builder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,13 @@ public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSource
sinkSpecBuilder.setProducerSpec(ProducerConfigUtils.convert(sourceConfig.getProducerConfig()));
}

if (sourceConfig.getBatchBuilder() != null) {
Function.ProducerSpec.Builder builder = sinkSpecBuilder.getProducerSpec() != null
? sinkSpecBuilder.getProducerSpec().toBuilder()
: Function.ProducerSpec.newBuilder();
sinkSpecBuilder.setProducerSpec(builder.setBatchBuilder(sourceConfig.getBatchBuilder()).build());
}

sinkSpecBuilder.setForwardSourceMessageProperty(true);

functionDetailsBuilder.setSink(sinkSpecBuilder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,37 @@ public void testValidateConfig() throws IOException {
assertTrue(e.getMessage().contains("Could not validate source config: Field 'configParameter' cannot be null!"));
}

@Test
public void testSupportsBatchBuilderWhenProducerConfigIsNull() {
SourceConfig sourceConfig = createSourceConfig();
sourceConfig.setProducerConfig(null);
sourceConfig.setBatchBuilder("KEY_BASED");
Function.FunctionDetails functionDetails =
SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
}

@Test
public void testSupportsBatchBuilderWhenProducerConfigExists() {
SourceConfig sourceConfig = createSourceConfig();
sourceConfig.setBatchBuilder("KEY_BASED");
sourceConfig.getProducerConfig().setMaxPendingMessages(123456);
Function.FunctionDetails functionDetails =
SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
assertEquals(functionDetails.getSink().getProducerSpec().getMaxPendingMessages(), 123456);
}

@Test
public void testSupportsBatchBuilderDefinedInProducerConfigWhenTopLevelBatchBuilderIsUndefined() {
SourceConfig sourceConfig = createSourceConfig();
sourceConfig.setBatchBuilder(null);
sourceConfig.getProducerConfig().setBatchBuilder("KEY_BASED");
Function.FunctionDetails functionDetails =
SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
assertEquals(functionDetails.getSink().getProducerSpec().getBatchBuilder(), "KEY_BASED");
}

private SourceConfig createSourceConfigWithBatch() {
SourceConfig sourceConfig = createSourceConfig();
BatchSourceConfig batchSourceConfig = createBatchSourceConfig();
Expand Down

0 comments on commit 3631202

Please sign in to comment.