Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix ConcurrentModificationException in IncrementalPublishingKafkaIndexTaskRunner #5907

Merged
merged 2 commits into from
Jul 1, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -1501,13 +1503,19 @@ public DateTime getStartTime(@Context final HttpServletRequest req)

private static class SequenceMetadata
{
/**
* Lock for accessing {@link #endOffsets} and {@link #checkpointed}. This lock is required because
* {@link #setEndOffsets)} can be called by both the main thread and the HTTP thread.
*/
private final ReentrantLock lock = new ReentrantLock();

private final int sequenceId;
private final String sequenceName;
private final Map<Integer, Long> startOffsets;
private final Map<Integer, Long> endOffsets;
private final Set<Integer> assignments;
private final boolean sentinel;
private volatile boolean checkpointed;
private boolean checkpointed;

@JsonCreator
public SequenceMetadata(
Expand All @@ -1524,8 +1532,8 @@ public SequenceMetadata(
this.sequenceId = sequenceId;
this.sequenceName = sequenceName;
this.startOffsets = ImmutableMap.copyOf(startOffsets);
this.endOffsets = Maps.newHashMap(endOffsets);
this.assignments = Sets.newHashSet(startOffsets.keySet());
this.endOffsets = new HashMap<>(endOffsets);
this.assignments = new HashSet<>(startOffsets.keySet());
this.checkpointed = checkpointed;
this.sentinel = false;
}
Expand All @@ -1539,7 +1547,13 @@ public int getSequenceId()
@JsonProperty
public boolean isCheckpointed()
{
return checkpointed;
lock.lock();
try {
return checkpointed;
}
finally {
lock.unlock();
}
}

@JsonProperty
Expand All @@ -1557,7 +1571,13 @@ public Map<Integer, Long> getStartOffsets()
@JsonProperty
public Map<Integer, Long> getEndOffsets()
{
return endOffsets;
lock.lock();
try {
return endOffsets;
}
finally {
lock.unlock();
}
}

@JsonProperty
Expand All @@ -1568,19 +1588,30 @@ public boolean isSentinel()

void setEndOffsets(Map<Integer, Long> newEndOffsets)
{
endOffsets.putAll(newEndOffsets);
checkpointed = true;
lock.lock();
try {
endOffsets.putAll(newEndOffsets);
checkpointed = true;
}
finally {
lock.unlock();
}
}

void updateAssignments(Map<Integer, Long> nextPartitionOffset)
{
assignments.clear();
nextPartitionOffset.entrySet().forEach(partitionOffset -> {
if (Longs.compare(endOffsets.get(partitionOffset.getKey()), nextPartitionOffset.get(partitionOffset.getKey()))
> 0) {
assignments.add(partitionOffset.getKey());
}
});
lock.lock();
try {
assignments.clear();
nextPartitionOffset.forEach((key, value) -> {
if (Longs.compare(endOffsets.get(key), nextPartitionOffset.get(key)) > 0) {
assignments.add(key);
}
});
}
finally {
lock.unlock();
}
}

boolean isOpen()
Expand All @@ -1590,10 +1621,17 @@ boolean isOpen()

boolean canHandle(ConsumerRecord<byte[], byte[]> record)
{
return isOpen()
&& endOffsets.get(record.partition()) != null
&& record.offset() >= startOffsets.get(record.partition())
&& record.offset() < endOffsets.get(record.partition());
lock.lock();
try {
final Long partitionEndOffset = endOffsets.get(record.partition());
return isOpen()
&& partitionEndOffset != null
&& record.offset() >= startOffsets.get(record.partition())
&& record.offset() < partitionEndOffset;
}
finally {
lock.unlock();
}
}

private SequenceMetadata()
Expand All @@ -1615,15 +1653,21 @@ static SequenceMetadata getSentinelSequenceMetadata()
@Override
public String toString()
{
return "SequenceMetadata{" +
"sequenceName='" + sequenceName + '\'' +
", sequenceId=" + sequenceId +
", startOffsets=" + startOffsets +
", endOffsets=" + endOffsets +
", assignments=" + assignments +
", sentinel=" + sentinel +
", checkpointed=" + checkpointed +
'}';
lock.lock();
try {
return "SequenceMetadata{" +
"sequenceName='" + sequenceName + '\'' +
", sequenceId=" + sequenceId +
", startOffsets=" + startOffsets +
", endOffsets=" + endOffsets +
", assignments=" + assignments +
", sentinel=" + sentinel +
", checkpointed=" + checkpointed +
'}';
}
finally {
lock.unlock();
}
}

Supplier<Committer> getCommitterSupplier(String topic, Map<Integer, Long> lastPersistedOffsets)
Expand All @@ -1635,28 +1679,40 @@ Supplier<Committer> getCommitterSupplier(String topic, Map<Integer, Long> lastPe
@Override
public Object getMetadata()
{
Preconditions.checkState(
assignments.isEmpty(),
"This committer can be used only once all the records till offsets [%s] have been consumed, also make sure to call updateAssignments before using this committer",
endOffsets
);
lock.lock();

// merge endOffsets for this sequence with globally lastPersistedOffsets
// This is done because this committer would be persisting only sub set of segments
// corresponding to the current sequence. Generally, lastPersistedOffsets should already
// cover endOffsets but just to be sure take max of offsets and persist that
for (Map.Entry<Integer, Long> partitionOffset : endOffsets.entrySet()) {
lastPersistedOffsets.put(partitionOffset.getKey(), Math.max(
partitionOffset.getValue(),
lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
));
}
try {
Preconditions.checkState(
assignments.isEmpty(),
"This committer can be used only once all the records till offsets [%s] have been consumed, also make"
+ " sure to call updateAssignments before using this committer",
endOffsets
);

// Publish metadata can be different from persist metadata as we are going to publish only
// subset of segments
return ImmutableMap.of(METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets),
METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets)
);
// merge endOffsets for this sequence with globally lastPersistedOffsets
// This is done because this committer would be persisting only sub set of segments
// corresponding to the current sequence. Generally, lastPersistedOffsets should already
// cover endOffsets but just to be sure take max of offsets and persist that
for (Map.Entry<Integer, Long> partitionOffset : endOffsets.entrySet()) {
lastPersistedOffsets.put(
partitionOffset.getKey(),
Math.max(
partitionOffset.getValue(),
lastPersistedOffsets.getOrDefault(partitionOffset.getKey(), 0L)
)
);
}

// Publish metadata can be different from persist metadata as we are going to publish only
// subset of segments
return ImmutableMap.of(
METADATA_NEXT_PARTITIONS, new KafkaPartitions(topic, lastPersistedOffsets),
METADATA_PUBLISH_PARTITIONS, new KafkaPartitions(topic, endOffsets)
);
}
finally {
lock.unlock();
}
}

@Override
Expand Down