Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement] historical fast restart by lazy load columns metadata(20X faster) #6988

Merged
merged 9 commits into from
Dec 3, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -1359,6 +1359,7 @@ These Historical configurations can be defined in the `historical/runtime.proper
|`druid.segmentCache.announceIntervalMillis`|How frequently to announce segments while segments are loading from cache. Set this value to zero to wait for all segments to be loaded before announcing.|5000 (5 seconds)|
|`druid.segmentCache.numLoadingThreads`|How many segments to drop or load concurrently from deep storage. Note that the work of loading segments involves downloading segments from deep storage, decompressing them and loading them to a memory mapped location. So the work is not all I/O Bound. Depending on CPU and network load, one could possibly increase this config to a higher value.|Number of cores|
|`druid.segmentCache.numBootstrapThreads`|How many segments to load concurrently during historical startup.|`druid.segmentCache.numLoadingThreads`|
|`druid.segmentCache.lazyLoadOnStart`|Whether or not to load segment columns metadata lazily during historical startup. Set this value to true will have a definite improvement during historical startup with HDD(may be 20 times faster). This features is unnecessary for historical with SSD, because it is fast enough.|false|
pzhdfy marked this conversation as resolved.
Show resolved Hide resolved
|`druid.coordinator.loadqueuepeon.curator.numCallbackThreads`|Number of threads for executing callback actions associated with loading or dropping of segments. One might want to increase this number when noticing clusters are lagging behind w.r.t. balancing segments across historical nodes.|2|

In `druid.segmentCache.locations`, *freeSpacePercent* was added because *maxSize* setting is only a theoretical limit and assumes that much space will always be available for storing segments. In case of any druid bug leading to unaccounted segment files left alone on disk or some other process writing stuff to disk, This check can start failing segment loading early before filling up the disk completely and leaving the host usable otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -1210,20 +1211,23 @@ private static class TestIndexIO extends IndexIO
columnNames.add(ColumnHolder.TIME_COLUMN_NAME);
columnNames.addAll(segment.getDimensions());
columnNames.addAll(segment.getMetrics());
final Map<String, ColumnHolder> columnMap = new HashMap<>(columnNames.size());
final Map<String, Supplier<ColumnHolder>> columnMap = new HashMap<>(columnNames.size());
final List<AggregatorFactory> aggregatorFactories = new ArrayList<>(segment.getMetrics().size());

for (String columnName : columnNames) {
if (MIXED_TYPE_COLUMN.equals(columnName)) {
columnMap.put(columnName, createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval())));
ColumnHolder columnHolder = createColumn(MIXED_TYPE_COLUMN_MAP.get(segment.getInterval()));
columnMap.put(columnName, () -> columnHolder);
} else if (DIMENSIONS.containsKey(columnName)) {
columnMap.put(columnName, createColumn(DIMENSIONS.get(columnName)));
ColumnHolder columnHolder = createColumn(DIMENSIONS.get(columnName));
columnMap.put(columnName, () -> columnHolder);
} else {
final Optional<AggregatorFactory> maybeMetric = AGGREGATORS.stream()
.filter(agg -> agg.getName().equals(columnName))
.findAny();
if (maybeMetric.isPresent()) {
columnMap.put(columnName, createColumn(maybeMetric.get()));
ColumnHolder columnHolder = createColumn(maybeMetric.get());
columnMap.put(columnName, () -> columnHolder);
aggregatorFactories.add(maybeMetric.get());
}
}
Expand All @@ -1245,7 +1249,8 @@ private static class TestIndexIO extends IndexIO
null,
columnMap,
null,
metadata
metadata,
false
)
);
}
Expand All @@ -1271,7 +1276,7 @@ void removeMetadata(File file)
index.getColumns(),
index.getFileMapper(),
null,
index.getDimensionHandlers()
() -> index.getDimensionHandlers()
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ private Segment loadSegment(DataSegment dataSegment, File tempSegmentDir)
final SegmentLoader loader = new SegmentLoaderFactory(getIndexIO(), getObjectMapper())
.manufacturate(tempSegmentDir);
try {
return loader.getSegment(dataSegment);
return loader.getSegment(dataSegment, false);
}
catch (SegmentLoadingException e) {
throw new RuntimeException(e);
Expand Down
131 changes: 86 additions & 45 deletions processing/src/main/java/org/apache/druid/segment/IndexIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -179,13 +181,17 @@ public void validateTwoSegments(final IndexableAdapter adapter1, final Indexable
}

public QueryableIndex loadIndex(File inDir) throws IOException
{
return loadIndex(inDir, false);
}
public QueryableIndex loadIndex(File inDir, boolean lazy) throws IOException
{
final int version = SegmentUtils.getVersionFromDir(inDir);

final IndexLoader loader = indexLoaders.get(version);

if (loader != null) {
return loader.load(inDir, mapper);
return loader.load(inDir, mapper, lazy);
} else {
throw new ISE("Unknown index version[%s]", version);
}
Expand Down Expand Up @@ -406,7 +412,7 @@ public MMappedIndex mapDir(File inDir) throws IOException

interface IndexLoader
{
QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException;
QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException;
}

static class LegacyIndexLoader implements IndexLoader
Expand All @@ -421,11 +427,11 @@ static class LegacyIndexLoader implements IndexLoader
}

@Override
public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException
public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException
{
MMappedIndex index = legacyHandler.mapDir(inDir);

Map<String, ColumnHolder> columns = new HashMap<>();
Map<String, Supplier<ColumnHolder>> columns = new HashMap<>();

for (String dimension : index.getAvailableDimensions()) {
ColumnBuilder builder = new ColumnBuilder()
Expand All @@ -449,61 +455,61 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException
if (index.getSpatialIndexes().get(dimension) != null) {
builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(index.getSpatialIndexes().get(dimension)));
}
columns.put(
dimension,
builder.build()
);
columns.put(dimension, getColumnHolderSupplier(builder, lazy));
}

for (String metric : index.getAvailableMetrics()) {
final MetricHolder metricHolder = index.getMetricHolder(metric);
if (metricHolder.getType() == MetricHolder.MetricType.FLOAT) {
columns.put(
metric,
new ColumnBuilder()
.setType(ValueType.FLOAT)
.setNumericColumnSupplier(
new FloatNumericColumnSupplier(
metricHolder.floatType,
LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
)
ColumnBuilder builder = new ColumnBuilder()
.setType(ValueType.FLOAT)
.setNumericColumnSupplier(
new FloatNumericColumnSupplier(
metricHolder.floatType,
LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
)
.build()
);
);
columns.put(metric, getColumnHolderSupplier(builder, lazy));
} else if (metricHolder.getType() == MetricHolder.MetricType.COMPLEX) {
columns.put(
metric,
new ColumnBuilder()
.setType(ValueType.COMPLEX)
.setComplexColumnSupplier(
new ComplexColumnPartSupplier(metricHolder.getTypeName(), metricHolder.complexType)
)
.build()
);
ColumnBuilder builder = new ColumnBuilder()
.setType(ValueType.COMPLEX)
.setComplexColumnSupplier(
new ComplexColumnPartSupplier(metricHolder.getTypeName(), metricHolder.complexType)
);
columns.put(metric, getColumnHolderSupplier(builder, lazy));
}
}

columns.put(
ColumnHolder.TIME_COLUMN_NAME,
new ColumnBuilder()
.setType(ValueType.LONG)
.setNumericColumnSupplier(
new LongNumericColumnSupplier(
index.timestamps,
LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
)
ColumnBuilder builder = new ColumnBuilder()
.setType(ValueType.LONG)
.setNumericColumnSupplier(
new LongNumericColumnSupplier(
index.timestamps,
LEGACY_FACTORY.getBitmapFactory().makeEmptyImmutableBitmap()
)
.build()
);
);
columns.put(ColumnHolder.TIME_COLUMN_NAME, getColumnHolderSupplier(builder, lazy));

return new SimpleQueryableIndex(
index.getDataInterval(),
index.getAvailableDimensions(),
new ConciseBitmapFactory(),
columns,
index.getFileMapper(),
null
null,
lazy
);
}

private Supplier<ColumnHolder> getColumnHolderSupplier(ColumnBuilder builder, boolean lazy)
{
if (lazy) {
return Suppliers.memoize(() -> builder.build());
} else {
ColumnHolder columnHolder = builder.build();
return () -> columnHolder;
}
}
}

static class V9IndexLoader implements IndexLoader
Expand All @@ -516,7 +522,7 @@ static class V9IndexLoader implements IndexLoader
}

@Override
public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException
public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy) throws IOException
{
log.debug("Mapping v9 index[%s]", inDir);
long startTime = System.currentTimeMillis();
Expand Down Expand Up @@ -576,25 +582,60 @@ public QueryableIndex load(File inDir, ObjectMapper mapper) throws IOException
}
}

Map<String, ColumnHolder> columns = new HashMap<>();
Map<String, Supplier<ColumnHolder>> columns = new HashMap<>();

for (String columnName : cols) {
if (Strings.isNullOrEmpty(columnName)) {
log.warn("Null or Empty Dimension found in the file : " + inDir);
continue;
}
columns.put(columnName, deserializeColumn(mapper, smooshedFiles.mapFile(columnName), smooshedFiles));

ByteBuffer colBuffer = smooshedFiles.mapFile(columnName);

if (lazy) {
columns.put(columnName, Suppliers.memoize(
() -> {
try {
return deserializeColumn(mapper, colBuffer, smooshedFiles);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
));
} else {
ColumnHolder columnHolder = deserializeColumn(mapper, colBuffer, smooshedFiles);
columns.put(columnName, () -> columnHolder);
}

}

columns.put(ColumnHolder.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time"), smooshedFiles));
ByteBuffer timeBuffer = smooshedFiles.mapFile("__time");

if (lazy) {
columns.put(ColumnHolder.TIME_COLUMN_NAME, Suppliers.memoize(
() -> {
try {
return deserializeColumn(mapper, timeBuffer, smooshedFiles);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
));
} else {
ColumnHolder columnHolder = deserializeColumn(mapper, timeBuffer, smooshedFiles);
columns.put(ColumnHolder.TIME_COLUMN_NAME, () -> columnHolder);
}

final QueryableIndex index = new SimpleQueryableIndex(
dataInterval,
dims,
segmentBitmapSerdeFactory.getBitmapFactory(),
columns,
smooshedFiles,
metadata
metadata,
lazy
);

log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);
Expand Down
Loading