Skip to content

Commit

Permalink
Fixed issue on Apache Tez where a split before and subsequent splicin…
Browse files Browse the repository at this point in the history
…g back into a c.p.HashJoin could create an invalid plan.
  • Loading branch information
cwensel committed Jul 14, 2015
1 parent f75648c commit 2d2d600
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Expand Up @@ -2,6 +2,9 @@ Cascading Change Log

3.0.2 [unreleased]

Fixed issue on Apache Tez where a split before and subsequent splicing back into a c.p.HashJoin could create an
invalid plan.

Fixed issue with c.t.u.TupleHasher#ObjectHasher not being serializable.

Fixed issue where an unreachable YARN timeline server could cause the application to fail.
Expand Down
Expand Up @@ -46,6 +46,7 @@
import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceHashJoinSameSourceTransformer;
import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceHashJoinToHashJoinTransformer;
import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceJoinSplitTransformer;
import cascading.flow.tez.planner.rule.transformer.BoundaryBalanceSplitToStreamedHashJoinTransformer;
import cascading.flow.tez.planner.rule.transformer.RemoveMalformedHashJoinNodeTransformer;

/**
Expand Down Expand Up @@ -74,6 +75,7 @@ public HashJoinHadoop2TezRuleRegistry()
addRule( new BoundaryBalanceCheckpointTransformer() );

// hash join
addRule( new BoundaryBalanceSplitToStreamedHashJoinTransformer() ); // testGroupBySplitGroupByJoin
addRule( new BoundaryBalanceHashJoinSameSourceTransformer() );
addRule( new BoundaryBalanceHashJoinToHashJoinTransformer() ); // force HJ into unique nodes
addRule( new BoundaryBalanceGroupBlockingHashJoinTransformer() ); // joinAfterEvery
Expand Down
@@ -0,0 +1,65 @@
/*
* Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
*
* Project and contact information: http://www.cascading.org/
*
* This file is part of the Cascading project.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cascading.flow.tez.planner.rule.expression;

import cascading.flow.planner.iso.expression.ElementCapture;
import cascading.flow.planner.iso.expression.ExpressionGraph;
import cascading.flow.planner.iso.expression.FlowElementExpression;
import cascading.flow.planner.iso.expression.PathScopeExpression;
import cascading.flow.planner.iso.expression.TypeExpression;
import cascading.flow.planner.rule.RuleExpression;
import cascading.pipe.Boundary;
import cascading.pipe.HashJoin;
import cascading.pipe.Pipe;

import static cascading.flow.planner.iso.expression.AndElementExpression.and;
import static cascading.flow.planner.iso.expression.NotElementExpression.not;

/**
*
*/
public class BalanceSplitToStreamedHashJoinExpression extends RuleExpression
{
private static final FlowElementExpression SHARED_SPLIT = new FlowElementExpression( Pipe.class, TypeExpression.Topo.Split );
public static final FlowElementExpression SHARED_HASHJOIN = new FlowElementExpression( HashJoin.class );

public BalanceSplitToStreamedHashJoinExpression()
{
super(
new ExpressionGraph(
and(
ElementCapture.Primary,
not( and( new FlowElementExpression( Pipe.class, TypeExpression.Topo.Split ), not( new FlowElementExpression( Boundary.class ) ) ) ),
not( new FlowElementExpression( HashJoin.class ) ) ) ),

new ExpressionGraph()
.arcs( SHARED_SPLIT, SHARED_HASHJOIN )
.arcs( SHARED_SPLIT, SHARED_HASHJOIN ),

new ExpressionGraph()
.arc(
and( ElementCapture.Secondary, new FlowElementExpression( Pipe.class ), not( new FlowElementExpression( Boundary.class ) ) ),
PathScopeExpression.NON_BLOCKING,
new FlowElementExpression( ElementCapture.Primary, HashJoin.class )
)
);
}
}
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2007-2015 Concurrent, Inc. All Rights Reserved.
*
* Project and contact information: http://www.cascading.org/
*
* This file is part of the Cascading project.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cascading.flow.tez.planner.rule.transformer;

import cascading.flow.planner.iso.expression.ElementCapture;
import cascading.flow.planner.rule.transformer.BoundaryElementFactory;
import cascading.flow.planner.rule.transformer.RuleInsertionTransformer;
import cascading.flow.tez.planner.rule.expression.BalanceSplitToStreamedHashJoinExpression;

import static cascading.flow.planner.rule.PlanPhase.BalanceAssembly;

/**
* Inserts Boundary after split that joins back into a HashJoin.
* <p/>
* this allows testGroupBySplitGroupByJoin to pass, and simplifies testJoinSamePipeAroundGroupBy by preventing
* the upstream operations to duplicate processing in parallel nodes
*/
public class BoundaryBalanceSplitToStreamedHashJoinTransformer extends RuleInsertionTransformer
{
public BoundaryBalanceSplitToStreamedHashJoinTransformer()
{
super(
BalanceAssembly,
new BalanceSplitToStreamedHashJoinExpression(),
ElementCapture.Secondary,
BoundaryElementFactory.BOUNDARY_PIPE
);
}
}
Expand Up @@ -2153,4 +2153,44 @@ public void testJoinSplitBeforeJoin() throws Exception
assertTrue( values.contains( new Tuple( "1\ta\t1\tA\t1\tA" ) ) );
assertTrue( values.contains( new Tuple( "1\ta\t1\tB\t1\tB" ) ) );
}

@Test
public void testGroupBySplitGroupByJoin() throws Exception
{
getPlatform().copyFromLocal( inputFileLower );

Tap source = getPlatform().getTextFile( new Fields( "offset", "line" ), inputFileLower );

Tap sink = getPlatform().getTextFile( new Fields( "line" ), getOutputPath( "sink" ), SinkMode.REPLACE );

Function splitter = new RegexSplitter( new Fields( "num", "char" ), " " );

Pipe pipeFirst = new Pipe( "first" );
pipeFirst = new Each( pipeFirst, new Fields( "line" ), splitter );
pipeFirst = new GroupBy( pipeFirst, new Fields( "num" ) );
pipeFirst = new Every( pipeFirst, new Fields( "char" ), new First( new Fields( "firstFirst" ) ), Fields.ALL );

Pipe pipeSecond = new Pipe( "second", pipeFirst );
pipeSecond = new Each( pipeSecond, new Identity() );
pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) );
pipeSecond = new Every( pipeSecond, new Fields( "firstFirst" ), new First( new Fields( "secondFirst" ) ), Fields.ALL );
pipeSecond = new GroupBy( pipeSecond, new Fields( "num" ) );
pipeSecond = new Every( pipeSecond, new Fields( "secondFirst" ), new First( new Fields( "thirdFirst" ) ), Fields.ALL );

Pipe splice = new HashJoin( pipeFirst, new Fields( "num" ), pipeSecond, new Fields( "num" ), Fields.size( 4 ) );

Flow flow = getPlatform().getFlowConnector().connect( source, sink, splice );

flow.complete();

validateLength( flow, 5, null );

List<Tuple> values = getSinkAsList( flow );

assertTrue( values.contains( new Tuple( "1\ta\t1\ta" ) ) );
assertTrue( values.contains( new Tuple( "2\tb\t2\tb" ) ) );
assertTrue( values.contains( new Tuple( "3\tc\t3\tc" ) ) );
assertTrue( values.contains( new Tuple( "4\td\t4\td" ) ) );
assertTrue( values.contains( new Tuple( "5\te\t5\te" ) ) );
}
}

0 comments on commit 2d2d600

Please sign in to comment.