-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-5187] [core] Create analog of Row and RowTypeInfo and RowCompator in core #2968
Conversation
Thanks for working on this @wuchong. I will review this next week. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @wuchong, Thanks for the PR.
I left some minor comments
public int getFieldIndex(String fieldName) { | ||
for (int i = 0; i < fieldNames.length; i++) { | ||
if (fieldNames[i].equals(fieldName)) { | ||
return i; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if we have more than field one with the same name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The field names of Row is forced to be f0~fn
.
this.fieldNames = new String[types.length]; | ||
|
||
for (int i = 0; i < types.length; i++) { | ||
fieldNames[i] = "f" + i; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we somehow get real names of fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think RowTypeInfo
is something like TupleTypeInfo
, which can't have custom field names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we had custom field names before and removed them at some point.
It might make sense to add support for optional names again now that Row
is becoming more user-facing. However, I think we can do this as a separate issue.
@Override | ||
public void writeWithKeyNormalization( | ||
Row record, | ||
DataOutputView target) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be in one line
public int extractKeys(Object record, Object[] target, int index) { | ||
int len = comparators.length; | ||
int localIndex = index; | ||
for(int i=0;i<len;i++){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, add some spaces
/** | ||
* @return creates auxiliary fields for normalized key support | ||
*/ | ||
private static Tuple4<int[], Integer, Integer, Boolean> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's the same as <int[], int, int, boolean>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, the generic type of Tuple can not be primitive types.
private static final long serialVersionUID = 1L; | ||
|
||
/** Number of field. */ | ||
private final int arity; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fields.length
is similar to arity
. Maybe should avoid this field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point!
/** | ||
* Intermediate constructor for creating auxiliary fields. | ||
*/ | ||
private RowComparator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this constructor is redundant. int[] normalizedKeyLengths, int numLeadingNormalizableKeys, int normalizableKeyPrefixLen, boolean invertNormKey
can be initialized direct form Tuple4<>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constructor is used in duplicate
which I don't want to generate the Tuple4 again.
Hi @tonycox , thanks for your reviewing. I addressed some of your comments. |
normalizableKeyPrefixLen == Integer.MAX_VALUE || | ||
normalizableKeyPrefixLen > keyBytes; | ||
normalizableKeyPrefixLen == Integer.MAX_VALUE || | ||
normalizableKeyPrefixLen > keyBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here must be only tabs instead of spaces
/** Temporary variable for directly passing orders to comparators. */ | ||
private boolean[] comparatorOrders = null; | ||
|
||
public RowTypeInfo(TypeInformation<?>... types) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add another one constructor with Collection<TypeInformation<?>> types
to set for example Seq(Types.INT, Types.LONG, Types.STRING)
from scala
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, Seq
is not a subclass of Collection
, provide Collection<TypeInformation<?>> types
doesn't fix the problem. I think it's better to adapt the Scala code where has used RowTypeInfo in this form: new RowTypeInfo(Types.INT, Types.LONG, Types.STRING)
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
scala.collection.Seq
easily converts to java.util.List
with import scala.collection.JavaConversions._
in scala code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And when we set Array[TypeInformation[_]]
in constructor from scala it has conflict with TypeInformation<?>...
, so I propose cast it .toList
.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like the following ways of using in Scala, rather than import JavaConversions._
or manually cast it with .toList
. In this way, we only need TypeInformation<?>... types
constructor.
new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BIG_DEC_TYPE_INFO)
new RowTypeInfo(Seq(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BIG_DEC_TYPE_INFO): _*)
new RowTypeInfo(Array(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.BIG_DEC_TYPE_INFO): _*)
|
||
@Override | ||
public String toString() { | ||
return Arrays.deepToString(fields); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about replacing all '[' and ']' from string?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Row tests expected = 1,3,2,1,3,Hello world
, but actual = [1, 3, 2, 1, 3, Hello world]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which "Row test" do you mean ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example org.apache.flink.api.scala.stream.table.AggregationsITCase
or org.apache.flink.api.scala.batch.table.JoinITCase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. The IT cases use Row as the result type and check the toString results. But IMO, the format 1,3,2,1,3,Hello world
without brackets is not a good design, which is hard to distinguish the boundary. And the format [1, 3, 2, 1, 3, Hello world]
with square brackets is not good too, because it looks like an array.
I would like to propose the format like Scala Case Class and Flink Java Tuple 's toString, which should be (1,3,2,1,3,Hello world)
. And we should also adapt the IT cases to the new Row's toString method. What do you think @tonycox @twalthr ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, the toString
method should return the same representation as before.
Hi @wuchong, maybe we should move |
Hi @tonycox , I agree we should move |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for porting Row
and all related classes to Java and moving them to fflink-core
.
I made just a few style comments. Looks good otherwise, IMO.
We should merge this rather soon because this change should go into 1.2 and we need a few follow-ups on this.
@@ -92,6 +92,8 @@ Internally, Flink makes the following distinctions between types: | |||
|
|||
* POJOs: classes that follow a certain bean-like pattern | |||
|
|||
* Row (unlimited length of fields) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tuples with arbitrary number of fields and support for null fields.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extend the description of the Java and Scala tuples (max 25 / 22 fields, null fields not supported) to make the difference to Row
clear.
this.fieldNames = new String[types.length]; | ||
|
||
for (int i = 0; i < types.length; i++) { | ||
fieldNames[i] = "f" + i; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we had custom field names before and removed them at some point.
It might make sense to add support for optional names again now that Row
is becoming more user-facing. However, I think we can do this as a separate issue.
boolean[] orders, | ||
int logicalFieldOffset, | ||
ExecutionConfig config) { | ||
comparatorOrders = orders; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a new line here to separate parameters from the method body?
int len, | ||
DataInputView source, | ||
boolean[] nullMask) throws IOException { | ||
int b = 0x00; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new line
TypeSerializer<Object>[] serializers, | ||
int[] normalizedKeyLengths, | ||
int numLeadingNormalizableKeys, int normalizableKeyPrefixLen, boolean invertNormKey) { | ||
this.arity = arity; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new line
} | ||
|
||
/** | ||
* Get the number of field in the Row. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
field -> fields
|
||
@Override | ||
public String toString() { | ||
return Arrays.deepToString(fields); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, the toString
method should return the same representation as before.
|
||
Row row = (Row) o; | ||
|
||
// Probably incorrect - comparing Object[] arrays with Arrays.equals |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you mean by this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a IDEA code generation comment. We can remove this .
createRow("a3", "b3", "c3", "d3", "e3", "f3", "g3", "h3", "i3", "j3") | ||
}; | ||
|
||
@Before |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be @BeforeClass
createRow(1, 1.0, "b", new Tuple3<>(2, true, (short) 3), testPojo3) | ||
}; | ||
|
||
@Before |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can be @BeforeClass
@tonycox @wuchong Yes, we can move |
Hi @fhueske , thanks for reviewing. I have addressed the comments. |
@fhueske Yes. I already started https://github.com/tonycox/flink/tree/FLINK-5188 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import org.apache.flink.types.Row; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ArrayList
is unused and needs to be removed.
Just noticed that the row comparator and serializer tests should be moved to I will do that before merging. |
…them to flink-core. This closes apache#2968.
merging |
…them to flink-core. This closes apache#2968.
Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.
General
Documentation
Tests & Build
mvn clean verify
has been executed successfully locally or a Travis build has passed