Skip to content

Commit

Permalink
Fixed issue where a c.p.Merge could hide the streamed/accumulated nat…
Browse files Browse the repository at this point in the history
…ure of a stream when leading to a c.p.Group pipe. This could result in duplicate data passed to the c.p.GroupBy or c.p.CoGroup within a MapReduce job.
  • Loading branch information
cwensel committed Apr 21, 2014
1 parent eee173c commit e8d271f
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CHANGES.txt
Expand Up @@ -2,6 +2,9 @@ Cascading Change Log

2.5.4 [unreleased]

Fixed issue where a c.p.Merge could hide the streamed/accumulated nature of a stream when leading to a c.p.Group
pipe. This could result in duplicate data passed to the c.p.GroupBy or c.p.CoGroup within a MapReduce job.

Fixed issue where c.p.a.FirstBy only accepted a single field name.

Updated c.t.p.PartitionCollector in c.t.p.BasePartitionTap to be public.
Expand Down
Expand Up @@ -88,7 +88,7 @@ public static List<GraphPath<FlowElement, Scope>> getAllDirectPathsBetween( Simp
{
List<FlowElement> pathVertexList = Graphs.getPathVertexList( path );

for( int i = 1; i < pathVertexList.size(); i++ ) // skip the from, its a Tap or Group
for( int i = 1; i < pathVertexList.size() - 1; i++ ) // skip the from and to, its a Tap or Group
{
FlowElement flowElement = pathVertexList.get( i );

Expand Down
Expand Up @@ -818,6 +818,31 @@ else if( flowElement instanceof HashJoin )
}
else if( flowElement instanceof Tap || flowElement instanceof Group )
{
// added for JoinFieldedPipesPlatformTest.testJoinMergeGroupBy where Merge hides streamed nature of path
if( flowElement instanceof Group && !joins.isEmpty() )
{
List<Splice> splices = new ArrayList<Splice>( );

splices.addAll( merges );
splices.add( (Splice) flowElement );

Collections.reverse( splices );

for( Splice splice : splices )
{
Map<Integer, Integer> pathCounts = countOrderedDirectPathsBetween( elementGraph, lastSourceElement, splice, true );

if( isBothAccumulatedAndStreamedPath( pathCounts ) )
{
tapInsertions.add( (Pipe) flowElements.get( flowElements.indexOf( splice ) - 1 ) );
break;
}
}

if( !tapInsertions.isEmpty() )
break;
}

for( int j = 0; j < joins.size(); j++ )
{
HashJoin join = joins.get( j );
Expand Down
Expand Up @@ -21,6 +21,8 @@
package cascading;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -29,8 +31,11 @@
import java.util.Set;

import cascading.flow.Flow;
import cascading.flow.FlowDef;
import cascading.operation.Aggregator;
import cascading.operation.Function;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.aggregator.First;
import cascading.operation.expression.ExpressionFunction;
import cascading.operation.regex.RegexFilter;
Expand All @@ -40,6 +45,7 @@
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.HashJoin;
import cascading.pipe.Merge;
import cascading.pipe.Pipe;
import cascading.pipe.joiner.InnerJoin;
import cascading.pipe.joiner.Joiner;
Expand Down Expand Up @@ -1727,4 +1733,80 @@ public void testJoinNone() throws Exception
assertTrue( values.contains( new Tuple( "1\ta\t2\tB" ) ) );
assertTrue( values.contains( new Tuple( "2\tb\t2\tB" ) ) );
}

/**
* When run against a cluster a Merge before a GroupBy can hide the streamed/accumulated nature of a branch.
*
* commented code is for troubleshooting.
* @throws Exception
*/
@Test
public void testJoinMergeGroupBy() throws Exception
{
getPlatform().copyFromLocal( inputFileNums10 );
getPlatform().copyFromLocal( inputFileNums20 );

Tap lhsTap = getPlatform().getTextFile( new Fields( "id" ), inputFileNums10 );
Tap rhsTap = getPlatform().getTextFile( new Fields( "id2" ), inputFileNums20 );

Pipe lhs = new Pipe( "lhs" );
Pipe rhs = new Pipe( "rhs" );

// Pipe joined = new CoGroup( messages, new Fields( "id" ), people, new Fields( "id2" ) );
Pipe joined = new HashJoin( lhs, new Fields( "id" ), rhs, new Fields( "id2" ) );

Pipe pruned = new Each( joined, new Fields( "id2" ), new Identity(), Fields.RESULTS );
// pruned = new Checkpoint( pruned );
Pipe merged = new Merge( pruned, rhs );
Pipe grouped = new GroupBy( merged, new Fields( "id2" ) );
// Pipe grouped = new GroupBy( Pipe.pipes( pruned, people ), new Fields( "id2" ) );
Aggregator count = new Count( new Fields( "count" ) );
Pipe counted = new Every( grouped, count );

String testJoinMerge = "testJoinMergeGroupBy/" + ( ( joined instanceof CoGroup ) ? "cogroup" : "hashjoin" );
Tap sink = getPlatform().getDelimitedFile( Fields.ALL, true, "\t", null, getOutputPath( testJoinMerge ), SinkMode.REPLACE );

FlowDef flowDef = FlowDef.flowDef()
.addSource( rhs, rhsTap )
.addSource( lhs, lhsTap )
.addTailSink( counted, sink );

Flow flow = getPlatform().getFlowConnector().connect( flowDef );

flow.writeDOT( "joinmerge.dot" );
flow.writeStepsDOT( "joinmerge-steps.dot" );

flow.complete();

validateLength( flow, 20 );

List<Tuple> values = getSinkAsList( flow );
List<Tuple> expected = new ArrayList<Tuple>();

expected.add( new Tuple( "1", "2" ) );
expected.add( new Tuple( "10", "2" ) );
expected.add( new Tuple( "11", "1" ) );
expected.add( new Tuple( "12", "1" ) );
expected.add( new Tuple( "13", "1" ) );
expected.add( new Tuple( "14", "1" ) );
expected.add( new Tuple( "15", "1" ) );
expected.add( new Tuple( "16", "1" ) );
expected.add( new Tuple( "17", "1" ) );
expected.add( new Tuple( "18", "1" ) );
expected.add( new Tuple( "19", "1" ) );
expected.add( new Tuple( "2", "2" ) );
expected.add( new Tuple( "20", "1" ) );
expected.add( new Tuple( "3", "2" ) );
expected.add( new Tuple( "4", "2" ) );
expected.add( new Tuple( "5", "2" ) );
expected.add( new Tuple( "6", "2" ) );
expected.add( new Tuple( "7", "2" ) );
expected.add( new Tuple( "8", "2" ) );
expected.add( new Tuple( "9", "2" ) );

Collections.sort(values);
Collections.sort(expected);

assertEquals( expected, values );
}
}

0 comments on commit e8d271f

Please sign in to comment.