diff --git a/core/src/main/java/org/apache/druid/data/input/AbstractInputSource.java b/core/src/main/java/org/apache/druid/data/input/AbstractInputSource.java index 1dee243c2006..1e056ea0e8ed 100644 --- a/core/src/main/java/org/apache/druid/data/input/AbstractInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/AbstractInputSource.java @@ -35,30 +35,33 @@ public abstract class AbstractInputSource implements InputSource public InputSourceReader reader( InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, - File temporaryDirectory + File temporaryDirectory, + InputStats inputStats ) { if (needsFormat()) { return formattableReader( inputRowSchema, Preconditions.checkNotNull(inputFormat, "inputFormat"), - temporaryDirectory + temporaryDirectory, + inputStats ); } else { - return fixedFormatReader(inputRowSchema, temporaryDirectory); + return fixedFormatReader(inputRowSchema, temporaryDirectory, inputStats); } } protected InputSourceReader formattableReader( InputRowSchema inputRowSchema, InputFormat inputFormat, - File temporaryDirectory + File temporaryDirectory, + InputStats inputStats ) { throw new UnsupportedOperationException("Implement this method properly if needsFormat() = true"); } - protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, File temporaryDirectory) + protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, File temporaryDirectory, InputStats inputStats) { throw new UnsupportedOperationException("Implement this method properly if needsFormat() = false"); } diff --git a/core/src/main/java/org/apache/druid/data/input/CountableInputEntity.java b/core/src/main/java/org/apache/druid/data/input/CountableInputEntity.java new file mode 100644 index 000000000000..d5c9a3084686 --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/CountableInputEntity.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.google.common.base.Predicate; +import com.google.common.io.CountingInputStream; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; + +/** + * Can be used to count number of bytes read from the base InputEntity + * */ +public class CountableInputEntity implements InputEntity +{ + private final InputStats inputStats; + private final InputEntity baseInputEntity; + + public CountableInputEntity(InputEntity baseInputEntity, InputStats inputStats) + { + this.baseInputEntity = baseInputEntity; + this.inputStats = inputStats; + } + + @Nullable @Override public URI getUri() + { + return baseInputEntity.getUri(); + } + + @Override public InputStream open() throws IOException + { + return new BytesCountingInputStream(baseInputEntity.open(), inputStats); + } + + @Override public CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException + { + final CleanableFile cleanableFile = baseInputEntity.fetch(temporaryDirectory, fetchBuffer); + inputStats.incrementProcessedBytes(cleanableFile.file().length()); + return cleanableFile; + } + + @Override public Predicate getRetryCondition() + { + return baseInputEntity.getRetryCondition(); + } + + @Override public InputEntity getBaseInputEntity() + { + return baseInputEntity; + } + + static class BytesCountingInputStream extends FilterInputStream + { + private final InputStats inputStats; + + /** + * Wraps another input stream, counting the number of bytes read. + * + * Similar to {@link CountingInputStream} but does not reset count on call to + * {@link CountingInputStream#reset()} + * + * @param in the input stream to be wrapped + */ + public BytesCountingInputStream(@Nullable InputStream in, InputStats inputStats) + { + super(in); + this.inputStats = inputStats; + } + + @Override public int read() throws IOException + { + int result = in.read(); + if (result != -1) { + inputStats.incrementProcessedBytes(1); + } + return result; + } + + @Override public int read(byte[] b, int off, int len) throws IOException + { + int result = in.read(b, off, len); + if (result != -1) { + inputStats.incrementProcessedBytes(result); + } + return result; + } + + @Override public long skip(long n) throws IOException + { + long result = in.skip(n); + inputStats.incrementProcessedBytes(result); + return result; + } + } +} diff --git a/core/src/main/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptor.java b/core/src/main/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptor.java index 35322c2a0fdf..85358eac27ee 100644 --- a/core/src/main/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptor.java +++ b/core/src/main/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptor.java @@ -97,7 +97,7 @@ public boolean needsFormat() } @Override - protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) + protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory, InputStats inputStats) { return new FirehoseToInputSourceReaderAdaptor(firehoseFactory, inputRowParser, temporaryDirectory); } diff --git a/core/src/main/java/org/apache/druid/data/input/InputEntity.java b/core/src/main/java/org/apache/druid/data/input/InputEntity.java index 71f14d95a3c3..8c72ccc68fcd 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputEntity.java +++ b/core/src/main/java/org/apache/druid/data/input/InputEntity.java @@ -73,14 +73,13 @@ interface CleanableFile extends Closeable /** * Fetches the input entity into the local storage. * This method might be preferred instead of {@link #open()}, for example - * + *

* - {@link InputFormat} requires expensive random access on remote storage. * - Holding a connection until you consume the entire InputStream is expensive. * * @param temporaryDirectory to store temp data. This directory will be removed automatically once * the task finishes. * @param fetchBuffer is used to fetch remote entity into local storage. - * * @see FileUtils#copyLarge */ default CleanableFile fetch(File temporaryDirectory, byte[] fetchBuffer) throws IOException @@ -125,4 +124,9 @@ default Predicate getRetryCondition() { return Predicates.alwaysFalse(); } + + default InputEntity getBaseInputEntity() + { + return this; + } } diff --git a/core/src/main/java/org/apache/druid/data/input/InputSource.java b/core/src/main/java/org/apache/druid/data/input/InputSource.java index b0144c51eef5..6e444681cc4a 100644 --- a/core/src/main/java/org/apache/druid/data/input/InputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/InputSource.java @@ -76,5 +76,5 @@ public interface InputSource * @param inputFormat to parse data. It can be null if {@link #needsFormat()} = true * @param temporaryDirectory to store temp data. It will be cleaned up automatically once the task is finished. */ - InputSourceReader reader(InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, File temporaryDirectory); + InputSourceReader reader(InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, File temporaryDirectory, InputStats inputStats); } diff --git a/core/src/main/java/org/apache/druid/data/input/InputStats.java b/core/src/main/java/org/apache/druid/data/input/InputStats.java new file mode 100644 index 000000000000..ebae06c6f5fa --- /dev/null +++ b/core/src/main/java/org/apache/druid/data/input/InputStats.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import java.util.concurrent.atomic.AtomicLong; + +public class InputStats +{ + private final AtomicLong processedBytes = new AtomicLong(0); + + public void incrementProcessedBytes(long incrementByValue) + { + processedBytes.getAndAdd(incrementByValue); + } + + public AtomicLong getProcessedBytes() + { + return processedBytes; + } +} diff --git a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java index d27bf4faa01d..daaac27d5966 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/CloudObjectInputSource.java @@ -22,11 +22,13 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.primitives.Ints; import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.CountableInputEntity; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.utils.CollectionUtils; @@ -143,13 +145,15 @@ public boolean needsFormat() protected InputSourceReader formattableReader( InputRowSchema inputRowSchema, InputFormat inputFormat, - @Nullable File temporaryDirectory + @Nullable File temporaryDirectory, + InputStats inputStats ) { return new InputEntityIteratingReader( inputRowSchema, inputFormat, - createSplits(inputFormat, null).flatMap(split -> split.get().stream()).map(this::createEntity).iterator(), + createSplits(inputFormat, null).flatMap(split -> split.get().stream()).map( + cloudObjectLocation -> new CountableInputEntity(createEntity(cloudObjectLocation), inputStats)).iterator(), temporaryDirectory ); } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java index 21480fd1b1d8..989a936bb619 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/HttpInputSource.java @@ -23,10 +23,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.CountableInputEntity; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.metadata.PasswordProvider; @@ -105,16 +107,20 @@ public SplittableInputSource withSplit(InputSplit split) protected InputSourceReader formattableReader( InputRowSchema inputRowSchema, InputFormat inputFormat, - @Nullable File temporaryDirectory + @Nullable File temporaryDirectory, + InputStats inputStats ) { return new InputEntityIteratingReader( inputRowSchema, inputFormat, - createSplits(inputFormat, null).map(split -> new HttpEntity( - split.get(), - httpAuthenticationUsername, - httpAuthenticationPasswordProvider + createSplits(inputFormat, null).map(split -> new CountableInputEntity( + new HttpEntity( + split.get(), + httpAuthenticationUsername, + httpAuthenticationPasswordProvider + ), + inputStats )).iterator(), temporaryDirectory ); @@ -131,8 +137,8 @@ public boolean equals(Object o) } HttpInputSource source = (HttpInputSource) o; return Objects.equals(uris, source.uris) && - Objects.equals(httpAuthenticationUsername, source.httpAuthenticationUsername) && - Objects.equals(httpAuthenticationPasswordProvider, source.httpAuthenticationPasswordProvider); + Objects.equals(httpAuthenticationUsername, source.httpAuthenticationUsername) && + Objects.equals(httpAuthenticationPasswordProvider, source.httpAuthenticationPasswordProvider); } @Override diff --git a/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java index 1e7b59fcf4a9..b061e742e328 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/InlineInputSource.java @@ -23,9 +23,11 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.CountableInputEntity; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; import org.apache.druid.java.util.common.StringUtils; import javax.annotation.Nullable; @@ -65,13 +67,14 @@ public boolean needsFormat() protected InputSourceReader formattableReader( InputRowSchema inputRowSchema, InputFormat inputFormat, - @Nullable File temporaryDirectory + @Nullable File temporaryDirectory, + InputStats inputStats ) { return new InputEntityIteratingReader( inputRowSchema, inputFormat, - Stream.of(new ByteEntity(StringUtils.toUtf8(data))).iterator(), + Stream.of(new CountableInputEntity(new ByteEntity(StringUtils.toUtf8(data)), inputStats)).iterator(), temporaryDirectory ); } diff --git a/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java b/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java index 04b7dc7b2cd9..f2e74be4dfe7 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/LocalInputSource.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.Iterators; import org.apache.commons.io.FileUtils; @@ -33,11 +34,14 @@ import org.apache.commons.io.filefilter.TrueFileFilter; import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.CountableInputEntity; +import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputFileAttribute; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.java.util.common.IAE; import org.apache.druid.utils.CollectionUtils; @@ -107,7 +111,7 @@ public Set getFiles() public Stream>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) { return Streams.sequentialStreamFrom(getSplitFileIterator(getSplitHintSpecOrDefault(splitHintSpec))) - .map(InputSplit::new); + .map(InputSplit::new); } @Override @@ -184,14 +188,21 @@ public boolean needsFormat() protected InputSourceReader formattableReader( InputRowSchema inputRowSchema, InputFormat inputFormat, - @Nullable File temporaryDirectory + @Nullable File temporaryDirectory, + InputStats inputStats ) { //noinspection ConstantConditions return new InputEntityIteratingReader( inputRowSchema, inputFormat, - Iterators.transform(getFileIterator(), FileEntity::new), + Iterators.transform(getFileIterator(), new Function() + { + @Nullable @Override public InputEntity apply(@Nullable File file) + { + return new CountableInputEntity(new FileEntity(file), inputStats); + } + }), temporaryDirectory ); } @@ -207,8 +218,8 @@ public boolean equals(Object o) } LocalInputSource that = (LocalInputSource) o; return Objects.equals(baseDir, that.baseDir) && - Objects.equals(filter, that.filter) && - Objects.equals(files, that.files); + Objects.equals(filter, that.filter) && + Objects.equals(files, that.files); } @Override diff --git a/core/src/main/java/org/apache/druid/java/util/metrics/InputStatsMonitor.java b/core/src/main/java/org/apache/druid/java/util/metrics/InputStatsMonitor.java new file mode 100644 index 000000000000..dc0382b3cb66 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/metrics/InputStatsMonitor.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.InputStats; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; + +import java.util.Map; + +public class InputStatsMonitor extends AbstractMonitor +{ + private final InputStats inputStats; + private long lastReportedValue; + private final Map dimensions; + + public InputStatsMonitor(InputStats inputStats, Map dimensions) + { + this.inputStats = inputStats; + this.dimensions = ImmutableMap.copyOf(dimensions); + this.lastReportedValue = 0; + } + + @Override + public boolean doMonitor(ServiceEmitter emitter) + { + final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); + MonitorUtils.addDimensionsToBuilder(builder, dimensions); + final long currentValue = inputStats.getProcessedBytes().get(); + emitter.emit(builder.build("ingest/events/processedBytes", currentValue - lastReportedValue)); + lastReportedValue = currentValue; + return true; + } +} diff --git a/core/src/test/java/org/apache/druid/data/input/CountableInputEntityTest.java b/core/src/test/java/org/apache/druid/data/input/CountableInputEntityTest.java new file mode 100644 index 000000000000..9f850ef12ea1 --- /dev/null +++ b/core/src/test/java/org/apache/druid/data/input/CountableInputEntityTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.FileEntity; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +public class CountableInputEntityTest +{ + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private CountableInputEntity countableInputEntity; + private InputStats inputStats; + private byte[] bytes; + private final int numBytes = 100; + + @Before + public void setUp() + { + inputStats = new InputStats(); + bytes = new byte[numBytes]; + } + + @Test + public void testWithFileEntity() throws IOException + { + final File sourceFile = folder.newFile("testWithFileEntity"); + final OutputStreamWriter outputStreamWriter = new OutputStreamWriter( + new FileOutputStream(sourceFile), + StandardCharsets.UTF_8 + ); + char[] chars = new char[numBytes]; + Arrays.fill(chars, ' '); + outputStreamWriter.write(chars); + outputStreamWriter.flush(); + outputStreamWriter.close(); + final FileEntity fileEntity = new FileEntity(sourceFile); + countableInputEntity = new CountableInputEntity(fileEntity, inputStats); + + final byte[] intermediateBuffer = new byte[numBytes / 2]; + countableInputEntity.open().read(intermediateBuffer); + Assert.assertEquals(numBytes / 2, inputStats.getProcessedBytes().intValue()); + + countableInputEntity.fetch(folder.newFolder(), intermediateBuffer); + Assert.assertEquals((numBytes / 2) + numBytes, inputStats.getProcessedBytes().intValue()); + } + + @Test + public void testWithByteEntity() throws IOException + { + final byte[] intermediateBuffer = new byte[numBytes]; + final ByteEntity byteEntity = new ByteEntity(bytes); + countableInputEntity = new CountableInputEntity(byteEntity, inputStats); + countableInputEntity.open().read(intermediateBuffer); + Assert.assertEquals(numBytes, inputStats.getProcessedBytes().intValue()); + + final byte[] smallIntermediateBuffer = new byte[25]; + final ByteEntity byteEntity1 = new ByteEntity(bytes); + countableInputEntity = new CountableInputEntity(byteEntity1, inputStats); + countableInputEntity.fetch(folder.newFolder(), smallIntermediateBuffer); + Assert.assertEquals(numBytes + numBytes, inputStats.getProcessedBytes().intValue()); + } +} diff --git a/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java b/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java index bd8cbe1fe755..115be410bac2 100644 --- a/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java +++ b/core/src/test/java/org/apache/druid/data/input/FirehoseFactoryToInputSourceAdaptorTest.java @@ -74,7 +74,8 @@ public void testUnimplementedInputFormat() throws IOException Collections.emptyList() ), null, - null + null, + new InputStats() ); final List result = new ArrayList<>(); try (CloseableIterator iterator = reader.read()) { diff --git a/core/src/test/java/org/apache/druid/data/input/impl/NoopInputSource.java b/core/src/test/java/org/apache/druid/data/input/impl/NoopInputSource.java index d1d18a827a07..b86325125107 100644 --- a/core/src/test/java/org/apache/druid/data/input/impl/NoopInputSource.java +++ b/core/src/test/java/org/apache/druid/data/input/impl/NoopInputSource.java @@ -23,6 +23,7 @@ import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; import javax.annotation.Nullable; import java.io.File; @@ -51,7 +52,8 @@ public boolean needsFormat() public InputSourceReader reader( InputRowSchema inputRowSchema, @Nullable InputFormat inputFormat, - @Nullable File temporaryDirectory + @Nullable File temporaryDirectory, + InputStats inputStats ) { throw new UnsupportedOperationException(); diff --git a/core/src/test/java/org/apache/druid/java/util/metrics/InputStatsMonitorTest.java b/core/src/test/java/org/apache/druid/java/util/metrics/InputStatsMonitorTest.java new file mode 100644 index 000000000000..23f782b7bb51 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/metrics/InputStatsMonitorTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.metrics; + +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.InputStats; +import org.apache.druid.java.util.emitter.core.Event; +import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class InputStatsMonitorTest +{ + private InputStatsMonitor inputStatsMonitor; + private InputStats inputStats; + private Map dimensions; + private List events; + private ServiceEmitter emitter; + + @Before + public void setUp() + { + inputStats = new InputStats(); + dimensions = ImmutableMap.of("k1", new String[] {"v1"}); + events = new ArrayList<>(); + inputStatsMonitor = new InputStatsMonitor(inputStats, dimensions); + emitter = new ServiceEmitter("", "", new TestEmitter(events)); + } + + @Test + public void testInputStatsMonitor() + { + inputStats.incrementProcessedBytes(10); + inputStatsMonitor.doMonitor(emitter); + Assert.assertEquals(10L, events.get(0).toMap().get("value")); + Assert.assertEquals("v1", ((List) events.get(0).toMap().get("k1")).get(0)); + inputStats.incrementProcessedBytes(100); + inputStatsMonitor.doMonitor(emitter); + Assert.assertEquals(100L, events.get(1).toMap().get("value")); + } + + static class TestEmitter extends NoopEmitter + { + private final List events; + + public TestEmitter(List events) + { + this.events = events; + } + + @Override + public void emit(Event event) + { + events.add(event); + } + + public List getEvents() + { + return events; + } + } +} diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 2ddfd7462073..6c365b915108 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -135,6 +135,12 @@ If SQL is enabled, the Broker will emit the following metrics for SQL. |`sqlQuery/time`|Milliseconds taken to complete a SQL.|id, nativeQueryIds, dataSource, remoteAddress, success.|< 1s| |`sqlQuery/bytes`|number of bytes returned in SQL response.|id, nativeQueryIds, dataSource, remoteAddress, success.| | +## Ingestion metrics (General) + +|Metric|Description|Dimensions|Normal Value| +|------|-----------|----------|------------| +|`ingest/events/processedBytes`|Number of bytes read from source during ingestion, this will include bytes read for determining partitions if applicable.|dataSource, taskId, taskType, supervisorId (where applicable).|Varies| + ## Ingestion Metrics (Kafka Indexing Service) These metrics are applicable for the Kafka Indexing Service. diff --git a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java index d38bc6627d98..6926500f04ca 100644 --- a/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java +++ b/extensions-contrib/aliyun-oss-extensions/src/test/java/org/apache/druid/data/input/aliyun/OssInputSourceTest.java @@ -43,6 +43,7 @@ import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CsvInputFormat; @@ -460,7 +461,8 @@ public void testReader() throws IOException InputSourceReader reader = inputSource.reader( someSchema, new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0), - temporaryFolder.newFolder() + temporaryFolder.newFolder(), + new InputStats() ); CloseableIterator iterator = reader.read(); @@ -503,7 +505,8 @@ public void testCompressedReader() throws IOException InputSourceReader reader = inputSource.reader( someSchema, new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0), - temporaryFolder.newFolder() + temporaryFolder.newFolder(), + new InputStats() ); CloseableIterator iterator = reader.read(); diff --git a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java index e68a9a44a386..4ba2e4eae4a0 100644 --- a/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java +++ b/extensions-core/google-extensions/src/test/java/org/apache/druid/data/input/google/GoogleCloudStorageInputSourceTest.java @@ -35,6 +35,7 @@ import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CsvInputFormat; @@ -232,7 +233,8 @@ public void testReader() throws IOException InputSourceReader reader = inputSource.reader( someSchema, new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0), - null + null, + new InputStats() ); CloseableIterator iterator = reader.read(); @@ -275,7 +277,8 @@ public void testCompressedReader() throws IOException InputSourceReader reader = inputSource.reader( someSchema, new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0), - null + null, + new InputStats() ); CloseableIterator iterator = reader.read(); diff --git a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java index b8c798a77b5c..78dad23e230f 100644 --- a/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java +++ b/extensions-core/hdfs-storage/src/main/java/org/apache/druid/inputsource/hdfs/HdfsInputSource.java @@ -27,11 +27,13 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.CountableInputEntity; import org.apache.druid.data.input.InputFileAttribute; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.data.input.impl.SplittableInputSource; @@ -154,7 +156,8 @@ List getInputPaths() protected InputSourceReader formattableReader( InputRowSchema inputRowSchema, InputFormat inputFormat, - @Nullable File temporaryDirectory + @Nullable File temporaryDirectory, + InputStats inputStats ) { try { @@ -166,7 +169,7 @@ protected InputSourceReader formattableReader( return new InputEntityIteratingReader( inputRowSchema, inputFormat, - Iterators.transform(cachedPaths.iterator(), path -> new HdfsInputEntity(configuration, path)), + Iterators.transform(cachedPaths.iterator(), path -> new CountableInputEntity(new HdfsInputEntity(configuration, path), inputStats)), temporaryDirectory ); } diff --git a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java index 170791e7d16c..19e257e2a92d 100644 --- a/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java +++ b/extensions-core/hdfs-storage/src/test/java/org/apache/druid/inputsource/hdfs/HdfsInputSourceTest.java @@ -29,6 +29,7 @@ import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; @@ -234,7 +235,7 @@ private static Path createFile(MiniDFSCluster dfsCluster, String pathSuffix, Str @Test public void readsSplitsCorrectly() throws IOException { - InputSourceReader reader = target.formattableReader(INPUT_ROW_SCHEMA, INPUT_FORMAT, null); + InputSourceReader reader = target.formattableReader(INPUT_ROW_SCHEMA, INPUT_FORMAT, null, new InputStats()); Map actualTimestampToValue = new HashMap<>(); try (CloseableIterator iterator = reader.read()) { @@ -310,7 +311,7 @@ public void setup() @Test public void readsSplitsCorrectly() throws IOException { - InputSourceReader reader = target.formattableReader(INPUT_ROW_SCHEMA, INPUT_FORMAT, null); + InputSourceReader reader = target.formattableReader(INPUT_ROW_SCHEMA, INPUT_FORMAT, null, new InputStats()); try (CloseableIterator iterator = reader.read()) { Assert.assertFalse(iterator.hasNext()); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java index b65910e6cc91..f71346f7261b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/IncrementalPublishingKafkaIndexTaskRunner.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskToolbox; @@ -65,14 +66,16 @@ public class IncrementalPublishingKafkaIndexTaskRunner extends SeekableStreamInd KafkaIndexTask task, @Nullable InputRowParser parser, AuthorizerMapper authorizerMapper, - LockGranularity lockGranularityToUse + LockGranularity lockGranularityToUse, + InputStats inputStats ) { super( task, parser, authorizerMapper, - lockGranularityToUse + lockGranularityToUse, + inputStats ); this.task = task; } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java index fb063532356d..5317f2a5aa69 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java @@ -128,7 +128,8 @@ protected SeekableStreamIndexTaskRunner createTaskRunner() this, dataSchema.getParser(), authorizerMapper, - lockGranularityToUse + lockGranularityToUse, + getInputStats() ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index 0fe57015fe29..1d11d2eb24d8 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -70,7 +70,8 @@ protected SeekableStreamIndexTaskRunner createTaskRunner() this, dataSchema.getParser(), authorizerMapper, - lockGranularityToUse + lockGranularityToUse, + getInputStats() ); } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java index 2c65cd036dc0..59c038a8d5c0 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskRunner.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskToolbox; @@ -59,14 +60,16 @@ public class KinesisIndexTaskRunner extends SeekableStreamIndexTaskRunner parser, AuthorizerMapper authorizerMapper, - LockGranularity lockGranularityToUse + LockGranularity lockGranularityToUse, + InputStats inputStats ) { super( task, parser, authorizerMapper, - lockGranularityToUse + lockGranularityToUse, + inputStats ); this.task = task; } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java index 9efaf4fd09c3..79cd263c9931 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/data/input/s3/S3InputSourceTest.java @@ -44,6 +44,7 @@ import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.impl.CloudObjectLocation; import org.apache.druid.data.input.impl.CsvInputFormat; @@ -515,7 +516,8 @@ public void testReader() throws IOException InputSourceReader reader = inputSource.reader( someSchema, new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0), - temporaryFolder.newFolder() + temporaryFolder.newFolder(), + new InputStats() ); CloseableIterator iterator = reader.read(); @@ -559,7 +561,8 @@ public void testCompressedReader() throws IOException InputSourceReader reader = inputSource.reader( someSchema, new CsvInputFormat(ImmutableList.of("time", "dim1", "dim2"), "|", false, null, 0), - temporaryFolder.newFolder() + temporaryFolder.newFolder(), + new InputStats() ); CloseableIterator iterator = reader.read(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java index f8596ec17ac6..80684b3d72fb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractBatchIndexTask.java @@ -22,6 +22,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import org.apache.druid.data.input.FirehoseFactory; import org.apache.druid.data.input.InputFormat; @@ -29,6 +30,7 @@ import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.common.LockGranularity; import org.apache.druid.indexing.common.TaskLock; @@ -47,6 +49,8 @@ import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.java.util.common.granularity.GranularityType; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.metrics.InputStatsMonitor; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.incremental.ParseExceptionHandler; import org.apache.druid.segment.incremental.RowIngestionMeters; @@ -96,9 +100,12 @@ public abstract class AbstractBatchIndexTask extends AbstractTask private TaskLockHelper taskLockHelper; + private final InputStats inputStats; + protected AbstractBatchIndexTask(String id, String dataSource, Map context) { super(id, dataSource, context); + this.inputStats = new InputStats(); } protected AbstractBatchIndexTask( @@ -110,6 +117,7 @@ protected AbstractBatchIndexTask( ) { super(id, groupId, taskResource, dataSource, context); + this.inputStats = new InputStats(); } /** @@ -122,6 +130,8 @@ protected AbstractBatchIndexTask( @Override public TaskStatus run(TaskToolbox toolbox) throws Exception { + toolbox.addMonitor(new InputStatsMonitor(inputStats, getMetricsDimensions())); + synchronized (this) { if (stopped) { return TaskStatus.failure(getId()); @@ -162,7 +172,8 @@ public static FilteringCloseableInputRowIterator inputSourceReader( @Nullable InputFormat inputFormat, Predicate rowFilter, RowIngestionMeters ingestionMeters, - ParseExceptionHandler parseExceptionHandler + ParseExceptionHandler parseExceptionHandler, + InputStats inputStats ) throws IOException { final List metricsNames = Arrays.stream(dataSchema.getAggregators()) @@ -176,7 +187,8 @@ public static FilteringCloseableInputRowIterator inputSourceReader( metricsNames ), inputFormat, - tmpDir + tmpDir, + inputStats ) ); return new FilteringCloseableInputRowIterator( @@ -250,6 +262,11 @@ public TaskLockHelper getTaskLockHelper() return Preconditions.checkNotNull(taskLockHelper, "taskLockHelper is not initialized yet"); } + public InputStats getInputStats() + { + return inputStats; + } + /** * Determine lockGranularity to use and try to acquire necessary locks. * This method respects the value of 'forceTimeChunkLock' in task context. @@ -554,4 +571,13 @@ private LockGranularityDetermineResult( this.segments = segments; } } + + public Map getMetricsDimensions() + { + return ImmutableMap.of( + DruidMetrics.TASK_ID, new String[] {getId()}, + DruidMetrics.TASK_TYPE, new String[] {getType()}, + DruidMetrics.DATASOURCE, new String[] {getDataSource()} + ); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 5e93675372fd..8685f88dcb6e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -729,7 +729,8 @@ private Map> collectIntervalsAndShardSp inputSource.needsFormat() ? getInputFormat(ingestionSchema) : null, rowFilter, determinePartitionsMeters, - determinePartitionsParseExceptionHandler + determinePartitionsParseExceptionHandler, + getInputStats() )) { while (inputRowIterator.hasNext()) { final InputRow inputRow = inputRowIterator.next(); @@ -882,7 +883,8 @@ private TaskStatus generateAndPublishSegments( new DefaultIndexTaskInputRowIteratorBuilder(), buildSegmentsMeters, buildSegmentsParseExceptionHandler, - pushTimeout + pushTimeout, + getInputStats() ); // If we use timeChunk lock, then we don't have to specify what segments will be overwritten because diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java index 05ac79e733d6..a7fb1b8e9098 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/InputSourceProcessor.java @@ -25,6 +25,7 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; import org.apache.druid.indexer.partitions.DynamicPartitionsSpec; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexing.common.task.batch.parallel.iterator.IndexTaskInputRowIteratorBuilder; @@ -68,7 +69,8 @@ public static SegmentsAndCommitMetadata process( IndexTaskInputRowIteratorBuilder inputRowIteratorBuilder, RowIngestionMeters buildSegmentsMeters, ParseExceptionHandler parseExceptionHandler, - long pushTimeout + long pushTimeout, + InputStats inputStats ) throws IOException, InterruptedException, ExecutionException, TimeoutException { @Nullable @@ -84,7 +86,8 @@ public static SegmentsAndCommitMetadata process( inputFormat, AbstractBatchIndexTask.defaultRowFilter(granularitySpec), buildSegmentsMeters, - parseExceptionHandler + parseExceptionHandler, + inputStats ); final HandlingInputRowIterator iterator = inputRowIteratorBuilder .delegate(inputRowIterator) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java index 53019dda5b4b..0599ae979277 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialDimensionDistributionTask.java @@ -204,7 +204,8 @@ public TaskStatus runTask(TaskToolbox toolbox) throws Exception inputFormat, AbstractBatchIndexTask.defaultRowFilter(granularitySpec), buildSegmentsMeters, - parseExceptionHandler + parseExceptionHandler, + getInputStats() ); HandlingInputRowIterator iterator = new RangePartitionIndexTaskInputRowIteratorBuilder(partitionDimension, SKIP_NULL) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java index 98dae9926cf3..052b1b8e32f4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialHashSegmentGenerateTask.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; import org.apache.druid.indexer.partitions.HashedPartitionsSpec; import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.actions.TaskActionClient; @@ -29,6 +30,7 @@ import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.common.task.batch.parallel.iterator.DefaultIndexTaskInputRowIteratorBuilder; import org.apache.druid.indexing.common.task.batch.partition.HashPartitionAnalysis; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.indexing.granularity.GranularitySpec; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.BucketNumberedShardSpec; @@ -176,4 +178,15 @@ public static HashPartitionAnalysis createHashPartitionAnalysisFromPartitionsSpe intervals.forEach(interval -> partitionAnalysis.updateBucket(interval, numBucketsPerInterval)); return partitionAnalysis; } + + @Override + public Map getMetricsDimensions() + { + return ImmutableMap.of( + DruidMetrics.TASK_ID, new String[] {getId()}, + DruidMetrics.TASK_TYPE, new String[] {getType()}, + DruidMetrics.DATASOURCE, new String[] {getDataSource()}, + DruidMetrics.SUPERVISOR_ID, new String[] {getSupervisorTaskId()} + ); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java index 89b9f80fcf46..56fede399bae 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialRangeSegmentGenerateTask.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import org.apache.druid.indexer.partitions.PartitionsSpec; import org.apache.druid.indexer.partitions.SingleDimensionPartitionsSpec; import org.apache.druid.indexing.common.TaskToolbox; @@ -32,6 +33,7 @@ import org.apache.druid.indexing.common.task.batch.parallel.iterator.RangePartitionIndexTaskInputRowIteratorBuilder; import org.apache.druid.indexing.common.task.batch.partition.RangePartitionAnalysis; import org.apache.druid.indexing.worker.ShuffleDataSegmentPusher; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.BucketNumberedShardSpec; import org.apache.druid.timeline.partition.PartitionBoundaries; @@ -179,4 +181,15 @@ private GenericPartitionStat createPartitionStat(TaskToolbox toolbox, DataSegmen null // sizeBytes is not supported yet ); } + + @Override + public Map getMetricsDimensions() + { + return ImmutableMap.of( + DruidMetrics.TASK_ID, new String[] {getId()}, + DruidMetrics.TASK_TYPE, new String[] {getType()}, + DruidMetrics.DATASOURCE, new String[] {getDataSource()}, + DruidMetrics.SUPERVISOR_ID, new String[] {getSupervisorTaskId()} + ); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java index 27160c96303c..d7961f776ee8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentGenerateTask.java @@ -190,7 +190,8 @@ private List generateSegments( inputRowIteratorBuilder, buildSegmentsMeters, parseExceptionHandler, - pushTimeout + pushTimeout, + getInputStats() ); return pushed.getSegments(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java index 099b24af37ca..2c7eca1d8c9f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; @@ -40,6 +41,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; @@ -357,4 +359,15 @@ private static Pair> mergeSegmentsInSamePartition( ); } } + + @Override + public Map getMetricsDimensions() + { + return ImmutableMap.of( + DruidMetrics.TASK_ID, new String[] {getId()}, + DruidMetrics.TASK_TYPE, new String[] {getType()}, + DruidMetrics.DATASOURCE, new String[] {getDataSource()}, + DruidMetrics.SUPERVISOR_ID, new String[] {getSupervisorTaskId()} + ); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java index 543de6168e69..9572d8304cd1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java @@ -339,7 +339,8 @@ private Set generateAndPushSegments( return true; }, rowIngestionMeters, - parseExceptionHandler + parseExceptionHandler, + getInputStats() ) ) { driver.startJob(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 4388c705fb4f..8655f6e9d306 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -29,11 +29,13 @@ import com.google.common.collect.Iterators; import org.apache.druid.client.coordinator.CoordinatorClient; import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.CountableInputEntity; import org.apache.druid.data.input.InputFileAttribute; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.MaxSizeSplitHintSpec; import org.apache.druid.data.input.SegmentsSplitHintSpec; import org.apache.druid.data.input.SplitHintSpec; @@ -166,12 +168,12 @@ public List getMetrics() } @Override - protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) + protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory, InputStats inputStats) { final SegmentLoader segmentLoader = segmentLoaderFactory.manufacturate(temporaryDirectory); final List> timeline = createTimeline(); - final Iterator entityIterator = FluentIterable + final Iterator entityIterator = FluentIterable .from(timeline) .transformAndConcat(holder -> { //noinspection ConstantConditions @@ -179,7 +181,7 @@ protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nu //noinspection ConstantConditions return FluentIterable .from(partitionHolder) - .transform(chunk -> new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval())); + .transform(chunk -> new CountableInputEntity(new DruidSegmentInputEntity(segmentLoader, chunk.getObject(), holder.getInterval()), inputStats)); }).iterator(); final List effectiveDimensions = ReingestionTimelineUtils.getDimensionsToReingest( dimensions, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java index 6460ae43d55d..b8b33e2ff9b5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java @@ -80,8 +80,8 @@ public class DruidSegmentReader extends IntermediateRowParsingReader 0) { reader = new TimedShutoffInputSourceReader(reader, DateTimes.nowUtc().plusMillis(samplerConfig.getTimeoutMs())); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java index ef49ce3fcea0..e11143da1b75 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSource.java @@ -20,10 +20,12 @@ package org.apache.druid.indexing.seekablestream; import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.CountableInputEntity; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.indexing.overlord.sampler.SamplerException; @@ -99,13 +101,14 @@ public boolean needsFormat() protected InputSourceReader formattableReader( InputRowSchema inputRowSchema, InputFormat inputFormat, - @Nullable File temporaryDirectory + @Nullable File temporaryDirectory, + InputStats inputStats ) { return new InputEntityIteratingReader( inputRowSchema, inputFormat, - createEntityIterator(), + createEntityIterator(inputStats), temporaryDirectory ); } @@ -114,7 +117,7 @@ protected InputSourceReader formattableReader( * Returns an iterator converting each byte array from RecordSupplier into a ByteEntity. Note that the * returned iterator will be blocked until the RecordSupplier gives any data. */ - CloseableIterator createEntityIterator() + CloseableIterator createEntityIterator(InputStats inputStats) { return new CloseableIterator() { @@ -144,7 +147,7 @@ public boolean hasNext() @Override public InputEntity next() { - return new ByteEntity(bytesIterator.next()); + return new CountableInputEntity(new ByteEntity(bytesIterator.next()), inputStats); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 54292d8c9fb8..f7ebfec9ecbc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -24,7 +24,9 @@ import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; +import com.google.common.collect.ImmutableMap; import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputStats; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator; import org.apache.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; @@ -39,6 +41,8 @@ import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.metrics.InputStatsMonitor; +import org.apache.druid.query.DruidMetrics; import org.apache.druid.query.NoopQueryRunner; import org.apache.druid.query.Query; import org.apache.druid.query.QueryRunner; @@ -73,6 +77,7 @@ public abstract class SeekableStreamIndexTask> runnerSupplier; + private final InputStats inputStats; @MonotonicNonNull protected AuthorizerMapper authorizerMapper; @@ -98,6 +103,7 @@ public SeekableStreamIndexTask( this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); this.context = context; + this.inputStats = new InputStats(); this.runnerSupplier = Suppliers.memoize(this::createTaskRunner); this.lockGranularityToUse = getContextValue(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, Tasks.DEFAULT_FORCE_TIME_CHUNK_LOCK) ? LockGranularity.TIME_CHUNK @@ -142,6 +148,7 @@ public SeekableStreamIndexTaskIOConfig getI @Override public TaskStatus run(final TaskToolbox toolbox) { + toolbox.addMonitor(new InputStatsMonitor(inputStats, getMetricsDimensions())); return getRunner().run(toolbox); } @@ -274,4 +281,18 @@ public SeekableStreamIndexTaskRunner getRun { return runnerSupplier.get(); } + + public InputStats getInputStats() + { + return inputStats; + } + + public Map getMetricsDimensions() + { + return ImmutableMap.of( + DruidMetrics.TASK_ID, new String[] {getId()}, + DruidMetrics.TASK_TYPE, new String[] {getType()}, + DruidMetrics.DATASOURCE, new String[] {getDataSource()} + ); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java index 2a2bf2d7f05f..49591adbb7e0 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java @@ -40,6 +40,7 @@ import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.LookupNodeService; @@ -231,11 +232,14 @@ public enum Status private volatile CopyOnWriteArrayList> sequences; private volatile Throwable backgroundThreadException; + private final InputStats inputStats; + public SeekableStreamIndexTaskRunner( final SeekableStreamIndexTask task, @Nullable final InputRowParser parser, final AuthorizerMapper authorizerMapper, - final LockGranularity lockGranularityToUse + final LockGranularity lockGranularityToUse, + final InputStats inputStats ) { Preconditions.checkNotNull(task); @@ -257,6 +261,7 @@ public SeekableStreamIndexTaskRunner( this.sequences = new CopyOnWriteArrayList<>(); this.ingestionState = IngestionState.NOT_STARTED; this.lockGranularityToUse = lockGranularityToUse; + this.inputStats = inputStats; resetNextCheckpointTime(); } @@ -610,6 +615,8 @@ public void run() SequenceMetadata sequenceToCheckpoint = null; for (OrderedPartitionableRecord record : records) { + final long processedBytes = record.getData().parallelStream().mapToLong(value -> value.length).reduce(0, Long::sum); + getInputStats().incrementProcessedBytes(processedBytes); final boolean shouldProcess = verifyRecordInRange(record.getPartitionId(), record.getSequenceNumber()); log.trace( @@ -1858,6 +1865,11 @@ private boolean verifyRecordInRange( return isMoreToReadBeforeReadingRecord(recordSequenceNumber.get(), endOffsets.get(partition)); } + public InputStats getInputStats() + { + return inputStats; + } + /** * checks if the input seqNum marks end of shard. Used by Kinesis only */ diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java index 5c0447704b9b..567d26852e7b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java @@ -30,6 +30,7 @@ import org.apache.druid.data.input.InputRowListPlusRawValues; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.ByteEntity; import org.apache.druid.data.input.impl.InputRowParser; @@ -164,7 +165,7 @@ protected SeekableStreamSamplerFirehose(InputRowParser parser) createRecordSupplier(), ioConfig.isUseEarliestSequenceNumber() ); - this.entityIterator = inputSource.createEntityIterator(); + this.entityIterator = inputSource.createEntityIterator(new InputStats()); // not counting sampler read bytes } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/input/GeneratorInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/input/GeneratorInputSourceTest.java index 9b6ae0d3e36f..0fb65551e2c4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/input/GeneratorInputSourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/input/GeneratorInputSourceTest.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.InputStats; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.parsers.CloseableIterator; @@ -126,7 +127,7 @@ public void testReader() throws IOException timestampIncrement ); - InputSourceReader reader = inputSource.fixedFormatReader(null, null); + InputSourceReader reader = inputSource.fixedFormatReader(null, null, new InputStats()); CloseableIterator iterator = reader.read(); InputRow first = iterator.next(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 99ab9a480e9e..6a4d376af15a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -41,6 +41,7 @@ import org.apache.druid.data.input.InputRowListPlusRawValues; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.InputRowParser; @@ -282,7 +283,7 @@ private static InputRow ir(String dt, String dim1, String dim2, float met) private static class MockExceptionInputSource extends AbstractInputSource { @Override - protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) + protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory, InputStats inputStats) { return new InputSourceReader() { @@ -334,7 +335,7 @@ public boolean needsFormat() private static class MockInputSource extends AbstractInputSource { @Override - protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) + protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory, InputStats inputStats) { return new InputSourceReader() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java index 10d6bf857a45..bfde0983bc61 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/RecordSupplierInputSourceTest.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSource; import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.impl.CsvInputFormat; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; @@ -81,7 +82,8 @@ public void testRead() throws IOException Collections.emptyList() ), inputFormat, - temporaryFolder.newFolder() + temporaryFolder.newFolder(), + new InputStats() ); int read = 0; diff --git a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java index b1e49af02bf1..78e6e9eb8ea7 100644 --- a/processing/src/main/java/org/apache/druid/query/DruidMetrics.java +++ b/processing/src/main/java/org/apache/druid/query/DruidMetrics.java @@ -34,6 +34,7 @@ public class DruidMetrics public static final String ID = "id"; public static final String TASK_ID = "taskId"; public static final String STATUS = "status"; + public static final String SUPERVISOR_ID = "supervisorId"; // task metrics public static final String TASK_TYPE = "taskType"; diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java index c7dfbb7fa365..d1e7fc1b4457 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlInputSource.java @@ -25,10 +25,12 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import org.apache.druid.data.input.AbstractInputSource; +import org.apache.druid.data.input.CountableInputEntity; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.SplitHintSpec; import org.apache.druid.data.input.impl.InputEntityIteratingReader; import org.apache.druid.data.input.impl.SplittableInputSource; @@ -110,14 +112,18 @@ public SplittableInputSource withSplit(InputSplit split) } @Override - protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory) + protected InputSourceReader fixedFormatReader(InputRowSchema inputRowSchema, @Nullable File temporaryDirectory, + InputStats inputStats + ) { final SqlInputFormat inputFormat = new SqlInputFormat(objectMapper); return new InputEntityIteratingReader( inputRowSchema, inputFormat, createSplits(inputFormat, null) - .map(split -> new SqlEntity(split.get(), sqlFirehoseDatabaseConnector, foldCase, objectMapper)).iterator(), + .map(split -> new CountableInputEntity( + new SqlEntity(split.get(), sqlFirehoseDatabaseConnector, foldCase, objectMapper), inputStats)) + .iterator(), temporaryDirectory ); } @@ -139,8 +145,8 @@ public boolean equals(Object o) } SqlInputSource that = (SqlInputSource) o; return foldCase == that.foldCase && - sqls.equals(that.sqls) && - sqlFirehoseDatabaseConnector.equals(that.sqlFirehoseDatabaseConnector); + sqls.equals(that.sqls) && + sqlFirehoseDatabaseConnector.equals(that.sqlFirehoseDatabaseConnector); } @Override diff --git a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java index f0508df0d309..140e7ff0cd4d 100644 --- a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java +++ b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java @@ -57,7 +57,7 @@ public class SqlReader extends IntermediateRowParsingReader> ) { this.inputRowSchema = inputRowSchema; - this.source = (SqlEntity) source; + this.source = (SqlEntity) source.getBaseInputEntity(); this.temporaryDirectory = temporaryDirectory; this.objectMapper = objectMapper; } diff --git a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java index 7afa88894f26..da50eb39bbf5 100644 --- a/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java +++ b/server/src/test/java/org/apache/druid/metadata/input/SqlInputSourceTest.java @@ -32,6 +32,7 @@ import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSourceReader; import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.InputStats; import org.apache.druid.data.input.Row; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; @@ -153,7 +154,7 @@ public void testSingleSplit() throws Exception testUtils.createAndUpdateTable(TABLE_NAME_1, 10); final File tempDir = createFirehoseTmpDir("testSingleSplit"); SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST1, true, testUtils.getDerbyFirehoseConnector(), mapper); - InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir); + InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir, new InputStats()); CloseableIterator resultIterator = sqlReader.read(); final List rows = new ArrayList<>(); while (resultIterator.hasNext()) { @@ -173,7 +174,7 @@ public void testMultipleSplits() throws Exception testUtils.createAndUpdateTable(TABLE_NAME_2, 10); final File tempDir = createFirehoseTmpDir("testMultipleSplit"); SqlInputSource sqlInputSource = new SqlInputSource(SQLLIST2, true, testUtils.getDerbyFirehoseConnector(), mapper); - InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir); + InputSourceReader sqlReader = sqlInputSource.fixedFormatReader(INPUT_ROW_SCHEMA, tempDir, new InputStats()); CloseableIterator resultIterator = sqlReader.read(); final List rows = new ArrayList<>(); while (resultIterator.hasNext()) { diff --git a/website/.spelling b/website/.spelling index 14fd910f4a78..0a0a184c3775 100644 --- a/website/.spelling +++ b/website/.spelling @@ -165,6 +165,7 @@ Splunk SqlFirehose SqlParameter StatsD +supervisorId TCP TGT TLS