Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.druid.jackson.CommaListJoinDeserializer;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.server.coordination.DruidServerMetadata;
import org.apache.druid.timeline.ClusterGroupTuples;
import org.apache.druid.timeline.CompactionState;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.ShardSpec;
Expand Down Expand Up @@ -54,8 +55,8 @@ private DataSegmentWithLocation(
@JsonProperty("dimensions") @JsonDeserialize(using = CommaListJoinDeserializer.class) @Nullable
List<String> dimensions,
@JsonProperty("metrics") @JsonDeserialize(using = CommaListJoinDeserializer.class) @Nullable List<String> metrics,
@JsonProperty("projections") @JsonDeserialize(using = CommaListJoinDeserializer.class) @Nullable
List<String> projections,
@JsonProperty("projections") @JsonDeserialize(using = CommaListJoinDeserializer.class) @Nullable List<String> projections,
@JsonProperty("clusterGroups") @Nullable ClusterGroupTuples clusterGroups,
@JsonProperty("shardSpec") @Nullable ShardSpec shardSpec,
@JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState,
@JsonProperty("binaryVersion") Integer binaryVersion,
Expand All @@ -74,6 +75,7 @@ private DataSegmentWithLocation(
dimensions,
metrics,
projections,
clusterGroups,
shardSpec,
lastCompactionState,
binaryVersion,
Expand All @@ -98,6 +100,7 @@ public DataSegmentWithLocation(
dataSegment.getDimensions(),
dataSegment.getMetrics(),
dataSegment.getProjections(),
dataSegment.getClusterGroups(),
dataSegment.getShardSpec(),
null,
dataSegment.getBinaryVersion(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.segment.loading;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import org.apache.druid.utils.CollectionUtils;

import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* A {@link PartialLoadSpec} that requests partial loading of a clustered segment's cluster groups. The base class
* carries the common {@code fingerprint} and {@code delegate} wire fields; this subtype adds the resolved
* {@code clusterGroupIndices} (positions into {@link org.apache.druid.timeline.ClusterGroupTuples#getTuples()}) that
* the historical should range-read into the local segment.
*/
@JsonTypeName(PartialClusterGroupLoadSpec.TYPE)
public class PartialClusterGroupLoadSpec extends PartialLoadSpec
{
public static final String TYPE = "partialClusterGroup";

/**
* Builds the raw wire-form {@link Map} representation of a {@link PartialClusterGroupLoadSpec} request. Used by the
* coordinator-side matcher (which doesn't instantiate the typed class because doing so would require plumbing an
* {@link ObjectMapper} through every matcher just to satisfy the constructor's lazy-delegate supplier).
*/
public static Map<String, Object> wireForm(
Map<String, Object> delegate,
List<Integer> clusterGroupIndices,
String fingerprint
)
{
return Map.of(
"type", TYPE,
"delegate", delegate,
"clusterGroupIndices", clusterGroupIndices,
"fingerprint", fingerprint
);
}

private final List<Integer> clusterGroupIndices;

@JsonCreator
public PartialClusterGroupLoadSpec(
@JsonProperty("delegate") Map<String, Object> delegate,
@JsonProperty("clusterGroupIndices") List<Integer> clusterGroupIndices,
@JsonProperty("fingerprint") String fingerprint,
@JacksonInject ObjectMapper jsonMapper
)
{
super(delegate, fingerprint, jsonMapper);
Preconditions.checkArgument(
!CollectionUtils.isNullOrEmpty(clusterGroupIndices),
"clusterGroupIndices must not be null or empty"
);
this.clusterGroupIndices = List.copyOf(clusterGroupIndices);
}

@JsonProperty
public List<Integer> getClusterGroupIndices()
{
return clusterGroupIndices;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
PartialClusterGroupLoadSpec that = (PartialClusterGroupLoadSpec) o;
return Objects.equals(getDelegate(), that.getDelegate())
&& Objects.equals(clusterGroupIndices, that.clusterGroupIndices)
&& Objects.equals(getFingerprint(), that.getFingerprint());
}

@Override
public int hashCode()
{
return Objects.hash(getDelegate(), clusterGroupIndices, getFingerprint());
}

@Override
public String toString()
{
return "PartialClusterGroupLoadSpec{" +
"delegate=" + getDelegate() +
", clusterGroupIndices=" + clusterGroupIndices +
", fingerprint=" + getFingerprint() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@
* A {@link PartialLoadSpec} that requests partial loading of a segment's projections. The base class carries the
* common {@code fingerprint} and {@code delegate} wire fields; this subtype adds the resolved projection names that
* the historical should range-read into the local segment.
* <p>
* The historical-side partial-load path inspects this wrapper at mount time. Until that path exists, the base
* class's default {@link #loadSegment} performs a full download via the inner delegate, and the announcement layer
* stamps the fingerprint + full size on the response so the coordinator's reconciler counts the replica as a
* satisfying full-fallback rather than re-queuing the load.
*/
@JsonTypeName(PartialProjectionLoadSpec.TYPE)
public class PartialProjectionLoadSpec extends PartialLoadSpec
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.timeline;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
* Typed clustering tuples carried on {@link DataSegment#getClusterGroups()} for clustered base-table segments. Each
* entry in {@link #tuples()} is one cluster group's clustering-column values, in the order declared by
* {@link #clusteringColumns()}. Optionally carries the clustering {@link VirtualColumns} when the segment was
* clustered on a virtual-column expression, so that matching for things like partial load rules and query time
* segment pruning can make use of this information.
* <p>
* The compact constructor validates {@code clusteringColumns}, interns the virtual columns through
* {@link DataSegment#virtualColumnInterner()}, and canonicalizes every tuple value to its declared
* {@link ColumnType} via {@link #coerceValue} so {@link Object#equals} works across the JSON/programmatic boundary.
*/
public record ClusterGroupTuples(
@JsonProperty("clusteringColumns") RowSignature clusteringColumns,
@JsonProperty("virtualColumns") @JsonInclude(JsonInclude.Include.NON_EMPTY) VirtualColumns virtualColumns,
@JsonProperty("tuples") List<List<Object>> tuples
)
{
@JsonCreator
public ClusterGroupTuples
{
if (clusteringColumns == null || clusteringColumns.size() == 0) {
throw InvalidInput.exception("clusteringColumns must not be null or empty");
}
virtualColumns = internVirtualColumns(virtualColumns);
tuples = canonicalizeTuples(clusteringColumns, tuples);
}

/**
* Convenience constructor for callers that don't carry clustering virtual columns. Equivalent to passing
* {@code null} for the virtual columns argument.
*/
public ClusterGroupTuples(RowSignature clusteringColumns, @Nullable List<List<Object>> tuples)
{
this(clusteringColumns, null, tuples);
}

/**
* Canonicalize {@code raw} for the declared clustering column {@code type}. This is intentionally narrow: its job
* is to unbreak Jackson's number-type narrowing (e.g., an Integer arriving for a LONG column gets normalized to a
* Long), not to do general value coercion. Rules:
* <ul>
* <li>{@code null} → {@code null}.</li>
* <li>STRING: {@link Objects#toString} on any non-null value (stringifying numeric operator input is benign).</li>
* <li>LONG / DOUBLE / FLOAT: require {@link Number}; return via the matching primitive accessor. Strings,
* Booleans, etc. are rejected — typed rule authoring should produce typed JSON, and silently parsing
* strings risks accepting operator typos that change the matched set.</li>
* </ul>
* Unsupported column types (anything that isn't STRING/LONG/DOUBLE/FLOAT) are rejected.
* <p>
* Used by:
* <ul>
* <li>{@link ClusterGroupTuples}'s compact constructor to canonicalize segment-side tuples (strict).</li>
* <li>Operator-supplied rule tuples in future cluster-group partial-load matchers, which can catch the
* exception and treat it as "no match for this segment" rather than a hard failure.</li>
* </ul>
*/
@Nullable
public static Object coerceValue(String columnName, ColumnType type, @Nullable Object raw)
{
if (raw == null) {
return null;
}
if (ColumnType.STRING.equals(type)) {
return raw instanceof String ? raw : Objects.toString(raw);
}
if (ColumnType.LONG.equals(type)) {
if (raw instanceof Number) {
return ((Number) raw).longValue();
}
throw cannotCoerce(raw, columnName, "LONG");
}
if (ColumnType.DOUBLE.equals(type)) {
if (raw instanceof Number) {
return ((Number) raw).doubleValue();
}
throw cannotCoerce(raw, columnName, "DOUBLE");
}
if (ColumnType.FLOAT.equals(type)) {
if (raw instanceof Number) {
return ((Number) raw).floatValue();
}
throw cannotCoerce(raw, columnName, "FLOAT");
}
throw InvalidInput.exception(
"Unsupported clustering column type [%s] for column [%s]; supported types are STRING, LONG, DOUBLE, FLOAT",
type,
columnName
);
}

private static VirtualColumns internVirtualColumns(@Nullable VirtualColumns virtualColumns)
{
if (virtualColumns == null || virtualColumns.isEmpty()) {
return VirtualColumns.EMPTY;
}
return VirtualColumns.create(
Arrays.stream(virtualColumns.getVirtualColumns())
.map(DataSegment.virtualColumnInterner()::intern)
.toList()
);
}

private static List<List<Object>> canonicalizeTuples(
RowSignature clusteringColumns,
@Nullable List<List<Object>> tuples
)
{
final List<List<Object>> source = tuples == null ? Collections.emptyList() : tuples;
final int numCols = clusteringColumns.size();
final List<List<Object>> coerced = new ArrayList<>(source.size());
for (int t = 0; t < source.size(); t++) {
final List<Object> tuple = source.get(t);
if (tuple == null || tuple.size() != numCols) {
throw InvalidInput.exception(
"tuple[%s] has size [%s] but clusteringColumns size is [%s]",
t,
tuple == null ? "null" : tuple.size(),
numCols
);
}
final Object[] out = new Object[numCols];
for (int i = 0; i < numCols; i++) {
final String name = clusteringColumns.getColumnName(i);
final ColumnType type = clusteringColumns.getColumnType(i).orElseThrow(
() -> InvalidInput.exception("clusteringColumn[%s] has no declared type", name)
);
out[i] = coerceValue(name, type, tuple.get(i));
}
coerced.add(Collections.unmodifiableList(Arrays.asList(out)));
}
return Collections.unmodifiableList(coerced);
}

private static DruidException cannotCoerce(Object raw, String columnName, String targetType)
{
return InvalidInput.exception(
"Cannot coerce value [%s] of type [%s] for column [%s] to %s",
raw,
raw.getClass().getName(),
columnName,
targetType
);
}
}
Loading
Loading