Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deserialize dimensions in group by queries to their respective types when reading from their serialized format #16511

Merged
merged 11 commits into from
Jun 14, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.segment.serde.ComplexMetrics;
import org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
import org.apache.druid.server.ResourceIdPopulatingQueryRunner;
import org.apache.druid.timeline.SegmentId;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -433,7 +434,8 @@ public void setup()
String queryName = schemaQuery[1];

schemaInfo = GeneratorBasicSchemas.SCHEMA_MAP.get(schemaName);
query = SCHEMA_QUERY_MAP.get(schemaName).get(queryName);
query = (GroupByQuery) ResourceIdPopulatingQueryRunner.populateResourceId(SCHEMA_QUERY_MAP.get(schemaName)
.get(queryName));

generator = new DataGenerator(
schemaInfo.getColumnSchemas(),
Expand Down Expand Up @@ -762,12 +764,12 @@ public void queryMultiQueryableIndexWithSerde(Blackhole blackhole, QueryableInde
//noinspection unchecked
QueryRunner<ResultRow> theRunner = new FinalizeResultsQueryRunner<>(
toolChest.mergeResults(
new SerializingQueryRunner<>(
new DefaultObjectMapper(new SmileFactory(), null),
new SerializingQueryRunner(
toolChest.decorateObjectMapper(new DefaultObjectMapper(new SmileFactory(), null), query),
ResultRow.class,
toolChest.mergeResults(
(queryPlus, responseContext) -> toolChest.mergeResults(
factory.mergeRunners(state.executorService, makeMultiRunners(state))
)
).run(QueryPlus.wrap(ResourceIdPopulatingQueryRunner.populateResourceId(query)))
)
),
(QueryToolChest) toolChest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,22 @@
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.groupby.ResultRow;

public class SerializingQueryRunner<T> implements QueryRunner<T>
public class SerializingQueryRunner implements QueryRunner<ResultRow>
{
static {
NullHandling.initializeForTests();
}

private final ObjectMapper smileMapper;
private final QueryRunner<T> baseRunner;
private final Class<T> clazz;
private final QueryRunner<ResultRow> baseRunner;
private final Class<ResultRow> clazz;

public SerializingQueryRunner(
ObjectMapper smileMapper,
Class<T> clazz,
QueryRunner<T> baseRunner
Class<ResultRow> clazz,
QueryRunner<ResultRow> baseRunner
)
{
this.smileMapper = smileMapper;
Expand All @@ -51,16 +52,16 @@ public SerializingQueryRunner(
}

@Override
public Sequence<T> run(
final QueryPlus<T> queryPlus,
public Sequence<ResultRow> run(
final QueryPlus<ResultRow> queryPlus,
final ResponseContext responseContext
)
{
return Sequences.map(
baseRunner.run(queryPlus, responseContext),
input -> {
try {
return JacksonUtils.readValue(smileMapper, smileMapper.writeValueAsBytes(input), clazz);
return JacksonUtils.readValue(smileMapper, smileMapper.writeValueAsBytes(input.getArray()), clazz);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package org.apache.druid.query.aggregation;

import it.unimi.dsi.fastutil.Hash;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
Expand Down Expand Up @@ -106,4 +110,27 @@ public byte[] toBytes(@Nullable SerializablePairLongDouble inPair)
}
};
}

@Override
public TypeStrategy<SerializablePairLongDouble> getTypeStrategy()
{
return new ObjectStrategyComplexTypeStrategy<>(
getObjectStrategy(),
ColumnType.ofComplex(getTypeName()),
new Hash.Strategy<SerializablePairLongDouble>()
{
@Override
public int hashCode(SerializablePairLongDouble o)
{
return o.hashCode();
}

@Override
public boolean equals(SerializablePairLongDouble a, SerializablePairLongDouble b)
{
return a.equals(b);
}
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package org.apache.druid.query.aggregation;

import it.unimi.dsi.fastutil.Hash;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
Expand Down Expand Up @@ -107,4 +111,27 @@ public byte[] toBytes(@Nullable SerializablePairLongFloat inPair)
}
};
}

@Override
public TypeStrategy<SerializablePairLongFloat> getTypeStrategy()
{
return new ObjectStrategyComplexTypeStrategy<>(
getObjectStrategy(),
ColumnType.ofComplex(getTypeName()),
new Hash.Strategy<SerializablePairLongFloat>()
{
@Override
public int hashCode(SerializablePairLongFloat o)
{
return o.hashCode();
}

@Override
public boolean equals(SerializablePairLongFloat a, SerializablePairLongFloat b)
{
return a.equals(b);
}
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@

package org.apache.druid.query.aggregation;

import it.unimi.dsi.fastutil.Hash;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.cell.NativeClearedByteBufferProvider;
import org.apache.druid.segment.writeout.SegmentWriteOutMedium;
Expand Down Expand Up @@ -106,4 +110,27 @@ public byte[] toBytes(@Nullable SerializablePairLongLong inPair)
}
};
}

@Override
public TypeStrategy<SerializablePairLongLong> getTypeStrategy()
{
return new ObjectStrategyComplexTypeStrategy<>(
getObjectStrategy(),
ColumnType.ofComplex(getTypeName()),
new Hash.Strategy<SerializablePairLongLong>()
{
@Override
public int hashCode(SerializablePairLongLong o)
{
return o.hashCode();
}

@Override
public boolean equals(SerializablePairLongLong a, SerializablePairLongLong b)
{
return a.equals(b);
}
}
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

package org.apache.druid.query.aggregation;

import it.unimi.dsi.fastutil.Hash;
import org.apache.druid.collections.SerializablePair;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.segment.GenericColumnSerializer;
import org.apache.druid.segment.column.ColumnBuilder;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.ObjectStrategyComplexTypeStrategy;
import org.apache.druid.segment.column.TypeStrategy;
import org.apache.druid.segment.data.GenericIndexed;
import org.apache.druid.segment.data.ObjectStrategy;
import org.apache.druid.segment.serde.ComplexColumnPartSupplier;
Expand Down Expand Up @@ -130,7 +134,7 @@ public void deserializeColumn(ByteBuffer buffer, ColumnBuilder columnBuilder)
}

@Override
public ObjectStrategy<?> getObjectStrategy()
public ObjectStrategy<SerializablePairLongString> getObjectStrategy()
{
return new ObjectStrategy<SerializablePairLongString>()
{
Expand Down Expand Up @@ -165,6 +169,29 @@ public byte[] toBytes(SerializablePairLongString val)
};
}

@Override
public TypeStrategy<SerializablePairLongString> getTypeStrategy()
{
return new ObjectStrategyComplexTypeStrategy<>(
getObjectStrategy(),
ColumnType.ofComplex(getTypeName()),
new Hash.Strategy<SerializablePairLongString>()
{
@Override
public int hashCode(SerializablePairLongString o)
{
return o.hashCode();
}

@Override
public boolean equals(SerializablePairLongString a, SerializablePairLongString b)
{
return a.equals(b);
}
}
);
}

@Override
public GenericColumnSerializer<?> getSerializer(SegmentWriteOutMedium segmentWriteOutMedium, String column)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ public boolean isVectorize()
return vectorize;
}

@SuppressWarnings("unused")
public boolean isIntermediateResultAsMapCompat()
{
return intermediateResultAsMapCompat;
Expand Down
Loading
Loading