Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.flink.table.types.KeyValueDataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.utils.LogicalTypeGeneralization;
import org.apache.flink.table.types.utils.TypeConversions;

Expand Down Expand Up @@ -202,13 +201,10 @@ private Optional<ResolvedExpression> convertRowToExpectedType(
ResolvedExpression sourceExpression,
FieldsDataType targetDataType,
ExpressionResolver.PostResolverFactory postResolverFactory) {
DataType[] targetDataTypes = ((RowType) targetDataType.getLogicalType()).getFieldNames()
.stream()
.map(name -> targetDataType.getFieldDataTypes().get(name))
.toArray(DataType[]::new);
List<DataType> targetDataTypes = targetDataType.getChildren();
List<ResolvedExpression> resolvedChildren = sourceExpression.getResolvedChildren();

if (resolvedChildren.size() != targetDataTypes.length) {
if (resolvedChildren.size() != targetDataTypes.size()) {
return Optional.empty();
}

Expand All @@ -217,13 +213,13 @@ private Optional<ResolvedExpression> convertRowToExpectedType(
boolean typesMatch = resolvedChildren.get(i)
.getOutputDataType()
.getLogicalType()
.equals(targetDataTypes[i].getLogicalType());
.equals(targetDataTypes.get(i).getLogicalType());
if (typesMatch) {
castedChildren[i] = resolvedChildren.get(i);
}

ResolvedExpression child = resolvedChildren.get(i);
DataType targetChildDataType = targetDataTypes[i];
DataType targetChildDataType = targetDataTypes.get(i);

Optional<ResolvedExpression> castedChild = convertToExpectedType(
child,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,9 @@ public static DataType ROW(Field... fields) {
.map(f -> Preconditions.checkNotNull(f, "Field definition must not be null."))
.map(f -> new RowField(f.name, f.dataType.getLogicalType(), f.description))
.collect(Collectors.toList());
final Map<String, DataType> fieldDataTypes = Stream.of(fields)
.collect(Collectors.toMap(f -> f.name, f -> f.dataType));
final List<DataType> fieldDataTypes = Stream.of(fields)
.map(f -> f.dataType)
.collect(Collectors.toList());
return new FieldsDataType(new RowType(logicalFields), fieldDataTypes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.DistinctType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;

import javax.annotation.Nullable;

import java.io.Serializable;

import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getFieldCount;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;

/**
* Base interface of an internal data structure representing data of {@link ArrayType}.
*
Expand Down Expand Up @@ -163,7 +172,9 @@ public interface ArrayData {
* @param pos position of the element to return
* @param elementType the element type of the array
* @return the element object at the specified position in this array data
* @deprecated Use {@link #createElementGetter(LogicalType)} for avoiding logical types during runtime.
*/
@Deprecated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you create an issue to use createElementGetter to replace get utility and remove this method? I think we should provide a clean interface before release.

static Object get(ArrayData array, int pos, LogicalType elementType) {
if (array.isNullAt(pos)) {
return null;
Expand Down Expand Up @@ -215,4 +226,103 @@ static Object get(ArrayData array, int pos, LogicalType elementType) {
throw new UnsupportedOperationException("Unsupported type: " + elementType);
}
}

/**
* Creates an accessor for getting elements in an internal array data structure at the
* given position.
*
* @param elementType the element type of the array
*/
static ElementGetter createElementGetter(LogicalType elementType) {
final ElementGetter elementGetter;
// ordered by type root definition
switch (elementType.getTypeRoot()) {
case CHAR:
case VARCHAR:
elementGetter = ArrayData::getString;
break;
case BOOLEAN:
elementGetter = ArrayData::getBoolean;
break;
case BINARY:
case VARBINARY:
elementGetter = ArrayData::getBinary;
break;
case DECIMAL:
final int decimalPrecision = getPrecision(elementType);
final int decimalScale = getScale(elementType);
elementGetter = (array, pos) -> array.getDecimal(pos, decimalPrecision, decimalScale);
break;
case TINYINT:
elementGetter = ArrayData::getByte;
break;
case SMALLINT:
elementGetter = ArrayData::getShort;
break;
case INTEGER:
case DATE:
case TIME_WITHOUT_TIME_ZONE:
case INTERVAL_YEAR_MONTH:
elementGetter = ArrayData::getInt;
break;
case BIGINT:
case INTERVAL_DAY_TIME:
elementGetter = ArrayData::getLong;
break;
case FLOAT:
elementGetter = ArrayData::getFloat;
break;
case DOUBLE:
elementGetter = ArrayData::getDouble;
break;
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int timestampPrecision = getPrecision(elementType);
elementGetter = (array, pos) -> array.getTimestamp(pos, timestampPrecision);
break;
case TIMESTAMP_WITH_TIME_ZONE:
throw new UnsupportedOperationException();
case ARRAY:
elementGetter = ArrayData::getArray;
break;
case MULTISET:
case MAP:
elementGetter = ArrayData::getMap;
break;
case ROW:
case STRUCTURED_TYPE:
final int rowFieldCount = getFieldCount(elementType);
elementGetter = (array, pos) -> array.getRow(pos, rowFieldCount);
break;
case DISTINCT_TYPE:
elementGetter = createElementGetter(((DistinctType) elementType).getSourceType());
break;
case RAW:
elementGetter = ArrayData::getRawValue;
break;
case NULL:
case SYMBOL:
case UNRESOLVED:
default:
throw new IllegalArgumentException();
}
if (!elementType.isNullable()) {
return elementGetter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the nullable attribute reliable now ?

Copy link
Member

@wuchong wuchong May 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I have to say it's tricky to guarantee the NOT NULL attributes in runtime, especially in streaming. AFAIK, StreamExecExpand will produce null values even if the field is NOT NULL.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of locations ignore nullability. We should gradually fix this. By default a type is nullable, so the check is performed in most of the cases. The implementation of the method is correct (given its input parameters), if there is a problem it should be the callers task.

Copy link
Member

@wuchong wuchong May 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think we should be cautious here. As you said, most of the cases are nullable, so we don't get much performance improvement from this. However, it may occure unexpected bugs if it is called in planner. We can add a TODO on this. When we fix all the nullable problem, we can update this without breaking compatibility.

if there is a problem it should be the callers task.

The problem is planner will also call this method, and I'm sure there are cases that null values exist in NOT NULL fields, then an exception will happen. This will be a bug then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no strong guarantee that our runtime would not generate null value for a not null field based on current codebase, and if there was, the bug would probably hard to trace out.

How much performance can we gain with the type nullable short-cut check ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So far only the converters will call this method. Let's fix upstream bugs gradually, once code is updated to this method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not only about performance but also correctness. A caller assumes the behavior when passing a nullable/not nullable type. It is harder to trace where a null comes from than a method that throws a hard exception when it encounters an unexpected null.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nullble problems are known issues but can't be fixed easily, they may need major rework (e.g. Expand node). I don't think we should let the queries failed.

If we want to keep this logic, then I would suggest only use this method in connectors, because we guarantee the NOT NULL constraint when writing values to sink (table.exec.sink.not-null-enforcer). But in planner, we shouldn't use this method now, because the NOT NULL information of field type is not trusted.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can add this information to the issue that removes the RowData.get method. So far the new method is only used in converters.

}
return (array, pos) -> {
if (array.isNullAt(pos)) {
return null;
}
return elementGetter.getElementOrNull(array, pos);
};
}

/**
* Accessor for getting the elements of an array during runtime.
*
* @see #createElementGetter(LogicalType)
*/
interface ElementGetter extends Serializable {
@Nullable Object getElementOrNull(ArrayData array, int pos);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@

import org.apache.commons.lang3.ArrayUtils;

import java.util.Arrays;
import java.util.Objects;

/**
* An internal data structure representing data of {@link ArrayType}.
*
Expand Down Expand Up @@ -132,6 +135,27 @@ public boolean isNullAt(int pos) {
return !isPrimitiveArray && ((Object[]) array)[pos] == null;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
GenericArrayData that = (GenericArrayData) o;
return size == that.size &&
isPrimitiveArray == that.isPrimitiveArray &&
Objects.deepEquals(array, that.array);
}

@Override
public int hashCode() {
int result = Objects.hash(size, isPrimitiveArray);
result = 31 * result + Arrays.deepHashCode(new Object[]{array});
return result;
}

// ------------------------------------------------------------------------------------------
// Read-only accessor methods
// ------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.table.types.logical.MultisetType;

import java.util.Map;
import java.util.Objects;

/**
* An internal data structure representing data of {@link MapType} or {@link MultisetType}.
Expand Down Expand Up @@ -74,5 +75,52 @@ public ArrayData valueArray() {
Object[] values = map.values().toArray();
return new GenericArrayData(values);
}

@Override
public boolean equals(Object o) {
if (o == this) {
return true;
}
if (!(o instanceof GenericMapData)) {
return false;
}
// deepEquals for values of byte[]
return deepEquals(map, ((GenericMapData) o).map);
}

private static <K, V> boolean deepEquals(Map<K, V> m1, Map<?, ?> m2) {
// copied from HashMap.equals but with deepEquals comparision
if (m1.size() != m2.size()) {
return false;
}
try {
for (Map.Entry<K, V> e : m1.entrySet()) {
K key = e.getKey();
V value = e.getValue();
if (value == null) {
if (!(m2.get(key) == null && m2.containsKey(key))) {
return false;
}
} else {
if (!Objects.deepEquals(value, m2.get(key))) {
return false;
}
}
}
} catch (ClassCastException | NullPointerException unused) {
return false;
}
return true;
}

@Override
public int hashCode() {
int result = 0;
for (Object key : map.keySet()) {
// only include key because values can contain byte[]
result += 31 * key.hashCode();
}
return result;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.util.StringUtils;

import java.util.Arrays;
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -191,17 +192,22 @@ public RowData getRow(int pos, int numFields) {

@Override
public boolean equals(Object o) {
if (o instanceof GenericRowData) {
GenericRowData other = (GenericRowData) o;
return kind == other.kind && Arrays.equals(fields, other.fields);
} else {
if (this == o) {
return true;
}
if (!(o instanceof GenericRowData)) {
return false;
}
GenericRowData that = (GenericRowData) o;
return kind == that.kind &&
Arrays.deepEquals(fields, that.fields);
}

@Override
public int hashCode() {
return 31 * kind.hashCode() + Arrays.hashCode(fields);
int result = Objects.hash(kind);
result = 31 * result + Arrays.deepHashCode(fields);
return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,14 @@ public interface RawValueData<T> {
* Creates an instance of {@link RawValueData} from a Java object.
*/
static <T> RawValueData<T> fromObject(T javaObject) {
return new BinaryRawValueData<>(javaObject);
return BinaryRawValueData.fromObject(javaObject);
}

/**
* Creates an instance of {@link RawValueData} from the given byte array.
*/
static <T> RawValueData<T> fromBytes(byte[] bytes) {
return BinaryRawValueData.fromBytes(bytes);
}

}
Loading