Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support new "generations" of existing segments #1

Merged
merged 3 commits into from Jul 27, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 36 additions & 0 deletions common/src/main/java/io/druid/timeline/SegmentGenerationUtils.java
@@ -0,0 +1,36 @@
package io.druid.timeline;

/**
* A segment with version containing _gen_ is considered to be a new generation of an existing segment, and the version
* it is generated from (source version) matches the part before _gen_.
* <p>
* A generated segment has the following properties:
* <ul>
* <li>
* It overshadows its base version segments per partition - if there is no generated segment with a specific
* partition number, then the source version segment with that partition is active.
* </li>
* <li>
* Its version is not used for allocating new segments internally - if it is the latest version for an interval,
* then the segment is allocated to the source version instead. Due to the previous property, the new allocated
* segment would still be active until a generated version of the same partition is created.
* </li>
* </ul>
*/
public class SegmentGenerationUtils
{
/**
* In case the given version string represents a custom generation of some base version, returns that base version.
* Otherwise returns <code>null</code>.
*/
public static String getSourceVersion(String version)
{
int generationIndex = version.indexOf("_gen_");

if (generationIndex >= 0) {
return version.substring(0, generationIndex);
} else {
return null;
}
}
}
128 changes: 108 additions & 20 deletions common/src/main/java/io/druid/timeline/VersionedIntervalTimeline.java
Expand Up @@ -30,6 +30,7 @@
import io.druid.timeline.partition.ImmutablePartitionHolder;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder;
import java.util.Collections;
import org.joda.time.Interval;

import java.util.ArrayList;
Expand Down Expand Up @@ -236,25 +237,8 @@ public Set<TimelineObjectHolder<VersionType, ObjectType>> findOvershadowed()
overShadowed.put(versionEntry.getKey(), versionCopy);
}

for (Map.Entry<Interval, TimelineEntry> entry : completePartitionsTimeline.entrySet()) {
Map<VersionType, TimelineEntry> versionEntry = overShadowed.get(entry.getValue().getTrueInterval());
if (versionEntry != null) {
versionEntry.remove(entry.getValue().getVersion());
if (versionEntry.isEmpty()) {
overShadowed.remove(entry.getValue().getTrueInterval());
}
}
}

for (Map.Entry<Interval, TimelineEntry> entry : incompletePartitionsTimeline.entrySet()) {
Map<VersionType, TimelineEntry> versionEntry = overShadowed.get(entry.getValue().getTrueInterval());
if (versionEntry != null) {
versionEntry.remove(entry.getValue().getVersion());
if (versionEntry.isEmpty()) {
overShadowed.remove(entry.getValue().getTrueInterval());
}
}
}
removeActiveFromOverShadowed(completePartitionsTimeline, overShadowed);
removeActiveFromOverShadowed(incompletePartitionsTimeline, overShadowed);

for (Map.Entry<Interval, Map<VersionType, TimelineEntry>> versionEntry : overShadowed.entrySet()) {
for (Map.Entry<VersionType, TimelineEntry> entry : versionEntry.getValue().entrySet()) {
Expand All @@ -276,14 +260,73 @@ public Set<TimelineObjectHolder<VersionType, ObjectType>> findOvershadowed()
}
}

private void removeActiveFromOverShadowed(
NavigableMap<Interval, TimelineEntry> timeline,
Map<Interval, Map<VersionType, TimelineEntry>> overShadowed
)
{

for (Map.Entry<Interval, TimelineEntry> entry : timeline.entrySet()) {
Map<VersionType, TimelineEntry> versionEntry = overShadowed.get(entry.getValue().getTrueInterval());
if (versionEntry != null) {
versionEntry.remove(entry.getValue().getVersion());
if (versionEntry.isEmpty()) {
overShadowed.remove(entry.getValue().getTrueInterval());
}

VersionType sourceVersion = getSourceVersion(entry.getValue().getVersion());
TimelineEntry sourceEntry = sourceVersion != null ? versionEntry.get(sourceVersion) : null;

// If there is a source version entry present in the overshadowed segment map, filter its partitions to exclude
// the segments that are not present in this version, as they can be present in the return value of lookup due
// to partitions from source version explicitly being added if they are missing from generated version.
if (sourceEntry != null) {
PartitionHolder<ObjectType> overshadowedSourceChunks = new PartitionHolder<>(Collections.emptyList());

for (PartitionChunk<ObjectType> chunk : sourceEntry.partitionHolder) {
if (entry.getValue().partitionHolder.getChunk(chunk.getChunkNumber()) != null) {
overshadowedSourceChunks.add(chunk);
}
}

if (overshadowedSourceChunks.isEmpty()) {
versionEntry.remove(sourceEntry.getVersion());
if (versionEntry.isEmpty()) {
overShadowed.remove(entry.getValue().getTrueInterval());
}
} else if (overshadowedSourceChunks.size() < sourceEntry.partitionHolder.size()) {
versionEntry.put(sourceEntry.version, new TimelineEntry(sourceEntry.trueInterval, sourceEntry.version,
overshadowedSourceChunks));
}
}
}
}
}

public boolean isOvershadowed(Interval interval, VersionType version)
{
return isOvershadowed(interval, version, -1);
}

public boolean isOvershadowed(Interval interval, VersionType version, int partitionNum)
{
try {
lock.readLock().lock();

TimelineEntry entry = completePartitionsTimeline.get(interval);
if (entry != null) {
return versionComparator.compare(version, entry.getVersion()) < 0;
if (versionComparator.compare(version, entry.getVersion()) < 0) {
VersionType sourceVersion = getSourceVersion(entry.getVersion());

// Returns false (not overshadowed) only if the partition number was specified, the latest complete version is
// a generation of the currently checked version, and the latest version does not contain that partition.
return partitionNum < 0 ||
sourceVersion == null ||
versionComparator.compare(version, sourceVersion) != 0 ||
entry.partitionHolder.getChunk(partitionNum) != null;
} else {
return false;
}
}

Interval lower = completePartitionsTimeline.floorKey(
Expand Down Expand Up @@ -524,6 +567,8 @@ private List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval inte
return retVal;
}

complementWithSourceVersionSegments(retVal);

TimelineObjectHolder<VersionType, ObjectType> firstEntry = retVal.get(0);
if (interval.overlaps(firstEntry.getInterval()) && interval.getStart()
.isAfter(firstEntry.getInterval().getStart())) {
Expand Down Expand Up @@ -552,6 +597,49 @@ private List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval inte
return retVal;
}

private void complementWithSourceVersionSegments(List<TimelineObjectHolder<VersionType, ObjectType>> holders)
{
int initialSize = holders.size();

// For each timeline entry, add partitions from the source version that do not exist for the existing timeline
// object. These are added as a separate entry to the list.

// Iterate only the initial items, more items may be added during iteration.
for (int i = 0; i < initialSize; i++) {
TimelineObjectHolder<VersionType, ObjectType> latestVersion = holders.get(i);
VersionType sourceVersion = getSourceVersion(latestVersion.getVersion());

if (sourceVersion != null) {
TreeMap<VersionType, TimelineEntry> allVersions = allTimelineEntries.get(latestVersion.getInterval());
TimelineEntry sourceEntry = allVersions != null ? allVersions.get(sourceVersion) : null;

if (sourceEntry != null) {
TimelineObjectHolder<VersionType, ObjectType> complementingVersion = null;

for (PartitionChunk<ObjectType> partition : sourceEntry.partitionHolder) {
if (latestVersion.getObject().getChunk(partition.getChunkNumber()) == null) {
if (complementingVersion == null) {
complementingVersion = new TimelineObjectHolder<>(sourceEntry.trueInterval, sourceEntry.version,
new PartitionHolder<>(Collections.emptyList()));
holders.add(complementingVersion);
}

complementingVersion.getObject().add(partition);
}
}
}
}
}
}

@SuppressWarnings("unchecked")
private VersionType getSourceVersion(VersionType version) {
if (version instanceof String) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When could VersionType be something other that String?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently it is always String in practice, but it would not be nice to add implicit type constraints to the previous functionality of this class.

return (VersionType) SegmentGenerationUtils.getSourceVersion((String) version);
}
return null;
}

public class TimelineEntry
{
private final Interval trueInterval;
Expand Down
Expand Up @@ -96,8 +96,8 @@ public int compareTo(PartitionChunk<T> other)
if (other instanceof NumberedPartitionChunk) {
final NumberedPartitionChunk castedOther = (NumberedPartitionChunk) other;
return ComparisonChain.start()
.compare(chunks, castedOther.chunks)
.compare(chunkNumber, castedOther.chunkNumber)
.compare(chunks, castedOther.chunks)
.result();
} else {
throw new IllegalArgumentException("Cannot compare against something that is not a NumberedPartitionChunk.");
Expand Down
Expand Up @@ -38,17 +38,21 @@
import io.druid.indexing.overlord.SegmentPublishResult;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.logger.Logger;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
import io.druid.timeline.DataSegment;
import io.druid.timeline.SegmentGenerationUtils;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.LinearShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.timeline.partition.PartitionChunk;
import java.util.Collections;
import java.util.HashSet;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.skife.jdbi.v2.FoldController;
Expand Down Expand Up @@ -267,6 +271,109 @@ private VersionedIntervalTimeline<String, DataSegment> getTimelineForIntervalsWi
return timeline;
}

private VersionedIntervalTimeline<String, DataSegment> getTimelineForAllocationWithHandle(
final Handle handle,
final String dataSource,
final Interval interval
) throws IOException
{

Query<Map<String, Object>> sql = handle
.createQuery(
String.format(
"SELECT payload FROM %s WHERE used = true AND dataSource = ? AND (start <= ? AND %2$send%2$s >= ?)",
dbTables.getSegmentsTable(),
connector.getQuoteString()
)
)
.bind(0, dataSource)
.bind(1, interval.getEnd().toString())
.bind(2, interval.getStart().toString());

try (final ResultIterator<byte[]> dbSegments = sql.map(ByteArrayMapper.FIRST).iterator()) {
final List<DataSegment> segments = new ArrayList<>();
final Set<Pair<Interval, String>> sourceVersions = new HashSet<>();

while (dbSegments.hasNext()) {
final byte[] payload = dbSegments.next();

DataSegment segment = jsonMapper.readValue(
payload,
DataSegment.class
);

String sourceSegmentVersion = SegmentGenerationUtils.getSourceVersion(segment.getVersion());

if (sourceSegmentVersion != null) {
sourceVersions.add(new Pair<>(segment.getInterval(), sourceSegmentVersion));
} else {
segments.add(segment);
}
}

segments.addAll(collectSourceSegments(handle, dataSource, sourceVersions));
return VersionedIntervalTimeline.forSegments(segments);
}
}

private List<DataSegment> collectSourceSegments(
final Handle handle,
final String dataSource,
final Set<Pair<Interval, String>> sourceVersions
) throws IOException
{
if (sourceVersions.isEmpty()) {
return Collections.emptyList();
}

final String[] identifiers = new String[sourceVersions.size()];
int index = 0;

for (Pair<Interval, String> sourceVersion : sourceVersions) {
identifiers[index++] = DataSegment.makeDataSegmentIdentifier(dataSource, sourceVersion.lhs.getStart(),
sourceVersion.lhs.getEnd(), sourceVersion.rhs, NoneShardSpec.instance());
}

Query<Map<String, Object>> sql = handle
.createQuery(
String.format(
"SELECT payload FROM %s WHERE dataSource = ? AND id LIKE ? AND version IN (%s)",
dbTables.getSegmentsTable(),
String.join(",", Collections.nCopies(sourceVersions.size(), "?"))
)
)
.bind(0, dataSource)
.bind(1, org.apache.commons.lang.StringUtils.getCommonPrefix(identifiers));

index = 0;

for (Pair<Interval, String> sourceVersion : sourceVersions) {
sql.bind(2 + index++, sourceVersion.rhs);
}

final List<DataSegment> results = new ArrayList<>();

try (final ResultIterator<byte[]> dbSegments = sql
.map(ByteArrayMapper.FIRST)
.iterator()) {

while (dbSegments.hasNext()) {
final byte[] payload = dbSegments.next();

DataSegment segment = jsonMapper.readValue(
payload,
DataSegment.class
);

if (sourceVersions.contains(new Pair<>(segment.getInterval(), segment.getVersion()))) {
results.add(segment);
}
}
}

return results;
}

/**
* Attempts to insert a set of segments to the database. Returns the set of segments actually added (segments
* with identifiers already in the database will not be added).
Expand Down Expand Up @@ -451,10 +558,10 @@ public SegmentIdentifier inTransaction(Handle handle, TransactionStatus transact

final SegmentIdentifier newIdentifier;

final List<TimelineObjectHolder<String, DataSegment>> existingChunks = getTimelineForIntervalsWithHandle(
final List<TimelineObjectHolder<String, DataSegment>> existingChunks = getTimelineForAllocationWithHandle(
handle,
dataSource,
ImmutableList.of(interval)
interval
).lookup(interval);

if (existingChunks.size() > 1) {
Expand Down
Expand Up @@ -78,7 +78,8 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
//Remove all segments in db that are overshadowed by served segments
for (DataSegment dataSegment : params.getAvailableSegments()) {
VersionedIntervalTimeline<String, DataSegment> timeline = timelines.get(dataSegment.getDataSource());
if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion())) {
if (timeline != null && timeline.isOvershadowed(dataSegment.getInterval(), dataSegment.getVersion(),
dataSegment.getShardSpec().getPartitionNum())) {
coordinator.removeSegment(dataSegment);
stats.addToGlobalStat("overShadowedCount", 1);
}
Expand Down