From a130344d81541c67dad5530372a013c46ee01660 Mon Sep 17 00:00:00 2001 From: QI Jiale Date: Thu, 23 Nov 2023 19:08:26 +0800 Subject: [PATCH 1/3] Support combine operation in sort for tez engine. --- client-tez/pom.xml | 6 + .../buffer/TezRssCombineOutputCollector.java | 38 +++++ .../common/sort/buffer/WriteBuffer.java | 134 ++++++++++++++--- .../sort/buffer/WriteBufferManager.java | 48 +++++- .../library/common/sort/impl/RssSorter.java | 9 +- .../library/common/sort/impl/RssUnSorter.java | 3 +- .../sort/buffer/WriteBufferManagerTest.java | 142 +++++++++++++++++- .../common/sort/buffer/WriteBufferTest.java | 2 + 8 files changed, 354 insertions(+), 28 deletions(-) create mode 100644 client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/TezRssCombineOutputCollector.java diff --git a/client-tez/pom.xml b/client-tez/pom.xml index edab397975..148069b9cc 100644 --- a/client-tez/pom.xml +++ b/client-tez/pom.xml @@ -129,6 +129,12 @@ hadoop-minicluster test + + org.apache.tez + tez-mapreduce + ${tez.version} + test + diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/TezRssCombineOutputCollector.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/TezRssCombineOutputCollector.java new file mode 100644 index 0000000000..ce4b06bac6 --- /dev/null +++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/TezRssCombineOutputCollector.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.runtime.library.common.sort.buffer; + +import java.io.IOException; + +import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.InMemoryWriter; + +public class TezRssCombineOutputCollector extends InMemoryWriter { + private WriteBuffer writer; + + public TezRssCombineOutputCollector(byte[] array) { + super(array); + } + + public synchronized void setWriter(WriteBuffer writer) { + this.writer = writer; + } + + public synchronized void append(Object key, Object value) throws IOException { + writer.addRecord((K) key, (V) value); + } +} diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBuffer.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBuffer.java index 774c54e9f5..748b536681 100644 --- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBuffer.java +++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBuffer.java @@ -19,13 +19,17 @@ import java.io.IOException; import java.io.OutputStream; -import java.util.Comparator; +import java.util.Arrays; +import java.util.Iterator; import java.util.List; import com.google.common.collect.Lists; +import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.serializer.Serializer; +import org.apache.hadoop.util.Progress; +import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,36 +93,36 @@ public void clear() { records.clear(); } - /** get data */ - public synchronized byte[] getData() { - int extraSize = 0; - for (Record record : records) { - extraSize += WritableUtils.getVIntSize(record.getKeyLength()); - extraSize += WritableUtils.getVIntSize(record.getValueLength()); - } - extraSize += WritableUtils.getVIntSize(-1); - extraSize += WritableUtils.getVIntSize(-1); - byte[] data = new byte[dataLength + extraSize]; - int offset = 0; + public synchronized void sort() { long startSort = System.currentTimeMillis(); if (this.isNeedSorted) { records.sort( - new Comparator>() { - @Override - public int compare(Record o1, Record o2) { - return comparator.compare( + (o1, o2) -> + comparator.compare( buffers.get(o1.getKeyIndex()).getBuffer(), o1.getKeyOffSet(), o1.getKeyLength(), buffers.get(o2.getKeyIndex()).getBuffer(), o2.getKeyOffSet(), - o2.getKeyLength()); - } - }); + o2.getKeyLength())); } long startCopy = System.currentTimeMillis(); sortTime += startCopy - startSort; + } + /** get data */ + public synchronized byte[] getData() { + int extraSize = 0; + for (Record record : records) { + extraSize += WritableUtils.getVIntSize(record.getKeyLength()); + extraSize += WritableUtils.getVIntSize(record.getValueLength()); + } + extraSize += WritableUtils.getVIntSize(-1); + extraSize += WritableUtils.getVIntSize(-1); + byte[] data = new byte[dataLength + extraSize]; + int offset = 0; + + final long startCopy = System.currentTimeMillis(); for (Record record : records) { offset = writeDataInt(data, offset, record.getKeyLength()); offset = writeDataInt(data, offset, record.getValueLength()); @@ -328,4 +332,96 @@ public int getSize() { return size; } } + + public static class BufferIterator implements TezRawKeyValueIterator { + private final WriteBuffer writeBuffer; + private final Iterator> iterator; + + private final DataInputBuffer previousKeyBuffer = new DataInputBuffer(); + private boolean hasPrevious = false; + private final DataInputBuffer keyBuffer = new DataInputBuffer(); + private final DataInputBuffer valueBuffer = new DataInputBuffer(); + private WriteBuffer.Record currentRecord; + + public BufferIterator(WriteBuffer writeBuffer) { + this.writeBuffer = writeBuffer; + this.iterator = writeBuffer.records.iterator(); + } + + @Override + public DataInputBuffer getKey() { + WriteBuffer.WrappedBuffer keyWrappedBuffer = + writeBuffer.buffers.get(currentRecord.getKeyIndex()); + byte[] rawData = keyWrappedBuffer.getBuffer(); + keyBuffer.reset(rawData, currentRecord.getKeyOffSet(), currentRecord.getKeyLength()); + return keyBuffer; + } + + @Override + public DataInputBuffer getValue() { + WriteBuffer.WrappedBuffer valueWrappedBuffer = + writeBuffer.buffers.get(currentRecord.getKeyIndex()); + byte[] rawData = valueWrappedBuffer.getBuffer(); + int valueOffset = currentRecord.getKeyOffSet() + currentRecord.getKeyLength(); + valueBuffer.reset(rawData, valueOffset, currentRecord.getValueLength()); + return valueBuffer; + } + + @Override + public boolean next() { + if (hasPrevious) { + int length = + Math.min(currentRecord.getKeyLength(), keyBuffer.getLength() - keyBuffer.getPosition()); + if (length > 0) { + byte[] prevKeyData = new byte[length]; + System.arraycopy(keyBuffer.getData(), keyBuffer.getPosition(), prevKeyData, 0, length); + previousKeyBuffer.reset(prevKeyData, 0, length); + } + } + + if (iterator.hasNext()) { + currentRecord = iterator.next(); + hasPrevious = true; + return true; + } + return false; + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public void close() throws IOException {} + + @Override + public Progress getProgress() { + return new Progress(); + } + + @Override + public boolean isSameKey() { + if (!hasPrevious) { + return false; + } + + if (previousKeyBuffer.getLength() != keyBuffer.getLength()) { + return false; + } + + byte[] prevKey = new byte[previousKeyBuffer.getLength()]; + byte[] currKey = new byte[keyBuffer.getLength()]; + System.arraycopy( + previousKeyBuffer.getData(), + previousKeyBuffer.getPosition(), + prevKey, + 0, + previousKeyBuffer.getLength()); + System.arraycopy( + keyBuffer.getData(), keyBuffer.getPosition(), currKey, 0, keyBuffer.getLength()); + + return Arrays.equals(prevKey, currKey); + } + } } diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java index af4d5cea4f..ddc6f4b6e4 100644 --- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java +++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManager.java @@ -43,6 +43,8 @@ import org.apache.tez.dag.records.TezDAGID; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezVertexID; +import org.apache.tez.runtime.library.common.combine.Combiner; +import org.apache.tez.runtime.library.common.sort.impl.TezRawKeyValueIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,6 +102,8 @@ public class WriteBufferManager { private final TezCounter mapOutputByteCounter; private final TezCounter mapOutputRecordCounter; + protected final Combiner combiner; + /** WriteBufferManager */ public WriteBufferManager( TezTaskAttemptID tezTaskAttemptID, @@ -128,7 +132,8 @@ public WriteBufferManager( int shuffleId, boolean isNeedSorted, TezCounter mapOutputByteCounter, - TezCounter mapOutputRecordCounter) { + TezCounter mapOutputRecordCounter, + Combiner combiner) { this.tezTaskAttemptID = tezTaskAttemptID; this.maxMemSize = maxMemSize; this.appId = appId; @@ -158,6 +163,8 @@ public WriteBufferManager( this.mapOutputRecordCounter = mapOutputRecordCounter; this.sendExecutorService = Executors.newFixedThreadPool(sendThreadNum, ThreadUtils.getThreadFactory("send-thread")); + + this.combiner = combiner; } /** add record */ @@ -240,9 +247,23 @@ public int compare(WriteBuffer o1, WriteBuffer o2) { sendShuffleBlocks(shuffleBlocks); } - private void prepareBufferForSend(List shuffleBlocks, WriteBuffer buffer) { + private void prepareBufferForSend( + List shuffleBlocks, WriteBuffer buffer) { buffers.remove(buffer.getPartitionId()); - ShuffleBlockInfo block = createShuffleBlock(buffer); + buffer.sort(); + ShuffleBlockInfo block; + if (combiner != null) { + try { + buffer = combineBuffer(buffer); + if (LOG.isDebugEnabled()) { + LOG.debug("Successfully finished combining."); + } + } catch (Exception e) { + Thread.currentThread().interrupt(); + LOG.error("Error occurred while combining in Sort:", e); + } + } + block = createShuffleBlock(buffer); buffer.clear(); shuffleBlocks.add(block); allBlockIds.add(block.getBlockId()); @@ -252,6 +273,27 @@ private void prepareBufferForSend(List shuffleBlocks, WriteBuf partitionToBlocks.get(block.getPartitionId()).add(block.getBlockId()); } + public WriteBuffer combineBuffer(WriteBuffer buffer) + throws IOException, InterruptedException { + TezRawKeyValueIterator kvIterator = new WriteBuffer.BufferIterator<>(buffer); + + TezRssCombineOutputCollector combineCollector = + new TezRssCombineOutputCollector<>(new byte[] {}); + + WriteBuffer newBuffer = + new WriteBuffer<>( + false, + buffer.getPartitionId(), + comparator, + maxSegmentSize, + keySerializer, + valSerializer); + + combineCollector.setWriter(newBuffer); + combiner.combine(kvIterator, combineCollector); + return newBuffer; + } + private void sendShuffleBlocks(List shuffleBlocks) { sendExecutorService.submit( new Runnable() { diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java index 94c9aa90da..2626582477 100644 --- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java +++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssSorter.java @@ -30,6 +30,8 @@ import org.apache.tez.common.RssTezUtils; import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.runtime.api.OutputContext; +import org.apache.tez.runtime.library.common.TezRuntimeUtils; +import org.apache.tez.runtime.library.common.combine.Combiner; import org.apache.tez.runtime.library.common.sort.buffer.WriteBufferManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +51,8 @@ public class RssSorter extends ExternalSorter { private Set failedBlockIds = Sets.newConcurrentHashSet(); private Map> partitionToServers; + protected final Combiner combiner; + private int[] numRecordsPerPartition; /** Initialization */ @@ -137,6 +141,8 @@ public RssSorter( LOG.info("bitmapSplitNum is {}", bitmapSplitNum); } + this.combiner = TezRuntimeUtils.instantiateCombiner(this.conf, outputContext); + LOG.info("applicationAttemptId is {}", applicationAttemptId.toString()); bufferManager = @@ -167,7 +173,8 @@ public RssSorter( shuffleId, true, mapOutputByteCounter, - mapOutputRecordCounter); + mapOutputRecordCounter, + combiner); LOG.info("Initialized WriteBufferManager."); } diff --git a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java index 0248bb8a26..21093924fd 100644 --- a/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java +++ b/client-tez/src/main/java/org/apache/tez/runtime/library/common/sort/impl/RssUnSorter.java @@ -165,7 +165,8 @@ public RssUnSorter( shuffleId, false, mapOutputByteCounter, - mapOutputRecordCounter); + mapOutputRecordCounter, + null); LOG.info("Initialized WriteBufferManager."); } diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java index bef29fa0e7..0c1f5f8a40 100644 --- a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java +++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; @@ -29,24 +30,32 @@ import java.util.function.Supplier; import java.util.stream.Collectors; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.io.serializer.Serializer; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.MapReduceBase; +import org.apache.hadoop.mapred.OutputCollector; +import org.apache.hadoop.mapred.Reducer; +import org.apache.hadoop.mapred.Reporter; import org.apache.tez.common.RssTezUtils; import org.apache.tez.common.TezRuntimeFrameworkConfigs; import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.dag.records.TezTaskAttemptID; +import org.apache.tez.mapreduce.combine.MRCombiner; import org.apache.tez.runtime.api.OutputContext; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.common.TezRuntimeUtils; import org.apache.tez.runtime.library.output.OutputTestHelpers; import org.apache.tez.runtime.library.output.RssOrderedPartitionedKVOutputTest; import org.apache.tez.runtime.library.partitioner.HashPartitioner; @@ -151,7 +160,8 @@ public void testWriteException(@TempDir File tmpDir) throws IOException, Interru shuffleId, true, mapOutputByteCounter, - mapOutputRecordCounter); + mapOutputRecordCounter, + null); Random random = new Random(); for (int i = 0; i < 1000; i++) { @@ -255,7 +265,8 @@ public void testWriteNormal(@TempDir File tmpDir) throws IOException, Interrupte shuffleId, true, mapOutputByteCounter, - mapOutputRecordCounter); + mapOutputRecordCounter, + null); Random random = new Random(); for (int i = 0; i < 1000; i++) { @@ -369,7 +380,8 @@ public void testCommitBlocksWhenMemoryShuffleDisabled(@TempDir File tmpDir) shuffleId, true, mapOutputByteCounter, - mapOutputRecordCounter); + mapOutputRecordCounter, + null); Random random = new Random(); for (int i = 0; i < 10000; i++) { @@ -472,7 +484,8 @@ public void testFailFastWhenFailedToSendBlocks(@TempDir File tmpDir) throws IOEx shuffleId, true, mapOutputByteCounter, - mapOutputRecordCounter); + mapOutputRecordCounter, + null); Random random = new Random(); RssException rssException = @@ -498,6 +511,110 @@ public void testFailFastWhenFailedToSendBlocks(@TempDir File tmpDir) throws IOEx assertTrue(mapOutputByteCounter.getValue() < 10520000); } + @Test + public void testCombineBuffer(@TempDir File tmpDir) throws Exception { + final long maxMemSize = 10240; + final long taskAttemptId = 0; + MockShuffleWriteClient writeClient = new MockShuffleWriteClient(); + writeClient.setMode(2); + WritableComparator comparator = WritableComparator.get(Text.class); + long maxSegmentSize = 3 * 1024; + SerializationFactory serializationFactory = new SerializationFactory(new JobConf()); + Serializer keySerializer = serializationFactory.getSerializer(Text.class); + Serializer valueSerializer = serializationFactory.getSerializer(IntWritable.class); + + long maxBufferSize = 14 * 1024 * 1024; + double memoryThreshold = 0.8f; + int sendThreadNum = 1; + double sendThreshold = 0.2f; + int batch = 50; + int numMaps = 1; + String storageType = "MEMORY"; + RssConf rssConf = new RssConf(); + Map> partitionToServers = new HashMap<>(); + long sendCheckInterval = 500L; + long sendCheckTimeout = 60 * 1000 * 10L; + + Configuration conf = new Configuration(); + FileSystem localFs = FileSystem.getLocal(conf); + Path workingDir = + new Path( + System.getProperty( + "test.build.data", System.getProperty("java.io.tmpdir", tmpDir.toString())), + RssOrderedPartitionedKVOutputTest.class.getName()) + .makeQualified(localFs.getUri(), localFs.getWorkingDirectory()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName()); + conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName()); + conf.set("mapred.combiner.class", Reduce.class.getName()); + OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir); + + WriteBufferManager bufferManager = + new WriteBufferManager<>( + null, + maxMemSize, + null, + taskAttemptId, + null, + null, + null, + comparator, + maxSegmentSize, + keySerializer, + valueSerializer, + maxBufferSize, + memoryThreshold, + sendThreadNum, + sendThreshold, + batch, + rssConf, + partitionToServers, + numMaps, + StorageType.withMemory(StorageType.valueOf(storageType)), + sendCheckInterval, + sendCheckTimeout, + 1, + 1, + true, + null, + null, + TezRuntimeUtils.instantiateCombiner(conf, outputContext)); + + WriteBuffer buffer = + new WriteBuffer( + true, 1, comparator, 10000, keySerializer, valueSerializer); + + List wordTable = + Lists.newArrayList( + "apple", "banana", "fruit", "cherry", "Chinese", "America", "Japan", "tomato"); + Random random = new Random(); + for (int i = 0; i < 8; i++) { + buffer.addRecord(new Text(wordTable.get(i)), new IntWritable(1)); + } + for (int i = 0; i < 100; i++) { + int index = random.nextInt(wordTable.size()); + buffer.addRecord(new Text(wordTable.get(index)), new IntWritable(1)); + } + + buffer.sort(); + WriteBuffer newBuffer = bufferManager.combineBuffer(buffer); + + WriteBuffer.BufferIterator kvIterator1 = + new WriteBuffer.BufferIterator<>(buffer); + WriteBuffer.BufferIterator kvIterator2 = + new WriteBuffer.BufferIterator<>(newBuffer); + int count1 = 0; + while (kvIterator1.next()) { + count1++; + } + int count2 = 0; + while (kvIterator2.next()) { + count2++; + } + assertEquals(108, count1); + assertEquals(8, count2); + } + class MockShuffleServer { private List cachedBlockInfos = new ArrayList<>(); private List flushBlockInfos = new ArrayList<>(); @@ -660,4 +777,21 @@ public void unregisterShuffle(String appId, int shuffleId) {} @Override public void unregisterShuffle(String appId) {} } + + public static class Reduce extends MapReduceBase + implements Reducer { + + public void reduce( + Text key, + Iterator values, + OutputCollector output, + Reporter reporter) + throws IOException { + int sum = 0; + while (values.hasNext()) { + sum += values.next().get(); + } + output.collect(key, new IntWritable(sum)); + } + } } diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferTest.java index dc3ebf707a..f885c64480 100644 --- a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferTest.java +++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferTest.java @@ -61,6 +61,7 @@ public void testReadWrite() throws IOException { valSerializer); long recordLength = buffer.addRecord(key, value); + buffer.sort(); assertEquals(20, buffer.getData().length); assertEquals(16, recordLength); assertEquals(1, buffer.getPartitionId()); @@ -134,6 +135,7 @@ public void testReadWrite() throws IOException { recordLength = buffer.addRecord(key, value); recordLenMap.putIfAbsent(keyStr, recordLength); + buffer.sort(); result = buffer.getData(); byteArrayInputStream = new ByteArrayInputStream(result); keyDeserializer.open(byteArrayInputStream); From b65baa80b76990448ea168ac45f8a52129760cee Mon Sep 17 00:00:00 2001 From: QI Jiale Date: Thu, 23 Nov 2023 19:57:08 +0800 Subject: [PATCH 2/3] fix checkstyle --- .../sort/buffer/WriteBufferManagerTest.java | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java index 0c1f5f8a40..0cb6b456c6 100644 --- a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java +++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java @@ -549,6 +549,24 @@ public void testCombineBuffer(@TempDir File tmpDir) throws Exception { conf.set("mapred.combiner.class", Reduce.class.getName()); OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir); + WriteBuffer buffer = + new WriteBuffer( + true, 1, comparator, 10000, keySerializer, valueSerializer); + + List wordTable = + Lists.newArrayList( + "apple", "banana", "fruit", "cherry", "Chinese", "America", "Japan", "tomato"); + Random random = new Random(); + for (int i = 0; i < 8; i++) { + buffer.addRecord(new Text(wordTable.get(i)), new IntWritable(1)); + } + for (int i = 0; i < 100; i++) { + int index = random.nextInt(wordTable.size()); + buffer.addRecord(new Text(wordTable.get(index)), new IntWritable(1)); + } + + buffer.sort(); + WriteBufferManager bufferManager = new WriteBufferManager<>( null, @@ -579,24 +597,7 @@ public void testCombineBuffer(@TempDir File tmpDir) throws Exception { null, null, TezRuntimeUtils.instantiateCombiner(conf, outputContext)); - - WriteBuffer buffer = - new WriteBuffer( - true, 1, comparator, 10000, keySerializer, valueSerializer); - - List wordTable = - Lists.newArrayList( - "apple", "banana", "fruit", "cherry", "Chinese", "America", "Japan", "tomato"); - Random random = new Random(); - for (int i = 0; i < 8; i++) { - buffer.addRecord(new Text(wordTable.get(i)), new IntWritable(1)); - } - for (int i = 0; i < 100; i++) { - int index = random.nextInt(wordTable.size()); - buffer.addRecord(new Text(wordTable.get(index)), new IntWritable(1)); - } - - buffer.sort(); + WriteBuffer newBuffer = bufferManager.combineBuffer(buffer); WriteBuffer.BufferIterator kvIterator1 = From 6dd9778540afea2d3889c97c8b96ac8f7f5de5ba Mon Sep 17 00:00:00 2001 From: QI Jiale Date: Thu, 23 Nov 2023 20:08:37 +0800 Subject: [PATCH 3/3] fix checkstyle --- .../sort/buffer/WriteBufferManagerTest.java | 64 ++++++++----------- 1 file changed, 25 insertions(+), 39 deletions(-) diff --git a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java index 0cb6b456c6..9508ecddd9 100644 --- a/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java +++ b/client-tez/src/test/java/org/apache/tez/runtime/library/common/sort/buffer/WriteBufferManagerTest.java @@ -513,41 +513,18 @@ public void testFailFastWhenFailedToSendBlocks(@TempDir File tmpDir) throws IOEx @Test public void testCombineBuffer(@TempDir File tmpDir) throws Exception { - final long maxMemSize = 10240; - final long taskAttemptId = 0; MockShuffleWriteClient writeClient = new MockShuffleWriteClient(); writeClient.setMode(2); WritableComparator comparator = WritableComparator.get(Text.class); - long maxSegmentSize = 3 * 1024; SerializationFactory serializationFactory = new SerializationFactory(new JobConf()); Serializer keySerializer = serializationFactory.getSerializer(Text.class); Serializer valueSerializer = serializationFactory.getSerializer(IntWritable.class); - long maxBufferSize = 14 * 1024 * 1024; - double memoryThreshold = 0.8f; - int sendThreadNum = 1; - double sendThreshold = 0.2f; - int batch = 50; - int numMaps = 1; - String storageType = "MEMORY"; - RssConf rssConf = new RssConf(); - Map> partitionToServers = new HashMap<>(); - long sendCheckInterval = 500L; - long sendCheckTimeout = 60 * 1000 * 10L; - Configuration conf = new Configuration(); - FileSystem localFs = FileSystem.getLocal(conf); - Path workingDir = - new Path( - System.getProperty( - "test.build.data", System.getProperty("java.io.tmpdir", tmpDir.toString())), - RssOrderedPartitionedKVOutputTest.class.getName()) - .makeQualified(localFs.getUri(), localFs.getWorkingDirectory()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, IntWritable.class.getName()); conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName()); conf.set("mapred.combiner.class", Reduce.class.getName()); - OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir); WriteBuffer buffer = new WriteBuffer( @@ -567,37 +544,46 @@ public void testCombineBuffer(@TempDir File tmpDir) throws Exception { buffer.sort(); + FileSystem localFs = FileSystem.getLocal(conf); + Path workingDir = + new Path( + System.getProperty( + "test.build.data", System.getProperty("java.io.tmpdir", tmpDir.toString())), + RssOrderedPartitionedKVOutputTest.class.getName()) + .makeQualified(localFs.getUri(), localFs.getWorkingDirectory()); + WriteBufferManager bufferManager = new WriteBufferManager<>( null, - maxMemSize, + 10240, null, - taskAttemptId, + 0, null, null, null, comparator, - maxSegmentSize, + 3 * 1024, keySerializer, valueSerializer, - maxBufferSize, - memoryThreshold, - sendThreadNum, - sendThreshold, - batch, - rssConf, - partitionToServers, - numMaps, - StorageType.withMemory(StorageType.valueOf(storageType)), - sendCheckInterval, - sendCheckTimeout, + 14 * 1024 * 1024, + 0.8f, + 1, + 0.2f, + 50, + new RssConf(), + new HashMap<>(), + 1, + StorageType.withMemory(StorageType.valueOf("MEMORY")), + 500L, + 60 * 1000 * 10L, 1, 1, true, null, null, - TezRuntimeUtils.instantiateCombiner(conf, outputContext)); - + TezRuntimeUtils.instantiateCombiner( + conf, OutputTestHelpers.createOutputContext(conf, workingDir))); + WriteBuffer newBuffer = bufferManager.combineBuffer(buffer); WriteBuffer.BufferIterator kvIterator1 =