Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make SegmentLoader extensible and customizable #11398

Merged
merged 17 commits into from
Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,18 @@ public void cleanup(DataSegment segment)
{
throw new UnsupportedOperationException("unused");
}

@Override
public boolean reserve(DataSegment segment)
{
throw new UnsupportedOperationException();
}

@Override
public boolean release(DataSegment segment)
{
throw new UnsupportedOperationException();
}
},
DataSegment.builder()
.dataSource("ds")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
* {@link Segment} that is also a {@link ReferenceCountingSegment}, allowing query engines that operate directly on
* segments to track references so that dropping a {@link Segment} can be done safely to ensure there are no in-flight
* queries.
*
* Extensions can extend this class for populating {@link org.apache.druid.timeline.VersionedIntervalTimeline} with
* a custom implementation through SegmentLoader.
*/
public class ReferenceCountingSegment extends ReferenceCountingCloseableObject<Segment>
implements SegmentReference, Overshadowable<ReferenceCountingSegment>
Expand Down Expand Up @@ -67,7 +70,7 @@ public static ReferenceCountingSegment wrapSegment(
);
}

private ReferenceCountingSegment(
protected ReferenceCountingSegment(
Segment baseSegment,
int startRootPartitionId,
int endRootPartitionId,
Expand Down Expand Up @@ -172,4 +175,13 @@ public Optional<Closeable> acquireReferences()
{
return incrementReferenceAndDecrementOnceCloseable();
}

@Override
public <T> T as(Class<T> clazz)
{
if (isClosed()) {
return null;
}
return baseObject.as(clazz);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment;

import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.join.table.IndexedTable;
import org.apache.druid.timeline.SegmentId;
import org.easymock.EasyMock;
import org.joda.time.Days;
Expand All @@ -43,6 +44,7 @@ public class ReferenceCountingSegmentTest
private final Interval dataInterval = new Interval(DateTimes.nowUtc().minus(Days.days(1)), DateTimes.nowUtc());
private QueryableIndex index;
private StorageAdapter adapter;
private IndexedTable indexedTable;
private int underlyingSegmentClosedCount;

@Before
Expand All @@ -51,6 +53,7 @@ public void setUp()
underlyingSegmentClosedCount = 0;
index = EasyMock.createNiceMock(QueryableIndex.class);
adapter = EasyMock.createNiceMock(StorageAdapter.class);
indexedTable = EasyMock.createNiceMock(IndexedTable.class);

segment = ReferenceCountingSegment.wrapRootGenerationSegment(
new Segment()
Expand Down Expand Up @@ -79,6 +82,19 @@ public StorageAdapter asStorageAdapter()
return adapter;
}

@Override
public <T> T as(Class<T> clazz)
{
if (clazz.equals(QueryableIndex.class)) {
return (T) asQueryableIndex();
} else if (clazz.equals(StorageAdapter.class)) {
return (T) asStorageAdapter();
} else if (clazz.equals(IndexedTable.class)) {
return (T) indexedTable;
}
return null;
}

@Override
public void close()
{
Expand Down Expand Up @@ -159,4 +175,13 @@ public void testExposesWrappedSegment()
Assert.assertEquals(adapter, segment.asStorageAdapter());
}

@Test
public void testSegmentAs()
{
Assert.assertSame(index, segment.as(QueryableIndex.class));
Assert.assertSame(adapter, segment.as(StorageAdapter.class));
Assert.assertSame(indexedTable, segment.as(IndexedTable.class));
Assert.assertNull(segment.as(String.class));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,58 @@
public interface SegmentCacheManager
{
/**
* Checks whether a segment is already cached.
* Checks whether a segment is already cached. It can return false even if {@link #reserve(DataSegment)}
* has been successful for a segment but is not downloaded yet.
*/
boolean isSegmentCached(DataSegment segment);

/**
* This method fetches the files for the given segment if the segment is not downloaded already.
* This method fetches the files for the given segment if the segment is not downloaded already. It
* is not required to {@link #reserve(DataSegment)} before calling this method. If caller has not reserved
* the space explicitly via {@link #reserve(DataSegment)}, the implementation should reserve space on caller's
* behalf.
* If the space has been explicitly reserved already
* - implementation should use only the reserved space to store segment files.
* - implementation should not release the location in case of download erros and leave it to the caller.
* @throws SegmentLoadingException if there is an error in downloading files
*/
File getSegmentFiles(DataSegment segment) throws SegmentLoadingException;

/**
* Cleanup the cache space used by the segment
* Tries to reserve the space for a segment on any location. When the space has been reserved,
* {@link #getSegmentFiles(DataSegment)} should download the segment on the reserved location or
* fail otherwise.
*
* This function is useful for custom extensions. Extensions can try to reserve the space first and
* if not successful, make some space by cleaning up other segments, etc. There is also improved
* concurrency for extensions with this function. Since reserve is a cheaper operation to invoke
* till the space has been reserved. Hence it can be put inside a lock if required by the extensions. getSegment
* can't be put inside a lock since it is a time-consuming operation, on account of downloading the files.
*
* @param segment - Segment to reserve
* @return True if enough space found to store the segment, false otherwise
*/
/*
* We only return a boolean result instead of a pointer to
* {@link StorageLocation} since we don't want callers to operate on {@code StorageLocation} directly outside {@code SegmentLoader}.
* {@link SegmentLoader} operates on the {@code StorageLocation} objects in a thread-safe manner.
*/
boolean reserve(DataSegment segment);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should isSegmentCached still return false after reserve is called? Would be worth to document it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. will document it.


/**
* Reverts the effects of {@link #reserve(DataSegment)} (DataSegment)} by releasing the location reserved for this segment.
* Callers, that explicitly reserve the space via {@link #reserve(DataSegment)}, should use this method to release the space.
*
* Implementation can throw error if the space is being released but there is data present. Callers
* are supposed to ensure that any data is removed via {@link #cleanup(DataSegment)}
* @param segment - Segment to release the location for.
* @return - True if any location was reserved and released, false otherwise.
*/
boolean release(DataSegment segment);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify the contract between this method and getSegmentFiles? For example, what should happen when release is called if reserve was not called but getSegmentFiles was called?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. if reserve is not called, getSegmentFiles will reserve the space. I will document this.


/**
* Cleanup the cache space used by the segment. It will not release the space if the space has been
* explicitly reserved via {@link #reserve(DataSegment)}
*/
void cleanup(DataSegment segment);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,34 @@

package org.apache.druid.segment.loading;

import org.apache.druid.segment.Segment;
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.timeline.DataSegment;

/**
* Loading segments from deep storage to local storage. Internally, this class can delegate the download to
* {@link SegmentCacheManager}. Implementations must be thread-safe.
*/
@UnstableApi
public interface SegmentLoader
{

/**
* Builds a {@link Segment} by downloading if necessary
* Returns a {@link ReferenceCountingSegment} that will be added by the {@link org.apache.druid.server.SegmentManager}
* to the {@link org.apache.druid.timeline.VersionedIntervalTimeline}. This method can be called multiple times
* by the {@link org.apache.druid.server.SegmentManager} and implementation can either return same {@link ReferenceCountingSegment}
* or a different {@link ReferenceCountingSegment}. Caller should not assume any particular behavior.
*
* Returning a {@code ReferenceCountingSegment} will let custom implementations keep track of reference count for
* segments that the custom implementations are creating. That way, custom implementations can know when the segment
* is in use or not.
* @param segment - Segment to load
* @param lazy - Whether column metadata de-serialization is to be deferred to access time. Setting this flag to true can speed up segment loading
* @param loadFailed - Callback to invoke if lazy loading fails during column access.
* @throws SegmentLoadingException - If there is an error in loading the segment
*/
Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the SegmentLoader guaranteed to return the same ReferenceCountingSegment instance across multiple calls of getSegment? Should it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can do either and the caller is not supposed to depend on that behavior. From the caller's perspective, it is going to get a segment object wrapped inside ReferenceCountingSegment. Implementations can have optimizations to save on repeated expensive work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be documented in the javadoc.


/**
* cleanup any state used by this segment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.segment.IndexIO;
import org.apache.druid.segment.ReferenceCountingSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.SegmentLazyLoadFailCallback;
import org.apache.druid.timeline.DataSegment;
Expand All @@ -46,7 +47,7 @@ public SegmentLocalCacheLoader(SegmentCacheManager cacheManager, IndexIO indexIO
}

@Override
public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
public ReferenceCountingSegment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFailCallback loadFailed) throws SegmentLoadingException
{
final File segmentFiles = cacheManager.getSegmentFiles(segment);
File factoryJson = new File(segmentFiles, "factory.json");
Expand All @@ -63,7 +64,8 @@ public Segment getSegment(DataSegment segment, boolean lazy, SegmentLazyLoadFail
factory = new MMappedQueryableSegmentizerFactory(indexIO);
}

return factory.factorize(segment, segmentFiles, lazy, loadFailed);
Segment segmentObject = factory.factorize(segment, segmentFiles, lazy, loadFailed);
return ReferenceCountingSegment.wrapSegment(segmentObject, segment.getShardSpec());
}

@Override
Expand Down
Loading