Permalink
Browse files

version 1.0.11

  • Loading branch information...
2 parents a935445 + 42d2c46 commit 9c180cf3ebc31de4511114effa301194f8c0a8c6 @cwensel committed May 22, 2009
View
@@ -1,5 +1,13 @@
Cascading Change Log
+1.0.11
+
+ Added clearer error message when c.t.Tap is used as both source and sink in a given Flow.
+
+ Demoted all DEBUG related c.t.Tuple#print() calls to TRACE.
+
+ Fixed NPE when planner finds inconsistencies with c.t.Tap and c.p.Pipe names.
+
1.0.10
Updated planner error messages when field name collisions detected.
@@ -21,14 +21,6 @@
package cascading.flow;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import cascading.operation.AssertionLevel;
import cascading.pipe.Each;
import cascading.pipe.Every;
@@ -40,6 +32,14 @@
import org.jgrapht.GraphPath;
import org.jgrapht.Graphs;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
/** Class FlowPlanner is the base class for all planner implementations. */
public class FlowPlanner
{
@@ -61,6 +61,8 @@
/** Must be called to determine if all elements of the base pipe assembly are available */
protected void verifyAssembly( Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps )
{
+ verifySourceNotSinks( sources, sinks );
+
verifyTaps( sources, true, true );
verifyTaps( sinks, false, true );
verifyTaps( traps, false, false );
@@ -75,6 +77,17 @@ protected ElementGraph createElementGraph( Pipe[] pipes, Map<String, Tap> source
return new ElementGraph( pipes, sources, sinks, traps, assertionLevel );
}
+ protected void verifySourceNotSinks( Map<String, Tap> sources, Map<String, Tap> sinks )
+ {
+ Collection<Tap> sourcesSet = sources.values();
+
+ for( Tap tap : sinks.values() )
+ {
+ if( sourcesSet.contains( tap ) )
+ throw new PlannerException( "tap may not be used as both source and sink in the same Flow: " + tap );
+ }
+ }
+
/**
* Method verifyTaps ...
*
@@ -90,9 +103,9 @@ protected void verifyTaps( Map<String, Tap> taps, boolean areSources, boolean ma
for( String tapName : taps.keySet() )
{
if( areSources && !taps.get( tapName ).isSource() )
- throw new PlannerException( "tap named: " + tapName + " is not a source: " + taps.get( tapName ) );
+ throw new PlannerException( "tap named: " + tapName + ", cannot be used as a source: " + taps.get( tapName ) );
else if( !areSources && !taps.get( tapName ).isSink() )
- throw new PlannerException( "tap named: " + tapName + " is not a sink: " + taps.get( tapName ) );
+ throw new PlannerException( "tap named: " + tapName + ", cannot be used as a sink: " + taps.get( tapName ) );
}
}
@@ -21,15 +21,6 @@
package cascading.flow;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import cascading.flow.hadoop.HadoopUtil;
import cascading.pipe.Every;
import cascading.pipe.Group;
@@ -44,6 +35,15 @@
import org.jgrapht.Graphs;
import org.jgrapht.alg.KShortestPaths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
/**
* Class MultiMapReducePlanner is the core Hadoop MapReduce planner.
* <p/>
@@ -206,6 +206,9 @@ public Flow buildFlow( String flowName, Pipe[] pipes, Map<String, Tap> sources,
{
Throwable cause = exception.getCause();
+ if( cause == null )
+ cause = exception;
+
// captures pipegraph for debugging
// forward message in case cause or trace is lost
String message = String.format( "could not build flow from assembly: [%s]", cause.getMessage() );
@@ -147,13 +147,16 @@ public void map( Object key, Object value, OutputCollector output ) throws IOExc
if( tuple == null )
LOG.debug( "map skipping key and value" );
- if( key instanceof Tuple )
- LOG.debug( "map key: " + ( (Tuple) key ).print() );
- else
- LOG.debug( "map key: [" + key + "]" );
-
- if( tuple != null )
- LOG.debug( "map value: " + tuple.print() );
+ if( LOG.isTraceEnabled() )
+ {
+ if( key instanceof Tuple )
+ LOG.trace( "map key: " + ( (Tuple) key ).print() );
+ else
+ LOG.trace( "map key: [" + key + "]" );
+
+ if( tuple != null )
+ LOG.trace( "map value: " + tuple.print() );
+ }
}
// skip the key/value pair if null is returned from the source
@@ -146,10 +146,10 @@ else if( operator instanceof Every && ( (Every) operator ).isBuffer() )
public void reduce( Object key, Iterator values, OutputCollector output ) throws IOException
{
- if( LOG.isDebugEnabled() )
+ if( LOG.isTraceEnabled() )
{
- LOG.debug( "reduce fields: " + stackHead.getOutGroupingFields() );
- LOG.debug( "reduce key: " + ( (Tuple) key ).print() );
+ LOG.trace( "reduce fields: " + stackHead.getOutGroupingFields() );
+ LOG.trace( "reduce key: " + ( (Tuple) key ).print() );
}
stackTail.setLastOutput( output );
@@ -21,18 +21,18 @@
package cascading.pipe.cogroup;
-import java.util.Iterator;
-
import cascading.tuple.Tuple;
import org.apache.log4j.Logger;
+import java.util.Iterator;
+
/**
* Class InnerJoin will return an {@link Iterator} that will iterate over a given {@link Joiner} and return tuples that represent
* and inner join of the CoGrouper internal grouped tuple collections.
*/
public class InnerJoin implements Joiner
{
- /** Field LOG */
+ /** Field LOG */
private static final Logger LOG = Logger.getLogger( InnerJoin.class );
public Iterator<Tuple> getIterator( GroupClosure closure )
@@ -138,8 +138,8 @@ private Tuple makeResult( Comparable[] lastValues )
for( Comparable lastValue : lastValues )
result.addAll( lastValue );
- if( LOG.isDebugEnabled() )
- LOG.debug( "tuple: " + result.print() );
+ if( LOG.isTraceEnabled() )
+ LOG.trace( "tuple: " + result.print() );
return result;
}
@@ -33,7 +33,11 @@
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexParser;
import cascading.operation.regex.RegexSplitter;
-import cascading.pipe.*;
+import cascading.pipe.CoGroup;
+import cascading.pipe.Each;
+import cascading.pipe.Every;
+import cascading.pipe.GroupBy;
+import cascading.pipe.Pipe;
import cascading.pipe.cogroup.InnerJoin;
import cascading.scheme.SequenceFile;
import cascading.scheme.TextLine;
@@ -46,7 +50,11 @@
import org.jgrapht.graph.SimpleDirectedGraph;
import java.io.IOException;
-import java.util.*;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
public class BuildJobsTest extends CascadingTestCase
{
@@ -1383,6 +1391,23 @@ private void splitMiddle( boolean before )
assertTrue( "not a TempHfs", operator instanceof TempHfs );
}
+ public void testSourceIsSink()
+ {
+ Tap tap = new Hfs( new TextLine( new Fields( "offset", "line" ) ), "foo/merge" );
+
+ Pipe pipe = new Pipe( "left" );
+
+ try
+ {
+ Flow flow = new FlowConnector().connect( tap, tap, pipe );
+// flow.writeDOT( "dupesource.dot" );
+ fail( "did not throw planner exception" );
+ }
+ catch( Exception exception )
+ {
+// exception.printStackTrace();
+ }
+ }
private int countDistance( SimpleDirectedGraph<FlowElement, Scope> graph, FlowElement lhs, FlowElement rhs )
{
View
@@ -1,2 +1,2 @@
-cascading.release.version=1.0.10
+cascading.release.version=1.0.11
cascading.hadoop.compatible.version=hadoop-0.18.3+

0 comments on commit 9c180cf

Please sign in to comment.