Skip to content

Commit

Permalink
[FLINK-1623] Rename Expression API to Table API
Browse files Browse the repository at this point in the history
Package name is now flink-table. ExpressionOperation is renamed to
Table.

This also adds more JavaDoc and ScalDoc.
  • Loading branch information
aljoscha committed Mar 29, 2015
1 parent d7d9b63 commit c9519c8
Show file tree
Hide file tree
Showing 87 changed files with 1,031 additions and 827 deletions.
90 changes: 62 additions & 28 deletions docs/linq.md
@@ -1,5 +1,5 @@
---
title: "Language-Integrated Queries"
title: "Language-Integrated Queries (Table API)"
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
Expand All @@ -23,58 +23,92 @@ under the License.
* This will be replaced by the TOC
{:toc}

**Language-Integrated Queries are an experimental feature and can currently only be used with
the Scala API**
**Language-Integrated Queries are an experimental feature**

Flink provides an API that allows specifying operations using SQL-like expressions.
This Expression API can be enabled by importing
`org.apache.flink.api.scala.expressions._`. This enables implicit conversions that allow
converting a `DataSet` or `DataStream` to an `ExpressionOperation` on which relational queries
can be specified. This example shows how a `DataSet` can be converted, how expression operations
can be specified and how an expression operation can be converted back to a `DataSet`:
Flink provides an API that allows specifying operations using SQL-like expressions. Instead of
manipulating `DataSet` or `DataStream` you work with `Table` on which relational operations can
be performed.

The following dependency must be added to your project when using the Table API:

{% highlight xml %}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table</artifactId>
<version>{{site.FLINK_VERSION_SHORT }}</version>
</dependency>
{% endhighlight %}

## Scala Table API

The Table API can be enabled by importing `org.apache.flink.api.scala.table._`. This enables
implicit conversions that allow
converting a DataSet or DataStream to a Table. This example shows how a DataSet can
be converted, how relational queries can be specified and how a Table can be
converted back to a DataSet:

{% highlight scala %}
import org.apache.flink.api.scala._
import org.apache.flink.api.scala.expressions._
import org.apache.flink.api.scala.table._

case class WC(word: String, count: Int)
val input = env.fromElements(WC("hello", 1), WC("hello", 1), WC("ciao", 1))
val expr = input.toExpression
val result = expr.groupBy('word).select('word, 'count.sum).as[WC]
val expr = input.toTable
val result = expr.groupBy('word).select('word, 'count.sum).toSet[WC]
{% endhighlight %}

The expression DSL uses Scala symbols to refer to field names and we use code generation to
transform expressions to efficient runtime code. Please not that the conversion to and from
expression operations only works when using Scala case classes or Flink POJOs. Please check out
transform expressions to efficient runtime code. Please note that the conversion to and from
Tables only works when using Scala case classes or Flink POJOs. Please check out
the [programming guide](programming_guide.html) to learn the requirements for a class to be
considered a POJO.

This is another example that shows how you
can join to operations:
can join to Tables:

{% highlight scala %}
case class MyResult(a: String, b: Int)

val input1 = env.fromElements(...).as('a, 'b)
val input2 = env.fromElements(...).as('c, 'd)
val joined = input1.join(input2).where('b == 'a && 'd > 42).select('a, 'd).as[MyResult]
val joined = input1.join(input2).where("b = a && d > 42").select("a, d").as[MyResult]
{% endhighlight %}

Notice, how a `DataSet` can be converted to an expression operation by using `as` and specifying new
names for the fields. This can also be used to disambiguate fields before a join operation.
Notice, how a DataSet can be converted to a Table by using `as` and specifying new
names for the fields. This can also be used to disambiguate fields before a join operation. Also,
in this example we see that you can also use Strings to specify relational expressions.

The Expression API can be used with the Streaming API, since we also have implicit conversions to
and from `DataStream`.
Please refer to the Scaladoc (and Javadoc) for a full list of supported operations and a
description of the expression syntax.

The following dependency must be added to your project when using the Expression API:
## Java Table API

{% highlight xml %}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-expressions</artifactId>
<version>{{site.FLINK_VERSION_SHORT }}</version>
</dependency>
When using Java, Tables can be converted to and from DataSet and DataStream using `TableEnvironment`.
This example is equivalent to the above Scala Example:

{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
TableEnvironment tableEnv = new TableEnvironment();

DataSet<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("Ciao", 1),
new WC("Hello", 1));

Table table = tableEnv.toTable(input);

Table filtered = table
.groupBy("word")
.select("word.count as count, word")
.filter("count = 2");

DataSet<WC> result = tableEnv.toSet(filtered, WC.class);
{% endhighlight %}

Please refer to the scaladoc for a full list of supported operations and a description of the
When using Java, the embedded DSL for specifying expressions cannot be used. Only String expressions
are supported. They support exactly the same feature set as the expression DSL.

Please refer to the Javadoc for a full list of supported operations and a description of the
expression syntax.


Expand Up @@ -90,7 +90,7 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
/**
* Returns the TypeInformation for the elements of this DataSet.
*/
def getType: TypeInformation[T] = set.getType
def getType(): TypeInformation[T] = set.getType

/**
* Returns the execution environment associated with the current DataSet.
Expand Down

This file was deleted.

Expand Up @@ -56,6 +56,11 @@ class DataStream[T](javaStream: JavaStream[T]) {
*/
def getJavaStream: JavaStream[T] = javaStream

/**
* Returns the TypeInformation for the elements of this DataStream.
*/
def getType(): TypeInformation[T] = javaStream.getType

/**
* Sets the parallelism of this operation. This must be greater than 1.
*/
Expand Down
Expand Up @@ -27,8 +27,8 @@ under the License.
<relativePath>..</relativePath>
</parent>

<artifactId>flink-expressions</artifactId>
<name>flink-expressions</name>
<artifactId>flink-table</artifactId>
<name>flink-table</name>

<packaging>jar</packaging>

Expand Down
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* <strong>Table API (Java)</strong><br>
*
* {@link org.apache.flink.api.java.table.TableEnvironment} can be used to create a
* {@link org.apache.flink.api.table.Table} from a {@link org.apache.flink.api.java.DataSet}
* or {@link org.apache.flink.streaming.api.datastream.DataStream}.
*
* <p>
* This can be used to perform SQL-like queries on data. Please have
* a look at {@link org.apache.flink.api.table.Table} to see which operations are supported and
* how query strings are written.
*
* <p>
* Example:
*
* <code>
* ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
*
* DataSet<WC> input = env.fromElements(
* new WC("Hello", 1),
* new WC("Ciao", 1),
* new WC("Hello", 1));
*
* Table table = TableUtil.from(input);
*
* Table filtered = table
* .groupBy("word")
* .select("word.count as count, word")
* .filter("count = 2");
*
* DataSet<WC> result = TableUtil.toSet(filtered, WC.class);
*
* result.print();
* env.execute();
* </code>
*
* <p>
* As seen above, a {@link org.apache.flink.api.table.Table} can be converted back to the
* underlying API representation using {@link org.apache.flink.api.java.table.TableEnvironment.toSet}
* or {@link org.apache.flink.api.java.table.TableEnvironment.toStream}.
*/
package org.apache.flink.api.java.table;
@@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -17,6 +17,17 @@
*/

/**
* Package doc wohoooo
* <strong>Table API</strong><br>
*
* This package contains the generic part of the Table API. It can be used with Flink Streaming
* and Flink Batch. From Scala as well as from Java.
*
* When using the Table API, as user creates a [[org.apache.flink.api.table.Table]] from
* a DataSet or DataStream. On this relational operations can be performed. A table can also
* be converted back to a DataSet or DataStream.
*
* Packages [[org.apache.flink.api.scala.table]] and [[org.apache.flink.api.java.table]] contain
* the language specific part of the API. Refer to these packages for documentation on how
* the Table API can be used in Java and Scala.
*/
package org.apache.flink.api.java.expressions;
package org.apache.flink.api.table;

0 comments on commit c9519c8

Please sign in to comment.