Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-21732: Sql. Split TableRowConverterImpl on two different implementations #3687

Merged
merged 6 commits into from
May 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,18 @@

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.BitSet;
import java.util.UUID;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.schema.BinaryRowConverter;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.InvalidTypeException;
import org.apache.ignite.internal.schema.SchemaDescriptor;
Expand All @@ -38,6 +42,7 @@
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.type.NativeTypeSpec;
import org.apache.ignite.sql.ColumnType;
import org.jetbrains.annotations.Nullable;

/**
* Adapter for row of older schema.
Expand Down Expand Up @@ -440,13 +445,56 @@ public Instant timestampValue(int colIdx) throws InvalidTypeException {
return mappedId < 0 ? (Instant) column.defaultValue() : super.timestampValue(mappedId);
}

@Override
public boolean hasNullValue(int colIdx) {
int mappedId = mapColumn(colIdx);

return mappedId < 0
? mapper.mappedColumn(colIdx).defaultValue() == null
: super.hasNullValue(mappedId);
}


@Override
public int elementCount() {
return newBinaryTupleSchema.elementCount();
}

/** {@inheritDoc} */
@Override
public BinaryTuple binaryTuple() {
public @Nullable BinaryTuple binaryTuple() {
// Underlying binary tuple can not be used directly.
return null;
}

/** {@inheritDoc} */
@Override
public ByteBuffer byteBuffer() {
// TODO: IGNITE-22156 Replace inheritance with delegation and drop this code.
int size = newBinaryTupleSchema.elementCount();
var builder = new BinaryTupleBuilder(size);

for (int col = 0; col < size; col++) {
Element element = newBinaryTupleSchema.element(col);

BinaryRowConverter.appendValue(builder, element, value(col));
}

return new BinaryTuple(size, builder.build()).byteBuffer();
}

/** {@inheritDoc} */
@Override
public int tupleSliceLength() {
throw new UnsupportedOperationException("Underlying binary can't be accessed directly.");
}

/** {@inheritDoc} */
@Override
public ByteBuffer tupleSlice() {
throw new UnsupportedOperationException("Underlying binary can't be accessed directly.");
}

private void ensureTypeConversionAllowed(ColumnType from, ColumnType to) throws InvalidTypeException {
if (!isSupportedColumnTypeChange(from, to)) {
throw new SchemaException(format("Type conversion is not allowed: {} -> {}", from, to));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.sql.engine.exec;

import java.util.BitSet;
import org.apache.ignite.internal.lang.InternalTuple;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.util.FieldDeserializingProjectedTuple;
import org.apache.ignite.internal.sql.engine.util.FormatAwareProjectedTuple;

/**
* Converts rows to execution engine representation.
*/
public class ProjectedTableRowConverterImpl extends TableRowConverterImpl {
/**
* Mapping of required columns to their indexes in physical schema.
*/
private final int[] requiredColumnsMapping;

private final BinaryTupleSchema fullTupleSchema;

/** Constructor. */
ProjectedTableRowConverterImpl(
SchemaRegistry schemaRegistry,
BinaryTupleSchema fullTupleSchema,
SchemaDescriptor schemaDescriptor,
BitSet requiredColumns
) {
super(schemaRegistry, schemaDescriptor);

this.fullTupleSchema = fullTupleSchema;

int size = requiredColumns.cardinality();

requiredColumnsMapping = new int[size];

int requiredIndex = 0;
for (Column column : schemaDescriptor.columns()) {
if (requiredColumns.get(column.positionInRow())) {
requiredColumnsMapping[requiredIndex++] = column.positionInRow();
}
}
}

@Override
public <RowT> RowT toRow(ExecutionContext<RowT> ectx, BinaryRow tableRow, RowFactory<RowT> factory) {
InternalTuple tuple;
if (tableRow.schemaVersion() == schemaDescriptor.version()) {
BinaryTuple tableTuple = new BinaryTuple(schemaDescriptor.length(), tableRow.tupleSlice());

tuple = new FormatAwareProjectedTuple(tableTuple, requiredColumnsMapping);
} else {
InternalTuple tableTuple = schemaRegistry.resolve(tableRow, schemaDescriptor);

tuple = new FieldDeserializingProjectedTuple(
fullTupleSchema,
tableTuple,
requiredColumnsMapping
);
}

return factory.create(tuple);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ public TableRowConverterFactoryImpl(

fullRowConverter = new TableRowConverterImpl(
schemaRegistry,
fullTupleSchema,
schemaDescriptor,
null
schemaDescriptor
);
}

Expand All @@ -66,7 +64,7 @@ public TableRowConverter create(@Nullable BitSet requiredColumns) {
return fullRowConverter;
}

return new TableRowConverterImpl(
return new ProjectedTableRowConverterImpl(
schemaRegistry,
fullTupleSchema,
schemaDescriptor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,62 +17,29 @@

package org.apache.ignite.internal.sql.engine.exec;

import java.util.BitSet;
import org.apache.ignite.internal.lang.InternalTuple;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.BinaryTupleSchema;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.sql.engine.util.FieldDeserializingProjectedTuple;
import org.apache.ignite.internal.sql.engine.util.FormatAwareProjectedTuple;
import org.jetbrains.annotations.Nullable;

/**
* Converts rows to execution engine representation.
*/
public class TableRowConverterImpl implements TableRowConverter {

private final SchemaRegistry schemaRegistry;
protected final SchemaRegistry schemaRegistry;

private final SchemaDescriptor schemaDescriptor;

private final BinaryTupleSchema fullTupleSchema;

/**
* Mapping of required columns to their indexes in physical schema.
*/
private final int[] requiredColumnsMapping;

private final boolean skipTrimming;
protected final SchemaDescriptor schemaDescriptor;

/** Constructor. */
TableRowConverterImpl(
SchemaRegistry schemaRegistry,
BinaryTupleSchema fullTupleSchema,
SchemaDescriptor schemaDescriptor,
@Nullable BitSet requiredColumns
SchemaDescriptor schemaDescriptor
) {
this.schemaRegistry = schemaRegistry;
this.schemaDescriptor = schemaDescriptor;
this.fullTupleSchema = fullTupleSchema;

this.skipTrimming = requiredColumns == null;

int size = requiredColumns == null
? schemaDescriptor.length()
: requiredColumns.cardinality();

requiredColumnsMapping = new int[size];

int requiredIndex = 0;
for (Column column : schemaDescriptor.columns()) {
if (requiredColumns == null || requiredColumns.get(column.positionInRow())) {
requiredColumnsMapping[requiredIndex++] = column.positionInRow();
}
}
}

/** {@inheritDoc} */
Expand All @@ -99,20 +66,11 @@ public <RowT> RowT toRow(
RowHandler.RowFactory<RowT> factory
) {
InternalTuple tuple;
if (tableRow.schemaVersion() == schemaDescriptor.version()) {
InternalTuple tableTuple = new BinaryTuple(schemaDescriptor.length(), tableRow.tupleSlice());

tuple = skipTrimming
? tableTuple
: new FormatAwareProjectedTuple(tableTuple, requiredColumnsMapping);
if (tableRow.schemaVersion() == schemaDescriptor.version()) {
tuple = new BinaryTuple(schemaDescriptor.length(), tableRow.tupleSlice());
} else {
InternalTuple tableTuple = schemaRegistry.resolve(tableRow, schemaDescriptor);

tuple = new FieldDeserializingProjectedTuple(
fullTupleSchema,
tableTuple,
requiredColumnsMapping
);
tuple = schemaRegistry.resolve(tableRow, schemaDescriptor);
}

return factory.create(tuple);
Expand Down