Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -742,7 +743,7 @@ public boolean isOffsetAvailable(StreamPartition<String> partition, OrderedSeque
});
}

public Set<Shard> getShards(String stream)
private Set<Shard> getShards(String stream)
{
if (useListShards) {
return getShardsUsingListShards(stream);
Expand Down Expand Up @@ -782,7 +783,8 @@ private Set<Shard> getShardsUsingDescribeStream(String stream)
* This makes the method resilient to LimitExceeded exceptions (compared to 100 shards, 10 TPS of describeStream)
*
* @param stream name of stream
* @return Immutable set of shards
*
* @return Set of Shard ids
*/
private Set<Shard> getShardsUsingListShards(String stream)
{
Expand All @@ -803,11 +805,11 @@ private Set<Shard> getShardsUsingListShards(String stream)
public Set<String> getPartitionIds(String stream)
{
return wrapExceptions(() -> {
ImmutableSet.Builder<String> partitionIds = ImmutableSet.builder();
Set<String> partitionIds = new TreeSet<>();
for (Shard shard : getShards(stream)) {
partitionIds.add(shard.getShardId());
}
return partitionIds.build();
return partitionIds;
});
}

Expand Down Expand Up @@ -870,25 +872,6 @@ public boolean isAnyFetchActive()
.anyMatch(fetch -> (fetch != null && !fetch.isDone()));
}

/**
* Fetches records from the specified shard to determine if it is empty.
* @param stream to which shard belongs
* @param shardId of the closed shard
* @return true if the closed shard is empty, false otherwise.
*/
public boolean isClosedShardEmpty(String stream, String shardId)
{
String shardIterator = kinesis.getShardIterator(stream,
shardId,
ShardIteratorType.TRIM_HORIZON.toString())
.getShardIterator();
GetRecordsRequest request = new GetRecordsRequest().withShardIterator(shardIterator)
.withLimit(1);
GetRecordsResult shardData = kinesis.getRecords(request);

return shardData.getRecords().isEmpty() && shardData.getNextShardIterator() == null;
}

/**
* Check that a {@link PartitionResource} has been assigned to this record supplier, and if so call
* {@link PartitionResource#seek} to move it to the latest offsets. Note that this method does not restart background
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@

package org.apache.druid.indexing.kinesis.supervisor;

import com.amazonaws.services.kinesis.model.Shard;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.ByteEntity;
Expand Down Expand Up @@ -66,7 +64,6 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.stream.Collectors;

/**
Expand All @@ -91,11 +88,6 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
private final AWSCredentialsConfig awsCredentialsConfig;
private volatile Map<String, Long> currentPartitionTimeLag;

// Maintain sets of currently closed shards to find ignorable (closed and empty) shards
// Poll closed shards once and store the result to avoid redundant costly calls to kinesis
private final Set<String> emptyClosedShardIds = new TreeSet<>();
private final Set<String> nonEmptyClosedShardIds = new TreeSet<>();

public KinesisSupervisor(
final TaskStorage taskStorage,
final TaskMaster taskMaster,
Expand Down Expand Up @@ -425,52 +417,6 @@ protected boolean supportsPartitionExpiration()
return true;
}

@Override
protected boolean shouldSkipIgnorablePartitions()
{
return spec.getSpec().getTuningConfig().isSkipIgnorableShards();
}

/**
* A kinesis shard is considered to be an ignorable partition if it is both closed and empty
* @return set of shards ignorable by kinesis ingestion
*/
@Override
protected Set<String> computeIgnorablePartitionIds()
{
updateClosedShardCache();
return ImmutableSet.copyOf(emptyClosedShardIds);
}

private synchronized void updateClosedShardCache()
{
final KinesisRecordSupplier kinesisRecordSupplier = (KinesisRecordSupplier) recordSupplier;
final String stream = spec.getSource();
final Set<Shard> allActiveShards = kinesisRecordSupplier.getShards(stream);
final Set<String> activeClosedShards = allActiveShards.stream()
.filter(shard -> isShardClosed(shard))
.map(Shard::getShardId)
.collect(Collectors.toSet());

// clear stale shards
emptyClosedShardIds.retainAll(activeClosedShards);
nonEmptyClosedShardIds.retainAll(activeClosedShards);

for (String closedShardId : activeClosedShards) {
// Try to utilize cache
if (emptyClosedShardIds.contains(closedShardId) || nonEmptyClosedShardIds.contains(closedShardId)) {
continue;
}

// Check if it is closed using kinesis and add to cache
if (kinesisRecordSupplier.isClosedShardEmpty(stream, closedShardId)) {
emptyClosedShardIds.add(closedShardId);
} else {
nonEmptyClosedShardIds.add(closedShardId);
}
}
}

@Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata, Set<String> expiredPartitionIds
Expand Down Expand Up @@ -536,15 +482,4 @@ private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadat

return new KinesisDataSourceMetadata(newSequences);
}

/**
* A shard is considered closed iff it has an ending sequence number.
*
* @param shard to be checked
* @return if shard is closed
*/
private boolean isShardClosed(Shard shard)
{
return shard.getSequenceNumberRange().getEndingSequenceNumber() != null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
private final Duration repartitionTransitionDuration;
private final Duration offsetFetchPeriod;
private final boolean useListShards;
private final boolean skipIgnorableShards;

public static KinesisSupervisorTuningConfig defaultConfig()
{
Expand Down Expand Up @@ -77,7 +76,6 @@ public static KinesisSupervisorTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}
Expand Down Expand Up @@ -114,8 +112,7 @@ public KinesisSupervisorTuningConfig(
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod,
@JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration,
@JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod,
@JsonProperty("useListShards") Boolean useListShards,
@JsonProperty("skipIgnorableShards") Boolean skipIgnorableShards
@JsonProperty("useListShards") Boolean useListShards
)
{
super(
Expand Down Expand Up @@ -163,7 +160,6 @@ public KinesisSupervisorTuningConfig(
DEFAULT_OFFSET_FETCH_PERIOD
);
this.useListShards = (useListShards != null ? useListShards : false);
this.skipIgnorableShards = (skipIgnorableShards != null ? skipIgnorableShards : false);
}

@Override
Expand Down Expand Up @@ -220,12 +216,6 @@ public boolean isUseListShards()
return useListShards;
}

@JsonProperty
public boolean isSkipIgnorableShards()
{
return skipIgnorableShards;
}

@Override
public String toString()
{
Expand Down Expand Up @@ -259,7 +249,6 @@ public String toString()
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
", repartitionTransitionDuration=" + getRepartitionTransitionDuration() +
", useListShards=" + isUseListShards() +
", skipIgnorableShards=" + isSkipIgnorableShards() +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,6 @@ public void testConvert()
null,
null,
null,
null,
null
);
KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -1107,45 +1106,6 @@ public void getPartitionTimeLag() throws InterruptedException
verifyAll();
}

@Test
public void testIsClosedShardEmpty()
{
AmazonKinesis mockKinesis = EasyMock.mock(AmazonKinesis.class);
KinesisRecordSupplier target = new KinesisRecordSupplier(mockKinesis,
recordsPerFetch,
0,
2,
false,
100,
5000,
5000,
5,
true,
false
);
Record record = new Record();

final String shardWithoutRecordsAndNullNextIterator = "0";
setupMockKinesisForShardId(mockKinesis, shardWithoutRecordsAndNullNextIterator, new ArrayList<>(), null);

final String shardWithRecordsAndNullNextIterator = "1";
setupMockKinesisForShardId(mockKinesis, shardWithRecordsAndNullNextIterator, Collections.singletonList(record), null);

final String shardWithoutRecordsAndNonNullNextIterator = "2";
setupMockKinesisForShardId(mockKinesis, shardWithoutRecordsAndNonNullNextIterator, new ArrayList<>(), "nextIterator");

final String shardWithRecordsAndNonNullNextIterator = "3";
setupMockKinesisForShardId(mockKinesis, shardWithRecordsAndNonNullNextIterator, Collections.singletonList(record), "nextIterator");

EasyMock.replay(mockKinesis);

// A closed shard is empty only when the records are empty and the next iterator is null
Assert.assertTrue(target.isClosedShardEmpty(STREAM, shardWithoutRecordsAndNullNextIterator));
Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithRecordsAndNullNextIterator));
Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithoutRecordsAndNonNullNextIterator));
Assert.assertFalse(target.isClosedShardEmpty(STREAM, shardWithRecordsAndNonNullNextIterator));
}

@Test
public void testIsOffsetAvailable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.apache.druid.indexing.kinesis.supervisor;

import com.amazonaws.services.kinesis.model.SequenceNumberRange;
import com.amazonaws.services.kinesis.model.Shard;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -107,7 +105,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -208,7 +205,6 @@ public void setupTest()
null,
null,
null,
null,
null
);
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
Expand Down Expand Up @@ -3970,7 +3966,6 @@ public void testIsTaskCurrent()
null,
null,
null,
null,
null
);

Expand Down Expand Up @@ -4924,55 +4919,6 @@ private void testShardMergePhaseThree(List<Task> phaseTwoTasks) throws Exception
Assert.assertEquals(expectedPartitionOffsets, supervisor.getPartitionOffsets());
}

@Test
public void testGetIgnorablePartitionIds()
{
supervisor = getTestableSupervisor(1, 2, true, "PT1H", null, null);
supervisor.setupRecordSupplier();
supervisor.tryInit();
String stream = supervisor.getKinesisSupervisorSpec().getSource();
SequenceNumberRange openShardRange = new SequenceNumberRange().withEndingSequenceNumber(null);
SequenceNumberRange closedShardRange = new SequenceNumberRange().withEndingSequenceNumber("non-null");

Shard openShard = new Shard().withShardId("openShard")
.withSequenceNumberRange(openShardRange);
Shard emptyClosedShard = new Shard().withShardId("emptyClosedShard")
.withSequenceNumberRange(closedShardRange);
Shard nonEmptyClosedShard = new Shard().withShardId("nonEmptyClosedShard")
.withSequenceNumberRange(closedShardRange);

EasyMock.expect(supervisorRecordSupplier.getShards(stream))
.andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once()
.andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once()
.andReturn(ImmutableSet.of(openShard, emptyClosedShard)).once()
.andReturn(ImmutableSet.of(openShard)).once()
.andReturn(ImmutableSet.of(openShard, nonEmptyClosedShard, emptyClosedShard)).once();

// The following calls happen twice, once during the first call since there was no cache,
// and once during the last since the cache was cleared prior to it
EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, emptyClosedShard.getShardId()))
.andReturn(true).times(2);
EasyMock.expect(supervisorRecordSupplier.isClosedShardEmpty(stream, nonEmptyClosedShard.getShardId()))
.andReturn(false).times(2);

EasyMock.replay(supervisorRecordSupplier);

// ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed}
// {empty-closed, nonEmpty-closed} added to cache
Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
// ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed}
Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
// ActiveShards = {open, empty-closed}, IgnorableShards = {empty-closed}
// {nonEmpty-closed} removed from cache
Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
// ActiveShards = {open}, IgnorableShards = {}
// {empty-closed} removed from cache
Assert.assertEquals(new HashSet<>(), supervisor.computeIgnorablePartitionIds());
// ActiveShards = {open, empty-closed, nonEmpty-closed}, IgnorableShards = {empty-closed}
// {empty-closed, nonEmpty-closed} re-added to cache
Assert.assertEquals(Collections.singleton(emptyClosedShard.getShardId()), supervisor.computeIgnorablePartitionIds());
}

private TestableKinesisSupervisor getTestableSupervisor(
int replicas,
int taskCount,
Expand Down Expand Up @@ -5082,7 +5028,6 @@ public KinesisIndexTaskClient build(
null,
null,
null,
null,
null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,11 @@ boolean isOffsetAvailable(StreamPartition<PartitionIdType> partition,
SequenceOffsetType getPosition(StreamPartition<PartitionIdType> partition);

/**
* returns the set of all available partitions under the given stream
* returns the set of partitions under the given stream
*
* @param stream name of stream
*
* @return set of partition ids belonging to the stream
* @return set of partitions
*/
Set<PartitionIdType> getPartitionIds(String stream);

Expand Down
Loading