Skip to content

Add InputStats to track bytes processed by a task#13520

Merged
kfaraz merged 17 commits intoapache:masterfrom
kfaraz:add_processed_bytes
Dec 13, 2022
Merged

Add InputStats to track bytes processed by a task#13520
kfaraz merged 17 commits intoapache:masterfrom
kfaraz:add_processed_bytes

Conversation

@kfaraz
Copy link
Contributor

@kfaraz kfaraz commented Dec 7, 2022

This is based on #12750 and #10407

Description

  • Track bytes processed by a task and expose in task reports along with row stats
  • Supported for classic batch and streaming ingestion
  • Not supported for MSQ ingestion in this PR
  • Not supported for FirehoseToInputSourceReaderAdaptor
  • processedBytes measure the uncompressed input bytes. e.g. in case of the sample "wikipedia" datasource, it measures the total size of the unzipped wikipedia.json file (and not the wikipedia.json.gz)
  • Records that are counted as unparseable, thrownAway or processedWithErrors also count towards processedBytes

Implementation

  • Add class InputStats to track processed bytes
  • Add method InputSourceReader.read(InputStats) to read input rows while counting bytes.

Since we need to count the bytes, we could not just have a wrapper around InputSourceReader or InputEntityReader (the way CountableInputSourceReader does) because the InputSourceReader only deals with InputRows and the byte information is already lost.

  • Classic batch: Use the new InputSourceReader.read(inputStats) in AbstractBatchIndexTask
  • Streaming: Increment processedBytes in StreamChunkParser. This does not use the new InputSourceReader.read(inputStats) method.
  • Extend InputStats with RowIngestionMeters so that bytes can be exposed in task reports

Tests and refactors

  • Update tests to verify the value of processedBytes
    • HdfsInputSourceTest
    • S3InputSourceTest
    • GCSInputSourceTest
    • OssInputSourceTest
    • SqlInputSourceTest
    • DruidSegmentReaderTest
    • KinesisIndexTaskTest
    • KafkaIndexTaskTest
  • Rename MutableRowIngestionMeters to SimpleRowIngestionMeters and remove duplicate class
  • Replace CacheTestSegmentCacheManager with NoopSegmentCacheManager
  • Refactor KafkaIndexTaskTest and KinesisIndexTaskTest

Pending items

  • docs

Further work

  • Support for MSQ ingestion

Release note

Track bytes processed by a task and publish them in the task report along with the row stats.

Sample row stats in a task report:

"rowStats": {
    "buildSegments": {
        "processed": 24433,
        "processedBytes": 11956576,
        "processedWithError": 0,
        "thrownAway": 0,
        "unparseable": 0
    }
}

Screenshots

Batch ingestion
image

Streaming ingestion
image


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Comment on lines 27 to 35
default void incrementProcessedBytes(long incrementByValue)
{

}

default long getProcessedBytes()
{
return 0;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are default methods needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RowIngestionMeters is marked as an extension point, which is where this is used. I can move the default impls there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if anyone uses their own impl of RowIngestionMeters. I would have preferred not having these default impls altogether.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to RowIngestionMeters for now.

public interface InputSourceReader
{
CloseableIterator<InputRow> read() throws IOException;
default CloseableIterator<InputRow> read() throws IOException
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method can now be removed as it is now only used in tests.


@Override
public CloseableIterator<InputRow> read() throws IOException
public CloseableIterator<InputRow> read(InputStats inputStats) throws IOException
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe throw UnsupportedOpException here?

@kfaraz kfaraz removed the WIP label Dec 12, 2022
Copy link
Contributor

@AmatyaAvadhanula AmatyaAvadhanula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @kfaraz! LGTM, +1 after builds pass

@kfaraz
Copy link
Contributor Author

kfaraz commented Dec 13, 2022

Thanks a lot for the review, @AmatyaAvadhanula !
Thanks for doing the initial work on this, @somu-imply , @pjain1 !

@kfaraz kfaraz merged commit 58a3acc into apache:master Dec 13, 2022
kfaraz added a commit that referenced this pull request Dec 15, 2022
Follow up to #13520

Bytes processed are currently tracked for intermediate stages in MSQ ingestion.
This patch adds the capability to track the bytes processed by an MSQ controller
task while reading from an external input source or a segment source.

Changes:
- Track `processedBytes` for every `InputSource` read in `ExternalInputSliceReader`
- Update `ChannelCounters` with the above obtained `processedBytes` when incrementing
the input file count.
- Update task report structure in docs

The total input processed bytes can be obtained by summing the `processedBytes` as follows:

totalBytes = 0
for every root stage (i.e. a stage which does not have another stage as an input):
    for every worker in that stage:
        for every input channel: (i.e. channels with prefix "input", e.g. "input0", "input1", etc.)
            totalBytes += processedBytes
@kfaraz kfaraz mentioned this pull request Jan 14, 2023
9 tasks
@clintropolis clintropolis added this to the 26.0 milestone Apr 10, 2023
@kfaraz kfaraz mentioned this pull request May 5, 2023
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants