Skip to content

Commit

Permalink
Add partial changes from commit 969342c
Browse files Browse the repository at this point in the history
More error reporting and stats for ingestion tasks (apache#5418)
These changes are required for the tasks api improvement commit
  • Loading branch information
Surekha Saharan committed Jul 3, 2018
1 parent 3904ce8 commit 61234a4
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 51 deletions.
67 changes: 39 additions & 28 deletions api/src/main/java/io/druid/indexer/TaskStatusPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class TaskStatusPlus
private final Long duration;
private final TaskLocation location;
private final String dataSource;
@Nullable
private final String errorMsg;

@JsonCreator
public TaskStatusPlus(
Expand All @@ -47,7 +49,8 @@ public TaskStatusPlus(
@JsonProperty("statusCode") @Nullable TaskState state,
@JsonProperty("duration") @Nullable Long duration,
@JsonProperty("location") TaskLocation location,
@JsonProperty("dataSource") String dataSource
@JsonProperty("dataSource") String dataSource,
@JsonProperty("errorMsg") String errorMsg
)
{
if (state != null && state.isComplete()) {
Expand All @@ -61,6 +64,7 @@ public TaskStatusPlus(
this.duration = duration;
this.location = Preconditions.checkNotNull(location, "location");
this.dataSource = dataSource;
this.errorMsg = errorMsg;
}

@JsonProperty
Expand Down Expand Up @@ -108,46 +112,53 @@ public TaskLocation getLocation()
return location;
}

@JsonProperty
public String getDataSource()
{
return dataSource;
}

@Nullable
@JsonProperty("errorMsg")
public String getErrorMsg()
{
return errorMsg;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

final TaskStatusPlus that = (TaskStatusPlus) o;
if (!id.equals(that.id)) {
return false;
}
if (!createdTime.equals(that.createdTime)) {
return false;
}
if (!queueInsertionTime.equals(that.queueInsertionTime)) {
return false;
}
if (!Objects.equals(state, that.state)) {
return false;
}
if (!Objects.equals(duration, that.duration)) {
return false;
}
return location.equals(that.location);
TaskStatusPlus that = (TaskStatusPlus) o;
return Objects.equals(getId(), that.getId()) &&
Objects.equals(getType(), that.getType()) &&
Objects.equals(getCreatedTime(), that.getCreatedTime()) &&
Objects.equals(getQueueInsertionTime(), that.getQueueInsertionTime()) &&
getState() == that.getState() &&
Objects.equals(getDuration(), that.getDuration()) &&
Objects.equals(getLocation(), that.getLocation()) &&
Objects.equals(getDataSource(), that.getDataSource()) &&
Objects.equals(getErrorMsg(), that.getErrorMsg());
}

@Override
public int hashCode()
{
return Objects.hash(id, createdTime, queueInsertionTime, state, duration, location);
}

@JsonProperty
public String getDataSource()
{
return dataSource;
return Objects.hash(
getId(),
getType(),
getCreatedTime(),
getQueueInsertionTime(),
getState(),
getDuration(),
getLocation(),
getDataSource(),
getErrorMsg()
);
}

}
3 changes: 2 additions & 1 deletion api/src/test/java/io/druid/indexer/TaskStatusPlusTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ public void testSerde() throws IOException
TaskState.RUNNING,
1000L,
TaskLocation.create("testHost", 1010, -1),
"ds_test"
"ds_test",
null
);
final String json = mapper.writeValueAsString(status);
Assert.assertEquals(status, mapper.readValue(json, TaskStatusPlus.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,40 +34,66 @@
*/
public class TaskStatus
{
public static final int MAX_ERROR_MSG_LENGTH = 100;

public static TaskStatus running(String taskId)
{
return new TaskStatus(taskId, TaskState.RUNNING, -1);
return new TaskStatus(taskId, TaskState.RUNNING, -1, null);
}

public static TaskStatus success(String taskId)
{
return new TaskStatus(taskId, TaskState.SUCCESS, -1);
return new TaskStatus(taskId, TaskState.SUCCESS, -1, null);
}

public static TaskStatus success(String taskId, String errorMsg)
{
return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg);
}

public static TaskStatus failure(String taskId)
{
return new TaskStatus(taskId, TaskState.FAILED, -1);
return new TaskStatus(taskId, TaskState.FAILED, -1, null);
}

public static TaskStatus failure(String taskId, String errorMsg)
{
return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg);
}

public static TaskStatus fromCode(String taskId, TaskState code)
{
return new TaskStatus(taskId, code, -1);
return new TaskStatus(taskId, code, -1, null);
}

// The error message can be large, so truncate it to avoid storing large objects in zookeeper/metadata storage.
// The full error message will be available via a TaskReport.
private static String truncateErrorMsg(String errorMsg)
{
if (errorMsg != null && errorMsg.length() > MAX_ERROR_MSG_LENGTH) {
return errorMsg.substring(0, MAX_ERROR_MSG_LENGTH) + "...";
} else {
return errorMsg;
}
}

private final String id;
private final TaskState status;
private final long duration;
private final String errorMsg;

@JsonCreator
protected TaskStatus(
@JsonProperty("id") String id,
@JsonProperty("status") TaskState status,
@JsonProperty("duration") long duration
@JsonProperty("duration") long duration,
@JsonProperty("errorMsg") String errorMsg
)
{
this.id = id;
this.status = status;
this.duration = duration;
this.errorMsg = truncateErrorMsg(errorMsg);

// Check class invariants.
Preconditions.checkNotNull(id, "id");
Expand All @@ -92,6 +118,12 @@ public long getDuration()
return duration;
}

@JsonProperty("errorMsg")
public String getErrorMsg()
{
return errorMsg;
}

/**
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
* isSuccess, or isFailure will be true at any one time.
Expand Down Expand Up @@ -141,7 +173,18 @@ public boolean isFailure()

public TaskStatus withDuration(long _duration)
{
return new TaskStatus(id, status, _duration);
return new TaskStatus(id, status, _duration, errorMsg);
}

@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("id", id)
.add("status", status)
.add("duration", duration)
.add("errorMsg", errorMsg)
.toString();
}

@Override
Expand All @@ -154,24 +197,15 @@ public boolean equals(Object o)
return false;
}
TaskStatus that = (TaskStatus) o;
return duration == that.duration &&
java.util.Objects.equals(id, that.id) &&
status == that.status;
return getDuration() == that.getDuration() &&
java.util.Objects.equals(getId(), that.getId()) &&
status == that.status &&
java.util.Objects.equals(getErrorMsg(), that.getErrorMsg());
}

@Override
public int hashCode()
{
return java.util.Objects.hash(id, status, duration);
}

@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("id", id)
.add("status", status)
.add("duration", duration)
.toString();
return java.util.Objects.hash(getId(), status, getDuration(), getErrorMsg());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,9 @@ public Response getCompleteTasks(
status.getStatusCode(),
status.getDuration(),
TaskLocation.unknown(),
pair.rhs);
pair.rhs,
status.getErrorMsg()
);
}));

return Response.ok(completeTasks).build();
Expand Down Expand Up @@ -777,7 +779,8 @@ public TaskStatusPlus apply(TaskRunnerWorkItem workItem)
null,
null,
workItem.getLocation(),
workItem.getDataSource()
workItem.getDataSource(),
null
);
}
}
Expand Down

0 comments on commit 61234a4

Please sign in to comment.