Skip to content

Commit

Permalink
[MINOR] test: address unchecked conversions (apache#624)
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

Address unchecked conversions in tests.

### Why are the changes needed?

Reduce warnings in build log.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI.
  • Loading branch information
kaijchen authored and advancedxy committed Mar 21, 2023
1 parent 6ae0d74 commit 19c44b8
Show file tree
Hide file tree
Showing 12 changed files with 75 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public void testWriteException() throws Exception {
Set<Long> failedBlocks = Sets.newConcurrentHashSet();
Counters.Counter mapOutputByteCounter = new Counters.Counter();
Counters.Counter mapOutputRecordCounter = new Counters.Counter();
SortWriteBufferManager<BytesWritable, BytesWritable> manager = new SortWriteBufferManager(
SortWriteBufferManager<BytesWritable, BytesWritable> manager;
manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
10240,
1L,
10,
Expand Down Expand Up @@ -109,7 +110,7 @@ public void testWriteException() throws Exception {
}
assertFalse(failedBlocks.isEmpty());
isException = false;
manager = new SortWriteBufferManager(
manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
100,
1L,
10,
Expand Down Expand Up @@ -158,7 +159,8 @@ public void testOnePartition() throws Exception {
Set<Long> failedBlocks = Sets.newConcurrentHashSet();
Counters.Counter mapOutputByteCounter = new Counters.Counter();
Counters.Counter mapOutputRecordCounter = new Counters.Counter();
SortWriteBufferManager<BytesWritable, BytesWritable> manager = new SortWriteBufferManager(
SortWriteBufferManager<BytesWritable, BytesWritable> manager;
manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
10240,
1L,
10,
Expand Down Expand Up @@ -206,7 +208,8 @@ public void testWriteNormal() throws Exception {
Set<Long> failedBlocks = Sets.newConcurrentHashSet();
Counters.Counter mapOutputByteCounter = new Counters.Counter();
Counters.Counter mapOutputRecordCounter = new Counters.Counter();
SortWriteBufferManager<BytesWritable, BytesWritable> manager = new SortWriteBufferManager(
SortWriteBufferManager<BytesWritable, BytesWritable> manager;
manager = new SortWriteBufferManager<BytesWritable, BytesWritable>(
10240,
1L,
10,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public void readTest() throws Exception {
writeTestData(writeHandler, 2, 5, expectedData,
blockIdBitmap, "key", KRYO_SERIALIZER, 0);

RssShuffleHandle handleMock = mock(RssShuffleHandle.class);
ShuffleDependency dependencyMock = mock(ShuffleDependency.class);
RssShuffleHandle<String, String, String> handleMock = mock(RssShuffleHandle.class);
ShuffleDependency<String, String, String> dependencyMock = mock(ShuffleDependency.class);
when(handleMock.getAppId()).thenReturn("appId");
when(handleMock.getShuffleId()).thenReturn(1);
when(handleMock.getDependency()).thenReturn(dependencyMock);
Expand All @@ -80,7 +80,7 @@ public void readTest() throws Exception {
when(dependencyMock.aggregator()).thenReturn(Option.empty());
when(dependencyMock.keyOrdering()).thenReturn(Option.empty());

RssShuffleReader rssShuffleReaderSpy = spy(new RssShuffleReader<String, String>(0, 1, contextMock,
RssShuffleReader<String, String> rssShuffleReaderSpy = spy(new RssShuffleReader<>(0, 1, contextMock,
handleMock, basePath, 1000, conf, StorageType.HDFS.name(),
1000, 2, 10, blockIdBitmap, taskIdBitmap, new RssConf()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public void checkBlockSendResultTest() {
Serializer kryoSerializer = new KryoSerializer(conf);
ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
Partitioner mockPartitioner = mock(Partitioner.class);
ShuffleDependency mockDependency = mock(ShuffleDependency.class);
RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
ShuffleDependency<String, String, String> mockDependency = mock(ShuffleDependency.class);
RssShuffleHandle<String, String, String> mockHandle = mock(RssShuffleHandle.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
when(mockDependency.partitioner()).thenReturn(mockPartitioner);
when(mockPartitioner.numPartitions()).thenReturn(2);
Expand All @@ -99,7 +99,7 @@ public void checkBlockSendResultTest() {
WriteBufferManager bufferManagerSpy = spy(bufferManager);
doReturn(1000000L).when(bufferManagerSpy).acquireMemory(anyLong());

RssShuffleWriter rssShuffleWriter = new RssShuffleWriter("appId", 0, taskId, 1L,
RssShuffleWriter<?, ?, ?> rssShuffleWriter = new RssShuffleWriter<>("appId", 0, taskId, 1L,
bufferManagerSpy, (new TaskMetrics()).shuffleWriteMetrics(),
manager, conf, mockShuffleWriteClient, mockHandle);

Expand Down Expand Up @@ -166,9 +166,9 @@ public void onError(Throwable e) {
manager.getEventLoop().start();

Partitioner mockPartitioner = mock(Partitioner.class);
ShuffleDependency mockDependency = mock(ShuffleDependency.class);
ShuffleDependency<String, String, String> mockDependency = mock(ShuffleDependency.class);
final ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
RssShuffleHandle<String, String, String> mockHandle = mock(RssShuffleHandle.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
Serializer kryoSerializer = new KryoSerializer(conf);
when(mockDependency.serializer()).thenReturn(kryoSerializer);
Expand Down Expand Up @@ -205,20 +205,20 @@ public void onError(Throwable e) {
WriteBufferManager bufferManagerSpy = spy(bufferManager);
doReturn(1000000L).when(bufferManagerSpy).acquireMemory(anyLong());

RssShuffleWriter rssShuffleWriter = new RssShuffleWriter("appId", 0, "taskId", 1L,
RssShuffleWriter<String, String, String> rssShuffleWriter = new RssShuffleWriter<>("appId", 0, "taskId", 1L,
bufferManagerSpy, shuffleWriteMetrics, manager, conf, mockShuffleWriteClient, mockHandle);

RssShuffleWriter<String, String, String> rssShuffleWriterSpy = spy(rssShuffleWriter);
doNothing().when(rssShuffleWriterSpy).sendCommit();

// case 1
MutableList<Product2<String, String>> data = new MutableList();
data.appendElem(new Tuple2("testKey1", "testValue1"));
data.appendElem(new Tuple2("testKey2", "testValue2"));
data.appendElem(new Tuple2("testKey3", "testValue3"));
data.appendElem(new Tuple2("testKey4", "testValue4"));
data.appendElem(new Tuple2("testKey5", "testValue5"));
data.appendElem(new Tuple2("testKey6", "testValue6"));
MutableList<Product2<String, String>> data = new MutableList<>();
data.appendElem(new Tuple2<>("testKey1", "testValue1"));
data.appendElem(new Tuple2<>("testKey2", "testValue2"));
data.appendElem(new Tuple2<>("testKey3", "testValue3"));
data.appendElem(new Tuple2<>("testKey4", "testValue4"));
data.appendElem(new Tuple2<>("testKey5", "testValue5"));
data.appendElem(new Tuple2<>("testKey6", "testValue6"));
rssShuffleWriterSpy.write(data.iterator());

assertTrue(rssShuffleWriterSpy.getShuffleWriteMetrics().shuffleWriteTime() > 0);
Expand Down Expand Up @@ -255,7 +255,7 @@ public void onError(Throwable e) {
public void postBlockEventTest() throws Exception {
final WriteBufferManager mockBufferManager = mock(WriteBufferManager.class);
final ShuffleWriteMetrics mockMetrics = mock(ShuffleWriteMetrics.class);
ShuffleDependency mockDependency = mock(ShuffleDependency.class);
ShuffleDependency<String, String, String> mockDependency = mock(ShuffleDependency.class);
Partitioner mockPartitioner = mock(Partitioner.class);
final RssShuffleManager mockShuffleManager = mock(RssShuffleManager.class);
when(mockDependency.partitioner()).thenReturn(mockPartitioner);
Expand All @@ -275,14 +275,14 @@ public void onError(Throwable e) {
eventLoop.start();

when(mockShuffleManager.getEventLoop()).thenReturn(eventLoop);
RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
RssShuffleHandle<String, String, String> mockHandle = mock(RssShuffleHandle.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
ShuffleWriteClient mockWriteClient = mock(ShuffleWriteClient.class);
SparkConf conf = new SparkConf();
conf.set(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.key(), "64")
.set(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.LOCALFILE.name());

RssShuffleWriter writer = new RssShuffleWriter("appId", 0, "taskId", 1L,
RssShuffleWriter<String, String, String> writer = new RssShuffleWriter<>("appId", 0, "taskId", 1L,
mockBufferManager, mockMetrics, mockShuffleManager, conf, mockWriteClient, mockHandle);
List<ShuffleBlockInfo> shuffleBlockInfoList = createShuffleBlockList(1, 31);
writer.postBlockEvent(shuffleBlockInfoList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.commons.lang3.SystemUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.shuffle.writer.AddBlockEvent;
import org.apache.spark.util.EventLoop;

public class TestUtils {
Expand All @@ -32,7 +33,7 @@ private TestUtils() {
public static RssShuffleManager createShuffleManager(
SparkConf conf,
Boolean isDriver,
EventLoop loop,
EventLoop<AddBlockEvent> loop,
Map<String, Set<Long>> successBlockIds,
Map<String, Set<Long>> failBlockIds) {
return new RssShuffleManager(conf, isDriver, loop, successBlockIds, failBlockIds);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public void readTest() throws Exception {
writeTestData(writeHandler, 2, 5, expectedData,
blockIdBitmap, "key", KRYO_SERIALIZER, 0);

RssShuffleHandle handleMock = mock(RssShuffleHandle.class);
ShuffleDependency dependencyMock = mock(ShuffleDependency.class);
RssShuffleHandle<String, String, String> handleMock = mock(RssShuffleHandle.class);
ShuffleDependency<String, String, String> dependencyMock = mock(ShuffleDependency.class);
when(handleMock.getAppId()).thenReturn("appId");
when(handleMock.getDependency()).thenReturn(dependencyMock);
when(handleMock.getShuffleId()).thenReturn(1);
Expand All @@ -87,7 +87,7 @@ public void readTest() throws Exception {

Map<Integer, Roaring64NavigableMap> partitionToExpectBlocks = Maps.newHashMap();
partitionToExpectBlocks.put(0, blockIdBitmap);
RssShuffleReader rssShuffleReaderSpy = spy(new RssShuffleReader<String, String>(
RssShuffleReader<String, String> rssShuffleReaderSpy = spy(new RssShuffleReader<>(
0,
1,
0,
Expand All @@ -111,7 +111,7 @@ public void readTest() throws Exception {
writeTestData(writeHandler1, 2, 4, expectedData,
blockIdBitmap1, "another_key", KRYO_SERIALIZER, 1);
partitionToExpectBlocks.put(1, blockIdBitmap1);
RssShuffleReader rssShuffleReaderSpy1 = spy(new RssShuffleReader<String, String>(
RssShuffleReader<String, String> rssShuffleReaderSpy1 = spy(new RssShuffleReader<>(
0,
2,
0,
Expand All @@ -132,7 +132,7 @@ public void readTest() throws Exception {
));
validateResult(rssShuffleReaderSpy1.read(), expectedData, 18);

RssShuffleReader rssShuffleReaderSpy2 = spy(new RssShuffleReader<String, String>(
RssShuffleReader<String, String> rssShuffleReaderSpy2 = spy(new RssShuffleReader<>(
0,
2,
0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public void checkBlockSendResultTest() {

ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
Partitioner mockPartitioner = mock(Partitioner.class);
RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
ShuffleDependency mockDependency = mock(ShuffleDependency.class);
RssShuffleHandle<String, String, String> mockHandle = mock(RssShuffleHandle.class);
ShuffleDependency<String, String, String> mockDependency = mock(ShuffleDependency.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
when(mockPartitioner.numPartitions()).thenReturn(2);
TaskMemoryManager mockTaskMemoryManager = mock(TaskMemoryManager.class);
Expand All @@ -104,7 +104,7 @@ public void checkBlockSendResultTest() {
Maps.newHashMap(), mockTaskMemoryManager, new ShuffleWriteMetrics(), new RssConf());
WriteBufferManager bufferManagerSpy = spy(bufferManager);

RssShuffleWriter rssShuffleWriter = new RssShuffleWriter("appId", 0, "taskId", 1L,
RssShuffleWriter<String, String, String> rssShuffleWriter = new RssShuffleWriter<>("appId", 0, "taskId", 1L,
bufferManagerSpy, (new TaskMetrics()).shuffleWriteMetrics(),
manager, conf, mockShuffleWriteClient, mockHandle);
doReturn(1000000L).when(bufferManagerSpy).acquireMemory(anyLong());
Expand Down Expand Up @@ -176,8 +176,8 @@ public void onError(Throwable e) {
Serializer kryoSerializer = new KryoSerializer(conf);
Partitioner mockPartitioner = mock(Partitioner.class);
final ShuffleWriteClient mockShuffleWriteClient = mock(ShuffleWriteClient.class);
ShuffleDependency mockDependency = mock(ShuffleDependency.class);
RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
ShuffleDependency<String, String, String> mockDependency = mock(ShuffleDependency.class);
RssShuffleHandle<String, String, String> mockHandle = mock(RssShuffleHandle.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
when(mockDependency.serializer()).thenReturn(kryoSerializer);
when(mockDependency.partitioner()).thenReturn(mockPartitioner);
Expand Down Expand Up @@ -211,7 +211,7 @@ public void onError(Throwable e) {
0, 0, bufferOptions, kryoSerializer,
partitionToServers, mockTaskMemoryManager, shuffleWriteMetrics, new RssConf());
WriteBufferManager bufferManagerSpy = spy(bufferManager);
RssShuffleWriter rssShuffleWriter = new RssShuffleWriter("appId", 0, "taskId", 1L,
RssShuffleWriter<String, String, String> rssShuffleWriter = new RssShuffleWriter<>("appId", 0, "taskId", 1L,
bufferManagerSpy, shuffleWriteMetrics, manager, conf, mockShuffleWriteClient, mockHandle);
doReturn(1000000L).when(bufferManagerSpy).acquireMemory(anyLong());

Expand All @@ -220,13 +220,13 @@ public void onError(Throwable e) {
doNothing().when(rssShuffleWriterSpy).sendCommit();

// case 1
MutableList<Product2<String, String>> data = new MutableList();
data.appendElem(new Tuple2("testKey2", "testValue2"));
data.appendElem(new Tuple2("testKey3", "testValue3"));
data.appendElem(new Tuple2("testKey4", "testValue4"));
data.appendElem(new Tuple2("testKey6", "testValue6"));
data.appendElem(new Tuple2("testKey1", "testValue1"));
data.appendElem(new Tuple2("testKey5", "testValue5"));
MutableList<Product2<String, String>> data = new MutableList<>();
data.appendElem(new Tuple2<>("testKey2", "testValue2"));
data.appendElem(new Tuple2<>("testKey3", "testValue3"));
data.appendElem(new Tuple2<>("testKey4", "testValue4"));
data.appendElem(new Tuple2<>("testKey6", "testValue6"));
data.appendElem(new Tuple2<>("testKey1", "testValue1"));
data.appendElem(new Tuple2<>("testKey5", "testValue5"));
rssShuffleWriterSpy.write(data.iterator());

assertTrue(shuffleWriteMetrics.writeTime() > 0);
Expand Down Expand Up @@ -265,7 +265,7 @@ public void onError(Throwable e) {
@Test
public void postBlockEventTest() throws Exception {
WriteBufferManager mockBufferManager = mock(WriteBufferManager.class);
ShuffleDependency mockDependency = mock(ShuffleDependency.class);
ShuffleDependency<String, String, String> mockDependency = mock(ShuffleDependency.class);
ShuffleWriteMetrics mockMetrics = mock(ShuffleWriteMetrics.class);
Partitioner mockPartitioner = mock(Partitioner.class);
when(mockDependency.partitioner()).thenReturn(mockPartitioner);
Expand All @@ -290,14 +290,14 @@ public void onError(Throwable e) {
Maps.newConcurrentMap(),
Maps.newConcurrentMap()));

RssShuffleHandle mockHandle = mock(RssShuffleHandle.class);
RssShuffleHandle<String, String, String> mockHandle = mock(RssShuffleHandle.class);
when(mockHandle.getDependency()).thenReturn(mockDependency);
ShuffleWriteClient mockWriteClient = mock(ShuffleWriteClient.class);
SparkConf conf = new SparkConf();
conf.set(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.key(), "64")
.set(RssSparkConfig.RSS_STORAGE_TYPE.key(), StorageType.MEMORY_LOCALFILE.name());
List<ShuffleBlockInfo> shuffleBlockInfoList = createShuffleBlockList(1, 31);
RssShuffleWriter writer = new RssShuffleWriter("appId", 0, "taskId", 1L,
RssShuffleWriter<String, String, String> writer = new RssShuffleWriter<>("appId", 0, "taskId", 1L,
mockBufferManager, mockMetrics, mockShuffleManager, conf, mockWriteClient, mockHandle);
writer.postBlockEvent(shuffleBlockInfoList);
Thread.sleep(500);
Expand All @@ -324,7 +324,7 @@ public void onError(Throwable e) {

private void testTwoEvents(
List<AddBlockEvent> events,
RssShuffleWriter writer,
RssShuffleWriter<String, String, String> writer,
int blockNum,
int blockLength,
int firstEventSize,
Expand All @@ -341,7 +341,7 @@ private void testTwoEvents(

private void testSingleEvent(
List<AddBlockEvent> events,
RssShuffleWriter writer,
RssShuffleWriter<String, String, String> writer,
int blockNum,
int blockLength) throws InterruptedException {
List<ShuffleBlockInfo> shuffleBlockInfoList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ public void testSplit() {
assertEquals(1, dataSegments.get(1).getLength());
}

@SafeVarargs
public static byte[] generateData(Pair<Integer, Integer>... configEntries) {
ByteBuffer byteBuffer = ByteBuffer.allocate(configEntries.length * 40);
int total = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public void combineByKeyTest() throws Exception {
}

@Override
public Map runTest(SparkSession spark, String fileName) throws Exception {
public Map<String, Tuple2<Integer, Integer>> runTest(SparkSession spark, String fileName) throws Exception {
// take a rest to make sure shuffle server is registered
Thread.sleep(4000);
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ static JavaPairRDD<String, Integer> getRDD(JavaSparkContext jsc) {
return javaPairRDD1;
}

static JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD(JavaPairRDD javaPairRDD1) {
static JavaPairRDD<String, Tuple2<Integer, Integer>> combineByKeyRDD(JavaPairRDD<String, Integer> javaPairRDD1) {
JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD = javaPairRDD1
.combineByKey((Function<Integer, Tuple2<Integer, Integer>>) i -> new Tuple2<>(1, i),
(Function2<Tuple2<Integer, Integer>, Integer, Tuple2<Integer, Integer>>)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void test() throws Exception {
}

@Override
public Map runTest(SparkSession spark, String fileName) throws Exception {
public Map<String, Long> runTest(SparkSession spark, String fileName) throws Exception {
// take a rest to make sure shuffle server is registered
Thread.sleep(3000);

Expand All @@ -49,9 +49,9 @@ public Map runTest(SparkSession spark, String fileName) throws Exception {
.otherwise(functions.col("id")).as("key1"), functions.col("id").as("value1"));
df1.createOrReplaceTempView("table1");

List list = spark.sql("select count(value1) from table1 group by key1").collectAsList();
List<?> list = spark.sql("select count(value1) from table1 group by key1").collectAsList();
Map<String, Long> result = new HashMap<>();
result.put("size", Long.valueOf(list.size()));
result.put("size", (long) list.size());

for (int stageId : spark.sparkContext().statusTracker().getJobInfo(0).get().stageIds()) {
long writeRecords = getFirstStageData(spark, stageId).shuffleWriteRecords();
Expand Down
Loading

0 comments on commit 19c44b8

Please sign in to comment.