-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DruidInputSource: Fix issues in column projection, timestamp handling. #10267
Conversation
DruidInputSource, DruidSegmentReader changes: 1) Remove "dimensions" and "metrics". They are not necessary, because we can compute which columns we need to read based on what is going to be used by the timestamp, transform, dimensions, and metrics. 2) Start using ColumnsFilter (see below) to decide which columns we need to read. 3) Actually respect the "timestampSpec". Previously, it was ignored, and the timestamp of the returned InputRows was set to the `__time` column of the input datasource. (1) and (2) together fix a bug in which the DruidInputSource would not properly read columns that are used as inputs to a transformSpec. (3) fixes a bug where the timestampSpec would be ignored if you attempted to set the column to something other than `__time`. (1) and (3) are breaking changes. Web console changes: 1) Remove "Dimensions" and "Metrics" from the Druid input source. 2) Set timestampSpec to `{"column": "__time", "format": "millis"}` for compatibility with the new behavior. Other changes: 1) Add ColumnsFilter, a new class that allows input readers to determine which columns they need to read. Currently, it's only used by the DruidInputSource, but it could be used by other columnar input sources in the future. 2) Add a ColumnsFilter to InputRowSchema. 3) Remove the metric names from InputRowSchema (they were unused). 4) Add InputRowSchemas.fromDataSchema method that computes the proper ColumnsFilter for given timestamp, dimensions, transform, and metrics. 5) Add "getRequiredColumns" method to TransformSpec to support the above.
Fwiw, on this one, I think the likelihood of (1) causing problems is low. It's a breaking change because if you were previously specifying a column as an input to one of your dimensionsSpec or aggregators, but then explicitly not including in the input source's "dimensions" or "metrics" list, it'll now actually get read. Previously it'd be treated as null. The new behavior is better & less brittle but is different. (3) is still fairly low, but somewhat more likely. It's possible that someone had their timestampSpec set to something like This patch as written just lets these things break, but we could cushion the fall, potentially:
What do people think? |
Btw, this fixes #10266 too. |
@@ -87,13 +91,21 @@ | |||
@Nullable | |||
private final List<WindowedSegmentId> segmentIds; | |||
private final DimFilter dimFilter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will it make sense to move DimFilter outside the InputSource in the task json? It seems more natural to me to put the filters alongside transforms, dimensions, and metrics and leave only the data source properties inside the InputSource
section. On the flip side, it could make the compatibility situation more complicated than it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible to specify a filter alongside transforms today! You can do it in two places:
- In the
transformSpec
(this works with any input source / format, see https://druid.apache.org/docs/latest/ingestion/index.html#filter) - In the druid
inputSource
itself (of course, only works with this input source)
It's a little silly to have both, perhaps, but there's a practical reason: specifying a filter in the druid inputSource
is faster, because it is applied while creating the cursor that reads the data, and therefore it can use indexes, etc. The filter in the transformSpec
is applied after the cursor generates rows.
But I think in the future, it'd be better to support pushing down the transformSpec
filter into the cursor, and then we could deprecate the filter parameter in the inputSource
, because it wouldn't be useful anymore.
For now, I suggest we leave it as-is.
(1) sounds good to me.
I think I'm a little unclear on (3), is the feature flag controlling whether to use the old/new behavior or is it just for whether the auto/millis check is executed? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, for the (3) backwards compatibility item, what makes sense to me if we decide to feature flag this (I'm not sure that's worth doing, maybe enough to just show a clear error message and call this out in the release notes), would be to have the flag control whether we ignore (old mode) or respect the timestampSpec, and in the new mode we could have that __time with auto/millis format check.
* @see InputRowSchema#getColumnsFilter() | ||
*/ | ||
@VisibleForTesting | ||
static ColumnsFilter createColumnsFilter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(What's the thumbs up for?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought it was a good method 👍
Hmm, thinking about this some more, it may be best to not have it be an error. The risk of doing a sanity-check error is that something that worked before will stop working once you upgrade. It seems to me that any time someone had been specifying
I think it makes sense to add a flag, because of the potential for confusion around why something suddenly broke in a subtle way after an upgrade. Usually (hopefully) cluster operators read the release notes, but not all users will read them, and the people submitting indexing tasks might not be the same people that operate the cluster. In new mode, I suggest we skip the check, because that will enable the full power of timestampSpec to be used (you could use it to switch a secondary timestamp to primary, for example). IMO the default should be new mode, but we should put something in the release notes that says if you have a bunch of users that might be relying on the old behavior, you can set this flag and get the old behavior back. I'd also consider adding a logged warning if the timestampSpec is anything other than What do you think? Separately — as a reviewer — would you prefer these changes to be made in this patch, or in a follow-up? I'm OK either way… |
That sounds fine to me too, I don't really have a strong preference on that.
Ah, I was thinking that the auto/millis check in the new mode would only apply if the column being referenced was The warning log sounds fine to me.
Let's do it in this patch, the additions I'm guessing won't be too large and we can have everything related in one place. |
@jon-wei I've pushed a new commit.
For this one, I left it as not-an-error.
For this one, I added a config Either way, a warning is logged if you read from the |
The new config LGTM, there are some CI failures |
@jon-wei thanks; I can't see the CI failures right now due to merge conflicts, so I just fixed them and pushed them up. I'll take another look once CI runs again. |
I'm having a tough time figuring out why the integration tests aren't passing. I suppose it's related to the fact that I added testing there for the new |
@gianm I saw some messages like this in the perfect rollup test: https://travis-ci.org/github/apache/druid/jobs/729522884
I didn't see the ClassCastException in the log for the failed batch test though. |
OK, got it all sorted out. The tests are passing now. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor nit
docs/configuration/index.md
Outdated
@@ -1249,6 +1249,7 @@ Additional peon configs include: | |||
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on middleManager restart for restorable tasks to gracefully exit.|PT5M| | |||
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|`/tmp/druid-indexing`| | |||
|`druid.indexer.task.restoreTasksOnRestart`|If true, MiddleManagers will attempt to stop tasks gracefully on shutdown and restore them on restart.|false| | |||
|`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/native-batch.md#druid-input-source) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.20.0.|false| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be 0.20.1 at this point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change it to 0.21.0, in the guess that this will be the next release.
docs/configuration/index.md
Outdated
@@ -1313,6 +1314,7 @@ then the value from the configuration below is used: | |||
|`druid.indexer.task.gracefulShutdownTimeout`|Wait this long on Indexer restart for restorable tasks to gracefully exit.|PT5M| | |||
|`druid.indexer.task.hadoopWorkingPath`|Temporary working directory for Hadoop tasks.|`/tmp/druid-indexing`| | |||
|`druid.indexer.task.restoreTasksOnRestart`|If true, the Indexer will attempt to stop tasks gracefully on shutdown and restore them on restart.|false| | |||
|`druid.indexer.task.ignoreTimestampSpecForDruidInputSource`|If true, tasks using the [Druid input source](../ingestion/native-batch.md#druid-input-source) will ignore the provided timestampSpec, and will use the `__time` column of the input datasource. This option is provided for compatibility with ingestion specs written before Druid 0.20.0.|false| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
The web console changes (*.tsx) look good to me |
thank you for updating the docs per my ask |
Fixed up some merge conflicts. |
Fixed a conflict and updated the docs to say "before Druid 0.22.0" instead of "before Druid 0.21.0". |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gianm sorry for the delayed review. LGTM overall, but I left some minor comments. Please address them. Also the build is failing because of the signature change of the constructor of InputRowSchema()
.
[ERROR] /home/jihoonson/Projects/druid/core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java:[382,13] cannot find symbol
[ERROR] symbol: variable Collections
[ERROR] location: class org.apache.druid.data.input.impl.JsonReaderTest
[ERROR] /home/jihoonson/Projects/druid/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java:[183,87] incompatible types: no instance(s) of type variable(s) T exist so that java.util.List<T> conforms to org.apache.druid.data.input.ColumnsFilter
[ERROR] /home/jihoonson/Projects/druid/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/StreamChunkParserTest.java:[206,87] incompatible types: no instance(s) of type variable(s) T exist so that java.util.List<T> conforms to org.apache.druid.data.input.ColumnsFilter
A spec that applies a filter and reads a subset of the original datasource's columns is shown below. | ||
It is OK for the input and output datasources to be the same. In this case, newly generated data will overwrite the | ||
previous data for the intervals specified in the `granularitySpec`. Generally, if you are going to do this, it is a | ||
good idea to test out your reindexing by writing to a separate datasource before overwriting your main one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe good to suggest using auto compaction here instead of writing an ingestion spec.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. I added this.
Alternatively, if your goals can be satisfied by [compaction](compaction.md),
consider that instead as a simpler approach.
docs/ingestion/native-batch.md
Outdated
"partitionsSpec": { | ||
"type": "hashed", | ||
"numShards": 1 | ||
}, | ||
"forceGuaranteedRollup": true, | ||
"maxNumConcurrentSubTasks": 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part was not in the previous example. Was it intentional to use hashed
partitionsSpec here? Seems unnecessary to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What I was thinking was:
- I want to include a full ingest spec, not just the inputSource part, so people have a full example.
- This spec uses rollup, so for a reindexing spec, it'd be good to use a partitionsSpec that guarantees rollup too.
Do you have a better suggestion for what to put in the example?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. It makes sense. If that's the case, I would suggest simply removing numShards
from the spec. The parallel task will find the numShards
automatically based on targetRowsPerSegment
which is 5 million by default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I'll do that.
@@ -160,6 +184,7 @@ public String getDataSource() | |||
|
|||
@Nullable | |||
@JsonProperty | |||
@JsonInclude(Include.NON_NULL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it better to add this annotation at the class-level? Seems reasonable to not include any fields in JSON if they are null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting question. To answer it, I had to add some tests to make sure it worked properly. The answer is yes, it does work. I'll make the change and keep the new tests (look for them in DruidInputSourceTest).
return Collections.singletonList(new MapBasedInputRow(timestamp.getMillis(), dimensions, intermediateRow)); | ||
return Collections.singletonList( | ||
MapInputRowParser.parse( | ||
new InputRowSchema( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be better to cache inputRowSchema
since this function is called per row.
@@ -129,7 +129,7 @@ public void testAutoCompactionDutySubmitAndVerifyCompaction() throws Exception | |||
fullDatasourceName, | |||
AutoCompactionSnapshot.AutoCompactionScheduleStatus.RUNNING, | |||
0, | |||
22482, | |||
22481, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know why this changed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guessed it was because the metadata changed. By that, I mean the org.apache.druid.segment.Metadata
object stored in the segment, which contains the TimestampSpec.
It adds up, I think, since -1 character is the difference between the old default timestamp
+ auto
(13 chars) and the new default __time
+ millis
(12 chars).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that seems likely the reason. Thanks 👍
Thanks for the review @jihoonson. I've pushed up the changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gianm thanks for the quick fix. The latest change LGTM. +1 after CI.
thanks! |
DruidInputSource, DruidSegmentReader changes:
can compute which columns we need to read based on what is going to
be used by the timestamp, transform, dimensions, and metrics.
to read.
the timestamp of the returned InputRows was set to the
__time
columnof the input datasource.
(1) and (2) together fix a bug in which the DruidInputSource would not
properly read columns that are used as inputs to a transformSpec.
(3) fixes a bug where the timestampSpec would be ignored if you attempted
to set the column to something other than
__time
.(1) and (3) are breaking changes.
Web console changes:
{"column": "__time", "format": "millis"}
forcompatibility with the new behavior.
Other changes:
which columns they need to read. Currently, it's only used by the
DruidInputSource, but it could be used by other columnar input sources
in the future.
ColumnsFilter for given timestamp, dimensions, transform, and metrics.