Skip to content

Commit

Permalink
Fix CombiningFirehoseFactory with IngestSegmentFirehoseFactory in Ind…
Browse files Browse the repository at this point in the history
…exTask (#6065)

* Fix CombiningFirehoseFactory with IngestSegmentFirehoseFactory in IndexTask

* Make recursive
  • Loading branch information
jon-wei committed Jul 31, 2018
1 parent ea72907 commit 91943a2
Showing 1 changed file with 21 additions and 4 deletions.
Expand Up @@ -85,6 +85,7 @@
import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
import io.druid.segment.realtime.firehose.ChatHandler;
import io.druid.segment.realtime.firehose.ChatHandlerProvider;
import io.druid.segment.realtime.firehose.CombiningFirehoseFactory;
import io.druid.segment.writeout.SegmentWriteOutMediumFactory;
import io.druid.server.security.Action;
import io.druid.server.security.AuthorizerMapper;
Expand Down Expand Up @@ -418,10 +419,7 @@ public TaskStatus run(final TaskToolbox toolbox)

final FirehoseFactory firehoseFactory = ingestionSchema.getIOConfig().getFirehoseFactory();

if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
// pass toolbox to Firehose
((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox);
}
setFirehoseFactoryToolbox(firehoseFactory, toolbox);

final File firehoseTempDir = toolbox.getFirehoseTemporaryDir();
// Firehose temporary directory is automatically removed when this IndexTask completes.
Expand Down Expand Up @@ -477,6 +475,25 @@ public TaskStatus run(final TaskToolbox toolbox)
}
}

// pass toolbox to any IngestSegmentFirehoseFactory
private void setFirehoseFactoryToolbox(FirehoseFactory firehoseFactory, TaskToolbox toolbox)
{
if (firehoseFactory instanceof IngestSegmentFirehoseFactory) {
((IngestSegmentFirehoseFactory) firehoseFactory).setTaskToolbox(toolbox);
return;
}

if (firehoseFactory instanceof CombiningFirehoseFactory) {
for (FirehoseFactory delegateFactory : ((CombiningFirehoseFactory) firehoseFactory).getDelegateFactoryList()) {
if (delegateFactory instanceof IngestSegmentFirehoseFactory) {
((IngestSegmentFirehoseFactory) delegateFactory).setTaskToolbox(toolbox);
} else if (delegateFactory instanceof CombiningFirehoseFactory) {
setFirehoseFactoryToolbox(delegateFactory, toolbox);
}
}
}
}

private Map<String, TaskReport> getTaskCompletionReports()
{
return TaskReport.buildTaskReports(
Expand Down

0 comments on commit 91943a2

Please sign in to comment.