Skip to content

Commit

Permalink
Adds backwards-compatible serde for SeekableStreamStartSequenceNumbers.
Browse files Browse the repository at this point in the history
This allows them to be deserialized by older Druid versions as
KafkaPartitions objects.

Fixes #7470.
  • Loading branch information
gianm committed Apr 19, 2019
1 parent 0a0fd63 commit 65569cd
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public String getStream()

/**
* Identical to {@link #getStream()}. Here for backwards compatibility, so a serialized
* SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
* SeekableStreamEndSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
*/
@JsonProperty
public String getTopic()
Expand Down Expand Up @@ -182,7 +182,7 @@ public SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> minus(

/**
* Identical to {@link #getPartitionSequenceNumberMap()} ()}. Here for backwards compatibility, so a serialized
* SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
* SeekableStreamEndSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
*/
@JsonProperty
public Map<PartitionIdType, SequenceOffsetType> getPartitionOffsetMap()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,37 +50,71 @@ public class SeekableStreamStartSequenceNumbers<PartitionIdType, SequenceOffsetT
@JsonCreator
public SeekableStreamStartSequenceNumbers(
@JsonProperty("stream") final String stream,
// kept for backward compatibility
@JsonProperty("topic") final String topic,
@JsonProperty("partitionSequenceNumberMap")
final Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap,
// kept for backward compatibility
@JsonProperty("partitionOffsetMap") final Map<PartitionIdType, SequenceOffsetType> partitionOffsetMap,
@JsonProperty("exclusivePartitions") @Nullable final Set<PartitionIdType> exclusivePartitions
)
{
this.stream = Preconditions.checkNotNull(stream, "stream");
this.partitionSequenceNumberMap = Preconditions.checkNotNull(
partitionSequenceNumberMap,
"partitionIdToSequenceNumberMap"
);
this.stream = stream == null ? topic : stream;
this.partitionSequenceNumberMap = partitionOffsetMap == null ? partitionSequenceNumberMap : partitionOffsetMap;

Preconditions.checkNotNull(this.stream, "stream");
Preconditions.checkNotNull(this.partitionSequenceNumberMap, "partitionIdToSequenceNumberMap");

// exclusiveOffset can be null if this class is deserialized from metadata store. Note that only end offsets are
// stored in metadata store.
// The default is true because there was only Kafka indexing service before in which the end offset is always
// exclusive.
this.exclusivePartitions = exclusivePartitions == null ? Collections.emptySet() : exclusivePartitions;
}

public SeekableStreamStartSequenceNumbers(
String stream,
Map<PartitionIdType, SequenceOffsetType> partitionSequenceNumberMap,
Set<PartitionIdType> exclusivePartitions
)
{
this(stream, null, partitionSequenceNumberMap, null, exclusivePartitions);
}

@Override
@JsonProperty
public String getStream()
{
return stream;
}

/**
* Identical to {@link #getStream()}. Here for backwards compatibility, so a serialized
* SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
*/
@JsonProperty
public String getTopic()
{
return stream;
}

@Override
@JsonProperty
public Map<PartitionIdType, SequenceOffsetType> getPartitionSequenceNumberMap()
{
return partitionSequenceNumberMap;
}

/**
* Identical to {@link #getPartitionSequenceNumberMap()} ()}. Here for backwards compatibility, so a serialized
* SeekableStreamStartSequenceNumbers can be read by older Druid versions as a KafkaPartitions object.
*/
@JsonProperty
public Map<PartitionIdType, SequenceOffsetType> getPartitionOffsetMap()
{
return partitionSequenceNumberMap;
}

@Override
public SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> plus(
SeekableStreamSequenceNumbers<PartitionIdType, SequenceOffsetType> other
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.seekablestream;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.java.util.common.jackson.JacksonUtils;
import org.apache.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;

import java.util.Map;

public class SeekableStreamStartSequenceNumbersTest
{
private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();

@Test
public void testSerde() throws Exception
{
final String stream = "theStream";
final Map<Integer, Long> offsetMap = ImmutableMap.of(1, 2L, 3, 4L);

final SeekableStreamStartSequenceNumbers<Integer, Long> partitions = new SeekableStreamStartSequenceNumbers<>(
stream,
offsetMap,
ImmutableSet.of(6)
);
final String serializedString = OBJECT_MAPPER.writeValueAsString(partitions);

// Check round-trip.
final SeekableStreamStartSequenceNumbers<Integer, Long> partitions2 = OBJECT_MAPPER.readValue(
serializedString,
new TypeReference<SeekableStreamStartSequenceNumbers<Integer, Long>>() {}
);

Assert.assertEquals("Round trip", partitions, partitions2);

// Check backwards compatibility.
final Map<String, Object> asMap = OBJECT_MAPPER.readValue(
serializedString,
JacksonUtils.TYPE_REFERENCE_MAP_STRING_OBJECT
);

Assert.assertEquals(stream, asMap.get("stream"));
Assert.assertEquals(stream, asMap.get("topic"));

// Jackson will deserialize the maps as string -> int maps, not int -> long.
Assert.assertEquals(
offsetMap,
OBJECT_MAPPER.convertValue(asMap.get("partitionSequenceNumberMap"), new TypeReference<Map<Integer, Long>>() {})
);
Assert.assertEquals(
offsetMap,
OBJECT_MAPPER.convertValue(asMap.get("partitionOffsetMap"), new TypeReference<Map<Integer, Long>>() {})
);
}
}

0 comments on commit 65569cd

Please sign in to comment.