Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/querying/query-context-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ Unless otherwise noted, the following parameters apply to all query types, and t
|`setProcessingThreadNames`|`true`| Whether processing thread names will be set to `queryType_dataSource_intervals` while processing a query. This aids in interpreting thread dumps, and is on by default. Query overhead can be reduced slightly by setting this to `false`. This has a tiny effect in most scenarios, but can be meaningful in high-QPS, low-per-segment-processing-time scenarios. |
|`sqlPlannerBloat`|`1000`|Calcite parameter which controls whether to merge two Project operators when inlining expressions causes complexity to increase. Implemented as a workaround to exception `There are not enough rules to produce a node with desired properties: convention=DRUID, sort=[]` thrown after rejecting the merge of two projects.|
|`cloneQueryMode`|`excludeClones`| Indicates whether clone Historicals should be queried by brokers. Clone servers are created by the `cloneServers` Coordinator dynamic configuration. Possible values are `excludeClones`, `includeClones` and `preferClones`. `excludeClones` means that clone Historicals are not queried by the broker. `preferClones` indicates that when given a choice between the clone Historical and the original Historical which is being cloned, the broker chooses the clones. Historicals which are not involved in the cloning process will still be queried. `includeClones` means that broker queries any Historical without regarding clone status. This parameter only affects native queries. MSQ does not query Historicals directly.|
|`realtimeSegmentsOnly` |`false`| When set to true, only query realtime segments. Historical segments are excluded. |
|`realtimeSegmentsMode` |`include`| Controls whether realtime segments are queried. `include` queries all segments, including realtime. `exclude` skips realtime segments. `exclusive` queries only realtime segments. |
|`realtimeSegmentsOnly` |`false`| **Deprecated.** Use `realtimeSegmentsMode=exclusive` instead. When set to `true`, this is equivalent to `realtimeSegmentsMode=exclusive`. When set to `false`, this is equivalent to `realtimeSegmentsMode=include`.|

## Parameters by query type

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.QueryContexts.RealtimeSegmentsMode;
import org.apache.druid.query.QueryContexts.Vectorize;
import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.TypedInFilter;
Expand Down Expand Up @@ -781,8 +782,48 @@ public boolean isPrePlanned()
return getBoolean(QueryContexts.CTX_PREPLANNED, QueryContexts.DEFAULT_PREPLANNED);
}

/**
* Returns the realtime segments mode for this query. If {@link QueryContexts#REALTIME_SEGMENTS_MODE} is absent
* or null, falls back to the deprecated {@code realtimeSegmentsOnly} boolean: {@code true} maps
* to {@link RealtimeSegmentsMode#EXCLUSIVE}; otherwise returns {@link RealtimeSegmentsMode#INCLUDE}.
* Throws {@link BadQueryContextException} if both fields are set simultaneously.
*/
public RealtimeSegmentsMode getRealtimeSegmentsMode()
{
RealtimeSegmentsMode mode = getEnum(
QueryContexts.REALTIME_SEGMENTS_MODE,
RealtimeSegmentsMode.class,
null
);
Comment thread
jtuglu1 marked this conversation as resolved.
boolean hasDeprecatedFlag = get(QueryContexts.REALTIME_SEGMENTS_ONLY) != null;
if (mode != null && hasDeprecatedFlag) {
throw new BadQueryContextException(
StringUtils.format(
"Cannot set both [%s] and deprecated [%s]; use [%s] only.",
QueryContexts.REALTIME_SEGMENTS_MODE,
QueryContexts.REALTIME_SEGMENTS_ONLY,
QueryContexts.REALTIME_SEGMENTS_MODE
)
);
}
if (mode != null) {
return mode;
}
if (hasDeprecatedFlag) {
// Backward-compat: honour the deprecated realtimeSegmentsOnly flag.
return getBoolean(QueryContexts.REALTIME_SEGMENTS_ONLY, QueryContexts.DEFAULT_REALTIME_SEGMENTS_ONLY)
? RealtimeSegmentsMode.EXCLUSIVE
: QueryContexts.DEFAULT_REALTIME_SEGMENTS_MODE;
}
return QueryContexts.DEFAULT_REALTIME_SEGMENTS_MODE;
}

/**
* @deprecated Use {@link #getRealtimeSegmentsMode()} instead.
*/
@Deprecated
public boolean isRealtimeSegmentsOnly()
{
return getBoolean(QueryContexts.REALTIME_SEGMENTS_ONLY, QueryContexts.DEFAULT_REALTIME_SEGMENTS_ONLY);
return getRealtimeSegmentsMode() == RealtimeSegmentsMode.EXCLUSIVE;
}
}
42 changes: 42 additions & 0 deletions processing/src/main/java/org/apache/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,20 @@ public class QueryContexts
public static final String NATIVE_QUERY_SQL_PLANNING_MODE_COUPLED = "COUPLED";
public static final String NATIVE_QUERY_SQL_PLANNING_MODE_DECOUPLED = "DECOUPLED";

/**
* @deprecated Use {@link #REALTIME_SEGMENTS_MODE} instead.
*/
@Deprecated
public static final String REALTIME_SEGMENTS_ONLY = "realtimeSegmentsOnly";
/**
* @deprecated Use {@link #DEFAULT_REALTIME_SEGMENTS_MODE} instead.
*/
@Deprecated
public static final boolean DEFAULT_REALTIME_SEGMENTS_ONLY = false;

public static final String REALTIME_SEGMENTS_MODE = "realtimeSegmentsMode";
public static final RealtimeSegmentsMode DEFAULT_REALTIME_SEGMENTS_MODE = RealtimeSegmentsMode.INCLUDE;

public static final String CTX_PREPLANNED = "prePlanned";
public static final boolean DEFAULT_PREPLANNED = true;

Expand Down Expand Up @@ -233,6 +244,37 @@ public String toString()
}
}

/**
* Classifies segments by whether a historical replica exists
* (see {@link org.apache.druid.client.selector.ServerSelector#isRealtimeSegment()}: a segment is
* "realtime" only when it has realtime servers and zero historical servers).
*/
public enum RealtimeSegmentsMode
{
/** Query all segments, including realtime (default). */
INCLUDE,
/** Query only realtime segments. */
EXCLUSIVE,
/** Skip realtime segments; query only historical. */
EXCLUDE;

@JsonCreator
public static RealtimeSegmentsMode fromString(String str)
{
if (str == null) {
return null;
}
return RealtimeSegmentsMode.valueOf(StringUtils.toUpperCase(str));
}

@Override
@JsonValue
public String toString()
{
return StringUtils.toLowerCase(name());
}
}

private QueryContexts()
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ public void testNonLegacyIsNotLegacyContext()
}

@Test
@SuppressWarnings("deprecation")
public void testIsRealtimeSegmentsOnly()
{
assertFalse(QueryContext.empty().isRealtimeSegmentsOnly());
Expand All @@ -440,6 +441,77 @@ public void testIsRealtimeSegmentsOnly()
);
}

@Test
public void testGetRealtimeSegmentsMode()
{
assertEquals(
QueryContexts.RealtimeSegmentsMode.INCLUDE,
QueryContext.empty().getRealtimeSegmentsMode()
);
assertEquals(
QueryContexts.RealtimeSegmentsMode.EXCLUSIVE,
QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "exclusive"))
.getRealtimeSegmentsMode()
);
assertEquals(
QueryContexts.RealtimeSegmentsMode.EXCLUDE,
QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "exclude"))
.getRealtimeSegmentsMode()
);
assertEquals(
QueryContexts.RealtimeSegmentsMode.INCLUDE,
QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "include"))
.getRealtimeSegmentsMode()
);
}

@Test
public void testGetRealtimeSegmentsModeBackwardCompat()
{
// realtimeSegmentsOnly=true maps to EXCLUSIVE
assertEquals(
QueryContexts.RealtimeSegmentsMode.EXCLUSIVE,
QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_ONLY, true))
.getRealtimeSegmentsMode()
);
// realtimeSegmentsOnly=false maps to INCLUDE (default)
assertEquals(
QueryContexts.RealtimeSegmentsMode.INCLUDE,
QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_ONLY, false))
.getRealtimeSegmentsMode()
);
}

@Test
public void testGetRealtimeSegmentsModeConflictThrows()
{
BadQueryContextException e = assertThrows(
BadQueryContextException.class,
() -> QueryContext.of(ImmutableMap.of(
QueryContexts.REALTIME_SEGMENTS_ONLY, true,
QueryContexts.REALTIME_SEGMENTS_MODE, "exclude"
)).getRealtimeSegmentsMode()
);
assertEquals(
"Cannot set both [realtimeSegmentsMode] and deprecated [realtimeSegmentsOnly]; use [realtimeSegmentsMode] only.",
e.getMessage()
);
}

@Test
public void testGetRealtimeSegmentsModeInvalidValue()
{
BadQueryContextException e = assertThrows(
BadQueryContextException.class,
() -> QueryContext.of(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "badvalue"))
.getRealtimeSegmentsMode()
);
assertEquals(
"Expected key [realtimeSegmentsMode] to be referring to one of the values [INCLUDE,EXCLUSIVE,EXCLUDE] of enum [RealtimeSegmentsMode], but got [badvalue]",
e.getMessage()
);
}

@Test
public void testSerialization() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryContext;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryContexts.RealtimeSegmentsMode;
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
Expand Down Expand Up @@ -444,7 +445,7 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
final Set<SegmentServerSelector> segments = new LinkedHashSet<>();
final SegmentPruner segmentPruner = ev.getSegmentPruner();

boolean isRealtimeSegmentOnly = query.context().isRealtimeSegmentsOnly();
RealtimeSegmentsMode realtimeSegmentsMode = query.context().getRealtimeSegmentsMode();
// Filter unneeded chunks based on partition dimension
for (TimelineObjectHolder<String, ServerSelector> holder : serversLookup) {
final Collection<PartitionChunk<ServerSelector>> filteredChunks;
Expand All @@ -458,8 +459,19 @@ private Set<SegmentServerSelector> computeSegmentsToQuery(
}
for (PartitionChunk<ServerSelector> chunk : filteredChunks) {
ServerSelector server = chunk.getObject();
if (isRealtimeSegmentOnly && !server.isRealtimeSegment()) {
continue; // Skip historical segments when only realtime segments are requested
switch (realtimeSegmentsMode) {
case EXCLUSIVE:
if (!server.isRealtimeSegment()) {
continue;
}
break;
case EXCLUDE:
if (server.isRealtimeSegment()) {
continue;
}
break;
case INCLUDE:
break;
}
final SegmentDescriptor segment = new SegmentDescriptor(
holder.getInterval(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3132,27 +3132,94 @@ public void testRealtimeSegmentsQueryContext()
selector.addServerAndUpdateSegment(new QueryableDruidServer(servers[0], null), dataSegment);
timeline.add(interval, "ver", new SingleElementPartitionChunk<>(selector));

final TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder()
// include (default): historical segment is included
final TimeBoundaryQuery queryInclude = Druids.newTimeBoundaryQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
.context(ImmutableMap.of("realtimeSegmentsOnly", false))
.context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "include"))
.randomQueryId()
.build();

final TimeBoundaryQuery query2 = Druids.newTimeBoundaryQueryBuilder()
// exclusive: only realtime segments — historical segment is excluded
final TimeBoundaryQuery queryExclusive = Druids.newTimeBoundaryQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
.context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "exclusive"))
.randomQueryId()
.build();

// backward compat: realtimeSegmentsOnly=true maps to EXCLUSIVE
final TimeBoundaryQuery queryLegacyTrue = Druids.newTimeBoundaryQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
.context(ImmutableMap.of("realtimeSegmentsOnly", true))
.context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_ONLY, true))
.randomQueryId()
.build();

final ResponseContext responseContext = initializeResponseContext();

getDefaultQueryRunner().run(QueryPlus.wrap(query), responseContext);
getDefaultQueryRunner().run(QueryPlus.wrap(query2), responseContext);
getDefaultQueryRunner().run(QueryPlus.wrap(queryInclude), responseContext);
getDefaultQueryRunner().run(QueryPlus.wrap(queryExclusive), responseContext);
getDefaultQueryRunner().run(QueryPlus.wrap(queryLegacyTrue), responseContext);

final Map<String, Integer> remainingResponseMap = (Map<String, Integer>) responseContext.get(ResponseContext.Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS);
Assert.assertEquals(1, remainingResponseMap.get(queryInclude.getId()).intValue());
Assert.assertEquals(0, remainingResponseMap.get(queryExclusive.getId()).intValue());
Assert.assertEquals(0, remainingResponseMap.get(queryLegacyTrue.getId()).intValue());
}

@Test
public void testRealtimeSegmentsModeExclude()
{
final Interval interval = Intervals.of("2016-01-01/2016-01-02");
final Interval queryInterval = Intervals.of("2016-01-01T14:00:00/2016-01-02T14:00:00");
final DataSegment dataSegment = new DataSegment(
"dataSource",
interval,
"ver",
ImmutableMap.of("type", "hdfs", "path", "/tmp"),
ImmutableList.of("product"),
ImmutableList.of("visited_sum"),
NoneShardSpec.instance(),
9,
12334
);
Comment thread
jtuglu1 marked this conversation as resolved.
Dismissed

// selector backed only by a realtime server — isRealtimeSegment() == true
final DruidServer realtimeServer = new DruidServer(
"rt1", "rt1", null, 10, null, ServerType.REALTIME, DruidServer.DEFAULT_TIER, 0
);
final ServerSelector realtimeSelector = new ServerSelector(
dataSegment,
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()),
HistoricalFilter.IDENTITY_FILTER
);
realtimeSelector.addServerAndUpdateSegment(new QueryableDruidServer(realtimeServer, null), dataSegment);
timeline.add(interval, "ver", new SingleElementPartitionChunk<>(realtimeSelector));

// exclude: realtime-only segment is skipped
final TimeBoundaryQuery queryExclude = Druids.newTimeBoundaryQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
.context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "exclude"))
.randomQueryId()
.build();

// include: realtime-only segment is included
final TimeBoundaryQuery queryInclude = Druids.newTimeBoundaryQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(new MultipleIntervalSegmentSpec(ImmutableList.of(queryInterval)))
.context(ImmutableMap.of(QueryContexts.REALTIME_SEGMENTS_MODE, "include"))
.randomQueryId()
.build();

final ResponseContext responseContext = initializeResponseContext();
getDefaultQueryRunner().run(QueryPlus.wrap(queryExclude), responseContext);
getDefaultQueryRunner().run(QueryPlus.wrap(queryInclude), responseContext);

final Map<String, Integer> remainingResponseMap = (Map<String, Integer>) responseContext.get(ResponseContext.Keys.REMAINING_RESPONSES_FROM_QUERY_SERVERS);
Assert.assertEquals(1, remainingResponseMap.get(query.getId()).intValue());
Assert.assertEquals(0, remainingResponseMap.get(query2.getId()).intValue());
Assert.assertEquals(0, remainingResponseMap.get(queryExclude.getId()).intValue());
Assert.assertEquals(1, remainingResponseMap.get(queryInclude.getId()).intValue());
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ export const QUERY_CONTEXT_COMPLETIONS: JsonCompletionRule[] = [
documentation: 'Enable vectorized query execution',
},
{
value: 'realtimeSegmentsOnly',
documentation: 'Whether to query only realtime segments',
value: 'realtimeSegmentsMode',
documentation: 'Controls whether realtime segments are queried',
},
{
value: 'vectorSize',
Expand Down Expand Up @@ -152,4 +152,12 @@ export const QUERY_CONTEXT_COMPLETIONS: JsonCompletionRule[] = [
{ value: 'force', documentation: 'Force vectorized execution' },
],
},
{
path: '$.realtimeSegmentsMode',
completions: [
{ value: 'include', documentation: 'Query all segments, including realtime (default)' },
{ value: 'exclude', documentation: 'Skip realtime segments; query only historical' },
{ value: 'exclusive', documentation: 'Query only realtime segments' },
],
},
];
Loading