Skip to content

Commit

Permalink
added access to hadoop job id via HadoopStepStats
Browse files Browse the repository at this point in the history
  • Loading branch information
cwensel committed Mar 19, 2010
1 parent eefb042 commit f807ee1
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/core/cascading/cascade/Cascade.java
Expand Up @@ -86,7 +86,7 @@ public class Cascade implements Runnable
/** Field jobGraph */
private final SimpleDirectedGraph<Flow, Integer> jobGraph;
/** Field cascadeStats */
private CascadeStats cascadeStats = new CascadeStats();
private final CascadeStats cascadeStats;
/** Field thread */
private Thread thread;
/** Field throwable */
Expand All @@ -104,7 +104,7 @@ public class Cascade implements Runnable
{
this.name = name;
this.jobGraph = jobGraph;

this.cascadeStats = new CascadeStats( getID() );
setIDOnFlow();
}

Expand Down
8 changes: 6 additions & 2 deletions src/core/cascading/flow/Flow.java
Expand Up @@ -92,7 +92,7 @@ public class Flow implements Runnable
/** Field skipStrategy */
private FlowSkipStrategy flowSkipStrategy = new FlowSkipIfSinkStale();
/** Field flowStats */
private final FlowStats flowStats = new FlowStats(); // don't use a listener to set values
private final FlowStats flowStats; // don't use a listener to set values
/** Field sources */
private Map<String, Tap> sources;
/** Field sinks */
Expand Down Expand Up @@ -204,13 +204,16 @@ public static long getJobPollingInterval( JobConf jobConf )
/** Used for testing. */
protected Flow()
{
this.name = "NA";
this.flowStats = new FlowStats( getID() );
}

protected Flow( Map<Object, Object> properties, JobConf jobConf, String name, ElementGraph pipeGraph, StepGraph stepGraph, Map<String, Tap> sources, Map<String, Tap> sinks, Map<String, Tap> traps )
{
this.name = name;
this.pipeGraph = pipeGraph;
this.stepGraph = stepGraph;
this.flowStats = new FlowStats( getID() );
setJobConf( jobConf );
setSources( sources );
setSinks( sinks );
Expand All @@ -223,6 +226,7 @@ protected Flow( Map<Object, Object> properties, JobConf jobConf, String name, St
{
this.name = name;
this.stepGraph = stepGraph;
this.flowStats = new FlowStats( getID() );
setJobConf( jobConf );
setSources( sources );
setSinks( sinks );
Expand Down Expand Up @@ -269,7 +273,7 @@ protected void setName( String name )
* Method getID returns the ID of this Flow object.
* <p/>
* The ID value is a long HEX String used to identify this instance globally. Subsequent Flow
* instances created with identical paramers will not return the same ID.
* instances created with identical parameters will not return the same ID.
*
* @return the ID (type String) of this Flow object.
*/
Expand Down
10 changes: 10 additions & 0 deletions src/core/cascading/flow/FlowStep.java
Expand Up @@ -107,6 +107,16 @@ protected FlowStep( String name, int id )
this.id = id;
}

/**
* Method getId returns the id of this FlowStep object.
*
* @return the id (type int) of this FlowStep object.
*/
public int getID()
{
return id;
}

/**
* Method getName returns the name of this FlowStep object.
*
Expand Down
9 changes: 8 additions & 1 deletion src/core/cascading/flow/FlowStepJob.java
Expand Up @@ -63,7 +63,7 @@ public class FlowStepJob implements Callable<Throwable>
/** Field throwable */
protected Throwable throwable;

public FlowStepJob( FlowStep flowStep, String stepName, JobConf currentConf )
public FlowStepJob( final FlowStep flowStep, String stepName, JobConf currentConf )
{
this.flowStep = flowStep;
this.stepName = stepName;
Expand All @@ -75,6 +75,13 @@ public FlowStepJob( FlowStep flowStep, String stepName, JobConf currentConf )

stepStats = new HadoopStepStats()
{

@Override
public Object getID()
{
return flowStep.getID();
}

@Override
protected JobClient getJobClient()
{
Expand Down
5 changes: 5 additions & 0 deletions src/core/cascading/flow/hadoop/HadoopStepStats.java
Expand Up @@ -181,6 +181,11 @@ public void setNumReducerTasks( int numReducerTasks )
this.numReducerTasks = numReducerTasks;
}

public String getJobID()
{
return getRunningJob().getJobID();
}

protected abstract JobClient getJobClient();

protected abstract RunningJob getRunningJob();
Expand Down
15 changes: 14 additions & 1 deletion src/core/cascading/stats/CascadeStats.java
Expand Up @@ -27,12 +27,25 @@

import cascading.cascade.Cascade;

/** Class Cascadetats collects {@link Cascade} specific statistics. */
/** Class CascadeStats collects {@link Cascade} specific statistics. */
public class CascadeStats extends CascadingStats
{
/** Field cascadeId */
String cascadeID;
/** Field flowStatsList */
List<FlowStats> flowStatsList = new LinkedList<FlowStats>(); // maintain order

public CascadeStats( String cascadeID )
{
this.cascadeID = cascadeID;
}

@Override
public Object getID()
{
return cascadeID;
}

/**
* Method addFlowStats add a child {@link cascading.flow.Flow} {2link FlowStats} instance.
*
Expand Down
8 changes: 8 additions & 0 deletions src/core/cascading/stats/CascadingStats.java
Expand Up @@ -63,6 +63,14 @@ public enum Status
{
}

/**
* Method getID returns the ID of this CascadingStats object.
*
* @return the ID (type Object) of this CascadingStats object.
*/
public abstract Object getID();


/**
* Method isFinished returns true if the current status show no work currently being executed. This method
* returns true if {@link #isSuccessful()}, {@link #isFailed()}, or {@link #isStopped()} returns true.
Expand Down
12 changes: 12 additions & 0 deletions src/core/cascading/stats/FlowStats.java
Expand Up @@ -31,8 +31,20 @@
/** Class FlowStats collects {@link Flow} specific statistics. */
public class FlowStats extends CascadingStats
{
String flowID;
List<StepStats> stepStatsList = new ArrayList<StepStats>();

public FlowStats( String flowID )
{
this.flowID = flowID;
}

@Override
public Object getID()
{
return flowID;
}

public void addStepStats( StepStats stepStats )
{
stepStatsList.add( stepStats );
Expand Down
14 changes: 14 additions & 0 deletions src/test/cascading/stats/CascadingStatsTest.java
Expand Up @@ -88,16 +88,22 @@ public void testStatsCounters() throws Exception

CascadeStats cascadeStats = cascade.getCascadeStats();

assertNotNull( cascadeStats.getID() );

assertEquals( 40, cascadeStats.getCounterValue( TestEnum.FIRST ) );
assertEquals( 20, cascadeStats.getCounterValue( TestEnum.SECOND ) );

FlowStats flowStats1 = flow1.getFlowStats();

assertNotNull( flowStats1.getID() );

assertEquals( 20, flowStats1.getCounterValue( TestEnum.FIRST ) );
assertEquals( 10, flowStats1.getCounterValue( TestEnum.SECOND ) );

FlowStats flowStats2 = flow2.getFlowStats();

assertNotNull( flowStats2.getID() );

assertEquals( 20, flowStats2.getCounterValue( TestEnum.FIRST ) );
assertEquals( 10, flowStats2.getCounterValue( TestEnum.SECOND ) );

Expand All @@ -107,6 +113,10 @@ public void testStatsCounters() throws Exception
assertEquals( 2, flowStats2.getStepsCount() );

HadoopStepStats stats1 = (HadoopStepStats) flowStats1.getStepStats().get( 0 );

assertNotNull( stats1.getID() );
assertNotNull( stats1.getJobID() );

assertEquals( 2, stats1.getNumMapTasks() );
assertEquals( 1, stats1.getNumReducerTasks() );

Expand All @@ -118,6 +128,10 @@ public void testStatsCounters() throws Exception
}

HadoopStepStats stats2 = (HadoopStepStats) flowStats2.getStepStats().get( 0 );

assertNotNull( stats2.getID() );
assertNotNull( stats2.getJobID() );

assertEquals( 2, stats2.getNumMapTasks() );
assertEquals( 1, stats2.getNumReducerTasks() );

Expand Down

0 comments on commit f807ee1

Please sign in to comment.