New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-7491] [Table API & SQL] add MultiSetTypeInfo; add built-in Collect Aggregate Function for Flink SQL. #4585
Conversation
7bcb0db
to
a50ad5b
Compare
Hi @suez1224 thanks for the PR, I think we can use |
a50ad5b
to
874bbca
Compare
Hi @suez1224, please read and fill out the template in the PR description. Thank you. |
874bbca
to
924589a
Compare
Hi @fhueske , I've filled out the PR template. Please take a look. Thanks a lot. |
69a96ec
to
138e78d
Compare
Thanks @suez1224, I'm quite busy atm but will try to have a look soon. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
flink-core/pom.xml
Outdated
@@ -80,6 +80,13 @@ under the License. | |||
<!-- managed version --> | |||
</dependency> | |||
|
|||
<!-- For multiset --> | |||
<dependency> | |||
<groupId>org.apache.commons</groupId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should not add additional dependencies to Flink just because of a new data type. There is also no reason behind choosing this library. Couldn't we not just use a usual Java Map? Otherwise I would propose to add class for our own type like we did it for org.apache.flink.types.Row
. Calcite is using List
, which is not very nice, but would also work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Use java.util.Map instead.
@@ -211,6 +218,14 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp | |||
canonize(relType) | |||
} | |||
|
|||
override def createMultisetType(elementType: RelDataType, maxCardinality: Long): RelDataType = { | |||
val relType = new MultisetRelDataType( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are multiple location where a new type has to be added like FlinkRelNode
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added changes in FlinkRelNode & ExpressionReducer
29ba8e0
to
d58071e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import scala.collection.JavaConverters._ | ||
|
||
/** The initial accumulator for Collect aggregate function */ | ||
class CollectAccumulator[E] extends JTuple1[util.Map[E, Integer]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use a MapView
here. This feature was recently added and automatically backs the Map with a MapState if possible. Otherwise, it uses a Java HashMap (as right now). The benefit of backing the accumulator by MapState is that only the keys and values that are accessed need to be deserialized. In contrast, a regular HashMap is completely de/serialized every time the accumulator is read. Using MapView would require that the accumulator is implemented as a POJO (instead of a Tuple1).
Check this class for details MapView and let me know if you have questions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please take another look, I've updated to use MapView.
def accumulate(accumulator: CollectAccumulator[E], value: E): Unit = { | ||
if (value != null) { | ||
if (accumulator.f0.containsKey(value)) { | ||
val add = (x: Integer, y: Integer) => x + y |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add
is not used, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, removed.
override def getAccumulatorType: TypeInformation[CollectAccumulator[E]] = { | ||
new TupleTypeInfo( | ||
classOf[CollectAccumulator[E]], | ||
new GenericTypeInfo[util.Map[E, Integer]](classOf[util.Map[E, Integer]])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't use a generic type here. This will result in a KryoSerializer which can be quite inefficient and result in state that cannot be upgraded. Rather use MapTypeInformation
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use MapViewTypeInfo here. However, if E is not basic type, I can only use GenericTypeInfo(please see ObjectCollectAggFunction), is there a better way? @fhueske
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could have an abstract method getElementTypeInfo()
that returns the type info for the elements. The basic types can be properly handled and for Object
we fall back to GenericType
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fhueske Thanks. I think that 's what exactly the current code is. Please take another look.
elementType, | ||
isNullable) { | ||
|
||
override def toString = s"MULTISET($typeInfo)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be rather s"MULTISET($elementType)"
. TypeInformation
is a Flink concept whereas RelDataType is in the Calcite context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
val tEnv = TableEnvironment.getTableEnvironment(env, config) | ||
|
||
val sqlQuery = | ||
"SELECT b, COLLECT(b)" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collect should be added to the SQL documentation under "Built-in Function" -> "Aggregate Functions"
Moreover, we should add MULTISET
to the supported data types.
It would also be nice if you could open a JIRA to add support for COLLECT to the Table API. We try to keep both in sync and it helps if we have a list of things that need to be added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the documentation.
Table API ticket created: https://issues.apache.org/jira/browse/FLINK-7658?filter=-1
4ce5223
to
f07216a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @suez1224, thanks for the update!
I added a few minor comments.
A major question is how null
values are handled. I'm not familiar with the semantics of COLLECT
but if we want to support null
values, we need to change some serialization code.
Best, Fabian
docs/dev/table/sql.md
Outdated
@@ -746,6 +746,7 @@ The SQL runtime is built on top of Flink's DataSet and DataStream APIs. Internal | |||
| `Types.PRIMITIVE_ARRAY`| `ARRAY` | e.g. `int[]` | | |||
| `Types.OBJECT_ARRAY` | `ARRAY` | e.g. `java.lang.Byte[]`| | |||
| `Types.MAP` | `MAP` | `java.util.HashMap` | | |||
| `Types.MULTISET` | `MULTISET` | `java.util.HashMap` | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we explain how the HashMap
is used to represent the multiset, i.e., that a multiset of String
is a HashMap<String, Integer>
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> { | ||
|
||
private static final long serialVersionUID = 1L; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rm newline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* @param <T> The type of the elements in the Multiset. | ||
*/ | ||
@PublicEvolving | ||
public final class MultisetTypeInfo<T> extends MapTypeInfo<T, Integer> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add to org.apache.flink.table.api.Types
class for easy creation of TypeInformation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does SQL Multiset also support null
values? If yes, we would need to wrap the MapSerializer
.
Otherwise, the problem would be that we would need to rely on the key serializer to support null
which many serializers do not. An solution would be to wrap the MapSerializer
and additionally serialize the count for null
elements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took a look at Calcite tests for Collect function, null will be ignored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! That makes things a lot easier :-)
// ------------------------------------------------------------------------ | ||
|
||
@Override | ||
public boolean isBasicType() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implemented by MapTypeInfo
, no need to override.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
@Override | ||
public boolean isTupleType() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
implemented by MapTypeInfo
, no need to override.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
map | ||
} else { | ||
null.asInstanceOf[util.Map[E, Integer]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the specs of COLLECT
, is null the correct return value or an empty Multiset?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check with Calcite tests, should return an empty Multiset instead.
@@ -1414,8 +1414,29 @@ object AggregateUtil { | |||
aggregates(index) = udagg.getFunction | |||
accTypes(index) = udagg.accType | |||
|
|||
case unSupported: SqlAggFunction => | |||
throw new TableException(s"unsupported Function: '${unSupported.getName}'") | |||
case other: SqlAggFunction => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this case to case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT =>
to have a dedicated case for this built-in function. Also the case after case _: SqlCountAggFunction
to have all built-in functions together.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -1414,8 +1414,29 @@ object AggregateUtil { | |||
aggregates(index) = udagg.getFunction | |||
accTypes(index) = udagg.accType | |||
|
|||
case unSupported: SqlAggFunction => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we add a dedicated case for COLLECT
, this case should not be remain at the end of this match
.
case _ => | ||
new ObjectCollectAggFunction | ||
} | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else case can be removed because we keep the catch all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
docs/dev/table/sql.md
Outdated
{% endhighlight %} | ||
</td> | ||
<td> | ||
<p>Returns a multiset of the <i>value</i>s.</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Be more specific about the handling of null
values. Are they ignored? What is returned if only null values are added (null
or empty multiset)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
851cd36
to
1741f10
Compare
1741f10
to
d597ab9
Compare
d597ab9
to
03a609a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
} | ||
|
||
abstract class CollectAggFunction[E] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to make this class abstract. Instead, we should add a constructor that asks for the TypeInformation
of the value. Then we don't need to subclass the aggregation function and avoid most generic value types for non-primitive fields.
@@ -1410,6 +1410,26 @@ object AggregateUtil { | |||
case _: SqlCountAggFunction => | |||
aggregates(index) = new CountAggFunction | |||
|
|||
case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT => | |||
aggregates(index) = sqlTypeName match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can pass the actual TypeInformation
of the argument type here to the constructor of the CollectAggFunction
and don't need to distinguish the different argument types.
def testUnboundedGroupByCollect(): Unit = { | ||
|
||
val env = StreamExecutionEnvironment.getExecutionEnvironment | ||
val tEnv = TableEnvironment.getTableEnvironment(env) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add env.setStateBackend(this.getStateBackend)
to enforce serialization through the MapView
.
def testUnboundedGroupByCollectWithObject(): Unit = { | ||
|
||
val env = StreamExecutionEnvironment.getExecutionEnvironment | ||
val tEnv = TableEnvironment.getTableEnvironment(env) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add env.setStateBackend(this.getStateBackend)
to enforce serialization through the MapView
.
case _ => | ||
new ObjectCollectAggFunction | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we need to set accTypes(index) = aggregates(index).getAccumulatorType
in order to activate the MapView
feature.
48c7b4c
to
84f1cb9
Compare
@fhueske Addressed your comments. PTAL. Much appreciated. |
new FloatCollectAggFunction | ||
case DOUBLE => | ||
new DoubleCollectAggFunction | ||
case TINYINT | SMALLINT | INTEGER | BIGINT | VARCHAR | CHAR | FLOAT | DOUBLE => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was rather thinking to remove the match case
block completely and set
aggregates(index) = new CollectAggFunction(FlinkTypeFactory.toTypeInfo(relDataType))
…on to SQL. This closes apache#4585.
Merging |
What is the purpose of the change
This change add COLLECT aggregate function for Flink SQL API.
Brief change log
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
:noDocumentation