Skip to content

Commit

Permalink
Use TaskReport uploads
Browse files Browse the repository at this point in the history
  • Loading branch information
jon-wei committed Apr 2, 2018
1 parent 4589808 commit d5f1e28
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 108 deletions.
21 changes: 3 additions & 18 deletions api/src/main/java/io/druid/indexer/TaskStatusPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;

public class TaskStatusPlus
Expand All @@ -42,9 +41,6 @@ public class TaskStatusPlus
@Nullable
private final String errorMsg;

@Nullable
private final Map<String, TaskReport> taskReports;

@JsonCreator
public TaskStatusPlus(
@JsonProperty("id") String id,
Expand All @@ -55,8 +51,7 @@ public TaskStatusPlus(
@JsonProperty("duration") @Nullable Long duration,
@JsonProperty("location") TaskLocation location,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("errorMsg") String errorMsg,
@JsonProperty("taskReports") Map<String, TaskReport> taskReports
@JsonProperty("errorMsg") String errorMsg
)
{
if (state != null && state.isComplete()) {
Expand All @@ -71,7 +66,6 @@ public TaskStatusPlus(
this.location = Preconditions.checkNotNull(location, "location");
this.dataSource = dataSource;
this.errorMsg = errorMsg;
this.taskReports = taskReports;
}

@JsonProperty
Expand Down Expand Up @@ -132,13 +126,6 @@ public String getErrorMsg()
return errorMsg;
}

@Nullable
@JsonProperty("taskReports")
public Map<String, TaskReport> getTaskReports()
{
return taskReports;
}

@Override
public boolean equals(Object o)
{
Expand All @@ -157,8 +144,7 @@ public boolean equals(Object o)
Objects.equals(getDuration(), that.getDuration()) &&
Objects.equals(getLocation(), that.getLocation()) &&
Objects.equals(getDataSource(), that.getDataSource()) &&
Objects.equals(getErrorMsg(), that.getErrorMsg()) &&
Objects.equals(getTaskReports(), that.getTaskReports());
Objects.equals(getErrorMsg(), that.getErrorMsg());
}

@Override
Expand All @@ -173,8 +159,7 @@ public int hashCode()
getDuration(),
getLocation(),
getDataSource(),
getErrorMsg(),
getTaskReports()
getErrorMsg()
);
}
}
1 change: 0 additions & 1 deletion api/src/test/java/io/druid/indexer/TaskStatusPlusTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public void testSerde() throws IOException
1000L,
TaskLocation.create("testHost", 1010, -1),
"ds_test",
null,
null
);
final String json = mapper.writeValueAsString(status);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.indexer.TaskMetricsUtils;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.TaskReport;
import io.druid.indexing.common.TaskReportFileWriter;
import io.druid.indexing.common.task.IndexTaskTest;
import io.druid.client.cache.CacheConfig;
import io.druid.client.cache.MapCache;
Expand All @@ -63,7 +65,6 @@
import io.druid.indexing.common.actions.TaskActionToolbox;
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTestTaskFileWriter;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.kafka.supervisor.KafkaSupervisor;
import io.druid.indexing.kafka.test.TestBroker;
Expand Down Expand Up @@ -208,6 +209,7 @@ public class KafkaIndexTaskTest
private List<ProducerRecord<byte[], byte[]>> records;
private final boolean isIncrementalHandoffSupported;
private final Set<Integer> checkpointRequestsHash = Sets.newHashSet();
private File reportsFile;

// This should be removed in versions greater that 0.12.x
// isIncrementalHandoffSupported should always be set to true in those later versions
Expand Down Expand Up @@ -327,6 +329,7 @@ public void setupTest() throws IOException
doHandoff = true;
topic = getTopicName();
records = generateRecords(topic);
reportsFile = File.createTempFile("KafkaIndexTaskTestReports-" + System.currentTimeMillis(), "json");
makeToolboxFactory();
}

Expand All @@ -340,7 +343,7 @@ public void tearDownTest()

runningTasks.clear();
}

reportsFile.delete();
destroyToolboxFactory();
}

Expand Down Expand Up @@ -1046,17 +1049,15 @@ public void testMultipleParseExceptionsSuccess() throws Exception
metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource())
);

IngestionStatsAndErrorsTaskReportData reportData = IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports(
status.getTaskReports()
);
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();

Map<String, Object> expectedMetrics = ImmutableMap.of(
"buildSegments",
ImmutableMap.of(
TaskMetricsUtils.ROWS_PROCESSED, 4L,
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 3L,
TaskMetricsUtils.ROWS_UNPARSEABLE, 3L,
TaskMetricsUtils.ROWS_THROWN_AWAY, 1L
TaskMetricsUtils.ROWS_PROCESSED, 4,
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 3,
TaskMetricsUtils.ROWS_UNPARSEABLE, 3,
TaskMetricsUtils.ROWS_THROWN_AWAY, 1
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
Expand Down Expand Up @@ -1123,17 +1124,15 @@ public void testMultipleParseExceptionsFailure() throws Exception
Assert.assertEquals(ImmutableSet.of(), publishedDescriptors());
Assert.assertNull(metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()));

IngestionStatsAndErrorsTaskReportData reportData = IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports(
status.getTaskReports()
);
IngestionStatsAndErrorsTaskReportData reportData = getTaskReportData();

Map<String, Object> expectedMetrics = ImmutableMap.of(
"buildSegments",
ImmutableMap.of(
TaskMetricsUtils.ROWS_PROCESSED, 3L,
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 0L,
TaskMetricsUtils.ROWS_UNPARSEABLE, 3L,
TaskMetricsUtils.ROWS_THROWN_AWAY, 0L
TaskMetricsUtils.ROWS_PROCESSED, 3,
TaskMetricsUtils.ROWS_PROCESSED_WITH_ERRORS, 0,
TaskMetricsUtils.ROWS_UNPARSEABLE, 3,
TaskMetricsUtils.ROWS_THROWN_AWAY, 0
)
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
Expand Down Expand Up @@ -2228,7 +2227,7 @@ public List<StorageLocationConfig> getLocations()
EasyMock.createNiceMock(DruidNode.class),
new LookupNodeService("tier"),
new DataNodeService("tier", 1, ServerType.INDEXER_EXECUTOR, 0),
new NoopTestTaskFileWriter()
new TaskReportFileWriter(reportsFile)
);
}

Expand Down Expand Up @@ -2350,4 +2349,17 @@ private SegmentDescriptor SD(final Task task, final String intervalString, final
final Interval interval = Intervals.of(intervalString);
return new SegmentDescriptor(interval, getLock(task, interval).getVersion(), partitionNum);
}

private IngestionStatsAndErrorsTaskReportData getTaskReportData() throws IOException
{
Map<String, TaskReport> taskReports = objectMapper.readValue(
reportsFile,
new TypeReference<Map<String, TaskReport>>()
{
}
);
return IngestionStatsAndErrorsTaskReportData.getPayloadFromTaskReports(
taskReports
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
package io.druid.indexing.common;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;

import java.util.Objects;

@JsonTypeName("ingestionStatsAndErrors")
public class IngestionStatsAndErrorsTaskReport implements TaskReport
{
public static final String REPORT_KEY = "ingestionStatsAndErrors";
Expand Down Expand Up @@ -88,4 +90,10 @@ public String toString()
", payload=" + payload +
'}';
}
}

@JsonProperty("type")
private String getType()
{
return "ingestionStatsAndErrors";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.google.common.collect.Maps;
import io.druid.indexer.IngestionStatsAndErrorsTaskReport;

import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,8 @@ public Response getCompleteTasks(
status.getDuration(),
TaskLocation.unknown(),
pair.rhs,
status.getErrorMsg(),
status.getTaskReports());
status.getErrorMsg()
);
}));

return Response.ok(completeTasks).build();
Expand Down Expand Up @@ -807,7 +807,6 @@ public TaskStatusPlus apply(TaskRunnerWorkItem workItem)
null,
workItem.getLocation(),
workItem.getDataSource(),
null,
null
);
}
Expand Down
Loading

0 comments on commit d5f1e28

Please sign in to comment.