Skip to content

Commit

Permalink
IGNITE-4274: Hadoop: added new property to control shuffle message size.
Browse files Browse the repository at this point in the history
  • Loading branch information
devozerov committed Dec 5, 2016
1 parent 214197c commit 6e8c35b
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 17 deletions.
Expand Up @@ -28,40 +28,40 @@ public enum HadoopJobProperty {
* <p> * <p>
* Setting it right allows to avoid rehashing. * Setting it right allows to avoid rehashing.
*/ */
COMBINER_HASHMAP_SIZE, COMBINER_HASHMAP_SIZE("ignite.combiner.hashmap.size"),


/** /**
* Initial size for hashmap which stores output of mapper or combiner and will be used as input of reducer. * Initial size for hashmap which stores output of mapper or combiner and will be used as input of reducer.
* <p> * <p>
* Setting it right allows to avoid rehashing. * Setting it right allows to avoid rehashing.
*/ */
PARTITION_HASHMAP_SIZE, PARTITION_HASHMAP_SIZE("ignite.partition.hashmap.size"),


/** /**
* Specifies number of concurrently running mappers for external execution mode. * Specifies number of concurrently running mappers for external execution mode.
* <p> * <p>
* If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}. * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}.
*/ */
EXTERNAL_CONCURRENT_MAPPERS, EXTERNAL_CONCURRENT_MAPPERS("ignite.external.concurrent.mappers"),


/** /**
* Specifies number of concurrently running reducers for external execution mode. * Specifies number of concurrently running reducers for external execution mode.
* <p> * <p>
* If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}. * If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}.
*/ */
EXTERNAL_CONCURRENT_REDUCERS, EXTERNAL_CONCURRENT_REDUCERS("ignite.external.concurrent.reducers"),


/** /**
* Delay in milliseconds after which Ignite server will reply job status. * Delay in milliseconds after which Ignite server will reply job status.
*/ */
JOB_STATUS_POLL_DELAY, JOB_STATUS_POLL_DELAY("ignite.job.status.poll.delay"),


/** /**
* Size in bytes of single memory page which will be allocated for data structures in shuffle. * Size in bytes of single memory page which will be allocated for data structures in shuffle.
* <p> * <p>
* By default is {@code 32 * 1024}. * By default is {@code 32 * 1024}.
*/ */
SHUFFLE_OFFHEAP_PAGE_SIZE, SHUFFLE_OFFHEAP_PAGE_SIZE("ignite.shuffle.offheap.page.size"),


/** /**
* If set to {@code true} then input for combiner will not be sorted by key. * If set to {@code true} then input for combiner will not be sorted by key.
Expand All @@ -71,7 +71,7 @@ public enum HadoopJobProperty {
* <p> * <p>
* By default is {@code false}. * By default is {@code false}.
*/ */
SHUFFLE_COMBINER_NO_SORTING, SHUFFLE_COMBINER_NO_SORTING("ignite.shuffle.combiner.no.sorting"),


/** /**
* If set to {@code true} then input for reducer will not be sorted by key. * If set to {@code true} then input for reducer will not be sorted by key.
Expand All @@ -81,31 +81,40 @@ public enum HadoopJobProperty {
* <p> * <p>
* By default is {@code false}. * By default is {@code false}.
*/ */
SHUFFLE_REDUCER_NO_SORTING, SHUFFLE_REDUCER_NO_SORTING("ignite.shuffle.reducer.no.sorting"),

/**
* Defines approximate size in bytes of shuffle message which will be passed over wire from mapper to reducer.
* <p>
* Defaults to 128Kb.
*/
SHUFFLE_MSG_SIZE("ignite.shuffle.message.size"),


/** /**
* Shuffle job throttle in milliseconds. When job is executed with separate shuffle thread, this parameter * Shuffle job throttle in milliseconds. When job is executed with separate shuffle thread, this parameter
* controls sleep duration between iterations through intermediate reducer maps. * controls sleep duration between iterations through intermediate reducer maps.
* <p> * <p>
* Defaults to {@code 0}. * Defaults to {@code 0}.
*/ */
SHUFFLE_JOB_THROTTLE; SHUFFLE_JOB_THROTTLE("ignite.shuffle.job.throttle");


/** */ /** Property name. */
private final String ptyName; private final String propName;


/** /**
* Constrcutor.
* *
* @param propName Property name.
*/ */
HadoopJobProperty() { HadoopJobProperty(String propName) {
ptyName = "ignite." + name().toLowerCase().replace('_', '.'); this.propName = propName;
} }


/** /**
* @return Property name. * @return Property name.
*/ */
public String propertyName() { public String propertyName() {
return ptyName; return propName;
} }


/** /**
Expand Down
Expand Up @@ -55,6 +55,7 @@
import org.apache.ignite.thread.IgniteThread; import org.apache.ignite.thread.IgniteThread;


import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING;
import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get;
Expand All @@ -64,7 +65,7 @@
*/ */
public class HadoopShuffleJob<T> implements AutoCloseable { public class HadoopShuffleJob<T> implements AutoCloseable {
/** */ /** */
private static final int MSG_BUF_SIZE = 128 * 1024; private static final int DFLT_SHUFFLE_MSG_SIZE = 128 * 1024;


/** */ /** */
private final HadoopJob job; private final HadoopJob job;
Expand Down Expand Up @@ -109,6 +110,9 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
/** */ /** */
private final IgniteLogger log; private final IgniteLogger log;


/** Message size. */
private final int msgSize;

/** */ /** */
private final long throttle; private final long throttle;


Expand All @@ -128,6 +132,8 @@ public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUn
this.mem = mem; this.mem = mem;
this.log = log.getLogger(HadoopShuffleJob.class); this.log = log.getLogger(HadoopShuffleJob.class);


msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE);

if (!F.isEmpty(locReducers)) { if (!F.isEmpty(locReducers)) {
for (int rdc : locReducers) { for (int rdc : locReducers) {
HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null); HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null);
Expand Down Expand Up @@ -320,7 +326,7 @@ private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException
continue; // Skip empty map and local node. continue; // Skip empty map and local node.


if (msgs[i] == null) if (msgs[i] == null)
msgs[i] = new HadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE); msgs[i] = new HadoopShuffleMessage(job.id(), i, msgSize);


final int idx = i; final int idx = i;


Expand Down Expand Up @@ -425,7 +431,7 @@ private void send(final int idx, int newBufMinSize) {
}); });


msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx, msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx,
Math.max(MSG_BUF_SIZE, newBufMinSize)); Math.max(msgSize, newBufMinSize));
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
Expand Down

0 comments on commit 6e8c35b

Please sign in to comment.