Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,9 @@ public void testKafkaSourceSinkWithMetadata() throws Exception {
+ " `physical_1` STRING,\n"
+ " `physical_2` INT,\n"
// metadata fields are out of order on purpose
// offset is ignored because it might not be deterministic
+ " `timestamp-type` STRING METADATA VIRTUAL,\n"
+ " `timestamp` TIMESTAMP(3) METADATA,\n"
+ " `offset` BIGINT METADATA VIRTUAL,\n"
+ " `leader-epoch` INT METADATA VIRTUAL,\n"
+ " `headers` MAP<STRING, BYTES> METADATA,\n"
+ " `partition` INT METADATA VIRTUAL,\n"
Expand Down Expand Up @@ -371,12 +371,12 @@ public void testKafkaSourceSinkWithMetadata() throws Exception {
headers3.put("k2", new byte[]{(byte) 0x20});

final List<Row> expected = Arrays.asList(
Row.of("data 1", 1, "CreateTime", LocalDateTime.parse("2020-03-08T13:12:11.123"), 0L, 0, headers1, 0, topic, true),
Row.of("data 2", 2, "CreateTime", LocalDateTime.parse("2020-03-09T13:12:11.123"), 1L, 0, Collections.emptyMap(), 0, topic, false),
Row.of("data 3", 3, "CreateTime", LocalDateTime.parse("2020-03-10T13:12:11.123"), 2L, 0, headers3, 0, topic, true)
Row.of("data 1", 1, "CreateTime", LocalDateTime.parse("2020-03-08T13:12:11.123"), 0, headers1, 0, topic, true),
Row.of("data 2", 2, "CreateTime", LocalDateTime.parse("2020-03-09T13:12:11.123"), 0, Collections.emptyMap(), 0, topic, false),
Row.of("data 3", 3, "CreateTime", LocalDateTime.parse("2020-03-10T13:12:11.123"), 0, headers3, 0, topic, true)
);

assertThat(result, deepEqualTo(expected));
assertThat(result, deepEqualTo(expected, true));

// ------------- cleanup -------------------

Expand Down
111 changes: 2 additions & 109 deletions flink-core/src/main/java/org/apache/flink/types/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@

import java.io.Serializable;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import static org.apache.flink.types.RowUtils.deepEqualsInternal;

/**
* A row is a fixed-length, null-aware composite type for storing multiple values in a deterministic
Expand Down Expand Up @@ -291,109 +289,4 @@ public static Row join(Row first, Row... remainings) {
public static boolean deepEquals(Row row1, Row row2) {
return deepEqualsInternal(row1, row2);
}

/**
* Compares two {@link List}s of {@link Row} for deep equality. This method supports all conversion
* classes of the table ecosystem.
*
* <p>The current implementation of {@link Row#equals(Object)} is not able to compare all deeply
* nested row structures that might be created in the table ecosystem. For example, it does not
* support comparing arrays stored in the values of a map. We might update the {@link #equals(Object)}
* with this implementation in future versions.
*/
public static boolean deepEquals(List<Row> l1, List<Row> l2) {
return deepEqualsInternal(l1, l2);
}

private static boolean deepEqualsInternal(Object o1, Object o2) {
if (o1 == o2) {
return true;
} else if (o1 == null || o2 == null) {
return false;
} else if (o1 instanceof Row && o2 instanceof Row) {
return deepEqualsRow((Row) o1, (Row) o2);
} else if (o1 instanceof Object[] && o2 instanceof Object[]) {
return deepEqualsArray((Object[]) o1, (Object[]) o2);
} else if (o1 instanceof Map && o2 instanceof Map) {
return deepEqualsMap((Map<?, ?>) o1, (Map<?, ?>) o2);
} else if (o1 instanceof List && o2 instanceof List) {
return deepEqualsList((List<?>) o1, (List<?>) o2);
}
return Objects.deepEquals(o1, o2);
}

private static boolean deepEqualsRow(Row row1, Row row2) {
if (row1.getKind() != row2.getKind()) {
return false;
}
if (row1.getArity() != row2.getArity()) {
return false;
}
for (int pos = 0; pos < row1.getArity(); pos++) {
final Object f1 = row1.getField(pos);
final Object f2 = row2.getField(pos);
if (!deepEqualsInternal(f1, f2)) {
return false;
}
}
return true;
}

private static boolean deepEqualsArray(Object[] a1, Object[] a2) {
if (a1.getClass() != a2.getClass()) {
return false;
}
if (a1.length != a2.length) {
return false;
}
for (int pos = 0; pos < a1.length; pos++) {
final Object e1 = a1[pos];
final Object e2 = a2[pos];
if (!deepEqualsInternal(e1, e2)) {
return false;
}
}
return true;
}

private static <K, V> boolean deepEqualsMap(Map<K, V> m1, Map<?, ?> m2) {
// copied from HashMap.equals but with deepEquals comparision
if (m1.size() != m2.size()) {
return false;
}
try {
for (Map.Entry<K, V> e : m1.entrySet()) {
K key = e.getKey();
V value = e.getValue();
if (value == null) {
if (!(m2.get(key) == null && m2.containsKey(key))) {
return false;
}
} else {
if (!deepEqualsInternal(value, m2.get(key))) {
return false;
}
}
}
} catch (ClassCastException | NullPointerException unused) {
return false;
}
return true;
}

private static <E> boolean deepEqualsList(List<E> l1, List<?> l2) {
if (l1.size() != l2.size()) {
return false;
}
final Iterator<E> i1 = l1.iterator();
final Iterator<?> i2 = l2.iterator();
while (i1.hasNext() && i2.hasNext()) {
final E o1 = i1.next();
final Object o2 = i2.next();
if (!deepEqualsInternal(o1, o2)) {
return false;
}
}
return true;
}
}
177 changes: 177 additions & 0 deletions flink-core/src/main/java/org/apache/flink/types/RowUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.types;

import org.apache.flink.annotation.PublicEvolving;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
* Utilities to deal with {@link Row} instances.
*
* <p>This class exists to keep the {@link Row} class itself slim.
*/
@PublicEvolving
public final class RowUtils {

/**
* Compares two {@link List}s of {@link Row} for deep equality. This method supports all conversion
* classes of the table ecosystem.
*/
public static boolean compareRows(List<Row> l1, List<Row> l2) {
return compareRows(l1, l2, false);
}

/**
* Compares two {@link List}s of {@link Row} for deep equality. This method supports all conversion
* classes of the table ecosystem. The top-level lists can be compared with or without order.
*/
public static boolean compareRows(List<Row> l1, List<Row> l2, boolean ignoreOrder) {
if (l1 == l2) {
return true;
} else if (l1 == null || l2 == null) {
return false;
}
if (ignoreOrder) {
return deepEqualsListUnordered(l1, l2);
} else {
return deepEqualsListOrdered(l1, l2);
}
}

static boolean deepEqualsInternal(Object o1, Object o2) {
if (o1 == o2) {
return true;
} else if (o1 == null || o2 == null) {
return false;
} else if (o1 instanceof Row && o2 instanceof Row) {
return deepEqualsRow((Row) o1, (Row) o2);
} else if (o1 instanceof Object[] && o2 instanceof Object[]) {
return deepEqualsArray((Object[]) o1, (Object[]) o2);
} else if (o1 instanceof Map && o2 instanceof Map) {
return deepEqualsMap((Map<?, ?>) o1, (Map<?, ?>) o2);
} else if (o1 instanceof List && o2 instanceof List) {
return deepEqualsListOrdered((List<?>) o1, (List<?>) o2);
}
return Objects.deepEquals(o1, o2);
}

private static boolean deepEqualsRow(Row row1, Row row2) {
if (row1.getKind() != row2.getKind()) {
return false;
}
if (row1.getArity() != row2.getArity()) {
return false;
}
for (int pos = 0; pos < row1.getArity(); pos++) {
final Object f1 = row1.getField(pos);
final Object f2 = row2.getField(pos);
if (!deepEqualsInternal(f1, f2)) {
return false;
}
}
return true;
}

private static boolean deepEqualsArray(Object[] a1, Object[] a2) {
if (a1.getClass() != a2.getClass()) {
return false;
}
if (a1.length != a2.length) {
return false;
}
for (int pos = 0; pos < a1.length; pos++) {
final Object e1 = a1[pos];
final Object e2 = a2[pos];
if (!deepEqualsInternal(e1, e2)) {
return false;
}
}
return true;
}

private static <K, V> boolean deepEqualsMap(Map<K, V> m1, Map<?, ?> m2) {
// copied from HashMap.equals but with deepEquals comparision
if (m1.size() != m2.size()) {
return false;
}
try {
for (Map.Entry<K, V> e : m1.entrySet()) {
K key = e.getKey();
V value = e.getValue();
if (value == null) {
if (!(m2.get(key) == null && m2.containsKey(key))) {
return false;
}
} else {
if (!deepEqualsInternal(value, m2.get(key))) {
return false;
}
}
}
} catch (ClassCastException | NullPointerException unused) {
return false;
}
return true;
}

private static <E> boolean deepEqualsListOrdered(List<E> l1, List<?> l2) {
if (l1.size() != l2.size()) {
return false;
}
final Iterator<E> i1 = l1.iterator();
final Iterator<?> i2 = l2.iterator();
while (i1.hasNext() && i2.hasNext()) {
final E o1 = i1.next();
final Object o2 = i2.next();
if (!deepEqualsInternal(o1, o2)) {
return false;
}
}
return true;
}

private static <E> boolean deepEqualsListUnordered(List<E> l1, List<?> l2) {
final List<?> l2Mutable = new LinkedList<>(l2);
for (E e1 : l1) {
final Iterator<?> iterator = l2Mutable.iterator();
boolean found = false;
while (iterator.hasNext()) {
final Object e2 = iterator.next();
if (deepEqualsInternal(e1, e2)) {
found = true;
iterator.remove();
break;
}
}
if (!found) {
return false;
}
}
return l2Mutable.size() == 0;
}

private RowUtils() {
// no instantiation
}
}
Loading