Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit state of replace and append for native batch tasks #12488

Merged
merged 17 commits into from
May 23, 2022

Conversation

loquisgon
Copy link

@loquisgon loquisgon commented Apr 28, 2022

A native batch ingestion job can be in either of these three modes: REPLACE (new since tombstones were added), APPEND, and REPLACE_LEGACY (current default for native batch ingestion). These modes are set with appropriate values for the following IOConfig flags: appendToExisting and dropExisting. The goal of this ticket is to emit Druid metrics to be able to assess the utilization of the different flavors of batch ingestion, in particular replace & tombstone creation.

Copying from the metrics.md documentation:

Metric Description Dimensions Normal Value
ingest/count Count of 1 every time an ingestion job runs (includes compaction jobs). Aggregate using dimensions. dataSource, taskId, taskType, taskIngestionMode Always 1.
ingest/segments/count Count of final segments created by job (includes tombstones). dataSource, taskId, taskType, taskIngestionMode At least 1.
ingest/tombstones/count Count of tombstones created by job dataSource, taskId, taskType, taskIngestionMode Zero or more for replace. Always zero for non-replace tasks (always zero for legacy replace, see below).

The taskIngestionMode dimension includes the following modes:
APPEND, REPLACE_LEGACY, and REPLACE. The APPEND mode indicates a native
ingestion job that is appending to existing segments; REPLACE a native ingestion
job replacing existing segments using tombstones;
and REPLACE_LEGACY the original replace before tombstones.

The mode is decided using the values
of the isAppendToExisting and isDropExisting flags in the
task's IOConfig as follows:

isAppendToExisting isDropExisting mode
true false APPEND
true true Invalid combination, exception thrown.
false false REPLACE_LEGACY (this is the default for native batch ingestion).
false true REPLACE

This PR has:

  • [ X] been self-reviewed.
  • [ X] added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • [ X] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • [ X] been tested in a test Druid cluster.

@loquisgon loquisgon marked this pull request as draft April 28, 2022 18:05
@loquisgon loquisgon marked this pull request as ready for review April 29, 2022 01:34
`true` | `false` | `APPEND`|
`true` | `true ` | Invalid combination, exception thrown. |
`false` | `false` | `OVERWRITE` (this is the default for native batch ingestion). |
`false` | `true` | `REPLACE`|
Copy link
Contributor

@zachjsh zachjsh May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to add details here about the creation of tombstones for time-chunks within the date interval specified for ingestion that have no data, if in REPLACE mode, and the absence of this in OVERWRITE mode, if my understanding is correct?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, this is to document the batch ingestion mode not the semantics of the method so I am not sure that the explanation for its semantics belongs here.

Copy link
Contributor

@zachjsh zachjsh May 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh ok, I guess the semantics are described elsewhere? If so should we link to that? not a big deal

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in the documentation for the flag isDropExisting. The actual names APPEND, REPLACE, and OVERWRITE are not formally documented and I am introducing them here. But documenting them is our of scope for this ticket I feel.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some explanation about what the modes mean.

emitter.emit(buildEvent("compact/overwrite/count", 1));
break;
default:
throw new ISE("Invalid compact ingestion mode [%s]", mode);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just log error instead of possibly failing ingestion task with this thrown exception?

Copy link
Author

@loquisgon loquisgon May 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integrating the BatchIngestionMode (see comment below) made this not required anymore.

|`isAppendToExisting` | `isdDropExisting` | mode |
|---------------------|-------------------|------|
`true` | `false` | `APPEND`|
`true` | `true ` | Invalid combination, exception thrown. |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at code, it doesnt seem that we throw an exception in this case, but rather treat it as APPEND, since we consider a job an append job if isAppendToExisting is true. and short-circuit the check to isDropExisting

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it does but it was confusing where. Due to this great comment I decided to fully integrate the concept of BatchIngestionMode in the batch ingestion task internals replacing all the confusing tests for append or drop existing etc. There were many changes but they should straightforward to follow. The code is better now, IMO.

@suneet-s
Copy link
Contributor

suneet-s commented May 11, 2022

The goal of this ticket is to emit Druid metrics to be able to assess the utilization of the different flavors of batch ingestion

Since there are many different config flags that might be of interest, and these can change as devs add new config settings. Have you considered extending the RequestLogger interface to allow logging ingestion specs in addition to the current sql and native queries? This will allow cluster operators to point these logs to another system (maybe even a druid cluster) for running analytics on top of the entire ingestion spec.

@loquisgon
Copy link
Author

@suneet-s No I have not considered RequestLogger, I am not really familiar with that. But I see your point. However, the whole area of enriching, cleaning, the logging, metrics, and alerts for ingestion is an area ripe for improvement. But I would suggest to think a little about a decent starting point on what to do/unify these areas. So for now, I would like to focus on the scope as defined for the work, but I would strongly recommend a more extended approach that would incorporate suggestions like the one you are making.

of the `isAppendToExisting` and `isDropExisting` flags in the
task's `IOConfig` as follows:

|`isAppendToExisting` | `isdDropExisting` | mode |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
|`isAppendToExisting` | `isdDropExisting` | mode |
|`isAppendToExisting` | `isDropExisting` | mode |

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix in next commit


|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`ingest/compact/ovewrite/count`|Count of `1` every time a compaction overwrite job runs|dataSource, taskId, taskType|Always `1`.|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't taskType always be compact ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taskType yeah for compact is compact but for other batch can be other...so for completeness & consistency it is better to live that dimension there I think

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed task type from metric.

@@ -221,6 +221,41 @@ Note: If the JVM does not support CPU time measurement for the current thread, i
|`worker/taskSlot/total/count`|Number of total task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.|
|`worker/taskSlot/used/count`|Number of busy task slots on the reporting worker per emission period. This metric is only available if the WorkerTaskCountStatsMonitor module is included.|category, version.|Varies.|

## Batch ingestion metrics (Native parallel task)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we decide to have a separate metric name for each type vs a single metric name with the type being a dimension?

ingest/batch/count, ingest/batch/segments/count and a dimension type (or even 2 dimensions for isAppendToExisting and isDropExisting - in this case, you don't need to deal with the error case described on line 243)

`true` | `true  ` | Invalid combination, exception thrown. |

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a similar vein, won't these metrics be useful for streaming ingestion as well - Why not make the metric name ingest/count and then the taskType dimension tells you whether or not it is streaming or batch?

Also, reading these docs, it's not clear to me whether a compaction job will also be counted as a batch job

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal of this feature is to gather data to let us know the utilization of the various modes of batch ingest, in particular REPLACE thus the categorization.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The invalid combination is enabled by the fact that the IOConfig part of the spec exposes both dropExisting and appendToExisting today. If we were to deprecate those and expose instead batchProcessingMode with its three possible values then this invalid state would no longer be possible. But I think that it is better to do this later if we agree it is a good idea.

Copy link
Author

@loquisgon loquisgon May 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The taskType is already a dimension (i.e. for native batch types we have: compact, index, index_parallel and for streaming index_kafka , etc; for non native index_hadoop...) so yeah...maybe I should just drop the batch (and compact) from the metric name and just use the indexType dimension to filter...is this what you are thinking @suneet-s ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @suneet-s suggestion to drop the batch and compact from metric name, as the task-type dimension included signifies this, and then the metric can be reused for other ingestion types, not just batch / compact.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree too. Working on it now.

Copy link
Contributor

@jon-wei jon-wei May 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a similar vein, won't these metrics be useful for streaming ingestion as well - Why not make the metric name ingest/count and then the taskType dimension tells you whether or not it is streaming or batch?

I had suggested a batch-specific name when discussing with Agustin since I think the semantics of the metric are batch specific, it's being emitted once per ingest spec posted, and I'm not sure what the equivalent for streaming would be. It's a metric indicating "number of times user did some action with a specific intent" vs. "detail of execution" (like a parallel subtask count for batch or number of streaming subtasks). Taking it out for a shared metric name seems fine to me though.

Copy link
Author

@loquisgon loquisgon May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@suneet-s I have removed the batch type from the metric and added taskIngestionMode as a dimension. As a result the new metrics are much clean and the code is cleaner as well. Also, now it was simple to add the metrics to streaming as well, take a look.

|`ingest/compact/ovewrite/count`|Count of `1` every time a compaction overwrite job runs|dataSource, taskId, taskType|Always `1`.|
|`ingest/compact/replace/count`|Count of `1` every time a compaction replace job runs|dataSource, taskId, taskType|Always `1`.|

|`isDropExisting` | mode |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this table have a description? Or maybe I missed it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zachjsh Simplified & cleaned up docs... see PR description or the metrics.md changes

…nsion. Move IngestionMode to AbstractTask to facilitate having mode as a dimension. Add metrics to streaming. Add missing coverage.
@lgtm-com
Copy link

lgtm-com bot commented May 18, 2022

This pull request introduces 2 alerts when merging 1622b74 into 3e8d7a6 - view on LGTM.com

new alerts:

  • 2 for Dereferenced variable may be null

…ation of IngestionMode to make it more robust to null IOConfig and fix test.
@lgtm-com
Copy link

lgtm-com bot commented May 18, 2022

This pull request introduces 1 alert when merging c5f139f into 3e8d7a6 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

@loquisgon
Copy link
Author

I don't understand the LGTM concern about the "new publishedSegmentsAndCommitMetadata". That variable existed already before I added the metric. Maybe it is complaining because the new code explicitly check that the variable is not null?

@lgtm-com
Copy link

lgtm-com bot commented May 18, 2022

This pull request introduces 1 alert when merging 8ba3f60 into 3e8d7a6 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

isAppendToExisting = BatchIOConfig.DEFAULT_APPEND_EXISTING;
isDropExisting = BatchIOConfig.DEFAULT_DROP_EXISTING;
} else {
isAppendToExisting = BatchIOConfig.DEFAULT_APPEND_EXISTING;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How come this uses the default from BatchIOConfig instead of using ioConfig.isAppendToExisting()

Copy link
Author

@loquisgon loquisgon May 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is specific to CompactionIOCOnfig which does not have a isAppendToExisting() flag/method. Added a comment to the code so this is more clear.

return ingestionMode;
}

protected static IngestionMode computeIngestionMode(@Nullable CompactionIOConfig ioConfig)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest distinguishing these two computeIngestionMode methods with different names since they both have nullable parameters

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. Done in next commit.

@lgtm-com
Copy link

lgtm-com bot commented May 20, 2022

This pull request introduces 1 alert when merging 2945222 into c236227 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

…or if they are passed null. Other minor cleanup.
@lgtm-com
Copy link

lgtm-com bot commented May 21, 2022

This pull request introduces 1 alert when merging 59aa6d7 into c236227 - view on LGTM.com

new alerts:

  • 1 for Dereferenced variable may be null

@loquisgon loquisgon merged commit 2f3d7a4 into apache:master May 23, 2022
@loquisgon loquisgon deleted the dropexisting_metric branch May 23, 2022 19:32
@abhishekagarwal87 abhishekagarwal87 added this to the 24.0.0 milestone Aug 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

6 participants