Skip to content

Commit

Permalink
capture actual map/reduce begin/end execution times for a task
Browse files Browse the repository at this point in the history
  • Loading branch information
cwensel committed Mar 25, 2012
1 parent 3a14cab commit 88cc601
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/core/cascading/flow/StepCounters.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
/** Enum FlowCounters lists all counters */
public enum StepCounters
{
Tuples_Read, Tuples_Written, Tuples_Trapped
Tuples_Read, Tuples_Written, Tuples_Trapped, Process_Begin, Process_End
}
11 changes: 10 additions & 1 deletion src/hadoop/cascading/flow/hadoop/FlowMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import cascading.CascadingException;
import cascading.flow.FlowException;
import cascading.flow.FlowSession;
import cascading.flow.StepCounters;
import cascading.flow.stream.Duct;
import cascading.flow.stream.ElementDuct;
import cascading.flow.stream.SourceStage;
Expand Down Expand Up @@ -89,6 +90,7 @@ public void configure( JobConf jobConf )
public void run( RecordReader input, OutputCollector output, Reporter reporter ) throws IOException
{
currentProcess.setReporter( reporter );
currentProcess.increment( StepCounters.Process_Begin, System.currentTimeMillis() );
currentProcess.setOutputCollector( output );

streamGraph.prepare();
Expand Down Expand Up @@ -124,7 +126,14 @@ public void run( RecordReader input, OutputCollector output, Reporter reporter )
}
finally
{
streamGraph.cleanup();
try
{
streamGraph.cleanup();
}
finally
{
currentProcess.increment( StepCounters.Process_End, System.currentTimeMillis() );
}
}
}
}
21 changes: 16 additions & 5 deletions src/hadoop/cascading/flow/hadoop/FlowReducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import cascading.CascadingException;
import cascading.flow.FlowException;
import cascading.flow.FlowSession;
import cascading.flow.StepCounters;
import cascading.flow.stream.Duct;
import cascading.flow.stream.ElementDuct;
import cascading.tap.Tap;
Expand Down Expand Up @@ -100,6 +101,8 @@ public void reduce( Object key, Iterator values, OutputCollector output, Reporte

if( !calledPrepare )
{
currentProcess.increment( StepCounters.Process_Begin, System.currentTimeMillis() );

streamGraph.prepare();

calledPrepare = true;
Expand All @@ -123,13 +126,21 @@ public void reduce( Object key, Iterator values, OutputCollector output, Reporte
@Override
public void close() throws IOException
{
if( calledPrepare )
try
{
group.complete( group );
if( calledPrepare )
{
group.complete( group );

streamGraph.cleanup();
}
streamGraph.cleanup();
}

super.close();
super.close();
}
finally
{
if( currentProcess != null )
currentProcess.increment( StepCounters.Process_End, System.currentTimeMillis() );
}
}
}

0 comments on commit 88cc601

Please sign in to comment.