Skip to content

Commit

Permalink
[FLINK-5143] [table] Add EXISTS to list of supported SQL operators.
Browse files Browse the repository at this point in the history
This closes apache#2853.
  • Loading branch information
twalthr authored and alpinegizmo committed Nov 28, 2016
1 parent d8f2f75 commit d6f176e
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 10 deletions.
18 changes: 10 additions & 8 deletions docs/dev/table_api.md
Expand Up @@ -2800,39 +2800,41 @@ value NOT IN (value [, value]* )
<p>Whether <i>value</i> is not equal to every value in a list.</p>
</td>
</tr>
<!-- NOT SUPPORTED SO FAR

<tr>
<td>
{% highlight text %}
value IN (sub-query)
EXISTS (sub-query)
{% endhighlight %}
</td>
<td>
<p>Whether <i>value</i> is equal to a row returned by sub-query.</p>
<p>Whether <i>sub-query</i> returns at least one row. Only supported if the operation can be rewritten in a join and group operation.</p>
</td>
</tr>

<!-- NOT SUPPORTED SO FAR
<tr>
<td>
{% highlight text %}
value NOT IN (sub-query)
value IN (sub-query)
{% endhighlight %}
</td>
<td>
<p>Whether <i>value</i> is not equal to every row returned by sub-query.</p>
<p>Whether <i>value</i> is equal to a row returned by sub-query.</p>
</td>
</tr>
<tr>
<td>
{% highlight text %}
EXISTS (sub-query)
value NOT IN (sub-query)
{% endhighlight %}
</td>
<td>
<p>Whether sub-query returns at least one row.</p>
<p>Whether <i>value</i> is not equal to every row returned by sub-query.</p>
</td>
</tr>-->
</tr>
-->

</tbody>
</table>
Expand Down
Expand Up @@ -215,7 +215,7 @@ class DataSetJoin(
}

private def joinTypeToString = joinType match {
case JoinRelType.INNER => "Join"
case JoinRelType.INNER => "InnerJoin"
case JoinRelType.LEFT=> "LeftOuterJoin"
case JoinRelType.RIGHT => "RightOuterJoin"
case JoinRelType.FULL => "FullOuterJoin"
Expand Down
Expand Up @@ -281,7 +281,8 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
SqlStdOperatorTable.CAST,
SqlStdOperatorTable.EXTRACT,
SqlStdOperatorTable.QUARTER,
SqlStdOperatorTable.SCALAR_QUERY
SqlStdOperatorTable.SCALAR_QUERY,
SqlStdOperatorTable.EXISTS
)

builtInSqlOperators.foreach(register)
Expand Down
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.scala.batch.sql

import org.apache.flink.api.scala._
import org.apache.flink.api.scala.table._
import org.apache.flink.api.table.utils.TableTestBase
import org.apache.flink.api.table.utils.TableTestUtil._
import org.junit.Test

class SetOperatorsTest extends TableTestBase {

@Test
def testExists(): Unit = {
val util = batchTestUtil()
util.addTable[(Long, Int, String)]("A", 'a_long, 'a_int, 'a_string)
util.addTable[(Long, Int, String)]("B", 'b_long, 'b_int, 'b_string)

val expected = unaryNode(
"DataSetCalc",
binaryNode(
"DataSetJoin",
batchTableNode(0),
unaryNode(
"DataSetAggregate",
unaryNode(
"DataSetCalc",
binaryNode(
"DataSetJoin",
batchTableNode(1),
unaryNode(
"DataSetAggregate",
batchTableNode(0),
term("groupBy", "a_long"),
term("select", "a_long")
),
term("where", "=(a_long, b_long)"),
term("join", "b_long", "b_int", "b_string", "a_long"),
term("joinType", "InnerJoin")
),
term("select", "a_long", "true AS $f0")
),
term("groupBy", "a_long"),
term("select", "a_long", "MIN($f0) AS $f1")
),
term("where", "=(a_long, a_long0)"),
term("join", "a_long", "a_int", "a_string", "a_long0", "$f1"),
term("joinType", "InnerJoin")
),
term("select", "a_int", "a_string")
)

util.verifySql(
"SELECT a_int, a_string FROM A WHERE EXISTS(SELECT * FROM B WHERE a_long = b_long)",
expected
)
}

}
Expand Up @@ -56,6 +56,10 @@ abstract class TableTestUtil {
def addTable[T: TypeInformation](name: String, fields: Expression*): Table
def verifySql(query: String, expected: String): Unit
def verifyTable(resultTable: Table, expected: String): Unit

// the print methods are for debugging purposes only
def printTable(resultTable: Table): Unit
def printSql(query: String): Unit
}

object TableTestUtil {
Expand Down Expand Up @@ -87,6 +91,7 @@ object TableTestUtil {
def streamTableNode(idx: Int): String = {
s"DataStreamScan(table=[[_DataStreamTable_$idx]])"
}

}

case class BatchTableTestUtil() extends TableTestUtil {
Expand Down Expand Up @@ -121,6 +126,16 @@ case class BatchTableTestUtil() extends TableTestUtil {
expected.split("\n").map(_.trim).mkString("\n"),
actual.split("\n").map(_.trim).mkString("\n"))
}

def printTable(resultTable: Table): Unit = {
val relNode = resultTable.getRelNode
val optimized = tEnv.optimize(relNode)
println(RelOptUtil.toString(optimized))
}

def printSql(query: String): Unit = {
printTable(tEnv.sql(query))
}
}

case class StreamTableTestUtil() extends TableTestUtil {
Expand Down Expand Up @@ -156,4 +171,15 @@ case class StreamTableTestUtil() extends TableTestUtil {
expected.split("\n").map(_.trim).mkString("\n"),
actual.split("\n").map(_.trim).mkString("\n"))
}

// the print methods are for debugging purposes only
def printTable(resultTable: Table): Unit = {
val relNode = resultTable.getRelNode
val optimized = tEnv.optimize(relNode)
println(RelOptUtil.toString(optimized))
}

def printSql(query: String): Unit = {
printTable(tEnv.sql(query))
}
}

0 comments on commit d6f176e

Please sign in to comment.