From 61234a455ca71e9e41b8a31100da1ac4c2184372 Mon Sep 17 00:00:00 2001 From: Surekha Saharan Date: Mon, 2 Jul 2018 22:49:31 -0700 Subject: [PATCH] Add partial changes from commit 969342cd287dc022c8e762b1353c433012363cb8 More error reporting and stats for ingestion tasks (#5418) These changes are required for the tasks api improvement commit --- .../java/io/druid/indexer/TaskStatusPlus.java | 67 ++++++++++------- .../io/druid/indexer/TaskStatusPlusTest.java | 3 +- .../io/druid/indexing/common/TaskStatus.java | 74 ++++++++++++++----- .../overlord/http/OverlordResource.java | 7 +- 4 files changed, 100 insertions(+), 51 deletions(-) diff --git a/api/src/main/java/io/druid/indexer/TaskStatusPlus.java b/api/src/main/java/io/druid/indexer/TaskStatusPlus.java index 61949cd02abc..1e628765a5bc 100644 --- a/api/src/main/java/io/druid/indexer/TaskStatusPlus.java +++ b/api/src/main/java/io/druid/indexer/TaskStatusPlus.java @@ -37,6 +37,8 @@ public class TaskStatusPlus private final Long duration; private final TaskLocation location; private final String dataSource; + @Nullable + private final String errorMsg; @JsonCreator public TaskStatusPlus( @@ -47,7 +49,8 @@ public TaskStatusPlus( @JsonProperty("statusCode") @Nullable TaskState state, @JsonProperty("duration") @Nullable Long duration, @JsonProperty("location") TaskLocation location, - @JsonProperty("dataSource") String dataSource + @JsonProperty("dataSource") String dataSource, + @JsonProperty("errorMsg") String errorMsg ) { if (state != null && state.isComplete()) { @@ -61,6 +64,7 @@ public TaskStatusPlus( this.duration = duration; this.location = Preconditions.checkNotNull(location, "location"); this.dataSource = dataSource; + this.errorMsg = errorMsg; } @JsonProperty @@ -108,46 +112,53 @@ public TaskLocation getLocation() return location; } + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @Nullable + @JsonProperty("errorMsg") + public String getErrorMsg() + { + return errorMsg; + } + @Override public boolean equals(Object o) { if (this == o) { return true; } - if (o == null || getClass() != o.getClass()) { return false; } - - final TaskStatusPlus that = (TaskStatusPlus) o; - if (!id.equals(that.id)) { - return false; - } - if (!createdTime.equals(that.createdTime)) { - return false; - } - if (!queueInsertionTime.equals(that.queueInsertionTime)) { - return false; - } - if (!Objects.equals(state, that.state)) { - return false; - } - if (!Objects.equals(duration, that.duration)) { - return false; - } - return location.equals(that.location); + TaskStatusPlus that = (TaskStatusPlus) o; + return Objects.equals(getId(), that.getId()) && + Objects.equals(getType(), that.getType()) && + Objects.equals(getCreatedTime(), that.getCreatedTime()) && + Objects.equals(getQueueInsertionTime(), that.getQueueInsertionTime()) && + getState() == that.getState() && + Objects.equals(getDuration(), that.getDuration()) && + Objects.equals(getLocation(), that.getLocation()) && + Objects.equals(getDataSource(), that.getDataSource()) && + Objects.equals(getErrorMsg(), that.getErrorMsg()); } @Override public int hashCode() { - return Objects.hash(id, createdTime, queueInsertionTime, state, duration, location); - } - - @JsonProperty - public String getDataSource() - { - return dataSource; + return Objects.hash( + getId(), + getType(), + getCreatedTime(), + getQueueInsertionTime(), + getState(), + getDuration(), + getLocation(), + getDataSource(), + getErrorMsg() + ); } - } diff --git a/api/src/test/java/io/druid/indexer/TaskStatusPlusTest.java b/api/src/test/java/io/druid/indexer/TaskStatusPlusTest.java index 200a6b4c701b..eeefe130e5be 100644 --- a/api/src/test/java/io/druid/indexer/TaskStatusPlusTest.java +++ b/api/src/test/java/io/druid/indexer/TaskStatusPlusTest.java @@ -53,7 +53,8 @@ public void testSerde() throws IOException TaskState.RUNNING, 1000L, TaskLocation.create("testHost", 1010, -1), - "ds_test" + "ds_test", + null ); final String json = mapper.writeValueAsString(status); Assert.assertEquals(status, mapper.readValue(json, TaskStatusPlus.class)); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java index b0249440c7a1..61e64917ca82 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskStatus.java @@ -34,40 +34,66 @@ */ public class TaskStatus { + public static final int MAX_ERROR_MSG_LENGTH = 100; + public static TaskStatus running(String taskId) { - return new TaskStatus(taskId, TaskState.RUNNING, -1); + return new TaskStatus(taskId, TaskState.RUNNING, -1, null); } public static TaskStatus success(String taskId) { - return new TaskStatus(taskId, TaskState.SUCCESS, -1); + return new TaskStatus(taskId, TaskState.SUCCESS, -1, null); + } + + public static TaskStatus success(String taskId, String errorMsg) + { + return new TaskStatus(taskId, TaskState.SUCCESS, -1, errorMsg); } public static TaskStatus failure(String taskId) { - return new TaskStatus(taskId, TaskState.FAILED, -1); + return new TaskStatus(taskId, TaskState.FAILED, -1, null); + } + + public static TaskStatus failure(String taskId, String errorMsg) + { + return new TaskStatus(taskId, TaskState.FAILED, -1, errorMsg); } public static TaskStatus fromCode(String taskId, TaskState code) { - return new TaskStatus(taskId, code, -1); + return new TaskStatus(taskId, code, -1, null); + } + + // The error message can be large, so truncate it to avoid storing large objects in zookeeper/metadata storage. + // The full error message will be available via a TaskReport. + private static String truncateErrorMsg(String errorMsg) + { + if (errorMsg != null && errorMsg.length() > MAX_ERROR_MSG_LENGTH) { + return errorMsg.substring(0, MAX_ERROR_MSG_LENGTH) + "..."; + } else { + return errorMsg; + } } private final String id; private final TaskState status; private final long duration; + private final String errorMsg; @JsonCreator protected TaskStatus( @JsonProperty("id") String id, @JsonProperty("status") TaskState status, - @JsonProperty("duration") long duration + @JsonProperty("duration") long duration, + @JsonProperty("errorMsg") String errorMsg ) { this.id = id; this.status = status; this.duration = duration; + this.errorMsg = truncateErrorMsg(errorMsg); // Check class invariants. Preconditions.checkNotNull(id, "id"); @@ -92,6 +118,12 @@ public long getDuration() return duration; } + @JsonProperty("errorMsg") + public String getErrorMsg() + { + return errorMsg; + } + /** * Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable, * isSuccess, or isFailure will be true at any one time. @@ -141,7 +173,18 @@ public boolean isFailure() public TaskStatus withDuration(long _duration) { - return new TaskStatus(id, status, _duration); + return new TaskStatus(id, status, _duration, errorMsg); + } + + @Override + public String toString() + { + return Objects.toStringHelper(this) + .add("id", id) + .add("status", status) + .add("duration", duration) + .add("errorMsg", errorMsg) + .toString(); } @Override @@ -154,24 +197,15 @@ public boolean equals(Object o) return false; } TaskStatus that = (TaskStatus) o; - return duration == that.duration && - java.util.Objects.equals(id, that.id) && - status == that.status; + return getDuration() == that.getDuration() && + java.util.Objects.equals(getId(), that.getId()) && + status == that.status && + java.util.Objects.equals(getErrorMsg(), that.getErrorMsg()); } @Override public int hashCode() { - return java.util.Objects.hash(id, status, duration); - } - - @Override - public String toString() - { - return Objects.toStringHelper(this) - .add("id", id) - .add("status", status) - .add("duration", duration) - .toString(); + return java.util.Objects.hash(getId(), status, getDuration(), getErrorMsg()); } } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 8be1dcdbe95e..91b22c82f77e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -621,7 +621,9 @@ public Response getCompleteTasks( status.getStatusCode(), status.getDuration(), TaskLocation.unknown(), - pair.rhs); + pair.rhs, + status.getErrorMsg() + ); })); return Response.ok(completeTasks).build(); @@ -777,7 +779,8 @@ public TaskStatusPlus apply(TaskRunnerWorkItem workItem) null, null, workItem.getLocation(), - workItem.getDataSource() + workItem.getDataSource(), + null ); } }