diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java index e7bf56584f57e..e713caa626501 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopJobProperty.java @@ -28,40 +28,40 @@ public enum HadoopJobProperty { *

* Setting it right allows to avoid rehashing. */ - COMBINER_HASHMAP_SIZE, + COMBINER_HASHMAP_SIZE("ignite.combiner.hashmap.size"), /** * Initial size for hashmap which stores output of mapper or combiner and will be used as input of reducer. *

* Setting it right allows to avoid rehashing. */ - PARTITION_HASHMAP_SIZE, + PARTITION_HASHMAP_SIZE("ignite.partition.hashmap.size"), /** * Specifies number of concurrently running mappers for external execution mode. *

* If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}. */ - EXTERNAL_CONCURRENT_MAPPERS, + EXTERNAL_CONCURRENT_MAPPERS("ignite.external.concurrent.mappers"), /** * Specifies number of concurrently running reducers for external execution mode. *

* If not specified, defaults to {@code Runtime.getRuntime().availableProcessors()}. */ - EXTERNAL_CONCURRENT_REDUCERS, + EXTERNAL_CONCURRENT_REDUCERS("ignite.external.concurrent.reducers"), /** * Delay in milliseconds after which Ignite server will reply job status. */ - JOB_STATUS_POLL_DELAY, + JOB_STATUS_POLL_DELAY("ignite.job.status.poll.delay"), /** * Size in bytes of single memory page which will be allocated for data structures in shuffle. *

* By default is {@code 32 * 1024}. */ - SHUFFLE_OFFHEAP_PAGE_SIZE, + SHUFFLE_OFFHEAP_PAGE_SIZE("ignite.shuffle.offheap.page.size"), /** * If set to {@code true} then input for combiner will not be sorted by key. @@ -71,7 +71,7 @@ public enum HadoopJobProperty { *

* By default is {@code false}. */ - SHUFFLE_COMBINER_NO_SORTING, + SHUFFLE_COMBINER_NO_SORTING("ignite.shuffle.combiner.no.sorting"), /** * If set to {@code true} then input for reducer will not be sorted by key. @@ -81,7 +81,14 @@ public enum HadoopJobProperty { *

* By default is {@code false}. */ - SHUFFLE_REDUCER_NO_SORTING, + SHUFFLE_REDUCER_NO_SORTING("ignite.shuffle.reducer.no.sorting"), + + /** + * Defines approximate size in bytes of shuffle message which will be passed over wire from mapper to reducer. + *

+ * Defaults to 128Kb. + */ + SHUFFLE_MSG_SIZE("ignite.shuffle.message.size"), /** * Shuffle job throttle in milliseconds. When job is executed with separate shuffle thread, this parameter @@ -89,23 +96,25 @@ public enum HadoopJobProperty { *

* Defaults to {@code 0}. */ - SHUFFLE_JOB_THROTTLE; + SHUFFLE_JOB_THROTTLE("ignite.shuffle.job.throttle"); - /** */ - private final String ptyName; + /** Property name. */ + private final String propName; /** + * Constrcutor. * + * @param propName Property name. */ - HadoopJobProperty() { - ptyName = "ignite." + name().toLowerCase().replace('_', '.'); + HadoopJobProperty(String propName) { + this.propName = propName; } /** * @return Property name. */ public String propertyName() { - return ptyName; + return propName; } /** diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java index 8c731c0dce513..e5af8f116a07f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java @@ -55,6 +55,7 @@ import org.apache.ignite.thread.IgniteThread; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.PARTITION_HASHMAP_SIZE; +import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_MSG_SIZE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_JOB_THROTTLE; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.SHUFFLE_REDUCER_NO_SORTING; import static org.apache.ignite.internal.processors.hadoop.HadoopJobProperty.get; @@ -64,7 +65,7 @@ */ public class HadoopShuffleJob implements AutoCloseable { /** */ - private static final int MSG_BUF_SIZE = 128 * 1024; + private static final int DFLT_SHUFFLE_MSG_SIZE = 128 * 1024; /** */ private final HadoopJob job; @@ -109,6 +110,9 @@ public class HadoopShuffleJob implements AutoCloseable { /** */ private final IgniteLogger log; + /** Message size. */ + private final int msgSize; + /** */ private final long throttle; @@ -128,6 +132,8 @@ public HadoopShuffleJob(T locReduceAddr, IgniteLogger log, HadoopJob job, GridUn this.mem = mem; this.log = log.getLogger(HadoopShuffleJob.class); + msgSize = get(job.info(), SHUFFLE_MSG_SIZE, DFLT_SHUFFLE_MSG_SIZE); + if (!F.isEmpty(locReducers)) { for (int rdc : locReducers) { HadoopTaskInfo taskInfo = new HadoopTaskInfo(HadoopTaskType.REDUCE, job.id(), rdc, 0, null); @@ -320,7 +326,7 @@ private void collectUpdatesAndSend(boolean flush) throws IgniteCheckedException continue; // Skip empty map and local node. if (msgs[i] == null) - msgs[i] = new HadoopShuffleMessage(job.id(), i, MSG_BUF_SIZE); + msgs[i] = new HadoopShuffleMessage(job.id(), i, msgSize); final int idx = i; @@ -425,7 +431,7 @@ private void send(final int idx, int newBufMinSize) { }); msgs[idx] = newBufMinSize == 0 ? null : new HadoopShuffleMessage(job.id(), idx, - Math.max(MSG_BUF_SIZE, newBufMinSize)); + Math.max(msgSize, newBufMinSize)); } /** {@inheritDoc} */