Skip to content

Commit

Permalink
Support both IndexTuningConfig and ParallelIndexTuningConfig for comp…
Browse files Browse the repository at this point in the history
…action task (#9222)

* Support both IndexTuningConfig and ParallelIndexTuningConfig for compaction task

* tuningConfig module

* fix tests
  • Loading branch information
jihoonson authored and jon-wei committed Jan 21, 2020
1 parent 0b0056b commit d541cbe
Show file tree
Hide file tree
Showing 11 changed files with 377 additions and 8 deletions.
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.guice;

import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.initialization.DruidModule;

import java.util.List;

public class IndexingServiceTuningConfigModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule(IndexingServiceTuningConfigModule.class.getSimpleName())
.registerSubtypes(
new NamedType(IndexTuningConfig.class, "index"),
new NamedType(ParallelIndexTuningConfig.class, "index_parallel")
)
);
}

@Override
public void configure(Binder binder)
{
}
}
Expand Up @@ -51,6 +51,7 @@
import org.apache.druid.indexing.common.actions.RetrieveUsedSegmentsAction;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.IndexTask.IndexTuningConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIOConfig;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexIngestionSpec;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
Expand All @@ -74,6 +75,7 @@
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.TuningConfig;
import org.apache.druid.segment.indexing.granularity.GranularitySpec;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.segment.loading.SegmentLoadingException;
Expand Down Expand Up @@ -177,7 +179,7 @@ public CompactionTask(
@JsonProperty("dimensionsSpec") @Nullable final DimensionsSpec dimensionsSpec,
@JsonProperty("metricsSpec") @Nullable final AggregatorFactory[] metricsSpec,
@JsonProperty("segmentGranularity") @Nullable final Granularity segmentGranularity,
@JsonProperty("tuningConfig") @Nullable final ParallelIndexTuningConfig tuningConfig,
@JsonProperty("tuningConfig") @Nullable final TuningConfig tuningConfig,
@JsonProperty("context") @Nullable final Map<String, Object> context,
@JacksonInject ObjectMapper jsonMapper,
@JacksonInject AuthorizerMapper authorizerMapper,
Expand Down Expand Up @@ -213,10 +215,10 @@ public CompactionTask(
this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity;
this.tuningConfig = tuningConfig;
this.tuningConfig = tuningConfig != null ? getTuningConfig(tuningConfig) : null;
this.jsonMapper = jsonMapper;
this.segmentProvider = new SegmentProvider(dataSource, this.ioConfig.getInputSpec());
this.partitionConfigurationManager = new PartitionConfigurationManager(tuningConfig);
this.partitionConfigurationManager = new PartitionConfigurationManager(this.tuningConfig);
this.authorizerMapper = authorizerMapper;
this.chatHandlerProvider = chatHandlerProvider;
this.rowIngestionMetersFactory = rowIngestionMetersFactory;
Expand All @@ -227,6 +229,51 @@ public CompactionTask(
this.appenderatorsManager = appenderatorsManager;
}

@VisibleForTesting
static ParallelIndexTuningConfig getTuningConfig(TuningConfig tuningConfig)
{
if (tuningConfig instanceof ParallelIndexTuningConfig) {
return (ParallelIndexTuningConfig) tuningConfig;
} else if (tuningConfig instanceof IndexTuningConfig) {
final IndexTuningConfig indexTuningConfig = (IndexTuningConfig) tuningConfig;
return new ParallelIndexTuningConfig(
null,
indexTuningConfig.getMaxRowsPerSegment(),
indexTuningConfig.getMaxRowsPerSegment(),
indexTuningConfig.getMaxBytesInMemory(),
indexTuningConfig.getMaxTotalRows(),
indexTuningConfig.getNumShards(),
null,
indexTuningConfig.getPartitionsSpec(),
indexTuningConfig.getIndexSpec(),
indexTuningConfig.getIndexSpecForIntermediatePersists(),
indexTuningConfig.getMaxPendingPersists(),
indexTuningConfig.isForceGuaranteedRollup(),
indexTuningConfig.isReportParseExceptions(),
indexTuningConfig.getPushTimeout(),
indexTuningConfig.getSegmentWriteOutMediumFactory(),
null,
null,
null,
null,
null,
null,
null,
null,
indexTuningConfig.isLogParseExceptions(),
indexTuningConfig.getMaxParseExceptions(),
indexTuningConfig.getMaxSavedParseExceptions()
);
} else {
throw new ISE(
"Unknown tuningConfig type: [%s], Must be either [%s] or [%s]",
tuningConfig.getClass().getName(),
ParallelIndexTuningConfig.class.getName(),
IndexTuningConfig.class.getName()
);
}
}

@JsonProperty
public CompactionIOConfig getIoConfig()
{
Expand Down Expand Up @@ -848,7 +895,7 @@ public static class Builder
@Nullable
private Granularity segmentGranularity;
@Nullable
private ParallelIndexTuningConfig tuningConfig;
private TuningConfig tuningConfig;
@Nullable
private Map<String, Object> context;

Expand Down Expand Up @@ -911,7 +958,7 @@ public Builder segmentGranularity(Granularity segmentGranularity)
return this;
}

public Builder tuningConfig(ParallelIndexTuningConfig tuningConfig)
public Builder tuningConfig(TuningConfig tuningConfig)
{
this.tuningConfig = tuningConfig;
return this;
Expand Down
Expand Up @@ -1171,7 +1171,6 @@ public boolean isAppendToExisting()
}
}

@JsonTypeName("index")
public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig
{
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
Expand Down
Expand Up @@ -21,7 +21,6 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.indexer.partitions.PartitionsSpec;
Expand All @@ -35,7 +34,6 @@
import javax.annotation.Nullable;
import java.util.Objects;

@JsonTypeName("index_parallel")
public class ParallelIndexTuningConfig extends IndexTuningConfig
{
private static final int DEFAULT_MAX_NUM_CONCURRENT_SUB_TASKS = 1;
Expand Down
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.introspect.AnnotationIntrospectorPair;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import org.apache.druid.client.coordinator.CoordinatorClient;
import org.apache.druid.client.indexing.ClientCompactQuery;
Expand All @@ -36,6 +37,7 @@
import org.apache.druid.indexing.common.SegmentLoaderFactory;
import org.apache.druid.indexing.common.TestUtils;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
import org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.segment.IndexSpec;
Expand Down Expand Up @@ -168,6 +170,7 @@ private static ObjectMapper setupInjectablesInObjectMapper(ObjectMapper objectMa
)
);
objectMapper.setInjectableValues(injectableValues);
objectMapper.registerSubtypes(new NamedType(ParallelIndexTuningConfig.class, "index_parallel"));
return objectMapper;
}
}

0 comments on commit d541cbe

Please sign in to comment.