Skip to content

Commit

Permalink
renamed references to task to slice to be consistent with the underly…
Browse files Browse the repository at this point in the history
…ing process model
  • Loading branch information
cwensel committed Feb 16, 2012
1 parent bcbd605 commit f4125d5
Show file tree
Hide file tree
Showing 10 changed files with 35 additions and 20 deletions.
2 changes: 2 additions & 0 deletions CHANGES.txt
Expand Up @@ -2,6 +2,8 @@ Cascading Change Log

2.0.0 [unreleased]

Added the c.m.UnitOfWork interface to give c.f.Flow and c.c.Cascade a common contract.

Changed c.t.h.TupleSerialization#setSerializations() to force TupleSerialization and o.a.h.i.s.WritableSerialization
are first in the "io.serializations" list.

Expand Down
21 changes: 17 additions & 4 deletions src/core/cascading/flow/FlowProcess.java
Expand Up @@ -96,13 +96,13 @@ public boolean isCounterStatusInitialized()
}

@Override
public int getNumConcurrentTasks()
public int getNumProcessSlices()
{
return 0;
}

@Override
public int getCurrentTaskNum()
public int getCurrentSliceNum()
{
return 0;
}
Expand Down Expand Up @@ -195,9 +195,22 @@ public void setCurrentSession( FlowSession currentSession )
currentSession.setCurrentProcess( this );
}

public abstract int getNumConcurrentTasks();
/**
* Method getNumProcessSlices returns the number of parallel slices or tasks allocated
* for this process execution.
* <p/>
* For MapReduce platforms, this is the same as the number of tasks for a given MapReduce job.
*
* @return an int
*/
public abstract int getNumProcessSlices();

public abstract int getCurrentTaskNum();
/**
* Method getCurrentSliceNum returns an integer representing which slice instance currently running.
*
* @return an int
*/
public abstract int getCurrentSliceNum();

/**
* Method getProperty should be used to return configuration parameters from the underlying system.
Expand Down
8 changes: 4 additions & 4 deletions src/core/cascading/flow/FlowProcessWrapper.java
Expand Up @@ -78,15 +78,15 @@ public void setCurrentSession( FlowSession currentSession )
}

@Override
public int getNumConcurrentTasks()
public int getNumProcessSlices()
{
return delegate.getNumConcurrentTasks();
return delegate.getNumProcessSlices();
}

@Override
public int getCurrentTaskNum()
public int getCurrentSliceNum()
{
return delegate.getCurrentTaskNum();
return delegate.getCurrentSliceNum();
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions src/core/cascading/management/SliceState.java
Expand Up @@ -34,10 +34,10 @@ public abstract class SliceState extends BaseState

protected SliceState( FlowProcess currentProcess )
{
initialize( currentProcess.getCurrentSession().getCascadingServices(), String.valueOf( currentProcess.getCurrentTaskNum() ) );
initialize( currentProcess.getCurrentSession().getCascadingServices(), String.valueOf( currentProcess.getCurrentSliceNum() ) );
this.sessionID = currentProcess.getCurrentSession().getID();
this.processID = currentProcess.getID();
this.sliceID = String.valueOf( currentProcess.getCurrentTaskNum() );
this.sliceID = String.valueOf( currentProcess.getCurrentSliceNum() );
}

@Override
Expand Down
4 changes: 2 additions & 2 deletions src/core/cascading/operation/filter/Limit.java
Expand Up @@ -80,8 +80,8 @@ public void prepare( FlowProcess flowProcess, OperationCall<Context> operationCa

operationCall.setContext( context );

int numTasks = flowProcess.getNumConcurrentTasks();
int taskNum = flowProcess.getCurrentTaskNum();
int numTasks = flowProcess.getNumProcessSlices();
int taskNum = flowProcess.getCurrentSliceNum();

context.limit = (long) Math.floor( (double) limit / (double) numTasks );

Expand Down
4 changes: 2 additions & 2 deletions src/hadoop/cascading/flow/hadoop/HadoopFlowProcess.java
Expand Up @@ -157,13 +157,13 @@ public int getCurrentNumReducers()
* @return int
*/
@Override
public int getCurrentTaskNum()
public int getCurrentSliceNum()
{
return getJobConf().getInt( "mapred.task.partition", 0 );
}

@Override
public int getNumConcurrentTasks()
public int getNumProcessSlices()
{
if( isMapper() )
return getCurrentNumMappers();
Expand Down
2 changes: 1 addition & 1 deletion src/hadoop/cascading/flow/hadoop/HadoopMapStreamGraph.java
Expand Up @@ -54,7 +54,7 @@ public HadoopMapStreamGraph( HadoopFlowProcess flowProcess, HadoopFlowStep step,
setTraps();
setScopes();

printGraph( step.getID(), "map", flowProcess.getCurrentTaskNum() );
printGraph( step.getID(), "map", flowProcess.getCurrentSliceNum() );
bind();
}

Expand Down
Expand Up @@ -48,7 +48,7 @@ public HadoopReduceStreamGraph( HadoopFlowProcess flowProcess, HadoopFlowStep st
setTraps();
setScopes();

printGraph( step.getID(), "reduce", flowProcess.getCurrentTaskNum() );
printGraph( step.getID(), "reduce", flowProcess.getCurrentSliceNum() );

bind();
}
Expand Down
4 changes: 2 additions & 2 deletions src/local/cascading/flow/local/LocalFlowProcess.java
Expand Up @@ -62,13 +62,13 @@ public void setStepStats( LocalStepStats stepStats )
}

@Override
public int getNumConcurrentTasks()
public int getNumProcessSlices()
{
return 0;
}

@Override
public int getCurrentTaskNum()
public int getCurrentSliceNum()
{
return 0;
}
Expand Down
4 changes: 2 additions & 2 deletions src/test/cascading/operation/filter/LimitFilterTest.java
Expand Up @@ -79,13 +79,13 @@ public FlowProcess copyWith( Object object )
}

@Override
public int getNumConcurrentTasks()
public int getNumProcessSlices()
{
return numTasks;
}

@Override
public int getCurrentTaskNum()
public int getCurrentSliceNum()
{
return taskNum;
}
Expand Down

0 comments on commit f4125d5

Please sign in to comment.