Skip to content

Commit

Permalink
PR comments, add support in AppenderatorDriverRealtimeIndexTask
Browse files Browse the repository at this point in the history
  • Loading branch information
jon-wei committed Mar 19, 2018
1 parent 278144e commit bbda06b
Show file tree
Hide file tree
Showing 21 changed files with 759 additions and 370 deletions.
22 changes: 13 additions & 9 deletions api/src/main/java/io/druid/indexer/TaskStatusPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ public TaskLocation getLocation()
return location;
}

@JsonProperty
public String getDataSource()
{
return dataSource;
}

@Nullable
@JsonProperty("metrics")
public Map<String, Object> getMetrics()
Expand Down Expand Up @@ -180,11 +186,15 @@ public boolean equals(Object o)
return false;
}

if (!Objects.equals(errorMsg, that.errorMsg)) {
if (!Objects.equals(dataSource, that.dataSource)) {
return false;
}

if (!Objects.equals(location, that.location)) {
if (!Objects.equals(metrics, that.metrics)) {
return false;
}

if (!Objects.equals(errorMsg, that.errorMsg)) {
return false;
}

Expand All @@ -202,16 +212,10 @@ public int hashCode()
state,
duration,
location,
dataSource,
metrics,
errorMsg,
context
);
}

@JsonProperty
public String getDataSource()
{
return dataSource;
}

}
1 change: 1 addition & 0 deletions api/src/main/java/io/druid/utils/CircularBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public E[] getBuffer()

public CircularBuffer(int capacity)
{
Preconditions.checkArgument(capacity > 0, "Capacity must be greater than 0.");
buffer = (E[]) new Object[capacity];
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
import io.druid.indexing.common.task.IndexTask;
import io.druid.indexing.common.task.IndexTaskUtils;
import io.druid.indexing.common.task.RealtimeIndexTask;
import io.druid.indexing.common.task.TaskResource;
import io.druid.indexing.common.task.Tasks;
Expand Down Expand Up @@ -95,12 +95,7 @@
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.server.security.Access;
import io.druid.server.security.Action;
import io.druid.server.security.AuthorizationUtils;
import io.druid.server.security.AuthorizerMapper;
import io.druid.server.security.ForbiddenException;
import io.druid.server.security.Resource;
import io.druid.server.security.ResourceAction;
import io.druid.server.security.ResourceType;
import io.druid.timeline.DataSegment;
import io.druid.utils.CircularBuffer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
Expand Down Expand Up @@ -435,14 +430,14 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception
catch (Exception e) {
log.error(e, "Encountered exception while running task.");
Map<String, Object> context = Maps.newHashMap();
List<String> savedParseExceptionMessages = IndexTask.getMessagesFromSavedParseExceptions(savedParseExceptions);
List<String> savedParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
if (savedParseExceptionMessages != null) {
context.put("unparseableEvents", savedParseExceptionMessages);
}
return TaskStatus.failure(
getId(),
getTaskCompletionMetrics(),
e.getMessage(),
Throwables.getStackTraceAsString(e),
getTaskCompletionContext()
);
}
Expand Down Expand Up @@ -713,67 +708,63 @@ public void run()
boolean isPersistRequired = false;

for (InputRow row : rows) {
try {
if (row != null && withinMinMaxRecordTime(row)) {
SequenceMetadata sequenceToUse = null;
for (SequenceMetadata sequence : sequences) {
if (sequence.canHandle(record)) {
sequenceToUse = sequence;
}
}

if (sequenceToUse == null) {
throw new ISE(
"WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s",
record.partition(),
record.offset(),
sequences
);
if (row != null && withinMinMaxRecordTime(row)) {
SequenceMetadata sequenceToUse = null;
for (SequenceMetadata sequence : sequences) {
if (sequence.canHandle(record)) {
sequenceToUse = sequence;
}
}

final AppenderatorDriverAddResult addResult = driver.add(
row,
sequenceToUse.getSequenceName(),
committerSupplier,
// skip segment lineage check as there will always be one segment
// for combination of sequence and segment granularity.
// It is necessary to skip it as the task puts messages polled from all the
// assigned Kafka partitions into a single Druid segment, thus ordering of
// messages among replica tasks across assigned partitions is not guaranteed
// which may cause replica tasks to ask for segments with different interval
// in different order which might cause SegmentAllocateAction to fail.
true,
// do not allow incremental persists to happen until all the rows from this batch
// of rows are indexed
false
if (sequenceToUse == null) {
throw new ISE(
"WTH?! cannot find any valid sequence for record with partition [%d] and offset [%d]. Current sequences: %s",
record.partition(),
record.offset(),
sequences
);
}

if (addResult.isOk()) {
// If the number of rows in the segment exceeds the threshold after adding a row,
// move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) {
if (!sequenceToUse.isCheckpointed()) {
sequenceToCheckpoint = sequenceToUse;
}
final AppenderatorDriverAddResult addResult = driver.add(
row,
sequenceToUse.getSequenceName(),
committerSupplier,
// skip segment lineage check as there will always be one segment
// for combination of sequence and segment granularity.
// It is necessary to skip it as the task puts messages polled from all the
// assigned Kafka partitions into a single Druid segment, thus ordering of
// messages among replica tasks across assigned partitions is not guaranteed
// which may cause replica tasks to ask for segments with different interval
// in different order which might cause SegmentAllocateAction to fail.
true,
// do not allow incremental persists to happen until all the rows from this batch
// of rows are indexed
false
);

if (addResult.isOk()) {
// If the number of rows in the segment exceeds the threshold after adding a row,
// move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) {
if (!sequenceToUse.isCheckpointed()) {
sequenceToCheckpoint = sequenceToUse;
}
isPersistRequired |= addResult.isPersistRequired();
} else {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
// May want configurable behavior here at some point.
// If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
}
isPersistRequired |= addResult.isPersistRequired();
} else {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
// May want configurable behavior here at some point.
// If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
}

if (addResult.getParseException() != null) {
throw addResult.getParseException();
}
fireDepartmentMetrics.incrementProcessed();
if (addResult.getParseException() != null) {
handleParseException(addResult.getParseException(), record);
} else {
fireDepartmentMetrics.incrementThrownAway();
fireDepartmentMetrics.incrementProcessed();
}
}
catch (ParseException e) {
handleParseException(e, record);
} else {
fireDepartmentMetrics.incrementThrownAway();
}
}
if (isPersistRequired) {
Expand Down Expand Up @@ -936,7 +927,7 @@ public void onFailure(Throwable t)
}

Map<String, Object> context = Maps.newHashMap();
List<String> savedParseExceptionMessages = IndexTask.getMessagesFromSavedParseExceptions(savedParseExceptions);
List<String> savedParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
if (savedParseExceptionMessages != null) {
context.put("unparseableEvents", savedParseExceptionMessages);
}
Expand Down Expand Up @@ -1146,42 +1137,38 @@ public void run()
final Map<String, Set<SegmentIdentifier>> segmentsToMoveOut = new HashMap<>();

for (InputRow row : rows) {
try {
if (row != null && withinMinMaxRecordTime(row)) {
final String sequenceName = sequenceNames.get(record.partition());
final AppenderatorDriverAddResult addResult = driver.add(
row,
sequenceName,
committerSupplier,
false,
false
);

if (addResult.isOk()) {
// If the number of rows in the segment exceeds the threshold after adding a row,
// move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) {
segmentsToMoveOut.computeIfAbsent(sequenceName, k -> new HashSet<>())
.add(addResult.getSegmentIdentifier());
}
isPersistRequired |= addResult.isPersistRequired();
} else {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
// May want configurable behavior here at some point.
// If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
if (row != null && withinMinMaxRecordTime(row)) {
final String sequenceName = sequenceNames.get(record.partition());
final AppenderatorDriverAddResult addResult = driver.add(
row,
sequenceName,
committerSupplier,
false,
false
);

if (addResult.isOk()) {
// If the number of rows in the segment exceeds the threshold after adding a row,
// move the segment out from the active segments of BaseAppenderatorDriver to make a new segment.
if (addResult.getNumRowsInSegment() > tuningConfig.getMaxRowsPerSegment()) {
segmentsToMoveOut.computeIfAbsent(sequenceName, k -> new HashSet<>())
.add(addResult.getSegmentIdentifier());
}
isPersistRequired |= addResult.isPersistRequired();
} else {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
// May want configurable behavior here at some point.
// If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
}

if (addResult.getParseException() != null) {
throw addResult.getParseException();
}
fireDepartmentMetrics.incrementProcessed();
if (addResult.getParseException() != null) {
handleParseException(addResult.getParseException(), record);
} else {
fireDepartmentMetrics.incrementThrownAway();
fireDepartmentMetrics.incrementProcessed();
}
}
catch (ParseException e) {
handleParseException(e, record);
} else {
fireDepartmentMetrics.incrementThrownAway();
}
}

Expand Down Expand Up @@ -1317,7 +1304,7 @@ public String apply(DataSegment input)
}

Map<String, Object> context = Maps.newHashMap();
List<String> savedParseExceptionMessages = IndexTask.getMessagesFromSavedParseExceptions(savedParseExceptions);
List<String> savedParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
if (savedParseExceptionMessages != null) {
context.put("unparseableEvents", savedParseExceptionMessages);
}
Expand Down Expand Up @@ -1358,11 +1345,10 @@ private void handleParseException(ParseException pe, ConsumerRecord<byte[], byte
}
}


private Map<String, Object> getTaskCompletionContext()
{
Map<String, Object> context = Maps.newHashMap();
List<String> buildSegmentsParseExceptionMessages = IndexTask.getMessagesFromSavedParseExceptions(savedParseExceptions);
List<String> buildSegmentsParseExceptionMessages = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
if (buildSegmentsParseExceptionMessages != null) {
Map<String, Object> unparseableEventsMap = Maps.newHashMap();
unparseableEventsMap.put("buildSegments", buildSegmentsParseExceptionMessages);
Expand Down Expand Up @@ -1449,17 +1435,7 @@ public boolean canRestore()
*/
private Access authorizationCheck(final HttpServletRequest req, Action action)
{
ResourceAction resourceAction = new ResourceAction(
new Resource(dataSchema.getDataSource(), ResourceType.DATASOURCE),
action
);

Access access = AuthorizationUtils.authorizeResourceAction(req, resourceAction, authorizerMapper);
if (!access.isAllowed()) {
throw new ForbiddenException(access.toString());
}

return access;
return IndexTaskUtils.datasourceAuthorizationCheck(req, action, getDataSource(), authorizerMapper);
}

@VisibleForTesting
Expand Down Expand Up @@ -1635,7 +1611,7 @@ public Response getUnparseableEvents(
)
{
authorizationCheck(req, Action.READ);
List<String> events = IndexTask.getMessagesFromSavedParseExceptions(savedParseExceptions);
List<String> events = IndexTaskUtils.getMessagesFromSavedParseExceptions(savedParseExceptions);
return Response.ok(events).build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,9 +278,9 @@ public boolean equals(Object o)
Objects.equals(indexSpec, that.indexSpec) &&
Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) &&
Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod) &&
Objects.equals(logParseExceptions, that.logParseExceptions) &&
Objects.equals(maxParseExceptions, that.maxParseExceptions) &&
Objects.equals(maxSavedParseExceptions, that.maxSavedParseExceptions);
logParseExceptions == that.logParseExceptions &&
maxParseExceptions == that.maxParseExceptions &&
maxSavedParseExceptions == that.maxSavedParseExceptions;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.druid.data.input.impl.LongDimensionSchema;
import io.druid.data.input.impl.StringDimensionSchema;
import io.druid.indexer.TaskMetricsUtils;
import io.druid.indexing.common.task.IndexTaskTest;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.core.NoopEmitter;
import io.druid.java.util.emitter.service.ServiceEmitter;
Expand Down Expand Up @@ -1105,7 +1106,7 @@ public void testMultipleParseExceptionsFailure() throws Exception

// Wait for task to exit
Assert.assertEquals(TaskState.FAILED, status.getStatusCode());
Assert.assertEquals("Max parse exceptions exceeded, terminating task...", status.getErrorMsg());
IndexTaskTest.checkTaskStatusErrorMsgForParseExceptionsExceeded(status);

// Check metrics
Assert.assertEquals(3, task.getFireDepartmentMetrics().processed());
Expand Down
Loading

0 comments on commit bbda06b

Please sign in to comment.