Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-123] Fix all test code style #185

Merged
merged 1 commit into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.WritableComparator;
Expand All @@ -35,6 +34,7 @@
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.response.SendShuffleDataResult;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
Expand Down Expand Up @@ -309,17 +309,20 @@ public RemoteStorageInfo fetchRemoteStorage(String appId) {
}

@Override
public void reportShuffleResult(Map<Integer, List<ShuffleServerInfo>> partitionToServers, String appId, int shuffleId, long taskAttemptId, Map<Integer, List<Long>> partitionToBlockIds, int bitmapNum) {
public void reportShuffleResult(Map<Integer, List<ShuffleServerInfo>> partitionToServers, String appId,
int shuffleId, long taskAttemptId, Map<Integer, List<Long>> partitionToBlockIds, int bitmapNum) {

}

@Override
public ShuffleAssignmentsInfo getShuffleAssignments(String appId, int shuffleId, int partitionNum, int partitionNumPerRange, Set<String> requiredTags, int assignmentShuffleServerNumber) {
public ShuffleAssignmentsInfo getShuffleAssignments(String appId, int shuffleId, int partitionNum,
int partitionNumPerRange, Set<String> requiredTags, int assignmentShuffleServerNumber) {
return null;
}

@Override
public Roaring64NavigableMap getShuffleResult(String clientType, Set<ShuffleServerInfo> shuffleServerInfoSet, String appId, int shuffleId, int partitionId) {
public Roaring64NavigableMap getShuffleResult(String clientType, Set<ShuffleServerInfo> shuffleServerInfoSet,
String appId, int shuffleId, int partitionId) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

package org.apache.hadoop.mapred;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Random;

import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.BytesWritable;
Expand All @@ -27,13 +33,6 @@
import org.apache.hadoop.io.serializer.Serializer;
import org.junit.jupiter.api.Test;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Map;
import java.util.Random;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class SortWriteBufferTest {
Expand Down Expand Up @@ -90,13 +89,13 @@ public void testReadWrite() throws IOException {
keySerializer.serialize(key);
byte[] valueBytes = new byte[200];
Map<String, BytesWritable> valueMap = Maps.newConcurrentMap();
Map<String, Long> recordLenMap = Maps.newConcurrentMap();
Random random = new Random();
random.nextBytes(valueBytes);
value = new BytesWritable(valueBytes);
valueMap.putIfAbsent(keyStr, value);
valSerializer.serialize(value);
recordLength = buffer.addRecord(key, value);
Map<String, Long> recordLenMap = Maps.newConcurrentMap();
recordLenMap.putIfAbsent(keyStr, recordLength);

keyStr = "key1";
Expand All @@ -114,12 +113,11 @@ public void testReadWrite() throws IOException {
bigKey[1] = 'e';
bigKey[2] = 'y';
bigKey[3] = '4';
BytesWritable bigWritableKey = new BytesWritable(bigKey);
final BytesWritable bigWritableKey = new BytesWritable(bigKey);
valueBytes = new byte[253];
random.nextBytes(valueBytes);
BytesWritable bigWritableValue = new BytesWritable(valueBytes);
long bigRecordLength = buffer.addRecord(bigWritableKey, bigWritableValue);

final BytesWritable bigWritableValue = new BytesWritable(valueBytes);
final long bigRecordLength = buffer.addRecord(bigWritableKey, bigWritableValue);
keyStr = "key2";
key = new BytesWritable(keyStr.getBytes());
valueBytes = new byte[3100];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class RssMRUtilsTest {

@Test
public void TaskAttemptIdTest() {
public void baskAttemptIdTest() {
long taskAttemptId = 0x1000ad12;
JobID jobID = new JobID();
TaskID taskId = new TaskID(jobID, TaskType.MAP, (int) taskAttemptId);
Expand Down Expand Up @@ -61,7 +61,7 @@ public void TaskAttemptIdTest() {
}

@Test
public void BlockConvertTest() {
public void blockConvertTest() {
JobID jobID = new JobID();
TaskID taskId = new TaskID(jobID, TaskType.MAP, 233);
TaskAttemptID taskAttemptID = new TaskAttemptID(taskId, 1);
Expand All @@ -76,7 +76,7 @@ public void BlockConvertTest() {

@Test
public void applyDynamicClientConfTest() {
JobConf conf = new JobConf();
final JobConf conf = new JobConf();
Map<String, String> clientConf = Maps.newHashMap();
String remoteStoragePath = "hdfs://path1";
String mockKey = "mapreduce.mockKey";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@

import com.google.common.collect.Sets;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapred.MapTaskCompletionEventsUpdate;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.MapTaskCompletionEventsUpdate;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.RssMRUtils;
import org.apache.hadoop.mapreduce.TaskType;
import org.junit.jupiter.api.Test;
Expand All @@ -50,11 +50,11 @@ public void singlePassEventFetch() throws IOException {
jobConf.setNumMapTasks(mapTaskNum);
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(0, mapTaskNum));
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(0, mapTaskNum));

RssEventFetcher ef =
new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
for (int mapIndex = 0; mapIndex < mapTaskNum; mapIndex++) {
long rssTaskId = RssMRUtils.convertTaskAttemptIdToLong(
Expand All @@ -74,14 +74,14 @@ public void singlePassWithRepeatedSuccessEventFetch() throws IOException {
jobConf.setNumMapTasks(mapTaskNum);
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(0, mapTaskNum,
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(0, mapTaskNum,
Sets.newHashSet(70, 80, 90),
Sets.newHashSet(),
Sets.newHashSet()));

RssEventFetcher ef =
new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
for (int mapIndex = 0; mapIndex < mapTaskNum; mapIndex++) {
long rssTaskId = RssMRUtils.convertTaskAttemptIdToLong(
Expand All @@ -102,18 +102,18 @@ public void multiPassEventFetch() throws IOException {

TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(0, MAX_EVENTS_TO_FETCH));
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(0, MAX_EVENTS_TO_FETCH));
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH,
eq(MAX_EVENTS_TO_FETCH), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH,
MAX_EVENTS_TO_FETCH));
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(MAX_EVENTS_TO_FETCH * 2), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH * 2, 3));
eq(MAX_EVENTS_TO_FETCH * 2), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(MAX_EVENTS_TO_FETCH * 2, 3));

RssEventFetcher ef =
new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);

Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
for (int mapIndex = 0; mapIndex < mapTaskNum; mapIndex++) {
Expand All @@ -134,12 +134,12 @@ public void missingEventFetch() throws IOException {
jobConf.setNumMapTasks(mapTaskNum);
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getInconsistentCompletionEventsUpdate(0, mapTaskNum,
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getInconsistentCompletionEventsUpdate(0, mapTaskNum,
Sets.newHashSet(45, 67), Sets.newHashSet()));

RssEventFetcher ef =
new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
for (int mapIndex = 0; mapIndex < mapTaskNum; mapIndex++) {
long rssTaskId = RssMRUtils.convertTaskAttemptIdToLong(
Expand All @@ -151,7 +151,7 @@ public void missingEventFetch() throws IOException {
ef.fetchAllRssTaskIds();
fail();
} catch (Exception e) {
assert(e.getMessage()
assert (e.getMessage()
.contains("TaskAttemptIDs are inconsistent with map tasks"));
}
}
Expand All @@ -164,12 +164,12 @@ public void extraEventFetch() throws IOException {
jobConf.setNumMapTasks(mapTaskNum);
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getInconsistentCompletionEventsUpdate(0, mapTaskNum,
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getInconsistentCompletionEventsUpdate(0, mapTaskNum,
Sets.newHashSet(), Sets.newHashSet(101)));

RssEventFetcher ef =
new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
for (int mapIndex = 0; mapIndex < mapTaskNum; mapIndex++) {
long rssTaskId = RssMRUtils.convertTaskAttemptIdToLong(
Expand All @@ -181,7 +181,7 @@ public void extraEventFetch() throws IOException {
ef.fetchAllRssTaskIds();
fail();
} catch (Exception e) {
assert(e.getMessage()
assert (e.getMessage()
.contains("TaskAttemptIDs are inconsistent with map tasks"));
}
}
Expand All @@ -197,14 +197,14 @@ public void obsoletedAndTipFailedEventFetch() throws IOException {
Set<Integer> tipFailed = Sets.newHashSet(89);
TaskUmbilicalProtocol umbilical = mock(TaskUmbilicalProtocol.class);
when(umbilical.getMapCompletionEvents(any(JobID.class),
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(0, mapTaskNum,
eq(0), eq(MAX_EVENTS_TO_FETCH), eq(tid)))
.thenReturn(getMockedCompletionEventsUpdate(0, mapTaskNum,
Sets.newHashSet(), obsoleted, tipFailed));

ExceptionReporter reporter = mock(ExceptionReporter.class);

RssEventFetcher ef =
new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);
new RssEventFetcher(1, tid, umbilical, jobConf, MAX_EVENTS_TO_FETCH);

Roaring64NavigableMap expected = Roaring64NavigableMap.bitmapOf();
for (int mapIndex = 0; mapIndex < mapTaskNum; mapIndex++) {
Expand All @@ -227,21 +227,23 @@ public void obsoletedAndTipFailedEventFetch() throws IOException {

private void validate(Roaring64NavigableMap expected, Roaring64NavigableMap actual) {
assert (expected.getLongCardinality() == actual.getLongCardinality());
actual.forEach(taskId -> { assert(expected.contains(taskId)); });
actual.forEach(taskId -> {
assert (expected.contains(taskId));
});
}

private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
int startIdx, int numEvents) {
int startIdx, int numEvents) {
return getMockedCompletionEventsUpdate(startIdx, numEvents,
Sets.newHashSet(), Sets.newHashSet(), Sets.newHashSet());
}

private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
int startIdx,
int numEvents,
Set<Integer> repeatedSuccEvents,
Set<Integer> obsoletedEvents,
Set<Integer> tipFailedEvents) {
int startIdx,
int numEvents,
Set<Integer> repeatedSuccEvents,
Set<Integer> obsoletedEvents,
Set<Integer> tipFailedEvents) {
ArrayList<TaskCompletionEvent> tceList = new ArrayList<org.apache.hadoop.mapred.TaskCompletionEvent>(numEvents);
for (int i = 0; i < numEvents; ++i) {
int eventIdx = startIdx + i;
Expand All @@ -252,34 +254,34 @@ private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(
status = TaskCompletionEvent.Status.TIPFAILED;
}
TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
eventIdx, true, status,
"http://somehost:8888");
new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
eventIdx, true, status,
"http://somehost:8888");
tceList.add(tce);
}
obsoletedEvents.forEach(i -> {
TaskCompletionEvent tce = new TaskCompletionEvent(tceList.size(),
new TaskAttemptID("12345", 1, TaskType.MAP, i, 0),
i, true, TaskCompletionEvent.Status.OBSOLETE,
"http://somehost:8888");
new TaskAttemptID("12345", 1, TaskType.MAP, i, 0),
i, true, TaskCompletionEvent.Status.OBSOLETE,
"http://somehost:8888");
tceList.add(tce);
});

// use new attempt number - 1
repeatedSuccEvents.forEach(i -> {
TaskCompletionEvent tce = new TaskCompletionEvent(tceList.size(),
new TaskAttemptID("12345", 1, TaskType.MAP, i, 1),
i, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
new TaskAttemptID("12345", 1, TaskType.MAP, i, 1),
i, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
tceList.add(tce);
});

// use new attempt number - 1
obsoletedEvents.forEach(i -> {
TaskCompletionEvent tce = new TaskCompletionEvent(tceList.size(),
new TaskAttemptID("12345", 1, TaskType.MAP, i, 1),
i, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
new TaskAttemptID("12345", 1, TaskType.MAP, i, 1),
i, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
tceList.add(tce);
});
TaskCompletionEvent[] events = {};
Expand All @@ -288,25 +290,25 @@ private MapTaskCompletionEventsUpdate getMockedCompletionEventsUpdate(


private MapTaskCompletionEventsUpdate getInconsistentCompletionEventsUpdate(
int startIdx, int numEvents, Set<Integer> missEvents, Set<Integer> extraEvents) {
int startIdx, int numEvents, Set<Integer> missEvents, Set<Integer> extraEvents) {
ArrayList<TaskCompletionEvent> tceList = new ArrayList<org.apache.hadoop.mapred.TaskCompletionEvent>(numEvents);
for (int i = 0; i < numEvents; ++i) {
int eventIdx = startIdx + i;
if (!missEvents.contains(eventIdx)) {
TaskCompletionEvent.Status status = TaskCompletionEvent.Status.SUCCEEDED;
TaskCompletionEvent tce = new TaskCompletionEvent(eventIdx,
new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
eventIdx, true, status,
"http://somehost:8888");
new TaskAttemptID("12345", 1, TaskType.MAP, eventIdx, 0),
eventIdx, true, status,
"http://somehost:8888");
tceList.add(tce);
}
}

extraEvents.forEach(i -> {
TaskCompletionEvent tce = new TaskCompletionEvent(i,
new TaskAttemptID("12345", 1, TaskType.MAP, i, 1),
i, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
new TaskAttemptID("12345", 1, TaskType.MAP, i, 1),
i, true, TaskCompletionEvent.Status.SUCCEEDED,
"http://somehost:8888");
tceList.add(tce);
});
TaskCompletionEvent[] events = {};
Expand Down