Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add configurable ColumnTypeMergePolicy to SegmentMetadataCache #14319

Conversation

clintropolis
Copy link
Member

@clintropolis clintropolis commented May 19, 2023

Description

This PR adds a new interface to control how SegmentMetadataCache chooses ColumnType when faced with differences between segments for SQL schemas which are computed, exposed as druid.sql.planner.metadataColumnTypeMergePolicy and adds a new 'least restrictive type' mode to allow choosing the type that data across all segments can best be coerced into and sets this as the default behavior.

This is a behavior change around when segment driven schema migrations take effect for the SQL schema. With latestInterval, the SQL schema will be updated as soon as the first job with the new schema has published segments, while using leastRestrictive, the schema will only be updated once all segments are reindexed to the new type. The benefit of leastRestrictive is that it eliminates a bunch of type coercion errors that can happen in SQL when types are varied across segments with latestInterval because the newest type is not able to correctly represent older data, such as if the segments have a mix of ARRAY and number types, or any other combinations that lead to odd query plans.

I am not at all attached to these names, so if they should be called something else more intuitive then feel free to suggest.

Release note

A new broker configuration, druid.sql.planner.metadataColumnTypeMergePolicy adds configurable modes to how column types are computed for the SQL table schema when faced with differences between segments. A new leastRestrictive mode allows choosing the most appropriate type that data across all segments can best be coerced into, and is now the default behavior. This is a subtle behavior change around when segment driven schema migrations will take effect for the SQL schema. With latestInterval, the SQL schema will be updated as soon as the first job with the new schema has published segments in the latest time interval of the data, while using the new leastRestrictive mode, the schema will only be updated once all segments are reindexed to the new type. However, leastRestrictive is likely to have "better" query time behavior and eliminates some query time errors and other oddities that can occur when using latestInterval.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

Copy link
Contributor

@abhishekrb19 abhishekrb19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this new capability. Looks good overall. I've a few questions and comments

* chosen, at least for systems that are continuously updated with 'current' data.
*
* Since {@link ColumnTypeMergePolicy} are used to compute the SQL schema, at least in systems using SQL schemas which
* are poartially or fully computed by this cache, this merge policy can result in query time errors if incompatible
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* are poartially or fully computed by this cache, this merge policy can result in query time errors if incompatible
* are partially or fully computed by this cache, this merge policy can result in query time errors if incompatible

* types are mixed if the chosen type is more restrictive than the types of some segments. If data is likely to vary
* in type across segments, consider using {@link LeastRestrictiveTypeMergePolicy} instead.
*/
public static class FirstTypeMergePolicy implements ColumnTypeMergePolicy
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we align this class name with the policy name - NewestFirstTypeMergePolicy or LatestTypeMergePolicy?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I tried to capture my hesitation to name the actual class something like that in the javadocs because the only reason it is 'newestFirst' is because of the external iterator ordering that is passed to it, and if the iterator changes then this one would take on the behavior of whatever order the iterator has. So i wanted to make it super clear that this doesn't actually do anything to ensure it is newest first, and instead just uses the first non-null type it finds. This way if someone refactors this cache it could hopefully be easier to notice that something might need to be done to make the behavior of this mode not change.

However, the type name is user facing, so it seemed important that to make it a name that people could understand, so i did choose the name that captures the behavior of policy + iterator for that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Thanks for clarifying.



/**
* ColumnTypeMergePolicy defines the rules of which type to use when faced with the possibility of different types
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice commentary, the code is easy to follow 👍

@@ -152,7 +152,7 @@ public static ColumnType ofComplex(@Nullable String complexTypeName)
* inference
*/
@Nullable
public static ColumnType leastRestrictiveType(@Nullable ColumnType type, @Nullable ColumnType other)
public static ColumnType leastRestrictiveType(@Nullable ColumnType type, @Nullable ColumnType other) throws IncompatibleTypeException
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call on the new exception type. May also want to update the javadoc to reflect IllegalArgumentException -> IncompatibleTypeException

{
public IncompatibleTypeException(ColumnType type, ColumnType other)
{
super("Cannot implicitly cast %s to %s", type, other);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
super("Cannot implicitly cast %s to %s", type, other);
super("Cannot implicitly cast [%s] to [%s]", type, other);

nit: I think it'll also be helpful to pass in the column name so it's clear which column has incompatible types

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered this, but I was also planning to use this for some expression type casting errors too, which don't always necessarily have a column they belong to, rather it might be the output type of an entire expression tree, so left it out exception for now (and am adding it in the place where expression types would use it).

The cache isn't actually logging about this error, instead it eats it and basically falls back to the old logic (i'm updating it to actually just use the other merge policy in the catch). I guess it could warn, but i was afraid it would be too chatty in the event you actually did have segments of incompatible types, so debug would probably be the most appropriate level.

@@ -113,6 +94,7 @@ public String toString()
", metadataSegmentCacheEnable=" + metadataSegmentCacheEnable +
", metadataSegmentPollPeriod=" + metadataSegmentPollPeriod +
", awaitInitializationOnStart=" + awaitInitializationOnStart +
", columnTypeMergePolicy=" + metadataColumnTypeMergePolicy +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
", columnTypeMergePolicy=" + metadataColumnTypeMergePolicy +
", metadataColumnTypeMergePolicy=" + metadataColumnTypeMergePolicy +


/**
* Resolves types using {@link ColumnType#leastRestrictiveType(ColumnType, ColumnType)} to find the ColumnType that
* can best represent all data contained across all segments.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it actually type resolved from all the segments or limited to the most recent MAX_SEGMENTS_PER_QUERY segments per query? (hardcoded to 15000)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I wonder what the performance implications for choosing the leastRestrictive strategy are, given that this policy has to scan many/all segments per data source. Should we call out any gotchas in the documentation explicitly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we iterate all segments for both modes, this PR didn't change that part. MAX_SEGMENTS_PER_QUERY defines the batch size of how many segments at a time we issue segment metadata queries for. Naively I think there shouldn't be any real performance difference between the policies since we always iterate all segments and columns regardless of the policy, the policy just chooses how we accumulate them into the final RowSignature.

Copy link
Contributor

@abhishekrb19 abhishekrb19 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding this new capability. Looks good overall. I've a few questions and comments

@dkoepke
Copy link
Contributor

dkoepke commented May 22, 2023

Regarding naming:

For newestFirst, "newest" without context might be confusing because it can refer either to when the segment was created/published OR the interval it covers. I believe the behavior is to select the segment with the latest interval (since the segments are sorted by segment ID, and the interval comes first in the ID), so a policy name like latestInterval might be clearer?

For leastRestrictive, perhaps bestFit is clearer? My concern is that "least restrictive", while correct, can also refer to a policy that just chooses the widest possible type.

@clintropolis
Copy link
Member Author

For leastRestrictive, perhaps bestFit is clearer? My concern is that "least restrictive", while correct, can also refer to a policy that just chooses the widest possible type.

'leastRestrictive' is effectively a rather permissive widening of types, see https://github.com/apache/druid/blob/master/processing/src/main/java/org/apache/druid/segment/column/ColumnType.java#L139 for the logic backing it, so im not sure its really wrong to interpret it that way.

@clintropolis
Copy link
Member Author

For newestFirst, "newest" without context might be confusing because it can refer either to when the segment was created/published OR the interval it covers. I believe the behavior is to select the segment with the latest interval (since the segments are sorted by segment ID, and the interval comes first in the ID), so a policy name like latestInterval might be clearer?

latestInterval probably is clearer, i can make the change soon unless anyone has better ideas/different opinions

@@ -1954,6 +1954,7 @@ The Druid SQL server is configured through the following properties on the Broke
|`druid.sql.http.enable`|Whether to enable JSON over HTTP querying at `/druid/v2/sql/`.|true|
|`druid.sql.planner.maxTopNLimit`|Maximum threshold for a [TopN query](../querying/topnquery.md). Higher limits will be planned as [GroupBy queries](../querying/groupbyquery.md) instead.|100000|
|`druid.sql.planner.metadataRefreshPeriod`|Throttle for metadata refreshes.|PT1M|
|`druid.sql.planner.metadataColumnTypeMergePolicy`|Defines how column types will be chosen when faced with differences between segments when computing the SQL schema. Options are specified as a JSON object, with valid choices of `{"type":"leastRestrictive"}` or `{"type":"latestInterval"}`. For `leastRestrictive`, Druid will automatically widen the type computed for the schema to a type which data across all segments can be converted into, however planned schema migrations can only take effect once all segments have been re-ingested to the new schema. With `latestInterval`, the column type in most recent time chunks defines the type for the schema. |`{"type":"leastRestrictive"}`|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to not just have this configured with more simple values like leastRestrictive or latestInterval

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honest answer? i forget how to do that off the top of my head 😅

@@ -112,4 +113,12 @@ public static <T extends TypeDescriptor> boolean either(
return (typeSignature1 != null && typeSignature1.is(typeDescriptor)) ||
(typeSignature2 != null && typeSignature2.is(typeDescriptor));
}

public static class IncompatibleTypeException extends IAE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q - would this exception return a 5xx to the user? I am wondering if we change the exception so that the end user gets a 400 instead. since its really a bad query.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, in this particular PR the thing isn't user facing really because its happening inside the brokers segment metadata cache that powers the SQL schema and is happening in the background. This logic does not happen in user queries.

The callers of ExpressionTypeConversion that also now throw this exception instead of IAE are i guess potentially user facing (though could also happen in ingest time transforms), though im not really sure this is an exception that should be thrown directly to users, rather it should probably be caught by something and decorated with additional context to indicate like what the offending expression was, etc.

@JsonCreator
static ColumnTypeMergePolicy fromString(String type)
{
if (LeastRestrictiveTypeMergePolicy.NAME.equals(type)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we case insensitive comparison here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@abhishekagarwal87 abhishekagarwal87 merged commit 4096f51 into apache:master May 24, 2023
@clintropolis clintropolis deleted the segment-metadata-type-merge-matters branch May 24, 2023 18:57
@abhishekagarwal87 abhishekagarwal87 added this to the 27.0 milestone Jul 19, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants