Skip to content

Commit

Permalink
Suggested changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Moritz Mack committed Nov 30, 2021
1 parent 8e81417 commit 18b88e7
Showing 1 changed file with 4 additions and 3 deletions.
Expand Up @@ -885,13 +885,13 @@ public PDone expand(PCollection<byte[]> input) {
getPartitionKey() == null || (getPartitioner() == null),
"only one of either withPartitionKey() or withPartitioner() is possible");
checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider() is required");
producerConfiguration(); // verify Kinesis producer configuration can be build
createProducerConfiguration(); // verify Kinesis producer configuration can be built

input.apply(ParDo.of(new KinesisWriterFn(this)));
return PDone.in(input.getPipeline());
}

private KinesisProducerConfiguration producerConfiguration() {
private KinesisProducerConfiguration createProducerConfiguration() {
Properties props = getProducerProperties();
if (props == null) {
props = new Properties();
Expand Down Expand Up @@ -930,7 +930,8 @@ private void setupSharedProducer() {
synchronized (producerRefCount) {
if (producer == null) {
producer =
spec.getAWSClientsProvider().createKinesisProducer(spec.producerConfiguration());
spec.getAWSClientsProvider()
.createKinesisProducer(spec.createProducerConfiguration());
producerRefCount.set(0);
}
}
Expand Down

0 comments on commit 18b88e7

Please sign in to comment.