Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Updated c.s.h.HadoopStepStats counter caching strategy to make a fina…
…l attempt even if max timeouts have been met. Added "cascading.step.counter.timeout" property to allow tuning of timeout period.
  • Loading branch information
cwensel committed Mar 15, 2013
1 parent 4ba5846 commit e3ea525
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Expand Up @@ -2,6 +2,9 @@ Cascading Change Log

2.1.6 [unreleased]

Updated c.s.h.HadoopStepStats counter caching strategy to make a final attempt even if max timeouts have been
met. Added "cascading.step.counter.timeout" property to allow tuning of timeout period.

2.1.5

Updated c.t.h.u.BytesComparator to implement c.t.Hasher as a convenience.
Expand Down
Expand Up @@ -54,6 +54,8 @@
/** Class HadoopStepStats provides Hadoop specific statistics and methods to underlying Hadoop facilities. */
public abstract class HadoopStepStats extends FlowStepStats
{
public static final String COUNTER_TIMEOUT_PROPERTY = "cascading.step.counter.timeout";

/** Field LOG */
private static final Logger LOG = LoggerFactory.getLogger( HadoopStepStats.class );
public static final int TIMEOUT_MAX = 3;
Expand Down Expand Up @@ -268,7 +270,7 @@ protected Counters cachedCounters()

protected synchronized Counters cachedCounters( boolean force )
{
if( ( !force && isFinished() ) || timeouts >= TIMEOUT_MAX )
if( !force && ( isFinished() || timeouts >= TIMEOUT_MAX ) )
return cachedCounters;

RunningJob runningJob = getRunningJob();
Expand All @@ -278,9 +280,11 @@ protected synchronized Counters cachedCounters( boolean force )

Future<Counters> future = runFuture( runningJob );

int timeout = ( (JobConf) getFlowStep().getConfig() ).getInt( COUNTER_TIMEOUT_PROPERTY, 5 );

try
{
Counters fetched = future.get( 5, TimeUnit.SECONDS );
Counters fetched = future.get( timeout, TimeUnit.SECONDS );

if( fetched != null )
cachedCounters = fetched;
Expand Down Expand Up @@ -310,9 +314,9 @@ protected synchronized Counters cachedCounters( boolean force )
timeouts++;

if( timeouts >= TIMEOUT_MAX )
LOG.warn( "fetching counters timed out final time: {}", timeouts );
LOG.warn( "fetching counters timed out after: {} seconds, final attempt: {}", timeout, timeouts );
else
LOG.warn( "fetching counters timed out: {}", timeouts );
LOG.warn( "fetching counters timed out after: {} seconds, attempts: {}", timeout, timeouts );
}

return cachedCounters;
Expand Down

0 comments on commit e3ea525

Please sign in to comment.