Skip to content

Conversation

@art4ul
Copy link
Contributor

@art4ul art4ul commented Apr 15, 2019

What is the purpose of the change

This pull request adds support of UNNEST operator for MAP types.

Brief change log

  • The LogicalUnnestRule now handle the MapRelDataType and create appropriate ExplodeTableFunction for it

Verifying this change

This change added tests and can be verified as follows:

  • Added test that validates that the following SQL query works correctly for the table with MAP type
    SELECT a,b,v FROM src CROSS JOIN UNNEST(c) as f (k,v)

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.

Details
The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@art4ul
Copy link
Contributor Author

art4ul commented Apr 18, 2019

Guys! Could you please review this pull request?

@art4ul
Copy link
Contributor Author

art4ul commented Apr 22, 2019

@KurtYoung @JingsongLi Could you please review this pull request or connect to guys who responsible for this component? Thanks in advance!

Copy link
Contributor

@KurtYoung KurtYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this, I left some comments. BTW, i noticed there are some format issues and i'm not sure i pointed out all of them, please check it.

case map: MapRelDataType =>
val keyTypeInfo = FlinkTypeFactory.toTypeInfo(map.keyType)
val valueTypeInfo = FlinkTypeFactory.toTypeInfo(map.valueType)
val componentTypeInfo = createTuple2TypeInformation(keyTypeInfo,valueTypeInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space after comma

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it more appropriate for us to return RowType here?

Copy link
Contributor Author

@art4ul art4ul Apr 23, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space after comma

Fixed

Is it more appropriate for us to return RowType here?

I need to return typeInfo for an exploded type which will be as a field in the resultant Row. I'm going to use the following transformation form Map[K,V] into a sequence of tuples [(K,V), ...]. I need it to reuse the SQL syntaxis of mapping result on the tuple. I mean I want to support the following SQL query:

SELECT k, v FROM t1 , UNNEST(t1.c) as A (k,v)

This is the reason why I return TupleTypeInfo

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked other system's behavior, it seems more common to unnest map field into two columns, in your case, it would be:

SELECT k, v FROM t1 , UNNEST(t1.c) as (k,v)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KurtYoung I'm sorry but in case if I use SQL like in your example I got org.apache.flink.table.api.SqlParserException in method org.apache.flink.table.calcite.FlinkPlannerImpl.parse

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for giving a wrong example. What i want to say is we should make built-in table functions looks more easy and explicit for framework. To this very example, you relied on some implicit functionality which Flink currently support. The first one is you replied on Flink can implicitly convert Tuple to a Row, and the second one is you don't provide any valuable type related information in your table function, all the type information only provided in the logical rule.

I'm not saying this is wrong, but i think there is another way to make all these things more accurate and explicit. For example, you can explicitly tell framework your table function is returning Row type, and give more type related information to framework. It will make these codes more robust, have less possibility break in the future. Can you give this a try? If this will involve lots of changes, i'm also ok with current version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@KurtYoung I got it, thank you a lot for your explanation. I return the ROW type now instead of Tuple as was previously.

val valueTypeInfo = FlinkTypeFactory.toTypeInfo(map.valueType)
val componentTypeInfo = createTuple2TypeInformation(keyTypeInfo,valueTypeInfo)
val componentType = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
.createTypeFromTypeInfo(componentTypeInfo,true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space after comma

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

.createTypeFromTypeInfo(componentTypeInfo,true)

val explodeFunction = ExplodeFunctionUtil.explodeTableFuncFromType(map.typeInfo)
(componentType , explodeFunction)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra space before comma

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
}

class MapExplodeTableFunc extends TableFunction[Object]{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space before brace

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure making this table function returning object and let framework rely on information somewhere else to perform right is good choice. This will make the codes fragile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

space before brace

fixed

I'm not sure making this table function returning object and let framework rely on information somewhere else to perform right is good choice. This will make the codes fragile.

The UNNEST function should be able to explode any type of key and value types. This is the reason why I'm using Object type here. Basically, I use the same approach which used for Array types (for the arrays used the ObjectExplodeTableFunc)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ObjectExplodeTableFunc is not directly used by all arrays which want to be unnested. It's only used by the array which has object array element type, .e.g. Array(object[]).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comments, I've fixed the TableFunction and now it returns Row

Copy link
Contributor

@JingsongLi JingsongLi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for working on this, basically LGTM. just some format issues.

class MapExplodeTableFunc extends TableFunction[Object] {

def eval(map: util.Map[Object, Object]): Unit = {
map.asScala.foreach{ case (key,value) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map.asScala.foreach { case (key, value)
space need here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


def eval(map: util.Map[Object, Object]): Unit = {
map.asScala.foreach{ case (key,value) =>
collect((key,value))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect((key, value))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@JingsongLi
Copy link
Contributor

The PR component tag should be [table-planner] instead of [Table API?

@art4ul art4ul changed the title [FLINK-12200] [Table API] Support UNNEST for MAP types [FLINK-12200] [Table-planner] Support UNNEST for MAP types Apr 23, 2019
@art4ul
Copy link
Contributor Author

art4ul commented Apr 23, 2019

The PR component tag should be [table-planner] instead of [Table API?

Thank you for your note, I've fixed the tag in the title .

@art4ul
Copy link
Contributor Author

art4ul commented Apr 23, 2019

@KurtYoung @JingsongLi Thank you guys for your feedback. I've fixed all format issues and add some answers to your questions.

@JingsongLi
Copy link
Contributor

LGTM +1

@KurtYoung KurtYoung merged commit 2c39024 into apache:master Apr 26, 2019
tianchen92 pushed a commit to tianchen92/flink that referenced this pull request May 13, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants