Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#972 ] fix(tez): Add output mapOutputByteCounter metrics #1016

Merged
merged 5 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.tez.common.RssTezUtils;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
Expand Down Expand Up @@ -95,6 +96,7 @@ public class WriteBufferManager<K, V> {
private final RssConf rssConf;
private final int shuffleId;
private final boolean isNeedSorted;
private final TezCounter mapOutputByteCounter;

/** WriteBufferManager */
public WriteBufferManager(
Expand All @@ -121,7 +123,8 @@ public WriteBufferManager(
long sendCheckTimeout,
int bitmapSplitNum,
int shuffleId,
boolean isNeedSorted) {
boolean isNeedSorted,
TezCounter mapOutputByteCounter) {
this.tezTaskAttemptID = tezTaskAttemptID;
this.maxMemSize = maxMemSize;
this.appId = appId;
Expand All @@ -147,6 +150,7 @@ public WriteBufferManager(
this.rssConf = rssConf;
this.shuffleId = shuffleId;
this.isNeedSorted = isNeedSorted;
this.mapOutputByteCounter = mapOutputByteCounter;
this.sendExecutorService =
Executors.newFixedThreadPool(1, ThreadUtils.getThreadFactory("send-thread"));
}
Expand Down Expand Up @@ -192,6 +196,7 @@ public void addRecord(int partitionId, K key, V value) throws InterruptedExcepti
&& inSendListBytes.get() <= maxMemSize * sendThreshold) {
sendBuffersToServers();
}
mapOutputByteCounter.increment(length);
}

private void sendBufferToServers(WriteBuffer<K, V> buffer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ public RssSorter(
sendCheckTimeout,
bitmapSplitNum,
shuffleId,
true);
true,
mapOutputByteCounter);
LOG.info("Initialized WriteBufferManager.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public RssUnSorter(
sendCheckTimeout,
bitmapSplitNum,
shuffleId,
false);
false,
mapOutputByteCounter);
LOG.info("Initialized WriteBufferManager.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.tez.runtime.library.common.sort.buffer;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -28,16 +29,29 @@
import java.util.stream.Collectors;

import com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.mapred.JobConf;
import org.apache.tez.common.RssTezUtils;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.output.OutputTestHelpers;
import org.apache.tez.runtime.library.output.RssOrderedPartitionedKVOutputTest;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import org.apache.uniffle.client.api.ShuffleWriteClient;
Expand All @@ -57,14 +71,14 @@

public class WriteBufferManagerTest {
@Test
public void testWriteException() throws IOException, InterruptedException {
public void testWriteException(@TempDir File tmpDir) throws IOException, InterruptedException {
TezTaskAttemptID tezTaskAttemptID =
TezTaskAttemptID.fromString("attempt_1681717153064_3770270_1_00_000000_0");
long maxMemSize = 10240;
String appId = "application_1681717153064_3770270";
long taskAttemptId = 0;
Set<Long> successBlockIds = Sets.newConcurrentHashSet();
Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
final long maxMemSize = 10240;
final String appId = "application_1681717153064_3770270";
final long taskAttemptId = 0;
final Set<Long> successBlockIds = Sets.newConcurrentHashSet();
final Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
MockShuffleWriteClient writeClient = new MockShuffleWriteClient();
RawComparator comparator = WritableComparator.get(BytesWritable.class);
long maxSegmentSize = 3 * 1024;
Expand All @@ -86,6 +100,23 @@ public void testWriteException() throws IOException, InterruptedException {
int bitmapSplitNum = 1;
int shuffleId = getShuffleId(tezTaskAttemptID, 1, 2);

Configuration conf = new Configuration();
FileSystem localFs = FileSystem.getLocal(conf);
Path workingDir =
new Path(
System.getProperty(
"test.build.data", System.getProperty("java.io.tmpdir", tmpDir.toString())),
RssOrderedPartitionedKVOutputTest.class.getName())
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
conf.set(
TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, HashPartitioner.class.getName());
conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
TezCounter mapOutputByteCounter =
outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);

WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
new WriteBufferManager(
tezTaskAttemptID,
Expand All @@ -111,7 +142,8 @@ public void testWriteException() throws IOException, InterruptedException {
sendCheckTimeout,
bitmapSplitNum,
shuffleId,
true);
true,
mapOutputByteCounter);

Random random = new Random();
for (int i = 0; i < 1000; i++) {
Expand All @@ -122,6 +154,8 @@ public void testWriteException() throws IOException, InterruptedException {
bufferManager.addRecord(1, new BytesWritable(key), new BytesWritable(value));
}

assertEquals(1052000, mapOutputByteCounter.getValue());

boolean isException = false;
try {
bufferManager.waitSendFinished();
Expand All @@ -132,14 +166,14 @@ public void testWriteException() throws IOException, InterruptedException {
}

@Test
public void testWriteNormal() throws IOException, InterruptedException {
public void testWriteNormal(@TempDir File tmpDir) throws IOException, InterruptedException {
TezTaskAttemptID tezTaskAttemptID =
TezTaskAttemptID.fromString("attempt_1681717153064_3770270_1_00_000000_0");
long maxMemSize = 10240;
String appId = "appattempt_1681717153064_3770270_000001";
long taskAttemptId = 0;
Set<Long> successBlockIds = Sets.newConcurrentHashSet();
Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
final long maxMemSize = 10240;
final String appId = "appattempt_1681717153064_3770270_000001";
final long taskAttemptId = 0;
final Set<Long> successBlockIds = Sets.newConcurrentHashSet();
final Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
MockShuffleWriteClient writeClient = new MockShuffleWriteClient();
writeClient.setMode(2);
RawComparator comparator = WritableComparator.get(BytesWritable.class);
Expand All @@ -162,6 +196,23 @@ public void testWriteNormal() throws IOException, InterruptedException {
int bitmapSplitNum = 1;
int shuffleId = getShuffleId(tezTaskAttemptID, 1, 2);

Configuration conf = new Configuration();
FileSystem localFs = FileSystem.getLocal(conf);
Path workingDir =
new Path(
System.getProperty(
"test.build.data", System.getProperty("java.io.tmpdir", tmpDir.toString())),
RssOrderedPartitionedKVOutputTest.class.getName())
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
conf.set(
TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, HashPartitioner.class.getName());
conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
TezCounter mapOutputByteCounter =
outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);

WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
new WriteBufferManager(
tezTaskAttemptID,
Expand All @@ -187,7 +238,8 @@ public void testWriteNormal() throws IOException, InterruptedException {
sendCheckTimeout,
bitmapSplitNum,
shuffleId,
true);
true,
mapOutputByteCounter);

Random random = new Random();
for (int i = 0; i < 1000; i++) {
Expand All @@ -198,6 +250,8 @@ public void testWriteNormal() throws IOException, InterruptedException {
int partitionId = random.nextInt(50);
bufferManager.addRecord(partitionId, new BytesWritable(key), new BytesWritable(value));
}

assertEquals(1052000, mapOutputByteCounter.getValue());
bufferManager.waitSendFinished();
assertTrue(bufferManager.getWaitSendBuffers().isEmpty());

Expand All @@ -208,6 +262,8 @@ public void testWriteNormal() throws IOException, InterruptedException {
random.nextBytes(value);
bufferManager.addRecord(i, new BytesWritable(key), new BytesWritable(value));
}

assertEquals(1175900, mapOutputByteCounter.getValue());
assert (1 == bufferManager.getWaitSendBuffers().size());
assert (4928 == bufferManager.getWaitSendBuffers().get(0).getDataLength());

Expand All @@ -216,14 +272,15 @@ public void testWriteNormal() throws IOException, InterruptedException {
}

@Test
public void testCommitBlocksWhenMemoryShuffleDisabled() throws IOException, InterruptedException {
public void testCommitBlocksWhenMemoryShuffleDisabled(@TempDir File tmpDir)
throws IOException, InterruptedException {
TezTaskAttemptID tezTaskAttemptID =
TezTaskAttemptID.fromString("attempt_1681717153064_3770270_1_00_000000_0");
long maxMemSize = 10240;
String appId = "application_1681717153064_3770270";
long taskAttemptId = 0;
Set<Long> successBlockIds = Sets.newConcurrentHashSet();
Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
final long maxMemSize = 10240;
final String appId = "application_1681717153064_3770270";
final long taskAttemptId = 0;
final Set<Long> successBlockIds = Sets.newConcurrentHashSet();
final Set<Long> failedBlockIds = Sets.newConcurrentHashSet();
MockShuffleWriteClient writeClient = new MockShuffleWriteClient();
writeClient.setMode(3);
RawComparator comparator = WritableComparator.get(BytesWritable.class);
Expand All @@ -245,6 +302,23 @@ public void testCommitBlocksWhenMemoryShuffleDisabled() throws IOException, Inte
int bitmapSplitNum = 1;
int shuffleId = getShuffleId(tezTaskAttemptID, 1, 2);

Configuration conf = new Configuration();
FileSystem localFs = FileSystem.getLocal(conf);
Path workingDir =
new Path(
System.getProperty(
"test.build.data", System.getProperty("java.io.tmpdir", tmpDir.toString())),
RssOrderedPartitionedKVOutputTest.class.getName())
.makeQualified(localFs.getUri(), localFs.getWorkingDirectory());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, Text.class.getName());
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS, Text.class.getName());
conf.set(
TezRuntimeConfiguration.TEZ_RUNTIME_PARTITIONER_CLASS, HashPartitioner.class.getName());
conf.setStrings(TezRuntimeFrameworkConfigs.LOCAL_DIRS, workingDir.toString());
OutputContext outputContext = OutputTestHelpers.createOutputContext(conf, workingDir);
TezCounter mapOutputByteCounter =
outputContext.getCounters().findCounter(TaskCounter.OUTPUT_BYTES);

WriteBufferManager<BytesWritable, BytesWritable> bufferManager =
new WriteBufferManager(
tezTaskAttemptID,
Expand All @@ -270,7 +344,8 @@ public void testCommitBlocksWhenMemoryShuffleDisabled() throws IOException, Inte
sendCheckTimeout,
bitmapSplitNum,
shuffleId,
true);
true,
mapOutputByteCounter);

Random random = new Random();
for (int i = 0; i < 10000; i++) {
Expand All @@ -283,6 +358,7 @@ public void testCommitBlocksWhenMemoryShuffleDisabled() throws IOException, Inte
}
bufferManager.waitSendFinished();

assertEquals(10520000, mapOutputByteCounter.getValue());
assertTrue(bufferManager.getWaitSendBuffers().isEmpty());
assertEquals(
writeClient.mockedShuffleServer.getFinishBlockSize(),
Expand Down