Skip to content

Commit

Permalink
Fixed issue on MapReduce that prevented counters from registering if …
Browse files Browse the repository at this point in the history
…incremented on a pipe assembly branch feeding the accumulated side of a c.p.HashJoin.
  • Loading branch information
cwensel committed Sep 14, 2016
1 parent b606d86 commit d0de40d
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 3 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Expand Up @@ -2,6 +2,9 @@ Cascading Change Log

3.1.2 [unreleased]

Fixed issue on MapReduce that prevented counters from registering if incremented on a pipe assembly branch feeding the
accumulated side of a c.p.HashJoin.

3.1.1

Fixed issue with the MapReduce planner where a c.p.Checkpoint after a c.p.GroupBy merge would fail the planner.
Expand Down
Expand Up @@ -46,6 +46,7 @@
import cascading.pipe.HashJoin;
import cascading.tap.Tap;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;

/**
*
Expand Down Expand Up @@ -85,7 +86,7 @@ protected void buildGraph()
// accumulated paths
for( Object source : tributaries )
{
HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess;
final HadoopFlowProcess hadoopProcess = (HadoopFlowProcess) flowProcess;
JobConf conf = hadoopProcess.getJobConf();

// allows client side config to be used cluster side
Expand All @@ -95,7 +96,16 @@ protected void buildGraph()
throw new IllegalStateException( "accumulated source conf property missing for: " + ( (Tap) source ).getIdentifier() );

conf = getSourceConf( hadoopProcess, conf, property );
flowProcess = new HadoopFlowProcess( hadoopProcess, conf );

// the reporter isn't provided until after the #run method is called
flowProcess = new HadoopFlowProcess( hadoopProcess, conf )
{
@Override
public Reporter getReporter()
{
return hadoopProcess.getReporter();
}
};

handleHead( (Tap) source, flowProcess );
}
Expand Down
Expand Up @@ -20,6 +20,7 @@

package cascading.stats;

import java.util.HashMap;
import java.util.Map;

import cascading.PlatformTestCase;
Expand All @@ -29,17 +30,20 @@
import cascading.flow.FlowConnector;
import cascading.flow.FlowRuntimeProps;
import cascading.flow.SliceCounters;
import cascading.operation.Function;
import cascading.operation.regex.RegexParser;
import cascading.operation.regex.RegexSplitter;
import cascading.operation.state.Counter;
import cascading.pipe.Each;
import cascading.pipe.GroupBy;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import org.junit.Test;

import static data.InputData.inputFileApache;
import static data.InputData.*;

/**
*
Expand Down Expand Up @@ -123,4 +127,51 @@ public void testStatsCounters() throws Exception

cascadeStats.captureDetail();
}

@Test
public void testStatsOnJoin() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );
getPlatform().copyFromLocal( inputFileUpper );

Tap sourceLower = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );
Tap sourceUpper = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileUpper );

Map sources = new HashMap();

sources.put( "lower", sourceLower );
sources.put( "upper", sourceUpper );

Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "join" ), SinkMode.REPLACE );

Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );

Pipe pipeLower = new Each( new Pipe( "lower" ), new Fields( "line" ), splitter );
pipeLower = new Each( pipeLower, new Counter( TestEnum.FIRST ) );

Pipe pipeUpper = new Each( new Pipe( "upper" ), new Fields( "line" ), splitter );

pipeUpper = new Each( pipeUpper, new Counter( TestEnum.SECOND ) );

Pipe splice = new HashJoin( pipeLower, new Fields( "num" ), pipeUpper, new Fields( "num" ), Fields.size( 4 ) );

Map<Object, Object> properties = getProperties();

Flow flow = getPlatform().getFlowConnector( properties ).connect( sources, sink, splice );

flow.complete();

validateLength( flow, 5 );

FlowStats flowStats = flow.getFlowStats();

assertNotNull( flowStats.getID() );

long firstCounter = flowStats.getCounterValue( TestEnum.FIRST );
long secondCounter = flowStats.getCounterValue( TestEnum.SECOND );

assertEquals( 5, firstCounter );
assertNotSame( 0, secondCounter ); // verifies accumulated side counters fired
assertEquals( firstCounter + secondCounter, flowStats.getCounterValue( SliceCounters.Tuples_Read ) );
}
}

0 comments on commit d0de40d

Please sign in to comment.