Skip to content

[SAMZA-2557] Adding support for nested rows access via dot path.#1386

Merged
atoomula merged 15 commits intoapache:masterfrom
b-slim:v3_working_poc_nested_rows
Jul 22, 2020
Merged

[SAMZA-2557] Adding support for nested rows access via dot path.#1386
atoomula merged 15 commits intoapache:masterfrom
b-slim:v3_working_poc_nested_rows

Conversation

@b-slim
Copy link
Contributor

@b-slim b-slim commented Jun 18, 2020

This is an initial draft on how to support the nested row access in Samza SQL.
There are multiple #interconnected items.

  1. Added a the actual definition of a ROW Calcite Data Type [EASY FINAL].

  2. Added a Row Converter form Samza Type System to Calcite Type System [Okay for now but will need more work for types like timestamps].

  3. Added a Collector for projects and filters that are pushed to Remote Table Scan [Complex and Needs Discussions].

  • Why we need this ? Adding a nested row struct forces the addition of project and in general nothing stops Calcite logical planner to add such an identity project thus this is needed anyway.

  • How this done ? As of now I chose to minimize the amount of rewrite or refactor and added a queue to collect the call stack between Remote table Scan and Join node. Then When doing the join The Project and Filter will happen post Join Lookup. We need to handle the case where filter does not match and null pad the result or return null as by current convention. To be honest I am still debating adding the Filter push down seems like there is no real gain since we have done the lookup already.

  1. Added Ref Implementation for the getNestedField legacy udf (This udf has to be removed because it assumes Everything is a SamzaRelRecord) [DONE].
  2. Need more code cleaning where type is mixed up between String Java and Avro Utf8 Java as a Key in the map [Follow up].
  3. Need more work on the union Type System case we have more than 2 Types [Follow up].

This change is Reviewable

.context(Contexts.EMPTY_CONTEXT)
.costFactory(null)
.programs(
Programs.hep(ImmutableList.of(FilterJoinRule.FILTER_ON_JOIN), true, DefaultRelMetadataProvider.INSTANCE))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we remove FILTER_ON_JOIN optimization ? It doesn't work well with remote joins. We should instead use the optimization for remote joins in the other PR that I sent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atoomula this rule is not really the main part of this work and I don't think we need to fix/choose now which rule should be used to handle filters or should or should not be pushed to table scan this is beyond the scope of this PR.
The main goal is to explore how we can handle the operator stack between the join and remote table scan.
I added the rule to ensure that if a filter is there things works as expected and it seems it is working. But I would love to know what are the blind spot(s) I am missing about what work and does not work well with the remote table scan if you think this will help drive this issue. Thanks

Copy link
Contributor

@srinipunuru srinipunuru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall i think it looks good. But i couldn't understand few things, I will take another deeper look once you add the comments.

String remoteTableName = tableNode.getSourceName();
StreamTableJoinFunction joinFn = new SamzaSqlRemoteTableJoinFunction(context.getMsgConverter(remoteTableName),
context.getTableKeyConverter(remoteTableName), streamNode, tableNode, join.getJoinType(), queryId);
MessageStream operatorStack = context.getMessageStream(tableNode.getRelNode().getId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is where the magic is happening, Can you add some comments describing what we are doing and why we are doing this?


Function<SamzaSqlRelMessage, SamzaSqlRelMessage> getFunction(Context context) {
Function<SamzaSqlRelMessage, SamzaSqlRelMessage> tailFn = null;
while (!_mapFnCallQueue.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add more comments on what we are doing here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srinipunuru added comments at the top of the class let me know if this still unclear.

return tailFn == null ? Function.identity() : tailFn;
}

private static class FilterMapAdapter implements MapFunction<SamzaSqlRelMessage, SamzaSqlRelMessage> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments here as well on why this adapter is required and what it does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments let me know if it is still unclear.

@b-slim b-slim force-pushed the v3_working_poc_nested_rows branch 5 times, most recently from d2cc0b5 to 03bd7c3 Compare June 30, 2020 16:01
@b-slim b-slim changed the title [WIP] Adding support for nested rows access via dot path. [SAMZA-2557] Adding support for nested rows access via dot path. Jul 2, 2020
*
* Note that this is needed because the Remote Table can not expose a proper {@code MessageStream}.
* It is a work around to minimize the amount of code changes of the current Query Translator {@link org.apache.samza.sql.translator.QueryTranslator},
* But in an ideal world, we should use Calcite planner in conventional way we can combine function when via translation of RelNodes.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still do not get it. Why can't we use query optimization for remote tables and if we see a filter/projection between table (we can detect table vs stream in the optimizer rule) and join in the Calcite plan, push them up ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conventional way of using Calcite is pushing operator toward table scan as much as possible.
This PR does that by leveraging Calcite as is with minimal code changes or copy and past of some internal code.
By pushing some stuff up case remote table and some stuff down case remote table shows a clear disconnect and not clear design that a someone else beside the author of the work will be able to get it without spending hours stepping into the debugger code.
The other question how this will be better than current approach of composing map/filters and fuse it to join operator, don't you think it is more optimal to fuse all the lookup/project/filter within join as one operator ?
Adding to that This way also, will enable a case where the join condition will be more than just key = c but will be able to add more conjunctions in the near future.
In my opinion this is the most clean way to work around the limitation of the remote table join operator with no major surgery and allowing pushing filters/projects and in the future handle richer join conditions.
Again the proper fix will be to adopt Calcite framework Convention pattern where an operator can be pushed inside another operator when going from logical to physical for instance in this case the Join will transformed to a new join node that knows how to translate the project and filters within it self. That is kind of what is happening now but without making this work a major surgery as someone has suggested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh.. now I get it. If we have the following condition "(p.key + 1) = pv.profileId", we cannot really solve it with calcite rule change other than changing our translator code (one of which is what you did).

@b-slim b-slim force-pushed the v3_working_poc_nested_rows branch from 86528c7 to f08bf51 Compare July 2, 2020 17:21
Copy link
Contributor

@atoomula atoomula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Largely fine with the changes. Thanks for digging thru different issues and resolving them. I added few comments.

.build();
Planner planner = Frameworks.getPlanner(frameworkConfig);

SchemaPlus rootSchema = CalciteSchema.createRootSchema(true, false).plus();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, is there any reason to set caching to false ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need caching, it is a historic optimization flag that most of cases not really needed because schema changes all the time. In fact that is the default in Calcite.

RelRoot relRoot = planner.rel(validatedSql);
LOG.info("query plan:\n" + RelOptUtil.toString(relRoot.rel, SqlExplainLevel.ALL_ATTRIBUTES));
return relRoot;
RelTraitSet relTraitSet = RelTraitSet.createEmpty();
Copy link
Contributor

@atoomula atoomula Jul 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you pull in the latest master branch code ? I have pushed Query optimization code which conflicts with your changes.

* Operator to extract nested Rows or Fields form a struct row type using a dotted path.
* The goal of this operator is two-fold.
* First it is a temporary fix for https://issues.apache.org/jira/browse/CALCITE-4065 to extract a row from a row.
* Second it will enable smooth backward compatible migration from existing udf that relies on legacy row format.
Copy link
Contributor

@atoomula atoomula Jul 20, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this truly backward compatible with with the existing GetNestedField udf ? Does this support all the types that are tested in GetSqlFieldUdf ? Esp nested map and array. https://github.com/apache/samza/blob/dcd4b558a2c702f5b5a320fdb9d0c3fcadabd09b/samza-sql/src/test/java/org/apache/samza/sql/fn/TestGetSqlFieldUdf.java

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be I run all the tests and it passed, if you have any test or use case in mind please bring it up. But this should work with all the type and will enforce type checking as oppose to the old udf that is type less in bunch corner cases. In nutshell some failure will happen if type does not match.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the above test TestGetSqlFieldUdf, can you take a look to see if it handles tests starting from testMapAtLastField to testArrayAtAllIntermediateFields ? I don't think we have any tests in TestSamzaSqlEndToEnd that test such complex types at intermediate fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added more tests for this to mimic the unit tests. As you can see this will not include on how legacy handles map. Thus will need for sure to re-write such statement when going with new releases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see 3e95902

e -> convertToAvroObject(e.getValue(), getNonNullUnionSchema(schema).getValueType())));
// If you ask why not using String and that is because some strings are Wrapped into org.apache.avro.util.Utf8
// TODO looking at the Utf8 code base it is not immutable, having it as a key is calling for trouble!
final Map<Object, Object> outputMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what scenarios do you expect the map key to be of Utf8 type ? Considering that Avro mandates the map key type to be a string, isn't it fair to expect users to convert Utf8s to string ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atoomula The code as of today does that, in fact the reason I run into it is because I changed to String but tested failed. To avoid making this PR a fix everything pr I added the comment and base fix for the null case needed by my work. But I agree we need to move out of Avro String especially that is is mutable.

@b-slim b-slim force-pushed the v3_working_poc_nested_rows branch from 9938a74 to 78e6da8 Compare July 21, 2020 15:44
@b-slim b-slim force-pushed the v3_working_poc_nested_rows branch from 78e6da8 to b917b7e Compare July 21, 2020 15:48
Copy link
Contributor

@atoomula atoomula left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@atoomula atoomula merged commit f5faae0 into apache:master Jul 22, 2020
lakshmi-manasa-g pushed a commit to lakshmi-manasa-g/samza that referenced this pull request Feb 9, 2021
…che#1386)

* Working version still need to work on extracting nested fields udf

* working version end to end with Filter optimization

* left outer join test with filters

* adding more comments

* fix the type converter used by udfs

* refix the test

* Added GetNestedField built in operator to allow support backward comaptiblity

* fix java doc and minor change on the type cast

Not sure what this test is testing for it is a regular join between Stream and local table

* Adding more tests and some logging to help read the compiled code

* Adding some Type sanity to the Join functions

* fix minor WAR

* revert unwanted changes

* fix the tests

* Adding more tests for map type and fix the Avro conversion for type with one type only

* fix the style checks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants