diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 0f549aaa74e2..f8f84e9e9c28 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -71,6 +71,7 @@ import static org.apache.kafka.common.config.ConfigDef.Range.between; import static org.apache.kafka.common.config.ConfigDef.ValidString.in; import static org.apache.kafka.common.config.ConfigDef.parseType; +import static org.apache.kafka.streams.internals.UpgradeFromValues.UPGRADE_FROM_35; /** * Configuration for a {@link KafkaStreams} instance. @@ -762,7 +763,8 @@ public class StreamsConfig extends AbstractConfig { UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\", \"" + UPGRADE_FROM_24 + "\", \"" + UPGRADE_FROM_25 + "\", \"" + UPGRADE_FROM_26 + "\", \"" + UPGRADE_FROM_27 + "\", \"" + UPGRADE_FROM_28 + "\", \"" + UPGRADE_FROM_30 + "\", \"" + UPGRADE_FROM_31 + "\", \"" + - UPGRADE_FROM_32 + "\", \"" + UPGRADE_FROM_33 + "\", \"" + UPGRADE_FROM_34 + "\" (for upgrading from the corresponding old version)."; + UPGRADE_FROM_32 + "\", \"" + UPGRADE_FROM_33 + "\", \"" + UPGRADE_FROM_34 + "\", \"" + + UPGRADE_FROM_35 + "(for upgrading from the corresponding old version)."; /** {@code windowstore.changelog.additional.retention.ms} */ @SuppressWarnings("WeakerAccess") diff --git a/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java b/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java index a5ae71a33d4e..e803e918701a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java +++ b/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java @@ -36,7 +36,8 @@ public enum UpgradeFromValues { UPGRADE_FROM_31("3.1"), UPGRADE_FROM_32("3.2"), UPGRADE_FROM_33("3.3"), - UPGRADE_FROM_34("3.4"); + UPGRADE_FROM_34("3.4"), + UPGRADE_FROM_35("3.5"); private final String value; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java index 02a0cb8bab90..76e824776e99 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java @@ -125,6 +125,7 @@ public RebalanceProtocol rebalanceProtocol() { case UPGRADE_FROM_32: case UPGRADE_FROM_33: case UPGRADE_FROM_34: + case UPGRADE_FROM_35: // we need to add new version when new "upgrade.from" values become available // This config is for explicitly sending FK response to a requested partition @@ -183,6 +184,7 @@ public int configuredMetadataVersion(final int priorVersion) { case UPGRADE_FROM_32: case UPGRADE_FROM_33: case UPGRADE_FROM_34: + case UPGRADE_FROM_35: // we need to add new version when new "upgrade.from" values become available // This config is for explicitly sending FK response to a requested partition