Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-13236] Properly close kinesis producer on teardown #15955

Merged
merged 5 commits into from Dec 7, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,7 +34,6 @@
import com.google.auto.value.AutoValue;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -884,28 +883,79 @@ public PDone expand(PCollection<byte[]> input) {
getPartitionKey() == null || (getPartitioner() == null),
"only one of either withPartitionKey() or withPartitioner() is possible");
checkArgument(getAWSClientsProvider() != null, "withAWSClientsProvider() is required");
createProducerConfiguration(); // verify Kinesis producer configuration can be built

input.apply(ParDo.of(new KinesisWriterFn(this)));
return PDone.in(input.getPipeline());
}

private static class KinesisWriterFn extends DoFn<byte[], Void> {
private KinesisProducerConfiguration createProducerConfiguration() {
Properties props = getProducerProperties();
if (props == null) {
props = new Properties();
}
return KinesisProducerConfiguration.fromProperties(props);
}

private static class KinesisWriterFn extends DoFn<byte[], Void> {
private static final int MAX_NUM_FAILURES = 10;

/** Usage count of static, shared Kinesis producer. */
private static int producerRefCount = 0;

/** Static, shared Kinesis producer. */
echauchot marked this conversation as resolved.
Show resolved Hide resolved
private static IKinesisProducer producer;
mosche marked this conversation as resolved.
Show resolved Hide resolved

private final KinesisIO.Write spec;
private static transient IKinesisProducer producer;

private transient KinesisPartitioner partitioner;
private transient LinkedBlockingDeque<KinesisWriteException> failures;
private transient List<Future<UserRecordResult>> putFutures;

KinesisWriterFn(KinesisIO.Write spec) {
this.spec = spec;
initKinesisProducer();
}

/**
* Initialize statically shared Kinesis producer if required and count usage.
*
* <p>NOTE: If there is, for whatever reasons, another instance of a {@link KinesisWriterFn}
* with different producer properties or even a different implementation of {@link
* AWSClientsProvider}, these changes will be silently discarded in favor of an existing
* producer instance.
*/
private void setupSharedProducer() {
synchronized (KinesisWriterFn.class) {
if (producer == null) {
producer =
spec.getAWSClientsProvider()
.createKinesisProducer(spec.createProducerConfiguration());
producerRefCount = 0;
}
producerRefCount++;
}
}

/**
* Discard statically shared producer if it is not used anymore according to the usage count.
*/
private void teardownSharedProducer() {
IKinesisProducer obsolete = null;
synchronized (KinesisWriterFn.class) {
if (--producerRefCount == 0) {
obsolete = producer;
producer = null;
}
}
if (obsolete != null) {
obsolete.flushSync(); // should be a noop, but just in case
obsolete.destroy();
}
}

@Setup
public void setup() {
setupSharedProducer();
// Use custom partitioner if it exists
if (spec.getPartitioner() != null) {
partitioner = spec.getPartitioner();
Expand All @@ -917,30 +967,6 @@ public void startBundle() {
putFutures = Collections.synchronizedList(new ArrayList<>());
/** Keep only the first {@link MAX_NUM_FAILURES} occurred exceptions */
failures = new LinkedBlockingDeque<>(MAX_NUM_FAILURES);
initKinesisProducer();
}

private synchronized void initKinesisProducer() {
echauchot marked this conversation as resolved.
Show resolved Hide resolved
// Init producer config
Properties props = spec.getProducerProperties();
if (props == null) {
props = new Properties();
}
KinesisProducerConfiguration config = KinesisProducerConfiguration.fromProperties(props);
// Fix to avoid the following message "WARNING: Exception during updateCredentials" during
// producer.destroy() call. More details can be found in this thread:
// https://github.com/awslabs/amazon-kinesis-producer/issues/10
config.setCredentialsRefreshDelay(100);
mosche marked this conversation as resolved.
Show resolved Hide resolved

// Init Kinesis producer
if (producer == null) {
producer = spec.getAWSClientsProvider().createKinesisProducer(config);
}
}

private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {
is.defaultReadObject();
initKinesisProducer();
}

/**
Expand Down Expand Up @@ -1067,10 +1093,7 @@ private void checkForFailures(String message) throws IOException {

@Teardown
public void teardown() throws Exception {
if (producer != null && producer.getOutstandingRecordsCount() > 0) {
producer.flushSync();
}
producer = null;
teardownSharedProducer();
}
}
}
Expand Down