Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-12446: Call subtractor before adder if key is the same #10747

Merged
merged 12 commits into from
Apr 10, 2023
8 changes: 6 additions & 2 deletions docs/streams/developer-guide/dsl-api.html
Expand Up @@ -1024,7 +1024,9 @@ <h4 class="anchor-heading"><a id="streams_concepts_globalktable" class="anchor-l
<li>When the first non-<code class="docutils literal"><span class="pre">null</span></code> value is received for a key (e.g., INSERT), then only the adder is called.</li>
<li>When subsequent non-<code class="docutils literal"><span class="pre">null</span></code> values are received for a key (e.g., UPDATE), then (1) the subtractor is
called with the old value as stored in the table and (2) the adder is called with the new value of the
input record that was just received. The order of execution for the subtractor and adder is not defined.</li>
input record that was just received. The subtractor will be called before the adder if and only if the extracted grouping key of the old and new value is the same.
The detection of this case depends on the correct implementation of the equals() method of the extracted key type. Otherwise, the order of execution for the subtractor
and adder is not defined.</li>
<li>When a tombstone record &#8211; i.e. a record with a <code class="docutils literal"><span class="pre">null</span></code> value &#8211; is received for a key (e.g., DELETE),
then only the subtractor is called. Note that, whenever the subtractor returns a <code class="docutils literal"><span class="pre">null</span></code> value itself,
then the corresponding key is removed from the resulting <code class="docutils literal"><span class="pre">KTable</span></code>. If that happens, any next input
Expand Down Expand Up @@ -1272,7 +1274,9 @@ <h4 class="anchor-heading"><a id="streams_concepts_globalktable" class="anchor-l
<li>When the first non-<code class="docutils literal"><span class="pre">null</span></code> value is received for a key (e.g., INSERT), then only the adder is called.</li>
<li>When subsequent non-<code class="docutils literal"><span class="pre">null</span></code> values are received for a key (e.g., UPDATE), then (1) the subtractor is
called with the old value as stored in the table and (2) the adder is called with the new value of the
input record that was just received. The order of execution for the subtractor and adder is not defined.</li>
input record that was just received. The subtractor will be called before the adder if and only if the extracted grouping key of the old and new value is the same.
The detection of this case depends on the correct implementation of the equals() method of the extracted key type. Otherwise, the order of execution for the subtractor
and adder is not defined.</li>
<li>When a tombstone record &#8211; i.e. a record with a <code class="docutils literal"><span class="pre">null</span></code> value &#8211; is received for a key (e.g., DELETE),
then only the subtractor is called. Note that, whenever the subtractor returns a <code class="docutils literal"><span class="pre">null</span></code> value itself,
then the corresponding key is removed from the resulting <code class="docutils literal"><span class="pre">KTable</span></code>. If that happens, any next input
Expand Down
20 changes: 17 additions & 3 deletions docs/streams/upgrade-guide.html
Expand Up @@ -34,10 +34,10 @@ <h1>Upgrade Guide and API Changes</h1>
</div>

<p>
Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 3.2 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code>
(possible values are <code>"0.10.0" - "3.2"</code>) and during the second you remove it. This is required to safely handle 2 changes. The first is introduction of the new cooperative rebalancing protocol of the embedded consumer. The second is a change in foreign-key join serialization format.
Upgrading from any older version to {{fullDotVersion}} is possible: if upgrading from 3.4 or below, you will need to do two rolling bounces, where during the first rolling bounce phase you set the config <code>upgrade.from="older version"</code>
(possible values are <code>"0.10.0" - "3.4"</code>) and during the second you remove it. This is required to safely handle 3 changes. The first is introduction of the new cooperative rebalancing protocol of the embedded consumer. The second is a change in foreign-key join serialization format.
Note that you will remain using the old eager rebalancing protocol if you skip or delay the second rolling bounce, but you can safely switch over to cooperative at any time once the entire group is on 2.4+ by removing the config value and bouncing. For more details please refer to
<a href="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>:
<a href="https://cwiki.apache.org/confluence/x/vAclBg">KIP-429</a>. The third is a change in the serialization format for an internal repartition topic. For more details, please refer to <a href="https://cwiki.apache.org/confluence/x/P5VbDg">KIP-904</a>:
</p>
<ul>
<li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to the version from which it is being upgrade.</li>
Expand Down Expand Up @@ -75,6 +75,20 @@ <h3 class="anchor-heading"><a id="streams_notable_changes" class="anchor-link"><
changelogs.
</p>

<p>
Downgrading from 3.5.x or newer version to 3.4.x or older version needs special attention:
Since 3.5.0 release, Kafka Streams uses a new serialization format for repartition topics.
This means that older versions of Kafka Streams would not be able to recognize the bytes written by newer versions,
and hence it is harder to downgrade Kafka Streams with version 3.5.0 or newer to older versions in-flight. For
more details, please refer to <a href="https://cwiki.apache.org/confluence/x/P5VbDg">KIP-904</a>.

For a downgrade, first switch the config from <code>"upgrade.from"</code> to the version you are downgrading to.
This disables writing of the new serialization format in your application. It's important to wait in this state
for a few minutes to make sure that the application has finished processing any "in-flight" messages written
fqaiser94 marked this conversation as resolved.
Show resolved Hide resolved
into the repartition topics in the new serialization format. Afterwards, you can downgrade your application to a
pre-3.5.x version.
</p>

<p>
Kafka Streams does not support running multiple instances of the same application as different processes on the same physical state directory. Starting in 2.8.0 (as well as 2.7.1 and 2.6.2),
this restriction will be enforced. If you wish to run more than one instance of Kafka Streams, you must configure them with different values for <code>state.dir</code>.
Expand Down
Expand Up @@ -398,6 +398,12 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_33 = "3.3";

/**
* Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 3.4.x}.
*/
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_34 = "3.4";

fqaiser94 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG "processing.guarantee"} for at-least-once processing guarantees.
*/
Expand Down Expand Up @@ -749,7 +755,7 @@ public class StreamsConfig extends AbstractConfig {
UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\", \"" + UPGRADE_FROM_24 + "\", \"" +
UPGRADE_FROM_25 + "\", \"" + UPGRADE_FROM_26 + "\", \"" + UPGRADE_FROM_27 + "\", \"" +
UPGRADE_FROM_28 + "\", \"" + UPGRADE_FROM_30 + "\", \"" + UPGRADE_FROM_31 + "\", \"" +
UPGRADE_FROM_32 + "\", \"" + UPGRADE_FROM_33 + "\" (for upgrading from the corresponding old version).";
UPGRADE_FROM_32 + "\", \"" + UPGRADE_FROM_33 + "\", \"" + UPGRADE_FROM_34 + "\" (for upgrading from the corresponding old version).";

/** {@code windowstore.changelog.additional.retention.ms} */
@SuppressWarnings("WeakerAccess")
Expand Down Expand Up @@ -1104,7 +1110,8 @@ public class StreamsConfig extends AbstractConfig {
UPGRADE_FROM_30,
UPGRADE_FROM_31,
UPGRADE_FROM_32,
UPGRADE_FROM_33),
UPGRADE_FROM_33,
UPGRADE_FROM_34),
Importance.LOW,
UPGRADE_FROM_DOC)
.define(WINDOWED_INNER_CLASS_SERDE,
Expand Down
Expand Up @@ -18,13 +18,15 @@

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.SerdeGetter;

import java.nio.ByteBuffer;

public class ChangedDeserializer<T> implements Deserializer<Change<T>>, WrappingNullableDeserializer<Change<T>, Void, T> {

private static final int NEWFLAG_SIZE = 1;
fqaiser94 marked this conversation as resolved.
Show resolved Hide resolved
private static final int NEW_OLD_FLAG_SIZE = 1;

private Deserializer<T> inner;

Expand All @@ -46,16 +48,41 @@ public void setIfUnset(final SerdeGetter getter) {

@Override
public Change<T> deserialize(final String topic, final Headers headers, final byte[] data) {
// The format we need to deserialize is:
fqaiser94 marked this conversation as resolved.
Show resolved Hide resolved
// {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
// {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2}
final ByteBuffer buffer = ByteBuffer.wrap(data);
final byte newOldFlag = buffer.get(data.length - NEW_OLD_FLAG_SIZE);

final byte[] bytes = new byte[data.length - NEWFLAG_SIZE];
final byte[] newData;
final byte[] oldData;
if (newOldFlag == (byte) 0) {
newData = null;
final int oldDataLength = data.length - NEW_OLD_FLAG_SIZE;
oldData = new byte[oldDataLength];
buffer.get(oldData);
} else if (newOldFlag == (byte) 1) {
oldData = null;
final int newDataLength = data.length - NEW_OLD_FLAG_SIZE;
newData = new byte[newDataLength];
buffer.get(newData);
} else if (newOldFlag == (byte) 2) {
final int newDataLength = Math.toIntExact(ByteUtils.readUnsignedInt(buffer));
newData = new byte[newDataLength];

System.arraycopy(data, 0, bytes, 0, bytes.length);
final int oldDataLength = data.length - Integer.BYTES - newDataLength - NEW_OLD_FLAG_SIZE;
oldData = new byte[oldDataLength];

if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) {
return new Change<>(inner.deserialize(topic, headers, bytes), null);
buffer.get(newData);
buffer.get(oldData);
} else {
return new Change<>(null, inner.deserialize(topic, headers, bytes));
throw new StreamsException("Encountered unknown byte value `" + newOldFlag + "` for oldNewFlag in in ChangedDeserializer.");
fqaiser94 marked this conversation as resolved.
Show resolved Hide resolved
}

return new Change<>(
inner.deserialize(topic, headers, newData),
inner.deserialize(topic, headers, oldData));
}

@Override
Expand Down
Expand Up @@ -18,16 +18,20 @@

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.ByteUtils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.internals.SerdeGetter;

import java.nio.ByteBuffer;
import java.util.Map;

public class ChangedSerializer<T> implements Serializer<Change<T>>, WrappingNullableSerializer<Change<T>, Void, T> {

private static final int NEWFLAG_SIZE = 1;

private static final int NEW_OLD_FLAG_SIZE = 1;
private static final int UINT32_SIZE = 4;
private Serializer<T> inner;
private boolean isUpgrade;

public ChangedSerializer(final Serializer<T> inner) {
this.inner = inner;
Expand All @@ -45,34 +49,87 @@ public void setIfUnset(final SerdeGetter getter) {
}
}

@SuppressWarnings("checkstyle:cyclomaticComplexity")
private boolean isUpgrade(final Map<String, ?> configs) {
fqaiser94 marked this conversation as resolved.
Show resolved Hide resolved
final Object upgradeFrom = configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom == null) {
return false;
}

switch ((String) upgradeFrom) {
case StreamsConfig.UPGRADE_FROM_0100:
case StreamsConfig.UPGRADE_FROM_0101:
case StreamsConfig.UPGRADE_FROM_0102:
case StreamsConfig.UPGRADE_FROM_0110:
case StreamsConfig.UPGRADE_FROM_10:
case StreamsConfig.UPGRADE_FROM_11:
case StreamsConfig.UPGRADE_FROM_20:
case StreamsConfig.UPGRADE_FROM_21:
case StreamsConfig.UPGRADE_FROM_22:
case StreamsConfig.UPGRADE_FROM_23:
case StreamsConfig.UPGRADE_FROM_24:
case StreamsConfig.UPGRADE_FROM_25:
case StreamsConfig.UPGRADE_FROM_26:
case StreamsConfig.UPGRADE_FROM_27:
case StreamsConfig.UPGRADE_FROM_28:
case StreamsConfig.UPGRADE_FROM_30:
case StreamsConfig.UPGRADE_FROM_31:
case StreamsConfig.UPGRADE_FROM_32:
case StreamsConfig.UPGRADE_FROM_33:
case StreamsConfig.UPGRADE_FROM_34:
return true;
default:
return false;
}
}

@Override
public void configure(final Map<String, ?> configs, final boolean isKey) {
this.isUpgrade = isUpgrade(configs);
}

/**
* @throws StreamsException if both old and new values of data are null, or if
* both values are not null
* both values are not null and is upgrading from a version less than 3.4
*/
@Override
public byte[] serialize(final String topic, final Headers headers, final Change<T> data) {
final byte[] serializedKey;

// only one of the old / new values would be not null
if (data.newValue != null) {
if (data.oldValue != null) {
final boolean oldValueIsNull = data.oldValue == null;
final boolean newValueIsNull = data.newValue == null;
fqaiser94 marked this conversation as resolved.
Show resolved Hide resolved

final byte[] newData = inner.serialize(topic, headers, data.newValue);
final byte[] oldData = inner.serialize(topic, headers, data.oldValue);

final int newDataLength = newValueIsNull ? 0 : newData.length;
fqaiser94 marked this conversation as resolved.
Show resolved Hide resolved
final int oldDataLength = oldValueIsNull ? 0 : oldData.length;
fqaiser94 marked this conversation as resolved.
Show resolved Hide resolved

// The serialization format is:
// {BYTE_ARRAY oldValue}{BYTE newOldFlag=0}
// {BYTE_ARRAY newValue}{BYTE newOldFlag=1}
// {UINT32 newDataLength}{BYTE_ARRAY newValue}{BYTE_ARRAY oldValue}{BYTE newOldFlag=2}
final ByteBuffer buf;
if (!newValueIsNull && !oldValueIsNull) {
if (isUpgrade) {
throw new StreamsException("Both old and new values are not null (" + data.oldValue
+ " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
+ " : " + data.newValue + ") in ChangeSerializer, which is not allowed unless upgrading.");
} else {
final int capacity = UINT32_SIZE + newDataLength + oldDataLength + NEW_OLD_FLAG_SIZE;
buf = ByteBuffer.allocate(capacity);
ByteUtils.writeUnsignedInt(buf, newDataLength);
fqaiser94 marked this conversation as resolved.
Show resolved Hide resolved
buf.put(newData).put(oldData).put((byte) 2);
}

serializedKey = inner.serialize(topic, headers, data.newValue);
} else if (!newValueIsNull) {
final int capacity = newDataLength + NEW_OLD_FLAG_SIZE;
buf = ByteBuffer.allocate(capacity);
buf.put(newData).put((byte) 1);
} else if (!oldValueIsNull) {
final int capacity = oldDataLength + NEW_OLD_FLAG_SIZE;
buf = ByteBuffer.allocate(capacity);
buf.put(oldData).put((byte) 0);
} else {
if (data.oldValue == null) {
throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
}

serializedKey = inner.serialize(topic, headers, data.oldValue);
throw new StreamsException("Both old and new values are null in ChangeSerializer, which is not allowed.");
}

final ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE);
buf.put(serializedKey);
buf.put((byte) (data.newValue != null ? 1 : 0));

return buf.array();
}

fqaiser94 marked this conversation as resolved.
Show resolved Hide resolved
Expand Down