Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
improved counter caching robustness and semantics
  • Loading branch information
cwensel committed Feb 24, 2015
1 parent 7741dc3 commit 831f9cb
Show file tree
Hide file tree
Showing 13 changed files with 282 additions and 173 deletions.
Expand Up @@ -50,6 +50,8 @@ public abstract class FlowStepJob<Config> implements Callable<Throwable>

/** Field stepName */
protected final String stepName;
/** Field jobConfiguration */
protected final Config jobConfiguration;
/** Field pollingInterval */
protected long pollingInterval = 1000;
/** Field recordStatsInterval */
Expand All @@ -67,8 +69,9 @@ public abstract class FlowStepJob<Config> implements Callable<Throwable>
/** Field throwable */
protected Throwable throwable;

public FlowStepJob( ClientState clientState, BaseFlowStep<Config> flowStep, long pollingInterval, long statsStoreInterval )
public FlowStepJob( ClientState clientState, Config jobConfiguration, BaseFlowStep<Config> flowStep, long pollingInterval, long statsStoreInterval )
{
this.jobConfiguration = jobConfiguration;
this.flowStep = flowStep;
this.stepName = flowStep.getName();
this.pollingInterval = pollingInterval;
Expand All @@ -79,7 +82,10 @@ public FlowStepJob( ClientState clientState, BaseFlowStep<Config> flowStep, long
this.flowStepStats.markPending();
}

public abstract Config getConfig();
public Config getConfig()
{
return jobConfiguration;
}

protected abstract FlowStepStats createStepStats( ClientState clientState );

Expand Down
40 changes: 27 additions & 13 deletions cascading-core/src/main/java/cascading/util/ShutdownUtil.java
Expand Up @@ -34,6 +34,11 @@
*/
public class ShutdownUtil
{
/**
* System property set to true when we have entered shutdown
*/
public static final String SHUTDOWN_EXECUTING = "cascading.jvm.shutdown.executing";

private static final Logger LOG = LoggerFactory.getLogger( ShutdownUtil.class );

public static abstract class Hook
Expand Down Expand Up @@ -93,23 +98,32 @@ public static synchronized void registerShutdownHook()
@Override
public void run()
{
// These are not threads, so each one will be run in priority order
// blocking until the previous is complete
while( !queue.isEmpty() )
{
Hook hook = null;

try
{
hook = queue.poll();
System.setProperty( SHUTDOWN_EXECUTING, "true" );

hook.execute();
}
catch( Exception exception )
try
{
// These are not threads, so each one will be run in priority order
// blocking until the previous is complete
while( !queue.isEmpty() )
{
LOG.error( "failed executing hook: {}, with exception: {}", hook, exception );
Hook hook = null;

try
{
hook = queue.poll();

hook.execute();
}
catch( Exception exception )
{
LOG.error( "failed executing hook: {}, with exception: {}", hook, exception );
}
}
}
finally
{
System.setProperty( SHUTDOWN_EXECUTING, "false" );
}
}
};

Expand Down
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;

import cascading.flow.hadoop.util.HadoopUtil;
import cascading.stats.CascadingStats;
import cascading.stats.FlowNodeStats;
import cascading.stats.FlowSliceStats;
Expand All @@ -39,12 +40,18 @@
*/
public class HadoopNodeCounterCache extends CounterCache<FlowNodeStats, Map<String, Map<String, Long>>>
{
public static final String NODE_COUNTER_MAX_AGE_PROPERTY = "cascading.node.counter.age.max.seconds";
public static final int DEFAULT_NODE_CACHED_AGE_MAX = 30; // don't re-fetch task reports for 30 seconds

private FlowNodeStats flowNodeStats;

protected HadoopNodeCounterCache( FlowNodeStats flowNodeStats, Configuration configuration )
{
super( flowNodeStats, configuration );
this.flowNodeStats = flowNodeStats;

// age matters here since we are aggregating task reports vs getting a pre-aggregated value at the node level
this.maxAge = configuration.getInt( NODE_COUNTER_MAX_AGE_PROPERTY, DEFAULT_NODE_CACHED_AGE_MAX );
}

@Override
Expand All @@ -53,41 +60,10 @@ protected FlowNodeStats getJobStatusClient()
return flowNodeStats;
}

protected Collection<String> getGroupNames( Map<String, Map<String, Long>> groups )
{
return groups.keySet();
}

protected Set<String> getCountersFor( Map<String, Map<String, Long>> counters, String group )
{
Set<String> results = new HashSet<>();

Map<String, Long> map = counters.get( group );

if( map != null )
results.addAll( map.keySet() );

return results;
}

protected long getCounterValue( Map<String, Map<String, Long>> counters, Enum counter )
{
return getCounterValue( counters, counter.getDeclaringClass().getName(), counter.name() );
}

protected long getCounterValue( Map<String, Map<String, Long>> counters, String groupName, String counterName )
@Override
protected boolean areCountersAvailable( FlowNodeStats runningJob )
{
Map<String, Long> counterGroup = counters.get( groupName );

if( counterGroup == null )
return 0;

Long counterValue = counterGroup.get( counterName );

if( counterValue == null )
return 0;

return counterValue;
return !HadoopUtil.isLocal( (Configuration) runningJob.getFlowNode().getFlowStep().getConfig() );
}

protected Map<String, Map<String, Long>> getCounters( FlowNodeStats flowNodeStats ) throws IOException
Expand Down Expand Up @@ -131,4 +107,41 @@ protected Map<String, Map<String, Long>> getCounters( FlowNodeStats flowNodeStat

return allCounters;
}

protected Collection<String> getGroupNames( Map<String, Map<String, Long>> groups )
{
return groups.keySet();
}

protected Set<String> getCountersFor( Map<String, Map<String, Long>> counters, String group )
{
Set<String> results = new HashSet<>();

Map<String, Long> map = counters.get( group );

if( map != null )
results.addAll( map.keySet() );

return results;
}

protected long getCounterValue( Map<String, Map<String, Long>> counters, Enum counter )
{
return getCounterValue( counters, counter.getDeclaringClass().getName(), counter.name() );
}

protected long getCounterValue( Map<String, Map<String, Long>> counters, String groupName, String counterName )
{
Map<String, Long> counterGroup = counters.get( groupName );

if( counterGroup == null )
return 0;

Long counterValue = counterGroup.get( counterName );

if( counterValue == null )
return 0;

return counterValue;
}
}
Expand Up @@ -97,7 +97,7 @@ protected boolean captureChildDetailInternal()
}
catch( IOException exception )
{
LOG.warn( "unable to get slice stats", exception );
LOG.warn( "unable to retrieve slice stats via task reports", exception );
}

return false;
Expand Down
Expand Up @@ -40,6 +40,17 @@ protected HadoopStepCounterCache( CascadingStats stats, Configuration configurat
super( stats, configuration );
}

@Override
protected boolean areCountersAvailable( RunningJob runningJob )
{
return true;
}

protected Counters getCounters( RunningJob runningJob ) throws IOException
{
return runningJob.getCounters();
}

protected Collection<String> getGroupNames( Counters groups )
{
return groups.getGroupNames();
Expand Down Expand Up @@ -76,9 +87,4 @@ protected long getCounterValue( Counters counters, String groupName, String coun

return counterValue.getValue();
}

protected Counters getCounters( RunningJob runningJob ) throws IOException
{
return runningJob.getCounters();
}
}
Expand Up @@ -46,8 +46,6 @@ public class HadoopFlowStepJob extends FlowStepJob<JobConf>
{
/** static field to capture errors in hadoop local mode */
private static Throwable localError;
/** Field currentConf */
private final JobConf currentConf;
/** Field jobClient */
private JobClient jobClient;
/** Field runningJob */
Expand All @@ -65,19 +63,12 @@ public static long getJobPollingInterval( JobConf jobConf )

public HadoopFlowStepJob( ClientState clientState, BaseFlowStep<JobConf> flowStep, JobConf currentConf )
{
super( clientState, flowStep, getJobPollingInterval( currentConf ), getStoreInterval( currentConf ) );
this.currentConf = currentConf;
super( clientState, currentConf, flowStep, getJobPollingInterval( currentConf ), getStoreInterval( currentConf ) );

if( flowStep.isDebugEnabled() )
flowStep.logDebug( "using polling interval: " + pollingInterval );
}

@Override
public JobConf getConfig()
{
return currentConf;
}

@Override
protected FlowStepStats createStepStats( ClientState clientState )
{
Expand Down Expand Up @@ -105,8 +96,8 @@ protected void internalBlockOnStop() throws IOException

protected void internalNonBlockingStart() throws IOException
{
jobClient = new JobClient( currentConf );
runningJob = jobClient.submitJob( currentConf );
jobClient = new JobClient( jobConfiguration );
runningJob = jobClient.submitJob( jobConfiguration );

flowStep.logInfo( "submitted hadoop job: " + runningJob.getID() );

Expand Down

0 comments on commit 831f9cb

Please sign in to comment.