Skip to content

Commit

Permalink
Merge pull request #12227 from cdapio/feature_release/CDAP-16856-mr-a…
Browse files Browse the repository at this point in the history
…gg-impl

CDAP-16856 mapreduce impl
  • Loading branch information
yaojiefeng committed May 29, 2020
2 parents 0000d65 + b542eb8 commit 2bbfd20
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 5 deletions.
Expand Up @@ -83,20 +83,25 @@ public static void setupTest() throws Exception {

@Test
public void testSimpleAggregator() throws Exception {
testSimpleAggregator(Engine.MAPREDUCE);
testSimpleAggregator(Engine.SPARK);
}

private void testSimpleAggregator(Engine engine) throws Exception {
Schema inputSchema = Schema.recordOf(
"input",
Schema.Field.of("id", Schema.of(Schema.Type.INT)),
Schema.Field.of("name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("used_name", Schema.of(Schema.Type.STRING)));
String userInput = "inputSource";
String output = "outputSink";
String userInput = "inputSource-" + engine;
String output = "outputSink-" + engine;
ETLBatchConfig config = ETLBatchConfig.builder()
.addStage(new ETLStage("users", MockSource.getPlugin(userInput, inputSchema)))
.addStage(new ETLStage("aggregator", DistinctReducibleAggregator.getPlugin("id,name")))
.addStage(new ETLStage("sink", MockSink.getPlugin(output)))
.addConnection("users", "aggregator")
.addConnection("aggregator", "sink")
.setEngine(Engine.SPARK)
.setEngine(engine)
.build();

AppRequest<ETLBatchConfig> appRequest = new AppRequest<>(APP_ARTIFACT, config);
Expand Down Expand Up @@ -136,7 +141,11 @@ public void testSimpleAggregator() throws Exception {

@Test
public void testFieldCountAgg() throws Exception {
Engine engine = Engine.SPARK;
testFieldCountAgg(Engine.MAPREDUCE);
testFieldCountAgg(Engine.SPARK);
}

private void testFieldCountAgg(Engine engine) throws Exception {
String source1Name = "pAggInput1-" + engine.name();
String source2Name = "pAggInput2-" + engine.name();
String sink1Name = "pAggOutput1-" + engine.name();
Expand Down
Expand Up @@ -41,6 +41,7 @@
import io.cdap.cdap.etl.api.batch.BatchAutoJoiner;
import io.cdap.cdap.etl.api.batch.BatchJoiner;
import io.cdap.cdap.etl.api.batch.BatchJoinerRuntimeContext;
import io.cdap.cdap.etl.api.batch.BatchReducibleAggregator;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;
import io.cdap.cdap.etl.api.join.JoinCondition;
import io.cdap.cdap.etl.api.join.JoinDefinition;
Expand Down Expand Up @@ -70,6 +71,7 @@
import io.cdap.cdap.etl.common.TrackedMultiOutputTransform;
import io.cdap.cdap.etl.common.TrackedTransform;
import io.cdap.cdap.etl.common.TransformExecutor;
import io.cdap.cdap.etl.common.plugin.AggregatorBridge;
import io.cdap.cdap.etl.common.plugin.JoinerBridge;
import io.cdap.cdap.etl.proto.v2.spec.StageSpec;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -163,7 +165,15 @@ private <IN, OUT> TrackedTransform<IN, OUT> getTransformation(StageSpec stageSpe
StageStatisticsCollector collector = isPipelineContainsCondition ?
new MapReduceStageStatisticsCollector(stageName, taskAttemptContext) : new NoopStageStatisticsCollector();
if (BatchAggregator.PLUGIN_TYPE.equals(pluginType)) {
BatchAggregator<?, ?, ?> batchAggregator = pluginInstantiator.newPluginInstance(stageName, macroEvaluator);
Object plugin = pluginInstantiator.newPluginInstance(stageName, macroEvaluator);
BatchAggregator<?, ?, ?> batchAggregator;
if (plugin instanceof BatchReducibleAggregator) {
BatchReducibleAggregator<?, ?, ?, ?> reducibleAggregator = (BatchReducibleAggregator<?, ?, ?, ?>) plugin;
batchAggregator = new AggregatorBridge<>(reducibleAggregator);
} else {
batchAggregator = (BatchAggregator<?, ?, ?>) plugin;
}

BatchRuntimeContext runtimeContext = createRuntimeContext(stageSpec);
batchAggregator.initialize(runtimeContext);
if (isMapPhase) {
Expand Down
@@ -0,0 +1,95 @@
/*
* Copyright © 2020 Cask Data, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package io.cdap.cdap.etl.common.plugin;

import io.cdap.cdap.etl.api.Emitter;
import io.cdap.cdap.etl.api.PipelineConfigurer;
import io.cdap.cdap.etl.api.batch.BatchAggregator;
import io.cdap.cdap.etl.api.batch.BatchAggregatorContext;
import io.cdap.cdap.etl.api.batch.BatchReducibleAggregator;
import io.cdap.cdap.etl.api.batch.BatchRuntimeContext;

import java.util.Iterator;

/**
* An implementation of {@link BatchAggregator} using a {@link BatchReducibleAggregator}.
*
* @param <GROUP_KEY> Type of group key
* @param <GROUP_VALUE> Type of values to group
* @param <AGG_VALUE> Type of agg values to group
* @param <OUT> Type of output object
*/
public class AggregatorBridge<GROUP_KEY, GROUP_VALUE, AGG_VALUE, OUT>
extends BatchAggregator<GROUP_KEY, GROUP_VALUE, OUT> {

private final BatchReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGG_VALUE, OUT> aggregator;

public AggregatorBridge(BatchReducibleAggregator<GROUP_KEY, GROUP_VALUE, AGG_VALUE, OUT> aggregator) {
this.aggregator = aggregator;
}

@Override
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
aggregator.configurePipeline(pipelineConfigurer);
}

@Override
public void prepareRun(BatchAggregatorContext context) throws Exception {
aggregator.prepareRun(context);
}

@Override
public void initialize(BatchRuntimeContext context) throws Exception {
aggregator.initialize(context);
}

@Override
public void destroy() {
aggregator.destroy();
}

@Override
public void onRunFinish(boolean succeeded, BatchAggregatorContext context) {
aggregator.onRunFinish(succeeded, context);
}

@Override
public void groupBy(GROUP_VALUE groupValue, Emitter<GROUP_KEY> emitter) throws Exception {
aggregator.groupBy(groupValue, emitter);
}

@Override
public void aggregate(GROUP_KEY groupKey, Iterator<GROUP_VALUE> groupValues, Emitter<OUT> emitter) throws Exception {
// this condition should not happen, since the group key is generated with the groupBy with existing input, so the
// iterator will at least contain that value
if (!groupValues.hasNext()) {
return;
}

// create first agg value
AGG_VALUE aggVal = aggregator.initializeAggregateValue(groupValues.next());

// loop the iterator to combine the values, here the mergePartitions will not be used since the iterator already
// has all the values with the key from all partitions
while (groupValues.hasNext()) {
aggVal = aggregator.mergeValues(aggVal, groupValues.next());
}

// after we get the final aggVal, we can finalize the result
aggregator.finalize(groupKey, aggVal, emitter);
}
}

0 comments on commit 2bbfd20

Please sign in to comment.