From 161f38b58064404e78b421c8d9b6e493e6222000 Mon Sep 17 00:00:00 2001 From: ran Date: Mon, 31 May 2021 15:48:19 +0800 Subject: [PATCH] Fix kinesis sink backoff class not found (#10744) * copy the class `Backoff` to the project Kinesis connector (cherry picked from commit e19f6472183ac93dc058a96f7ac0537c22d90e18) --- .../org/apache/pulsar/io/kinesis/Backoff.java | 128 ++++++++++++++++++ .../apache/pulsar/io/kinesis/KinesisSink.java | 1 - 2 files changed, 128 insertions(+), 1 deletion(-) create mode 100644 pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Backoff.java diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Backoff.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Backoff.java new file mode 100644 index 0000000000000..babacd4b40da7 --- /dev/null +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Backoff.java @@ -0,0 +1,128 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.kinesis; + +import com.google.common.annotations.VisibleForTesting; +import java.time.Clock; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import lombok.Data; + + +/** + * All variables are in TimeUnit millis by default. + */ +@Data +public class Backoff { + public static final long DEFAULT_INTERVAL_IN_NANOSECONDS = TimeUnit.MILLISECONDS.toNanos(100); + public static final long MAX_BACKOFF_INTERVAL_NANOSECONDS = TimeUnit.SECONDS.toNanos(30); + private final long initial; + private final long max; + private final Clock clock; + private long next; + private long mandatoryStop; + + private long firstBackoffTimeInMillis; + private boolean mandatoryStopMade = false; + + private static final Random random = new Random(); + + @VisibleForTesting + Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop, + TimeUnit unitMandatoryStop, Clock clock) { + this.initial = unitInitial.toMillis(initial); + this.max = unitMax.toMillis(max); + this.next = this.initial; + this.mandatoryStop = unitMandatoryStop.toMillis(mandatoryStop); + this.clock = clock; + } + + public Backoff(long initial, TimeUnit unitInitial, long max, TimeUnit unitMax, long mandatoryStop, + TimeUnit unitMandatoryStop) { + this(initial, unitInitial, max, unitMax, mandatoryStop, unitMandatoryStop, Clock.systemDefaultZone()); + } + + public long next() { + long current = this.next; + if (current < max) { + this.next = Math.min(this.next * 2, this.max); + } + + // Check for mandatory stop + if (!mandatoryStopMade) { + long now = clock.millis(); + long timeElapsedSinceFirstBackoff = 0; + if (initial == current) { + firstBackoffTimeInMillis = now; + } else { + timeElapsedSinceFirstBackoff = now - firstBackoffTimeInMillis; + } + + if (timeElapsedSinceFirstBackoff + current > mandatoryStop) { + current = Math.max(initial, mandatoryStop - timeElapsedSinceFirstBackoff); + mandatoryStopMade = true; + } + } + + // Randomly decrease the timeout up to 10% to avoid simultaneous retries + // If current < 10 then current/10 < 1 and we get an exception from Random saying "Bound must be positive" + if (current > 10) { + current -= random.nextInt((int) current / 10); + } + return Math.max(initial, current); + } + + public void reduceToHalf() { + if (next > initial) { + this.next = Math.max(this.next / 2, this.initial); + } + } + + public void reset() { + this.next = this.initial; + this.mandatoryStopMade = false; + } + + @VisibleForTesting + long getFirstBackoffTimeInMillis() { + return firstBackoffTimeInMillis; + } + + public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts, + long defaultInterval, long maxBackoffInterval) { + long initialTimestampInNano = unitInitial.toNanos(initialTimestamp); + long currentTime = System.nanoTime(); + long interval = defaultInterval; + for (int i = 1; i < failedAttempts; i++) { + interval = interval * 2; + if (interval > maxBackoffInterval) { + interval = maxBackoffInterval; + break; + } + } + + // if the current time is less than the time at which next retry should occur, we should backoff + return currentTime < (initialTimestampInNano + interval); + } + + public static boolean shouldBackoff(long initialTimestamp, TimeUnit unitInitial, int failedAttempts) { + return Backoff.shouldBackoff(initialTimestamp, unitInitial, failedAttempts, + DEFAULT_INTERVAL_IN_NANOSECONDS, MAX_BACKOFF_INTERVAL_NANOSECONDS); + } +} diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index adc7c3c5f956b..ed7c70cd76d9f 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -47,7 +47,6 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; -import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.io.aws.AbstractAwsConnector; import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;