Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconcile terminology and method naming to 'used/unused segments'; Rename MetadataSegmentManager to MetadataSegmentsManager #7306

Merged
merged 67 commits into from Jan 27, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
87556bc
Reconcile terminology and method naming to 'used/unused segments'; Do…
leventov Mar 19, 2019
e44e199
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Mar 20, 2019
25de70d
Fix brace
leventov Mar 20, 2019
fe13d9c
Import order
leventov Mar 21, 2019
050bd3b
Rename withKillDataSourceWhitelist to withSpecificDataSourcesToKill
leventov Mar 21, 2019
562f5b4
Fix tests
leventov Mar 21, 2019
a6138da
Fix tests by adding proper methods without interval parameters to Ind…
leventov Mar 25, 2019
cb6fe7f
More aligned names of DruidCoordinatorHelpers, rename several Coordin…
leventov Mar 28, 2019
164f783
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Mar 28, 2019
d959bb0
Rename ClientCompactTaskQuery to ClientCompactionTaskQuery for consis…
leventov Mar 28, 2019
01c4494
More variable and method renames
leventov Mar 28, 2019
06ec389
Rename MetadataSegments to SegmentsMetadata
leventov Apr 1, 2019
09fab95
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Apr 1, 2019
f4774e1
Javadoc update
leventov Apr 1, 2019
8d5200d
Simplify SegmentsMetadata.getUnusedSegmentIntervals(), more javadocs
leventov Apr 1, 2019
255f320
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Apr 9, 2019
f34eef0
Update Javadoc of VersionedIntervalTimeline.iterateAllObjects()
leventov Apr 9, 2019
2ebb23c
Reorder imports
leventov Apr 9, 2019
ba8ed62
Rename SegmentsMetadata.tryMark... methods to mark... and make them t…
leventov Apr 19, 2019
64b0404
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Apr 19, 2019
231a529
Complete merge
leventov Apr 19, 2019
a113231
Add CollectionUtils.newTreeSet(); Refactor DruidCoordinatorRuntimePar…
leventov Apr 22, 2019
abad3f3
Remove MetadataSegmentManager
leventov Apr 22, 2019
cb1f3b0
Rename millisLagSinceCoordinatorBecomesLeaderBeforeCanMarkAsUnusedOve…
leventov Apr 22, 2019
a5840b8
Fix tests, refactor DruidCluster creation in tests into DruidClusterB…
leventov Apr 26, 2019
f4283d2
Merge with master
leventov May 3, 2019
559f2c8
Merge remote-tracking branch 'upstream/master' into used-segments
leventov May 8, 2019
93c12cd
Merge remote-tracking branch 'upstream/master' into used-segments
leventov May 9, 2019
dfc0dbb
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Jul 19, 2019
553b804
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Jul 22, 2019
886e580
Fix inspections
leventov Jul 22, 2019
77fc5e8
Fix SQLMetadataSegmentManagerEmptyTest and rename it to SqlSegmentsMe…
leventov Jul 22, 2019
1258fa0
Rename SegmentsAndMetadata to SegmentsAndCommitMetadata to reduce the…
leventov Jul 23, 2019
15cb9ae
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Jul 24, 2019
c2c7547
Rename DruidCoordinatorHelper to CoordinatorDuty, refactor DruidCoord…
leventov Jul 25, 2019
9cd239e
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Jul 26, 2019
a723553
Unused import
leventov Jul 26, 2019
26a8381
Optimize imports
leventov Jul 29, 2019
afcd301
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Jul 29, 2019
1557acc
Rename IndexerSQLMetadataStorageCoordinator.getDataSourceMetadata() t…
leventov Jul 29, 2019
96bcab7
Unused import
leventov Jul 29, 2019
d934853
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Jul 31, 2019
c3e488e
Update terminology in datasource-view.tsx
leventov Jul 31, 2019
f65e654
Fix label in datasource-view.spec.tsx.snap
leventov Jul 31, 2019
5e8f3e4
Fix lint errors in datasource-view.tsx
leventov Aug 1, 2019
697f0d5
Doc improvements
leventov Aug 1, 2019
66a23f9
Another attempt to please TSLint
leventov Aug 2, 2019
d3882c4
Another attempt to please TSLint
leventov Aug 2, 2019
4b9d992
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Aug 2, 2019
b77a327
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Aug 8, 2019
d355aa6
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Aug 12, 2019
718bb23
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Aug 28, 2019
19b7f3a
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Aug 28, 2019
2a6bcff
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Sep 11, 2019
22e71d1
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Nov 14, 2019
c575e6e
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Nov 14, 2019
c5d22a0
Style fixes
leventov Nov 14, 2019
285ebc9
Fix IndexerSQLMetadataStorageCoordinator.createUsedSegmentsSqlQueryFo…
leventov Nov 14, 2019
53f572a
Try to fix docs build issue
leventov Nov 14, 2019
da7667c
Javadoc and spelling fixes
leventov Nov 15, 2019
91a5c8c
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Nov 20, 2019
a566b6d
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Nov 21, 2019
6da0b9d
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Dec 8, 2019
d0bf20a
Rename SegmentsMetadata to SegmentsMetadataManager, address other com…
leventov Jan 22, 2020
10587c1
Merge remote-tracking branch 'upstream/master' into used-segments
leventov Jan 22, 2020
2ad6dd5
Address more comments
leventov Jan 23, 2020
6745f4a
Fix a bug in DataSourceOptimizer
leventov Jan 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -30,9 +30,12 @@
import org.apache.druid.timeline.partition.ImmutablePartitionHolder;
import org.apache.druid.timeline.partition.PartitionChunk;
import org.apache.druid.timeline.partition.PartitionHolder;
import org.apache.druid.utils.CollectionUtils;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -44,28 +47,43 @@
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.StreamSupport;

/**
* VersionedIntervalTimeline is a data structure that manages objects on a specific timeline.
*
* It associates a jodatime Interval and a generically-typed version with the object that is being stored.
* It associates an {@link Interval} and a generically-typed version with the object that is being stored.
*
* In the event of overlapping timeline entries, timeline intervals may be chunked. The underlying data associated
* with a timeline entry remains unchanged when chunking occurs.
*
* After loading objects via the add() method, the lookup(Interval) method can be used to get the list of the most
* recent objects (according to the version) that match the given interval. The intent is that objects represent
* a certain time period and when you do a lookup(), you are asking for all of the objects that you need to look
* at in order to get a correct answer about that time period.
* After loading objects via the {@link #add} method, the {@link #lookup(Interval)} method can be used to get the list
* of the most recent objects (according to the version) that match the given interval. The intent is that objects
* represent a certain time period and when you do a {@link #lookup(Interval)}, you are asking for all of the objects
* that you need to look at in order to get a correct answer about that time period.
*
* The findOvershadowed() method returns a list of objects that will never be returned by a call to lookup() because
* they are overshadowed by some other object. This can be used in conjunction with the add() and remove() methods
* to achieve "atomic" updates. First add new items, then check if those items caused anything to be overshadowed, if
* so, remove the overshadowed elements and you have effectively updated your data set without any user impact.
* The {@link #findOvershadowed} method returns a list of objects that will never be returned by a call to {@link
* #lookup} because they are overshadowed by some other object. This can be used in conjunction with the {@link #add}
* and {@link #remove} methods to achieve "atomic" updates. First add new items, then check if those items caused
* anything to be overshadowed, if so, remove the overshadowed elements and you have effectively updated your data set
* without any user impact.
*/
public class VersionedIntervalTimeline<VersionType, ObjectType> implements TimelineLookup<VersionType, ObjectType>
{
public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterable<DataSegment> segments)
{
return forSegments(segments.iterator());
}

public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterator<DataSegment> segments)
{
final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
addSegments(timeline, segments);
return timeline;
}

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);

final NavigableMap<Interval, TimelineEntry> completePartitionsTimeline = new TreeMap<Interval, TimelineEntry>(
Expand All @@ -75,6 +93,7 @@ public class VersionedIntervalTimeline<VersionType, ObjectType> implements Timel
Comparators.intervalsByStartThenEnd()
);
private final Map<Interval, TreeMap<VersionType, TimelineEntry>> allTimelineEntries = new HashMap<>();
private final AtomicInteger numObjects = new AtomicInteger();

private final Comparator<? super VersionType> versionComparator;

Expand All @@ -85,18 +104,6 @@ public VersionedIntervalTimeline(
this.versionComparator = versionComparator;
}

public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterable<DataSegment> segments)
{
return forSegments(segments.iterator());
}

public static VersionedIntervalTimeline<String, DataSegment> forSegments(Iterator<DataSegment> segments)
{
final VersionedIntervalTimeline<String, DataSegment> timeline = new VersionedIntervalTimeline<>(Ordering.natural());
addSegments(timeline, segments);
return timeline;
}

public static void addSegments(
VersionedIntervalTimeline<String, DataSegment> timeline,
Iterator<DataSegment> segments
Expand All @@ -115,6 +122,32 @@ public Map<Interval, TreeMap<VersionType, TimelineEntry>> getAllTimelineEntries(
return allTimelineEntries;
}

/**
* Returns a lazy collection with all objects in this VersionedIntervalTimeline to be used for iteration or {@link
* Collection#stream()} transformation. The order of objects in this collection is unspecified.
*
* Note: iteration over the returned collection may not be as trivially cheap as, for example, iteration over an
* ArrayList. Try (to some reasonable extent) to organize the code so that it iterates the returned collection only
* once rather than several times.
*/
public Collection<ObjectType> iterateAllObjects()
{
return CollectionUtils.createLazyCollectionFromStream(
() -> allTimelineEntries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allTimelineEntries makes me think there's another collection with a subset of "all", but it isn't.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It follows the naming of "allTimelineEntries". I think the logic behind this naming is that it includes overshadowed objects. I've extended the Javadoc of iterateAllObjects() to make it clearer.

.values()
.stream()
.flatMap((TreeMap<VersionType, TimelineEntry> entryMap) -> entryMap.values().stream())
.flatMap((TimelineEntry entry) -> StreamSupport.stream(entry.getPartitionHolder().spliterator(), false))
.map(PartitionChunk::getObject),
numObjects.get()
);
egor-ryashin marked this conversation as resolved.
Show resolved Hide resolved
}

public int getNumObjects()
{
return numObjects.get();
}

public void add(final Interval interval, VersionType version, PartitionChunk<ObjectType> object)
{
addAll(Iterators.singletonIterator(object), o -> interval, o -> version);
Expand Down Expand Up @@ -143,15 +176,19 @@ private void addAll(
TreeMap<VersionType, TimelineEntry> versionEntry = new TreeMap<>(versionComparator);
versionEntry.put(version, entry);
allTimelineEntries.put(interval, versionEntry);
numObjects.incrementAndGet();
} else {
entry = exists.get(version);

if (entry == null) {
entry = new TimelineEntry(interval, version, new PartitionHolder<>(object));
exists.put(version, entry);
numObjects.incrementAndGet();
} else {
PartitionHolder<ObjectType> partitionHolder = entry.getPartitionHolder();
partitionHolder.add(object);
if (partitionHolder.add(object)) {
numObjects.incrementAndGet();
}
}
}

Expand All @@ -174,6 +211,7 @@ private void addAll(
}
}

@Nullable
public PartitionChunk<ObjectType> remove(Interval interval, VersionType version, PartitionChunk<ObjectType> chunk)
{
try {
Expand All @@ -189,7 +227,11 @@ public PartitionChunk<ObjectType> remove(Interval interval, VersionType version,
return null;
}

PartitionChunk<ObjectType> retVal = entry.getPartitionHolder().remove(chunk);
PartitionChunk<ObjectType> removedChunk = entry.getPartitionHolder().remove(chunk);
if (removedChunk == null) {
return null;
}
numObjects.decrementAndGet();
if (entry.getPartitionHolder().isEmpty()) {
versionEntries.remove(version);
if (versionEntries.isEmpty()) {
Expand All @@ -201,7 +243,7 @@ public PartitionChunk<ObjectType> remove(Interval interval, VersionType version,

remove(completePartitionsTimeline, interval, entry, false);

return retVal;
return removedChunk;
}
finally {
lock.writeLock().unlock();
Expand All @@ -217,9 +259,7 @@ public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType vers
if (entry.getKey().equals(interval) || entry.getKey().contains(interval)) {
TimelineEntry foundEntry = entry.getValue().get(version);
if (foundEntry != null) {
return new ImmutablePartitionHolder<ObjectType>(
foundEntry.getPartitionHolder()
);
return new ImmutablePartitionHolder<>(foundEntry.getPartitionHolder());
}
}
}
Expand Down
Expand Up @@ -23,7 +23,7 @@
*/
public class ImmutablePartitionHolder<T> extends PartitionHolder<T>
{
public ImmutablePartitionHolder(PartitionHolder partitionHolder)
public ImmutablePartitionHolder(PartitionHolder<T> partitionHolder)
{
super(partitionHolder);
}
Expand All @@ -35,7 +35,7 @@ public PartitionChunk<T> remove(PartitionChunk<T> tPartitionChunk)
}

@Override
public void add(PartitionChunk<T> tPartitionChunk)
public boolean add(PartitionChunk<T> tPartitionChunk)
{
throw new UnsupportedOperationException();
}
Expand Down
Expand Up @@ -22,72 +22,62 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;

import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.List;
import java.util.SortedSet;
import java.util.Spliterator;
import java.util.TreeSet;
import java.util.TreeMap;

/**
* An object that clumps together multiple other objects which each represent a shard of some space.
*/
public class PartitionHolder<T> implements Iterable<PartitionChunk<T>>
{
private final TreeSet<PartitionChunk<T>> holderSet;
private final TreeMap<PartitionChunk<T>, PartitionChunk<T>> holderMap;

public PartitionHolder(PartitionChunk<T> initialChunk)
{
this.holderSet = new TreeSet<>();
this.holderMap = new TreeMap<>();
add(initialChunk);
}

public PartitionHolder(List<PartitionChunk<T>> initialChunks)
{
this.holderSet = new TreeSet<>();
this.holderMap = new TreeMap<>();
for (PartitionChunk<T> chunk : initialChunks) {
add(chunk);
}
}

public PartitionHolder(PartitionHolder partitionHolder)
public PartitionHolder(PartitionHolder<T> partitionHolder)
{
this.holderSet = new TreeSet<>();
this.holderSet.addAll(partitionHolder.holderSet);
this.holderMap = new TreeMap<>();
this.holderMap.putAll(partitionHolder.holderMap);
}

public void add(PartitionChunk<T> chunk)
public boolean add(PartitionChunk<T> chunk)
{
holderSet.add(chunk);
return holderMap.putIfAbsent(chunk, chunk) == null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose TreeMap was chosen because of the method putIfAbsent?
Anyway, I suspect we don't call add(chunk) with the same chunk twice, or do I miss something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just replaced TreeSet with TreeMap, not sure how does it relate to putIfAbsent().

Anyway, I suspect we don't call add(chunk) with the same chunk twice, or do I miss something?

I don't know. This is unprovable from the point of view of the VersionedIntervalTimeline abstraction, which may contain any objects and anybody may call its add() method with anything. So VersionedIntervalTimeline assumes that objects can be repeated and increments numObjects counter only when PartitionHolder.add() returns true.

}

@Nullable
public PartitionChunk<T> remove(PartitionChunk<T> chunk)
{
if (!holderSet.isEmpty()) {
// Somewhat funky implementation in order to return the removed object as it exists in the set
SortedSet<PartitionChunk<T>> tailSet = holderSet.tailSet(chunk, true);
if (!tailSet.isEmpty()) {
PartitionChunk<T> element = tailSet.first();
if (chunk.equals(element)) {
holderSet.remove(element);
return element;
}
}
}
return null;
return holderMap.remove(chunk);
}

public boolean isEmpty()
{
return holderSet.isEmpty();
return holderMap.isEmpty();
}

public boolean isComplete()
{
if (holderSet.isEmpty()) {
if (holderMap.isEmpty()) {
return false;
}

Iterator<PartitionChunk<T>> iter = holderSet.iterator();
Iterator<PartitionChunk<T>> iter = holderMap.keySet().iterator();

PartitionChunk<T> curr = iter.next();

Expand Down Expand Up @@ -117,7 +107,7 @@ public boolean isComplete()
public PartitionChunk<T> getChunk(final int partitionNum)
{
final Iterator<PartitionChunk<T>> retVal = Iterators.filter(
holderSet.iterator(),
holderMap.keySet().iterator(),
input -> input.getChunkNumber() == partitionNum
);

Expand All @@ -127,13 +117,13 @@ public PartitionChunk<T> getChunk(final int partitionNum)
@Override
public Iterator<PartitionChunk<T>> iterator()
{
return holderSet.iterator();
return holderMap.keySet().iterator();
}

@Override
public Spliterator<PartitionChunk<T>> spliterator()
{
return holderSet.spliterator();
return holderMap.keySet().spliterator();
}

public Iterable<T> payloads()
Expand All @@ -153,7 +143,7 @@ public boolean equals(Object o)

PartitionHolder that = (PartitionHolder) o;

if (!holderSet.equals(that.holderSet)) {
if (!holderMap.equals(that.holderMap)) {
return false;
}

Expand All @@ -163,14 +153,14 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
return holderSet.hashCode();
return holderMap.hashCode();
}

@Override
public String toString()
{
return "PartitionHolder{" +
"holderSet=" + holderSet +
"holderMap=" + holderMap +
'}';
}
}