-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2676] move BeamSqlRow and BeamSqlRowType to sdk/java/core #3675
Conversation
Changes Unknown when pulling 5b42e63 on XuMingmin:BEAM-2676_2 into ** on apache:DSL_SQL**. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this refactoring!
Some high level comments about the end state. Also, for ease of reviewing, could you separate out the bulk renaming from the rest of the (more interesting) changes? (If you want, you can rebase on robertwb@ab0ca82 ).
throws CoderException, IOException { | ||
nullListCoder.encode(value.getNullFields(), outStream); | ||
for (int idx = 0; idx < value.size(); ++idx) { | ||
if (value.getNullFields().contains(idx)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like there should be an isNull(idx) method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I wonder if this should be a BitSet instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
*/ | ||
public class BeamSqlRowCoder extends CustomCoder<BeamSqlRow> { | ||
private BeamSqlRowType tableSchema; | ||
public class BeamSqlRecordCoder extends CustomCoder<BeamSqlRecord> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need a separate BeamSqlRecordCoder, just have the BeamSqlRecordTypeProvider's Coder create a vanilla BeamRecordCoder with the right list of coders.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here a BeamSqlRecordCoder is provided as I don't find an existing Coder for short/float/Date/Boolean, so we do some conversion in this Coder. --SerializableCoder doesn't fit as it's not deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be preferable (and more useful) to add such coders to the SDK rather than hide the conversions here in BeamSqlRecordCoder. If you wanted to avoid exposing these coders you can make them inner classes of BeamSqlRecordTypeProvider (with a TODO to consider making them top-level).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense to me, don't want to bring in the Coder tasks here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, BeamRecordType would take this list of coders in its constructor so that BeamRecordCoder could be created given a BeamRecordType.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, given ShortCoder/FloatCoder/..., BeamSqlRecordCoder
can be created with BeamSqlRecordType
:
BeamSqlRecordCoder of(BeamSqlRecordType type)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we get rid of BeamSqlRecord
(below) we can get rid of BeamSqlRecordCoder
as BeamRecordCoder
would just take any BeamRecordType
as its parameter.
public abstract class BeamSqlRowType implements Serializable { | ||
public abstract List<String> getFieldsName(); | ||
public abstract List<Integer> getFieldsType(); | ||
public class BeamSqlRecord extends BeamRecord { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the main points of the proposal was to not have a separate Record type--just use a BeamRecord that happens to have a BeamSqlRecordTypeProvider dataType.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BeamSqlRecord
is a helper class for SQL, to avoid (BeamSqlRowType) BeamRecord.getDataType();
everywhere. --In Beam SQL code, it heavily relies on BeamSqlRecordTypeProvider
which has more functions than BeamRecordTypeProvider
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid a new BeamSqlRecord
, maybe I can just add the cast line as a utility function. Any thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If BeamSqlRecord is just a helper class, it shouldn't be in the public API. If that means a couple of casts to BeamSqlRecordTypeProvider
then that's OK. However, the only methods I see on this class are validateValueType
(rarely used) and getFieldsType
.
Of course getFieldsType
(should this be getFieldTypes
?) the interesting one. However, nearly every use of this method is to create a new BeamSqlRecordTypeProvider
. It would actually be better to add methods on BeamRecordType
that slice/concatenate/etc. to create new BeamRecordType
s (with overloads in BeamSqlRecordType
s to create BeamSqlRecordType
s).
Done right, simple SQL statements such as selections, joins, and projections should work on generic BeamRecords. Aggregations and comparisons would still (likely) require knowing the specific types (as SQL Types) and hence an actual BeamSqlRecordType
, but that's OK as we're already in the domain of throwing errors if it's not the right type (and a generic BeamRecordType schema would always throw a "wrong type" error.) We don't, of course, have to get this working now.
The naming still seems to be in flux. I'd drop the Provider
suffix, and get rid of Row
everywhere. The suite of types would then be.
BeamRecord
BeamRecordCoder
BeamSqlRecordType
extendsBeamRecordType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, in the short term, I'm OK with just adding lots of casts with a TODO to do this properly with polymorphism to unblock getting this in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's merge the discussion of BeamSqlRecord
and BeamSqlRecordCoder
.
A BeamSqlRecordHelper
class would be introduced, which has two methods
public static BeamSqlRecordType getSqlRecordType(BeamRecord);
public static BeamRecordCoder getSqlRecordCoder(BeamSqlRecordType);
- I don't like the CAST mixing in code, as we know a CAST is must, let's centralize it for better document;
- As mentioned above, I'll prepare some inner Coders to cover SQL types. Would create a separated task to expose in sdk/core;
Thanks for the naming suggestion, I would do it after we clear all the questions. --A simple rename would cause hundreds of lines impacted.
Changes Unknown when pulling 129ae96 on XuMingmin:BEAM-2676_2 into ** on apache:DSL_SQL**. |
Changes Unknown when pulling 129ae96 on XuMingmin:BEAM-2676_2 into ** on apache:DSL_SQL**. |
Changes Unknown when pulling 129ae96 on XuMingmin:BEAM-2676_2 into ** on apache:DSL_SQL**. |
As discussed, I remove BeamSqlRecord, and now there're only:
and BeamSqlRecordHelper to handle the type cast, coder in SQL. --Sorry for the large lines, practically it's not doable to separate the renaming step. Let's finish this first, then I'll move on to the windowInfo fields; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Phew...looking good! Agree that the window info stuff should be a separate PR; filed BEAM-2722.
The main remaining point is that I think BeamRecordType should hold a list of Coders, and as such can provide a BeamRecordCoder, which will simplify things. Other than that just minor comments.
Also, having read the whole thing, could you try and build on top of these commits now :).
@Override | ||
public void verifyDeterministic() | ||
throws org.apache.beam.sdk.coders.Coder.NonDeterministicException { | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recursively call on all members of coderArray.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
happen to notice that DoubleCoder is not deterministic.
@@ -174,7 +137,7 @@ public boolean getBoolean(String fieldName) { | |||
} | |||
|
|||
public Object getFieldValue(int fieldIdx) { | |||
if (nullFields.contains(fieldIdx)) { | |||
if (nullFields.get(fieldIdx)) { | |||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, is dataValues.get(fieldIdx) already null here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, this is needless.
return dataType; | ||
} | ||
|
||
public void setDataType(BeamSqlRowType dataType) { | ||
public void setDataType(BeamRecordType dataType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this ever needed? Seems it should be set at creation, never changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove..
} | ||
|
||
public List<Integer> getNullFields() { | ||
return nullFields; | ||
public void setNullFields(BitSet nullFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, couldn't this always be inferred based on what fields were set?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, is there any reason to explicitly store this set, rather than have getNullFields() implicitly compute it based on what is actually null at the time. (Happy with deferring to a future JIRA.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would keep a BitSet to indicate null fields. It's useful when encoding/decoding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But other than during encoding/decoding there's no reason to keep it around, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think so, the information is MUST to decode as we skip null values when encoding.
--Calculate it on the fly is possible although, a trade-off between a loop-scan and store a BitSet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we must store it in the encoded representation. But between the choice of setting it to all-true in the constructor and updating it as fields are mutated (we're not doing this correctly btw, see setDataValues and possibly elsewhere) vs. computing it on decode, I prefer the locality of the latter as it's just as cheap (and really cheap compared to the actual serialization), but your call.
BitSet nullFields = nullListCoder.decode(inStream); | ||
|
||
BeamRecord record = new BeamRecord(recordType); | ||
record.setNullFields(nullFields); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(See other comment.) Isn't this inferred by the setting (or not) below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove..
throws CoderException, IOException { | ||
nullListCoder.encode(value.getNullFields(), outStream); | ||
for (int idx = 0; idx < value.size(); ++idx) { | ||
if (value.getNullFields().get(idx)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use isNull here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
for (int i = 0; i < fieldsIndices.size(); i++) { | ||
int fieldIndex = fieldsIndices.get(i); | ||
int fieldRet = 0; | ||
SqlTypeName fieldType = CalciteUtils.getFieldType(row1.getDataType(), fieldIndex); | ||
SqlTypeName fieldType = CalciteUtils.getFieldType( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the implementation below, couldn't you just call row1.get(fieldIndex).compareTo(row2.get(fieldIndex)) iff they're instances of Comparable, and raise UnsupportedOperationException otherwise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, the existing types are all comparable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any need to query fieldIndex at all, vs instanceof Comparable?
@@ -119,21 +120,21 @@ public BeamSqlRow apply(BeamSqlRow input) { | |||
return keyOfRecord; | |||
} | |||
|
|||
private BeamSqlRowType exTypeOfKeyRecord(BeamSqlRowType dataType) { | |||
private BeamSqlRecordType exTypeOfKeyRecord(BeamSqlRecordType dataType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reference BEAM-2721 here.
// build the type | ||
// the name of the join field is not important | ||
List<String> names = new ArrayList<>(joinColumns.size()); | ||
List<Integer> types = new ArrayList<>(joinColumns.size()); | ||
for (int i = 0; i < joinColumns.size(); i++) { | ||
names.add("c" + i); | ||
types.add(isLeft | ||
? input.getDataType().getFieldsType().get(joinColumns.get(i).getKey()) : | ||
input.getDataType().getFieldsType().get(joinColumns.get(i).getValue())); | ||
? BeamSqlRecordHelper.getSqlRecordType(input).getFieldsType() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reference BEAM-2721 here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ping on these two.
types.addAll(leftRow.getDataType().getFieldsType()); | ||
types.addAll(rightRow.getDataType().getFieldsType()); | ||
BeamSqlRowType type = BeamSqlRowType.create(names, types); | ||
types.addAll(BeamSqlRecordHelper.getSqlRecordType(leftRow).getFieldsType()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reference BEAM-2721 here
create a separate PR BEAM-2723 for the windowInfo fields, will do it after this. |
@@ -243,7 +230,7 @@ public BeamSqlRowComparator(List<Integer> fieldsIndices, | |||
} | |||
} | |||
|
|||
public static <T extends Number & Comparable> int numberCompare(T a, T b) { | |||
public static <T extends Comparable> int compare(T a, T b) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this even needed?
} | ||
|
||
public List<Integer> getNullFields() { | ||
return nullFields; | ||
public void setNullFields(BitSet nullFields) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we must store it in the encoded representation. But between the choice of setting it to all-true in the constructor and updating it as fields are mutated (we're not doing this correctly btw, see setDataValues and possibly elsewhere) vs. computing it on decode, I prefer the locality of the latter as it's just as cheap (and really cheap compared to the actual serialization), but your call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for bearing with me--LGTM! The remainder of the comments are optional.
Really appreciate for your review @robertwb . I would address the two in another PR, with a clean base. --Seems |
Changes Unknown when pulling 706781d on XuMingmin:BEAM-2676_2 into ** on apache:DSL_SQL**. |
@robertwb could you merge this PR? So I can start the left changes. |
create a new PR to get rid of the huge rebase work after #3666.
Following the discussion in BEAM-2676, the changes are outlined as:
BeamRecord
andBeamRecordTypeProvider
are marked as@Experimental
;BeamRecord
is moved to sdk/core, which includes a defaultBeamRecordTypeProvider
to define type information; Also aBeamRecordCoder
is provided as Coder;BeamSqlRecord
extendsBeamRecord
,BeamSqlRecordTypeProvider
extendsBeamRecordTypeProvider
to support SQL types; a new CoderBeamSqlRecordCoder
is provided to align better;