Permalink
Browse files

version 1.0.12

  • Loading branch information...
2 parents 9c180cf + fcca330 commit 3e61900414fcdeaf73e1d4bcdfcbc8c1b8ffca89 @cwensel committed Jun 13, 2009
View
@@ -1,5 +1,16 @@
Cascading Change Log
+1.0.12
+
+ Fixed bug where the c.f.FlowPlanner did not detect that tails were not bound to sinks, or that some tail references
+ were missing.
+
+ Fixed j.u.ConcurrentModificationException when using a c.c.CascadeConnector on c.f.Flows using a c.t.MultiSink
+ c.t.Tap.
+
+ Fixed bug where c.f.s.StackException was being wrapped preventing failures within sink c.t.Tap instances from
+ causing the c.f.Flow to fail. This mainly affected Flows using traps.
+
1.0.11
Added clearer error message when c.t.Tap is used as both source and sink in a given Flow.
@@ -21,13 +21,6 @@
package cascading.cascade;
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
import cascading.flow.Flow;
import cascading.tap.MultiTap;
import cascading.tap.Tap;
@@ -41,6 +34,14 @@
import org.jgrapht.graph.SimpleDirectedGraph;
import org.jgrapht.traverse.TopologicalOrderIterator;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Set;
+
/**
* Class CascadeConnector is used to construct a new {@link Cascade} instance from a collection of {@link Flow} instance.
* <p/>
@@ -112,16 +113,21 @@ private void makeTapGraph( SimpleDirectedGraph<Tap, Flow.FlowHolder> tapGraph, F
{
for( Flow flow : flows )
{
- Collection<Tap> sources = new HashSet<Tap>( flow.getSources().values() );
+ LinkedList<Tap> sources = new LinkedList<Tap>( flow.getSources().values() );
Collection<Tap> sinks = flow.getSinks().values();
// account for MultiTap sources
- for( Tap source : sources )
+ ListIterator<Tap> iterator = sources.listIterator();
+ while( iterator.hasNext() )
{
+ Tap source = iterator.next();
+
if( source instanceof MultiTap )
{
- sources.remove( source );
- Collections.addAll( sources, ( (MultiTap) source ).getTaps() );
+ iterator.remove();
+
+ for( Tap tap : ( (MultiTap) source ).getTaps() )
+ iterator.add( tap );
}
}
@@ -21,16 +21,6 @@
package cascading.flow;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.Writer;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import cascading.operation.AssertionLevel;
import cascading.pipe.Every;
import cascading.pipe.Group;
@@ -50,6 +40,16 @@
import org.jgrapht.traverse.DepthFirstIterator;
import org.jgrapht.traverse.TopologicalOrderIterator;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
/** Class ElementGraph represents the executable FlowElement graph. */
public class ElementGraph extends SimpleDirectedGraph<FlowElement, Scope>
{
@@ -169,7 +169,16 @@ private void addExtents( Map<String, Tap> sources, Map<String, Tap> sinks )
for( String sink : sinks.keySet() )
{
- Scope scope = addEdge( sinks.get( sink ), tail );
+ Scope scope = null;
+
+ try
+ {
+ scope = addEdge( sinks.get( sink ), tail );
+ }
+ catch( IllegalArgumentException exception )
+ {
+ throw new ElementGraphException( "missing pipe for sink tap: [" + sink + "]" );
+ }
if( scope == null )
throw new ElementGraphException( "cannot sink to the same path from multiple branches: [" + Util.join( sinks.values() ) + "]" );
@@ -28,6 +28,7 @@
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.tap.Tap;
+import cascading.util.Util;
import org.apache.log4j.Logger;
import org.jgrapht.GraphPath;
import org.jgrapht.Graphs;
@@ -158,14 +159,21 @@ protected void verifyPipeAssemblyEndPoints( Map<String, Tap> sources, Map<String
throw new PlannerException( pipe, "pipe name not found in either sink or source map: " + tailName );
if( tailNames.contains( tailName ) && !tails.contains( pipe ) )
- LOG.warn( "duplicate tail name found: " + tailName );
+ LOG.warn( "duplicate tail name found, not an error but tails should have unique names: " + tailName );
// throw new PlannerException( pipe, "duplicate tail name found: " + tailName );
tailNames.add( tailName );
tails.add( pipe );
}
}
+ tailNames.removeAll( sinks.keySet() );
+ Set<String> remainingSinks = new HashSet<String>( sinks.keySet() );
+ remainingSinks.removeAll( tailNames );
+
+ if( tailNames.size() != 0 )
+ throw new PlannerException( "not all tail pipes bound to sink taps, remaining tail pipe names: [" + Util.join( tailNames, ", " ) + "], remaining sinks: [" + Util.join( remainingSinks, ", " ) + "]" );
+
// handle heads
Set<Pipe> heads = new HashSet<Pipe>();
Set<String> headNames = new HashSet<String>();
@@ -180,13 +188,21 @@ protected void verifyPipeAssemblyEndPoints( Map<String, Tap> sources, Map<String
throw new PlannerException( head, "pipe name not found in either sink or source map: " + headName );
if( headNames.contains( headName ) && !heads.contains( head ) )
- LOG.warn( "duplicate head name found: " + headName );
+ LOG.warn( "duplicate tail name found, not an error but heads should have unique names: " + headName );
// throw new PlannerException( pipe, "duplicate head name found: " + headName );
headNames.add( headName );
heads.add( head );
}
}
+
+ headNames.removeAll( sources.keySet() );
+ Set<String> remainingSources = new HashSet<String>( sources.keySet() );
+ remainingSources.removeAll( headNames );
+
+ if( headNames.size() != 0 )
+ throw new PlannerException( "not all head pipes bound to source taps, remaining head pipe names: [" + Util.join( headNames, ", " ) + "], remaining sources: [" + Util.join( remainingSources, ", " ) + "]" );
+
}
protected void verifyTraps( Map<String, Tap> traps, Pipe[] pipes, Map<String, Tap> sources, Map<String, Tap> sinks )
@@ -33,7 +33,13 @@
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntryIterator;
import cascading.tuple.TuplePair;
-import cascading.tuple.hadoop.*;
+import cascading.tuple.hadoop.GroupingComparator;
+import cascading.tuple.hadoop.GroupingPartitioner;
+import cascading.tuple.hadoop.ReverseTupleComparator;
+import cascading.tuple.hadoop.ReverseTuplePairComparator;
+import cascading.tuple.hadoop.TupleComparator;
+import cascading.tuple.hadoop.TuplePairComparator;
+import cascading.tuple.hadoop.TupleSerialization;
import cascading.util.Util;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
@@ -44,7 +50,12 @@
import java.io.IOException;
import java.io.Serializable;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -364,7 +375,7 @@ private void cleanTap( JobConf jobConf, Tap tap )
}
catch( IOException exception )
{
- logWarn( "unable to remove temporary path: " + this.sink, exception );
+ // ignore exception
}
}
@@ -21,8 +21,6 @@
package cascading.flow.stack;
-import java.io.IOException;
-
import cascading.CascadingException;
import cascading.flow.FlowElement;
import cascading.flow.FlowException;
@@ -33,6 +31,8 @@
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
+import java.io.IOException;
+
/**
*
*/
@@ -69,11 +69,11 @@ private void operateGroup( TupleEntry tupleEntry )
}
catch( IOException exception )
{
- throw new FlowException( "failed writing output", exception );
+ throw new StackException( "failed writing output", exception );
}
catch( OutOfMemoryError error )
{
- throw new FlowException( "out of memory, try increasing task memory allocation", error );
+ throw new StackException( "out of memory, try increasing task memory allocation", error );
}
catch( Throwable throwable )
{
@@ -23,9 +23,7 @@
import cascading.CascadingException;
-/**
- *
- */
+/** Class StackException is an Exception holder that wraps a fatal exception within the pipeline stack. */
public class StackException extends CascadingException
{
public StackException()
@@ -22,8 +22,13 @@
package cascading.flow.stack;
import cascading.CascadingException;
-import cascading.flow.*;
+import cascading.flow.FlowElement;
+import cascading.flow.FlowException;
+import cascading.flow.FlowProcess;
+import cascading.flow.Scope;
+import cascading.flow.StepCounters;
import cascading.tap.Tap;
+import cascading.tap.TapException;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
@@ -79,7 +84,15 @@ private void operateSink( TupleEntry tupleEntry )
}
catch( OutOfMemoryError error )
{
- throw new FlowException( "out of memory, try increasing task memory allocation", error );
+ throw new StackException( "out of memory, try increasing task memory allocation", error );
+ }
+ catch( IOException exception )
+ {
+ throw new StackException( "io exception writing to tap: " + sink.toString(), exception );
+ }
+ catch( TapException exception )
+ {
+ throw new StackException( "exception writing to tap: " + sink.toString(), exception );
}
catch( Throwable throwable )
{
@@ -22,8 +22,13 @@
package cascading.flow.stack;
import cascading.CascadingException;
-import cascading.flow.*;
+import cascading.flow.FlowElement;
+import cascading.flow.FlowException;
+import cascading.flow.FlowProcess;
+import cascading.flow.Scope;
+import cascading.flow.StepCounters;
import cascading.tap.Tap;
+import cascading.tap.TapException;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryCollector;
@@ -70,6 +75,11 @@ private void operateSink( Tuple key, Iterator values )
operateSink( (TupleEntry) values.next() );
}
+ /**
+ * Throws a StackException to flag a hard failure
+ *
+ * @param tupleEntry
+ */
private void operateSink( TupleEntry tupleEntry )
{
try
@@ -88,7 +98,15 @@ private void operateSink( TupleEntry tupleEntry )
}
catch( OutOfMemoryError error )
{
- throw new FlowException( "out of memory, try increasing task memory allocation", error );
+ throw new StackException( "out of memory, try increasing task memory allocation", error );
+ }
+ catch( IOException exception )
+ {
+ throw new StackException( "io exception writing to tap: " + sink.toString(), exception );
+ }
+ catch( TapException exception )
+ {
+ throw new StackException( "exception writing to tap: " + sink.toString(), exception );
}
catch( Throwable throwable )
{
@@ -21,11 +21,19 @@
package cascading.pipe;
+import cascading.CascadingException;
import cascading.flow.FlowCollector;
import cascading.flow.FlowElement;
import cascading.flow.FlowProcess;
import cascading.flow.Scope;
-import cascading.operation.*;
+import cascading.operation.Assertion;
+import cascading.operation.AssertionLevel;
+import cascading.operation.ConcreteCall;
+import cascading.operation.Filter;
+import cascading.operation.FilterCall;
+import cascading.operation.Function;
+import cascading.operation.FunctionCall;
+import cascading.operation.ValueAssertion;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
@@ -399,11 +407,11 @@ public void operate( FlowProcess flowProcess, TupleEntry input )
handle( flowProcess, input, arguments );
}
- catch( AssertionException exception )
+ catch( CascadingException exception )
{
throw exception;
}
- catch( Exception exception )
+ catch( Throwable exception )
{
throw new OperatorException( Each.this, "operator Each failed executing operation", exception );
}
Oops, something went wrong. Retry.

0 comments on commit 3e61900

Please sign in to comment.