Skip to content

Commit

Permalink
IGNITE-21732: Sql. Split TableRowConverterImpl on two different imple…
Browse files Browse the repository at this point in the history
…mentations (#3687)
  • Loading branch information
AMashenkov committed May 2, 2024
1 parent cb34325 commit 78e1ae6
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.BitSet;
import java.util.UUID;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.InvalidTypeException;
import org.apache.ignite.internal.schema.SchemaDescriptor;
Expand All @@ -38,6 +42,7 @@
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.type.NativeTypeSpec;
import org.apache.ignite.sql.ColumnType;
import org.jetbrains.annotations.Nullable;

/**
* Adapter for row of older schema.
Expand Down Expand Up @@ -440,13 +445,56 @@ public Instant timestampValue(int colIdx) throws InvalidTypeException {
return mappedId < 0 ? (Instant) column.defaultValue() : super.timestampValue(mappedId);
}

@Override
public boolean hasNullValue(int colIdx) {
int mappedId = mapColumn(colIdx);

return mappedId < 0
? mapper.mappedColumn(colIdx).defaultValue() == null
: super.hasNullValue(mappedId);
}


@Override
public int elementCount() {
return newBinaryTupleSchema.elementCount();
}

/** {@inheritDoc} */
@Override
public BinaryTuple binaryTuple() {
public @Nullable BinaryTuple binaryTuple() {
// Underlying binary tuple can not be used directly.
return null;
}

/** {@inheritDoc} */
@Override
public ByteBuffer byteBuffer() {
// TODO: IGNITE-22156 Replace inheritance with delegation and drop this code.
int size = newBinaryTupleSchema.elementCount();
var builder = new BinaryTupleBuilder(size);

for (int col = 0; col < size; col++) {
Element element = newBinaryTupleSchema.element(col);

BinaryRowConverter.appendValue(builder, element, value(col));
}

return new BinaryTuple(size, builder.build()).byteBuffer();
}

/** {@inheritDoc} */
@Override
public int tupleSliceLength() {
throw new UnsupportedOperationException("Underlying binary can't be accessed directly.");
}

/** {@inheritDoc} */
@Override
public ByteBuffer tupleSlice() {
throw new UnsupportedOperationException("Underlying binary can't be accessed directly.");
}

private void ensureTypeConversionAllowed(ColumnType from, ColumnType to) throws InvalidTypeException {
if (!isSupportedColumnTypeChange(from, to)) {
throw new SchemaException(format("Type conversion is not allowed: {} -> {}", from, to));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.sql.engine.exec;

import java.util.BitSet;
import org.apache.ignite.internal.lang.InternalTuple;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.util.FieldDeserializingProjectedTuple;
import org.apache.ignite.internal.sql.engine.util.FormatAwareProjectedTuple;

/**
* Converts rows to execution engine representation.
*/
public class ProjectedTableRowConverterImpl extends TableRowConverterImpl {
/**
* Mapping of required columns to their indexes in physical schema.
*/
private final int[] requiredColumnsMapping;

private final BinaryTupleSchema fullTupleSchema;

/** Constructor. */
ProjectedTableRowConverterImpl(
SchemaRegistry schemaRegistry,
BinaryTupleSchema fullTupleSchema,
SchemaDescriptor schemaDescriptor,
BitSet requiredColumns
) {
super(schemaRegistry, schemaDescriptor);

this.fullTupleSchema = fullTupleSchema;

int size = requiredColumns.cardinality();

requiredColumnsMapping = new int[size];

int requiredIndex = 0;
for (Column column : schemaDescriptor.columns()) {
if (requiredColumns.get(column.positionInRow())) {
requiredColumnsMapping[requiredIndex++] = column.positionInRow();
}
}
}

@Override
public <RowT> RowT toRow(ExecutionContext<RowT> ectx, BinaryRow tableRow, RowFactory<RowT> factory) {
InternalTuple tuple;
if (tableRow.schemaVersion() == schemaDescriptor.version()) {
BinaryTuple tableTuple = new BinaryTuple(schemaDescriptor.length(), tableRow.tupleSlice());

tuple = new FormatAwareProjectedTuple(tableTuple, requiredColumnsMapping);
} else {
InternalTuple tableTuple = schemaRegistry.resolve(tableRow, schemaDescriptor);

tuple = new FieldDeserializingProjectedTuple(
fullTupleSchema,
tableTuple,
requiredColumnsMapping
);
}

return factory.create(tuple);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ public TableRowConverterFactoryImpl(

fullRowConverter = new TableRowConverterImpl(
schemaRegistry,
fullTupleSchema,
schemaDescriptor,
null
schemaDescriptor
);
}

Expand All @@ -66,7 +64,7 @@ public TableRowConverter create(@Nullable BitSet requiredColumns) {
return fullRowConverter;
}

return new TableRowConverterImpl(
return new ProjectedTableRowConverterImpl(
schemaRegistry,
fullTupleSchema,
schemaDescriptor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,62 +17,29 @@

package org.apache.ignite.internal.sql.engine.exec;

import java.util.BitSet;
import org.apache.ignite.internal.lang.InternalTuple;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.sql.engine.util.FieldDeserializingProjectedTuple;
import org.apache.ignite.internal.sql.engine.util.FormatAwareProjectedTuple;
import org.jetbrains.annotations.Nullable;

/**
* Converts rows to execution engine representation.
*/
public class TableRowConverterImpl implements TableRowConverter {

private final SchemaRegistry schemaRegistry;
protected final SchemaRegistry schemaRegistry;

private final SchemaDescriptor schemaDescriptor;

private final BinaryTupleSchema fullTupleSchema;

/**
* Mapping of required columns to their indexes in physical schema.
*/
private final int[] requiredColumnsMapping;

private final boolean skipTrimming;
protected final SchemaDescriptor schemaDescriptor;

/** Constructor. */
TableRowConverterImpl(
SchemaRegistry schemaRegistry,
BinaryTupleSchema fullTupleSchema,
SchemaDescriptor schemaDescriptor,
@Nullable BitSet requiredColumns
SchemaDescriptor schemaDescriptor
) {
this.schemaRegistry = schemaRegistry;
this.schemaDescriptor = schemaDescriptor;
this.fullTupleSchema = fullTupleSchema;

this.skipTrimming = requiredColumns == null;

int size = requiredColumns == null
? schemaDescriptor.length()
: requiredColumns.cardinality();

requiredColumnsMapping = new int[size];

int requiredIndex = 0;
for (Column column : schemaDescriptor.columns()) {
if (requiredColumns == null || requiredColumns.get(column.positionInRow())) {
requiredColumnsMapping[requiredIndex++] = column.positionInRow();
}
}
}

/** {@inheritDoc} */
Expand All @@ -99,20 +66,11 @@ public <RowT> RowT toRow(
RowHandler.RowFactory<RowT> factory
) {
InternalTuple tuple;
if (tableRow.schemaVersion() == schemaDescriptor.version()) {
InternalTuple tableTuple = new BinaryTuple(schemaDescriptor.length(), tableRow.tupleSlice());

tuple = skipTrimming
? tableTuple
: new FormatAwareProjectedTuple(tableTuple, requiredColumnsMapping);
if (tableRow.schemaVersion() == schemaDescriptor.version()) {
tuple = new BinaryTuple(schemaDescriptor.length(), tableRow.tupleSlice());
} else {
InternalTuple tableTuple = schemaRegistry.resolve(tableRow, schemaDescriptor);

tuple = new FieldDeserializingProjectedTuple(
fullTupleSchema,
tableTuple,
requiredColumnsMapping
);
tuple = schemaRegistry.resolve(tableRow, schemaDescriptor);
}

return factory.create(tuple);
Expand Down

0 comments on commit 78e1ae6

Please sign in to comment.