Skip to content

Commit

Permalink
[FLINK-947] Add parser to Expression API for exposing it to Java
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Mar 29, 2015
1 parent 659ddc0 commit d7d9b63
Show file tree
Hide file tree
Showing 31 changed files with 2,076 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,18 @@ public boolean hasField(String fieldName) {
*/
public abstract String[] getFieldNames();

/**
* True if this type has an inherent ordering of the fields, such that a user can
* always be sure in which order the fields will be in. This is true for Tuples and
* Case Classes. It is not true for Regular Java Objects, since there, the ordering of
* the fields can be arbitrary.
*
* This is used when translating a DataSet or DataStream to an Expression Table, when
* initially renaming the fields of the underlying type.
*/
public boolean hasDeterministicFieldOrder() {
return false;
}
/**
* Returns the field index of the composite field of the given name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,4 +272,9 @@ public String toString() {
bld.append('>');
return bld.toString();
}

@Override
public boolean hasDeterministicFieldOrder() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Package doc wohoooo
*/
package org.apache.flink.api.java.expressions;
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,12 @@


import org.apache.flink.api.expressions.ExpressionOperation;
import org.apache.flink.api.expressions.tree.EqualTo$;
import org.apache.flink.api.expressions.tree.Expression;
import org.apache.flink.api.expressions.tree.Literal$;
import org.apache.flink.api.expressions.tree.UnresolvedFieldReference$;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.scala.expressions.JavaBatchTranslator;
import org.apache.flink.api.java.expressions.ExpressionUtil;

/**
* This is extremely bare-bones. We need a parser that can parse expressions in a String
* and create the correct expression AST. Then we can use expressions like this:
*
* {@code in.select("'field0.avg, 'field1.count") }
* Very simple example that shows how the Java Expression API can be used.
*/
public class JavaExpressionExample {

Expand Down Expand Up @@ -60,17 +52,16 @@ public static void main(String[] args) throws Exception {
DataSet<WC> input = env.fromElements(
new WC("Hello", 1),
new WC("Ciao", 1),
new WC("Hello", 1)
);
new WC("Hello", 1));

ExpressionOperation<JavaBatchTranslator> expr = new JavaBatchTranslator().createExpressionOperation(
input,
new Expression[] { UnresolvedFieldReference$.MODULE$.apply("count"), UnresolvedFieldReference$.MODULE$.apply("word")});
ExpressionOperation expr = ExpressionUtil.from(input);

ExpressionOperation<JavaBatchTranslator> filtered = expr.filter(
EqualTo$.MODULE$.apply(UnresolvedFieldReference$.MODULE$.apply("word"), Literal$.MODULE$.apply("Hello")));
ExpressionOperation filtered = expr
.groupBy("word")
.select("word.count as count, word")
.filter("count = 2");

DataSet<WC> result = (DataSet<WC>) filtered.as(TypeExtractor.createTypeInfo(WC.class));
DataSet<WC> result = ExpressionUtil.toSet(filtered, WC.class);

result.print();
env.execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.expressions.analysis.{GroupByAnalyzer, SelectionAnalyzer,
PredicateAnalyzer}
import org.apache.flink.api.expressions.operations._
import org.apache.flink.api.expressions.parser.ExpressionParser
import org.apache.flink.api.expressions.tree.{ResolvedFieldReference,
UnresolvedFieldReference, Expression}

Expand Down Expand Up @@ -66,6 +67,21 @@ case class ExpressionOperation[A <: OperationTranslator](
this.copy(operation = Select(operation, analyzedFields))
}

/**
* Performs a selection operation. Similar to an SQL SELECT statement. The field expressions
* can contain complex expressions and aggregations.
*
* Example:
*
* {{{
* in.select("key, value.avg + " The average" as average, other.substring(0, 10)")
* }}}
*/
def select(fields: String): ExpressionOperation[A] = {
val fieldExprs = ExpressionParser.parseExpressionList(fields)
select(fieldExprs: _*)
}

/**
* Renames the fields of the expression result. Use this to disambiguate fields before
* joining to operations.
Expand All @@ -84,7 +100,21 @@ case class ExpressionOperation[A <: OperationTranslator](
case false => throw new ExpressionException("Only field expression allowed in as().")
}
this.copy(operation = As(operation, fields.toArray map { _.name }))
}

/**
* Renames the fields of the expression result. Use this to disambiguate fields before
* joining to operations.
*
* Example:
*
* {{{
* in.as("a, b")
* }}}
*/
def as(fields: String): ExpressionOperation[A] = {
val fieldExprs = ExpressionParser.parseExpressionList(fields)
as(fieldExprs: _*)
}

/**
Expand All @@ -110,13 +140,42 @@ case class ExpressionOperation[A <: OperationTranslator](
* Example:
*
* {{{
* in.filter('name === "Fred")
* in.filter("name === 'Fred'")
* }}}
*/
def filter(predicate: String): ExpressionOperation[A] = {
val predicateExpr = ExpressionParser.parseExpression(predicate)
filter(predicateExpr)
}

/**
* Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
* clause.
*
* Example:
*
* {{{
* in.filter(name === "Fred")
* }}}
*/
def where(predicate: Expression): ExpressionOperation[A] = {
filter(predicate)
}

/**
* Filters out elements that don't pass the filter predicate. Similar to a SQL WHERE
* clause.
*
* Example:
*
* {{{
* in.filter("name === 'Fred'")
* }}}
*/
def where(predicate: String): ExpressionOperation[A] = {
filter(predicate)
}

/**
* Groups the elements on some grouping keys. Use this before a selection with aggregations
* to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
Expand All @@ -143,9 +202,25 @@ case class ExpressionOperation[A <: OperationTranslator](
this.copy(operation = GroupBy(operation, analyzedFields))
}

/**
* Groups the elements on some grouping keys. Use this before a selection with aggregations
* to perform the aggregation on a per-group basis. Similar to a SQL GROUP BY statement.
*
* Example:
*
* {{{
* in.groupBy("key").select("key, value.avg")
* }}}
*/
def groupBy(fields: String): ExpressionOperation[A] = {
val fieldsExpr = ExpressionParser.parseExpressionList(fields)
groupBy(fieldsExpr: _*)
}

/**
* Joins to expression operations. Similar to an SQL join. The fields of the two joined
* operations must not overlap, use [[as]] to rename fields if necessary.
* operations must not overlap, use [[as]] to rename fields if necessary. You can use
* where and select clauses after a join to further specify the behaviour of the join.
*
* Example:
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.api.expressions.analysis

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType

/**
* Analyzer for unary predicates, i.e. filter operations.
Expand All @@ -28,5 +27,6 @@ class PredicateAnalyzer(inputFields: Seq[(String, TypeInformation[_])]) extends
new ResolveFieldReferences(inputFields),
new InsertAutoCasts,
new TypeCheck,
new VerifyNoAggregates,
new VerifyBoolean)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.expressions.analysis

import org.apache.flink.api.expressions.ExpressionException
import org.apache.flink.api.expressions.tree.{Aggregation, Expression}

import scala.collection.mutable

/**
* Rule that verifies that an expression does not contain aggregate operations. Right now, join
* predicates and filter predicates cannot contain aggregates.
*/
class VerifyNoAggregates extends Rule {

def apply(expr: Expression) = {
val errors = mutable.MutableList[String]()

val result = expr.transformPre {
case agg: Aggregation=> {
errors +=
s"""Aggregations are not allowed in join/filter predicates."""
agg
}
}

if (errors.length > 0) {
throw new ExpressionException(
s"""Invalid expression "$expr": ${errors.mkString(" ")}""")
}

result

}
}

0 comments on commit d7d9b63

Please sign in to comment.