Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#1398] fix(mr,tez): Make attempId computable and move it to taskAttemptId in BlockId layout. #1418

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -44,37 +44,40 @@ public class RssMRUtils {

private static final Logger LOG = LoggerFactory.getLogger(RssMRUtils.class);
private static final BlockIdLayout LAYOUT = BlockIdLayout.DEFAULT;
private static final int MAX_ATTEMPT_LENGTH = 6;
private static final int MAX_ATTEMPT_LENGTH = 4;
private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
private static final int MAX_SEQUENCE_NO =
(1 << (LAYOUT.sequenceNoBits - MAX_ATTEMPT_LENGTH)) - 1;
private static final int MAX_TASK_LENGTH = LAYOUT.taskAttemptIdBits - MAX_ATTEMPT_LENGTH;
private static final int MAX_TASK_ID = (1 << MAX_TASK_LENGTH) - 1;

// Class TaskAttemptId have two field id and mapId, rss taskAttemptID have 21 bits,
// mapId is 19 bits, id is 2 bits. MR have a trick logic, taskAttemptId will increase
// 1000 * (appAttemptId - 1), so we will decrease it.
// Class TaskAttemptId have two field id and mapId. MR have a trick logic, taskAttemptId will
// increase 1000 * (appAttemptId - 1), so we will decrease it.
public static long convertTaskAttemptIdToLong(TaskAttemptID taskAttemptID, int appAttemptId) {
qijiale76 marked this conversation as resolved.
Show resolved Hide resolved
qijiale76 marked this conversation as resolved.
Show resolved Hide resolved
int lowBytes = taskAttemptID.getTaskID().getId();
if (lowBytes > LAYOUT.maxTaskAttemptId) {
throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed");
}
if (appAttemptId < 1) {
throw new RssException("appAttemptId " + appAttemptId + " is wrong");
}
int highBytes = taskAttemptID.getId() - (appAttemptId - 1) * 1000;
if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
long lowBytes = taskAttemptID.getId() - (appAttemptId - 1) * 1000L;
if (lowBytes > MAX_ATTEMPT_ID || lowBytes < 0) {
throw new RssException(
"TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed " + MAX_ATTEMPT_ID);
}
long highBytes = taskAttemptID.getTaskID().getId();
if (highBytes > MAX_TASK_ID || highBytes < 0) {
throw new RssException(
"TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " exceed");
"TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " exceed " + MAX_TASK_ID);
}
return LAYOUT.getBlockId(highBytes, 0, lowBytes);
long taskAttemptId = (highBytes << (MAX_ATTEMPT_LENGTH)) + lowBytes;
return LAYOUT.getBlockId(0, 0, taskAttemptId);
qijiale76 marked this conversation as resolved.
Show resolved Hide resolved
}

public static TaskAttemptID createMRTaskAttemptId(
JobID jobID, TaskType taskType, long rssTaskAttemptId, int appAttemptId) {
if (appAttemptId < 1) {
throw new RssException("appAttemptId " + appAttemptId + " is wrong");
}
TaskID taskID = new TaskID(jobID, taskType, LAYOUT.getTaskAttemptId(rssTaskAttemptId));
int id = LAYOUT.getSequenceNo(rssTaskAttemptId) + 1000 * (appAttemptId - 1);
int task = LAYOUT.getTaskAttemptId(rssTaskAttemptId) >> MAX_ATTEMPT_LENGTH;
qijiale76 marked this conversation as resolved.
Show resolved Hide resolved
int attempt = (int) (rssTaskAttemptId & MAX_ATTEMPT_ID);
TaskID taskID = new TaskID(jobID, taskType, task);
int id = attempt + 1000 * (appAttemptId - 1);
return new TaskAttemptID(taskID, id);
}

Expand Down Expand Up @@ -228,27 +231,33 @@ public static String getString(Configuration rssJobConf, String key, String defa
}

public static long getBlockId(int partitionId, long taskAttemptId, int nextSeqNo) {
qijiale76 marked this conversation as resolved.
Show resolved Hide resolved
long attemptId = taskAttemptId >> (LAYOUT.partitionIdBits + LAYOUT.taskAttemptIdBits);
if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
if (taskAttemptId < 0 || taskAttemptId > LAYOUT.maxTaskAttemptId) {
throw new RssException(
"Can't support attemptId [" + attemptId + "], the max value should be " + MAX_ATTEMPT_ID);
"Can't support attemptId ["
+ taskAttemptId
+ "], the max value should be "
+ LAYOUT.maxTaskAttemptId);
}
if (nextSeqNo < 0 || nextSeqNo > MAX_SEQUENCE_NO) {
if (nextSeqNo < 0 || nextSeqNo > LAYOUT.maxSequenceNo) {
throw new RssException(
"Can't support sequence [" + nextSeqNo + "], the max value should be " + MAX_SEQUENCE_NO);
"Can't support sequence ["
+ nextSeqNo
+ "], the max value should be "
+ LAYOUT.maxSequenceNo);
}

int atomicInt = (int) ((nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId);
long taskId =
taskAttemptId - (attemptId << (LAYOUT.partitionIdBits + LAYOUT.taskAttemptIdBits));

return LAYOUT.getBlockId(atomicInt, partitionId, taskId);
if (partitionId < 0 || partitionId > LAYOUT.maxPartitionId) {
throw new RssException(
"Can't support partitionId ["
+ partitionId
+ "], the max value should be "
+ LAYOUT.maxPartitionId);
}
return LAYOUT.getBlockId(nextSeqNo, partitionId, taskAttemptId);
}

public static long getTaskAttemptId(long blockId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This task attempt id is derived from the block id, hence it is reduced in its bit size:

Suggested change
public static long getTaskAttemptId(long blockId) {
public static int getTaskAttemptId(long blockId) {

The caller of this method can continue to upcast the returned int to long, no problem.

int mapId = LAYOUT.getTaskAttemptId(blockId);
int attemptId = LAYOUT.getSequenceNo(blockId) & MAX_ATTEMPT_ID;
return LAYOUT.getBlockId(attemptId, 0, mapId);
return LAYOUT.getTaskAttemptId(blockId);
}

public static int estimateTaskConcurrency(JobConf jobConf) {
Expand Down
44 changes: 21 additions & 23 deletions client-tez/src/main/java/org/apache/tez/common/RssTezUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,10 @@ public class RssTezUtils {

private static final Logger LOG = LoggerFactory.getLogger(RssTezUtils.class);
private static final BlockIdLayout LAYOUT = BlockIdLayout.DEFAULT;
private static final int MAX_ATTEMPT_LENGTH = 6;
private static final int MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
private static final int MAX_SEQUENCE_NO =
(1 << (LAYOUT.sequenceNoBits - MAX_ATTEMPT_LENGTH)) - 1;
public static final int MAX_ATTEMPT_LENGTH = 4;
public static final long MAX_ATTEMPT_ID = (1 << MAX_ATTEMPT_LENGTH) - 1;
qijiale76 marked this conversation as resolved.
Show resolved Hide resolved
private static final int MAX_TASK_LENGTH = LAYOUT.taskAttemptIdBits - MAX_ATTEMPT_LENGTH;
private static final int MAX_TASK_ID = (1 << MAX_TASK_LENGTH) - 1;

public static final String HOST_NAME = "hostname";

Expand Down Expand Up @@ -164,27 +164,25 @@ public static long getBlockId(int partitionId, long taskAttemptId, int nextSeqNo
partitionId,
taskAttemptId,
nextSeqNo);
long attemptId = taskAttemptId >> (LAYOUT.partitionIdBits + LAYOUT.taskAttemptIdBits);
if (attemptId < 0 || attemptId > MAX_ATTEMPT_ID) {
if (taskAttemptId < 0 || taskAttemptId > LAYOUT.maxTaskAttemptId) {
throw new RssException(
"Can't support attemptId [" + attemptId + "], the max value should be " + MAX_ATTEMPT_ID);
"Can't support attemptId ["
+ taskAttemptId
+ "], the max value should be "
+ LAYOUT.maxTaskAttemptId);
}
if (nextSeqNo < 0 || nextSeqNo > MAX_SEQUENCE_NO) {
if (nextSeqNo < 0 || nextSeqNo > LAYOUT.maxSequenceNo) {
throw new RssException(
"Can't support sequence [" + nextSeqNo + "], the max value should be " + MAX_SEQUENCE_NO);
"Can't support sequence ["
+ nextSeqNo
+ "], the max value should be "
+ LAYOUT.maxSequenceNo);
}

int atomicInt = (int) ((nextSeqNo << MAX_ATTEMPT_LENGTH) + attemptId);
long taskId =
taskAttemptId - (attemptId << (LAYOUT.partitionIdBits + LAYOUT.taskAttemptIdBits));

return LAYOUT.getBlockId(atomicInt, partitionId, taskId);
return LAYOUT.getBlockId(nextSeqNo, partitionId, taskAttemptId);
}

public static long getTaskAttemptId(long blockId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This task attempt id is derived from the block id, hence it is reduced in its bit size:

Suggested change
public static long getTaskAttemptId(long blockId) {
public static int getTaskAttemptId(long blockId) {

The caller of this method can continue to upcast the returned int to long, no problem.

int mapId = LAYOUT.getTaskAttemptId(blockId);
int attemptId = LAYOUT.getSequenceNo(blockId) & MAX_ATTEMPT_ID;
return LAYOUT.getBlockId(attemptId, 0, mapId);
return LAYOUT.getTaskAttemptId(blockId);
}

public static int estimateTaskConcurrency(Configuration jobConf, int mapNum, int reduceNum) {
Expand Down Expand Up @@ -277,16 +275,16 @@ private static int mapVertexId(String vertexName) {
}

public static long convertTaskAttemptIdToLong(TezTaskAttemptID taskAttemptID) {
qijiale76 marked this conversation as resolved.
Show resolved Hide resolved
int lowBytes = taskAttemptID.getTaskID().getId();
if (lowBytes > LAYOUT.maxTaskAttemptId) {
int lowBytes = taskAttemptID.getId();
if (lowBytes > MAX_ATTEMPT_ID || lowBytes < 0) {
throw new RssException("TaskAttempt " + taskAttemptID + " low bytes " + lowBytes + " exceed");
}
int highBytes = taskAttemptID.getId();
if (highBytes > MAX_ATTEMPT_ID || highBytes < 0) {
int highBytes = taskAttemptID.getTaskID().getId();
if (highBytes > MAX_TASK_ID || highBytes < 0) {
throw new RssException(
"TaskAttempt " + taskAttemptID + " high bytes " + highBytes + " exceed.");
}
long id = LAYOUT.getBlockId(highBytes, 0, lowBytes);
long id = (long) highBytes << MAX_ATTEMPT_LENGTH + lowBytes;
qijiale76 marked this conversation as resolved.
Show resolved Hide resolved
LOG.info("ConvertTaskAttemptIdToLong taskAttemptID:{}, id is {}, .", taskAttemptID, id);
return id;
}
Expand Down