Skip to content

Commit

Permalink
[#992] fix(tez): convertTaskAttemptIdToLong should not consider appat…
Browse files Browse the repository at this point in the history
…temptId
  • Loading branch information
zhengchenyu committed Jul 17, 2023
1 parent 411b445 commit 3575e8d
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 22 deletions.
16 changes: 4 additions & 12 deletions client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -287,23 +287,15 @@ private static int mapVertexId(String vertexName) {
}
}

public static long convertTaskAttemptIdToLong(TezTaskAttemptID taskAttemptID, int appAttemptId) {
public static long convertTaskAttemptIdToLong(TezTaskAttemptID taskAttemptID) {
long lowBytes = taskAttemptID.getTaskID().getId();
if (lowBytes > Constants.MAX_TASK_ATTEMPT_ID) {
throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed");
}
if (appAttemptId < 1) {
throw new RssException("appAttemptId " + appAttemptId + " is wrong");
}
long highBytes = (long) taskAttemptID.getId() - (appAttemptId - 1) * 1000;
long highBytes = taskAttemptID.getId();
if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
throw new RssException(
"TaskAttempt "
+ taskAttemptID
+ " high bytes "
+ highBytes
+ " exceed, appAttemptId:"
+ appAttemptId);
"TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " exceed.");
}
long id =
(highBytes << (Constants.TASK_ATTEMPT_ID_MAX_LENGTH + Constants.PARTITION_ID_MAX_LENGTH))
Expand All @@ -323,7 +315,7 @@ public static Roaring64NavigableMap fetchAllRssTaskIds(
for (InputAttemptIdentifier inputAttemptIdentifier : successMapTaskAttempts) {
String pathComponent = inputAttemptIdentifier.getPathComponent();
TezTaskAttemptID mapTaskAttemptID = IdUtils.convertTezTaskAttemptID(pathComponent);
long rssTaskId = RssTezUtils.convertTaskAttemptIdToLong(mapTaskAttemptID, appAttemptId);
long rssTaskId = RssTezUtils.convertTaskAttemptIdToLong(mapTaskAttemptID);
long mapTaskId = mapTaskAttemptID.getTaskID().getId();

LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.IdUtils;
import org.apache.tez.common.RssTezConfig;
import org.apache.tez.common.RssTezUtils;
import org.apache.tez.dag.records.TezTaskAttemptID;
Expand Down Expand Up @@ -84,8 +83,7 @@ public RssSorter(
conf.getDouble(
RssTezConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
RssTezConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
long taskAttemptId =
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID, IdUtils.getAppAttemptId());
long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID);

long maxSegmentSize =
conf.getLong(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.IdUtils;
import org.apache.tez.common.RssTezConfig;
import org.apache.tez.common.RssTezUtils;
import org.apache.tez.dag.records.TezTaskAttemptID;
Expand Down Expand Up @@ -83,8 +82,7 @@ public RssUnSorter(
conf.getDouble(
RssTezConfig.RSS_CLIENT_SORT_MEMORY_USE_THRESHOLD,
RssTezConfig.RSS_CLIENT_DEFAULT_SORT_MEMORY_USE_THRESHOLD);
long taskAttemptId =
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID, IdUtils.getAppAttemptId());
long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptID);
long maxSegmentSize =
conf.getLong(
RssTezConfig.RSS_CLIENT_MAX_BUFFER_SIZE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void baskAttemptIdTest() {

boolean isException = false;
try {
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId);
} catch (RssException e) {
isException = true;
}
Expand All @@ -65,7 +65,7 @@ public void baskAttemptIdTest() {
tezTaskAttemptId = TezTaskAttemptID.getInstance(taskId, 2);
isException = false;
try {
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId);
} catch (RssException e) {
isException = true;
}
Expand All @@ -79,7 +79,7 @@ public void blockConvertTest() {
TezVertexID vId = TezVertexID.getInstance(dagId, 35);
TezTaskID tId = TezTaskID.getInstance(vId, 389);
TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tId, 2);
long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId);
long blockId = RssTezUtils.getBlockId(1, taskAttemptId, 0);
long newTaskAttemptId = RssTezUtils.getTaskAttemptId(blockId);
assertEquals(taskAttemptId, newTaskAttemptId);
Expand All @@ -95,7 +95,7 @@ public void testPartitionIdConvertBlock() {
TezVertexID vId = TezVertexID.getInstance(dagId, 35);
TezTaskID tId = TezTaskID.getInstance(vId, 389);
TezTaskAttemptID tezTaskAttemptId = TezTaskAttemptID.getInstance(tId, 2);
long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId, 1);
long taskAttemptId = RssTezUtils.convertTaskAttemptIdToLong(tezTaskAttemptId);
long mask = (1L << Constants.PARTITION_ID_MAX_LENGTH) - 1;
for (int partitionId = 0; partitionId <= 3000; partitionId++) {
for (int seqNo = 0; seqNo <= 10; seqNo++) {
Expand Down

0 comments on commit 3575e8d

Please sign in to comment.