Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
long compressedLength = ir.partLength;
long decompressedLength = ir.rawLength;

compressedLength -= CryptoUtils.cryptoPadding(job);
decompressedLength -= CryptoUtils.cryptoPadding(job);

// Get the location for the map output - either in-memory or on-disk
MapOutput<K, V> mapOutput = merger.reserve(mapTaskId, decompressedLength,
id);
Expand All @@ -150,8 +153,7 @@ private boolean copyMapOutput(TaskAttemptID mapTaskId) throws IOException {
inStream = CryptoUtils.wrapIfNecessary(job, inStream);

try {
inStream.seek(ir.startOffset);

inStream.seek(ir.startOffset + CryptoUtils.cryptoPadding(job));
mapOutput.shuffle(LOCALHOST, inStream, compressedLength, decompressedLength, metrics, reporter);
} finally {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,11 @@ You can do this on a per-job basis, or by means of a cluster-wide setting in the
To set this property in NodeManager, set it in the `yarn-env.sh` file:

YARN_NODEMANAGER_OPTS="-Djavax.net.debug=all"

Encrypted Intermediate Data Spill files
---------------------------------------

This capability allows encryption of the intermediate files generated during the merge and shuffle phases.
It can be enabled by setting the `mapreduce.job.encrypted-intermediate-data` job property to `true`.

**NOTE:** Currently, enabling encrypted intermediate data spills would restrict the number of attempts of the job to 1.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void testEncryptedMerger() throws Throwable {
jobConf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
Credentials credentials = UserGroupInformation.getCurrentUser().getCredentials();
TokenCache.setShuffleSecretKey(new byte[16], credentials);
TokenCache.setEncryptedSpillKey(new byte[16], credentials);
UserGroupInformation.getCurrentUser().addCredentials(credentials);
testInMemoryAndOnDiskMerger();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,24 +52,31 @@ public class TestMRIntermediateDataEncryption {

@Test
public void testSingleReducer() throws Exception {
doEncryptionTest(3, 1, 2);
doEncryptionTest(3, 1, 2, false);
}

@Test
public void testUberMode() throws Exception {
doEncryptionTest(3, 1, 2, true);
}

@Test
public void testMultipleMapsPerNode() throws Exception {
doEncryptionTest(8, 1, 2);
doEncryptionTest(8, 1, 2, false);
}

@Test
public void testMultipleReducers() throws Exception {
doEncryptionTest(2, 4, 2);
doEncryptionTest(2, 4, 2, false);
}

public void doEncryptionTest(int numMappers, int numReducers, int numNodes) throws Exception {
doEncryptionTest(numMappers, numReducers, numNodes, 1000);
public void doEncryptionTest(int numMappers, int numReducers, int numNodes,
boolean isUber) throws Exception {
doEncryptionTest(numMappers, numReducers, numNodes, 1000, isUber);
}

public void doEncryptionTest(int numMappers, int numReducers, int numNodes, int numLines) throws Exception {
public void doEncryptionTest(int numMappers, int numReducers, int numNodes,
int numLines, boolean isUber) throws Exception {
MiniDFSCluster dfsCluster = null;
MiniMRClientCluster mrCluster = null;
FileSystem fileSystem = null;
Expand All @@ -85,7 +92,8 @@ public void doEncryptionTest(int numMappers, int numReducers, int numNodes, int
// Generate input.
createInput(fileSystem, numMappers, numLines);
// Run the test.
runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem, numMappers, numReducers, numLines);
runMergeTest(new JobConf(mrCluster.getConfig()), fileSystem,
numMappers, numReducers, numLines, isUber);
} finally {
if (dfsCluster != null) {
dfsCluster.shutdown();
Expand All @@ -111,8 +119,9 @@ private void createInput(FileSystem fs, int numMappers, int numLines) throws Exc
}
}

private void runMergeTest(JobConf job, FileSystem fileSystem, int numMappers, int numReducers, int numLines)
throws Exception {
private void runMergeTest(JobConf job, FileSystem fileSystem, int
numMappers, int numReducers, int numLines, boolean isUber)
throws Exception {
fileSystem.delete(OUTPUT, true);
job.setJobName("Test");
JobClient client = new JobClient(job);
Expand All @@ -133,6 +142,9 @@ private void runMergeTest(JobConf job, FileSystem fileSystem, int numMappers, in
job.setInt("mapreduce.map.maxattempts", 1);
job.setInt("mapreduce.reduce.maxattempts", 1);
job.setInt("mapred.test.num_lines", numLines);
if (isUber) {
job.setBoolean("mapreduce.job.ubertask.enable", true);
}
job.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
try {
submittedJob = client.submitJob(job);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,14 @@ public AMFeedback statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus)
throws IOException, InterruptedException {
StringBuffer buf = new StringBuffer("Task ");
buf.append(taskId);
buf.append(" making progress to ");
buf.append(taskStatus.getProgress());
String state = taskStatus.getStateString();
if (state != null) {
buf.append(" and state of ");
buf.append(state);
if (taskStatus != null) {
buf.append(" making progress to ");
buf.append(taskStatus.getProgress());
String state = taskStatus.getStateString();
if (state != null) {
buf.append(" and state of ");
buf.append(state);
}
}
LOG.info(buf.toString());
// ignore phase
Expand Down