From dbf8d8687c40034d7def2143a3cdd72a89f2647c Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Thu, 25 May 2017 17:20:56 -0400 Subject: [PATCH] [FLINK-6722] [table] Activate strict checkstyle --- flink-libraries/flink-table/pom.xml | 34 +++ .../flink/table/annotation/TableType.java | 1 + .../flink/table/api/java/package-info.java | 1 + .../org/apache/flink/table/explain/Node.java | 60 +++-- .../flink/table/explain/PlanJsonParser.java | 48 ++-- .../resources/tableSourceConverter.properties | 2 +- .../java/batch/TableEnvironmentITCase.java | 49 +++-- .../api/java/batch/TableSourceITCase.java | 4 + .../java/batch/sql/GroupingSetsITCase.java | 10 +- .../table/api/java/batch/sql/SqlITCase.java | 16 +- .../table/api/java/stream/sql/SqlITCase.java | 32 +-- .../api/java/stream/utils/StreamTestData.java | 3 + .../java/utils/UserDefinedAggFunctions.java | 205 ++++++++++-------- .../utils/UserDefinedScalarFunctions.java | 22 +- .../java/utils/UserDefinedTableFunctions.java | 7 + .../table/api/scala/batch/ExplainTest.scala | 8 +- 16 files changed, 325 insertions(+), 177 deletions(-) diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml index a34fa1b4992ea..bbed554b49cfc 100644 --- a/flink-libraries/flink-table/pom.xml +++ b/flink-libraries/flink-table/pom.xml @@ -251,6 +251,40 @@ under the License. + + org.apache.maven.plugins + maven-checkstyle-plugin + 2.17 + + + com.puppycrawl.tools + checkstyle + 6.19 + + + + /tools/maven/strict-checkstyle.xml + /tools/maven/suppressions.xml + true + true + true + + + + + test-compile + + check + + + + diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java index 3845eaee1ef88..2d2a7af05232c 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/annotation/TableType.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.Public; import org.apache.flink.table.catalog.TableSourceConverter; + import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java index 3dbf50f3613e0..50d41a2c9ff6a 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/api/java/package-info.java @@ -61,6 +61,7 @@ * {@link org.apache.flink.table.api.java.StreamTableEnvironment#toAppendStream(Table, java.lang.Class)}}, or * {@link org.apache.flink.table.api.java.StreamTableEnvironment#toRetractStream(Table, java.lang.Class)}}. */ + package org.apache.flink.table.api.java; import org.apache.flink.table.api.Table; diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/Node.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/Node.java index 4616728973902..6317d0cbac4ac 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/Node.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/Node.java @@ -20,77 +20,93 @@ import java.util.List; +/** + * Field hierarchy of an execution plan. + */ public class Node { private int id; private String type; private String pact; private String contents; private int parallelism; - private String driver_strategy; + private String driverStrategy; private List predecessors; - private List global_properties; - private List local_properties; + private List globalProperties; + private List localProperties; private List estimates; private List costs; - private List compiler_hints; + private List compilerHints; public int getId() { return id; } + public String getType() { return type; } + public String getPact() { return pact; } + public String getContents() { return contents; } + public int getParallelism() { return parallelism; } - public String getDriver_strategy() { - return driver_strategy; + + public String getDriverStrategy() { + return driverStrategy; } + public List getPredecessors() { return predecessors; } - public List getGlobal_properties() { - return global_properties; + + public List getGlobalProperties() { + return globalProperties; } - public List getLocal_properties() { - return local_properties; + + public List getLocalProperties() { + return localProperties; } + public List getEstimates() { return estimates; } + public List getCosts() { return costs; } - public List getCompiler_hints() { - return compiler_hints; + + public List getCompilerHints() { + return compilerHints; } } class Predecessors { - private String ship_strategy; - private String exchange_mode; + private String shipStrategy; + private String exchangeMode; - public String getShip_strategy() { - return ship_strategy; + public String getShipStrategy() { + return shipStrategy; } - public String getExchange_mode() { - return exchange_mode; + + public String getExchangeMode() { + return exchangeMode; } } -class Global_properties { +class GlobalProperties { private String name; private String value; public String getValue() { return value; } + public String getName() { return name; } @@ -103,6 +119,7 @@ class LocalProperty { public String getValue() { return value; } + public String getName() { return name; } @@ -115,6 +132,7 @@ class Estimates { public String getValue() { return value; } + public String getName() { return name; } @@ -127,18 +145,20 @@ class Costs { public String getValue() { return value; } + public String getName() { return name; } } -class Compiler_hints { +class CompilerHints { private String name; private String value; public String getValue() { return value; } + public String getName() { return name; } diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java index f13c042186a2d..ee9b9dadf1368 100644 --- a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/explain/PlanJsonParser.java @@ -18,20 +18,25 @@ package org.apache.flink.table.explain; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategy; import java.io.PrintWriter; import java.io.StringWriter; import java.util.LinkedHashMap; import java.util.List; +/** + * Utility for converting an execution plan from JSON to a human-readable string. + */ public class PlanJsonParser { public static String getSqlExecutionPlan(String t, Boolean extended) throws Exception { ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE); - //not every node is same, ignore the unknown field + // not every node is same, ignore the unknown field objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); PlanTree tree = objectMapper.readValue(t, PlanTree.class); @@ -43,7 +48,7 @@ public static String getSqlExecutionPlan(String t, Boolean extended) throws Exce for (int index = 0; index < tree.getNodes().size(); index++) { Node tempNode = tree.getNodes().get(index); - //input with operation such as join or union is coordinate, keep the same indent + // input with operation such as join or union is coordinate, keep the same indent if ((tempNode.getPact().equals("Data Source")) && (map.containsKey(tempNode.getPact()))) { tabCount = map.get(tempNode.getPact()); } @@ -57,15 +62,15 @@ public static String getSqlExecutionPlan(String t, Boolean extended) throws Exce printTab(tabCount + 1, pw); String content = tempNode.getContents(); - //drop the hashcode of object instance + // drop the hashcode of object instance int dele = tempNode.getContents().indexOf("@"); if (dele > -1) { content = tempNode.getContents().substring(0, dele); } - //replace with certain content if node is dataSource to pass - //unit tests, because java and scala use different api to - //get input element + // replace with certain content if node is dataSource to pass + // unit tests, because java and scala use different api to + // get input element if (tempNode.getPact().equals("Data Source")) { content = "collect elements with CollectionInputFormat"; } @@ -74,35 +79,35 @@ public static String getSqlExecutionPlan(String t, Boolean extended) throws Exce List predecessors = tempNode.getPredecessors(); if (predecessors != null) { printTab(tabCount + 1, pw); - pw.print("ship_strategy : " + predecessors.get(0).getShip_strategy() + "\n"); + pw.print("ship_strategy : " + predecessors.get(0).getShipStrategy() + "\n"); - String mode = predecessors.get(0).getExchange_mode(); + String mode = predecessors.get(0).getExchangeMode(); if (mode != null) { printTab(tabCount + 1, pw); pw.print("exchange_mode : " + mode + "\n"); } } - if (tempNode.getDriver_strategy() != null) { + if (tempNode.getDriverStrategy() != null) { printTab(tabCount + 1, pw); - pw.print("driver_strategy : " + tempNode.getDriver_strategy() + "\n"); + pw.print("driver_strategy : " + tempNode.getDriverStrategy() + "\n"); } - if (tempNode.getGlobal_properties() != null) { + if (tempNode.getGlobalProperties() != null) { printTab(tabCount + 1, pw); - pw.print(tempNode.getGlobal_properties().get(0).getName() + " : " - + tempNode.getGlobal_properties().get(0).getValue() + "\n"); + pw.print(tempNode.getGlobalProperties().get(0).getName() + " : " + + tempNode.getGlobalProperties().get(0).getValue() + "\n"); } if (extended) { - List globalProperties = tempNode.getGlobal_properties(); + List globalProperties = tempNode.getGlobalProperties(); for (int i = 1; i < globalProperties.size(); i++) { printTab(tabCount + 1, pw); pw.print(globalProperties.get(i).getName() + " : " + globalProperties.get(i).getValue() + "\n"); } - List localProperties = tempNode.getLocal_properties(); + List localProperties = tempNode.getLocalProperties(); for (int i = 0; i < localProperties.size(); i++) { printTab(tabCount + 1, pw); pw.print(localProperties.get(i).getName() + " : " @@ -123,11 +128,11 @@ public static String getSqlExecutionPlan(String t, Boolean extended) throws Exce + costs.get(i).getValue() + "\n"); } - List compilerHintses = tempNode.getCompiler_hints(); - for (int i = 0; i < compilerHintses.size(); i++) { + List compilerHints = tempNode.getCompilerHints(); + for (int i = 0; i < compilerHints.size(); i++) { printTab(tabCount + 1, pw); - pw.print(compilerHintses.get(i).getName() + " : " - + compilerHintses.get(i).getValue() + "\n"); + pw.print(compilerHints.get(i).getName() + " : " + + compilerHints.get(i).getValue() + "\n"); } } tabCount++; @@ -138,8 +143,9 @@ public static String getSqlExecutionPlan(String t, Boolean extended) throws Exce } private static void printTab(int tabCount, PrintWriter pw) { - for (int i = 0; i < tabCount; i++) + for (int i = 0; i < tabCount; i++) { pw.print("\t"); + } } } diff --git a/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties b/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties index 1632e1264d616..d548f485fe227 100644 --- a/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties +++ b/flink-libraries/flink-table/src/main/resources/tableSourceConverter.properties @@ -26,4 +26,4 @@ # which offers converters instead of put all information into the # tableSourceConverter.properties of flink-table module. ################################################################################ -scan.packages=org.apache.flink.table.sources \ No newline at end of file +scan.packages=org.apache.flink.table.sources diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java index 3bb283f906278..aac7e1173daa3 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java @@ -18,38 +18,43 @@ package org.apache.flink.table.api.java.batch; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.calcite.tools.RuleSets; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.typeutils.GenericTypeInfo; -import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase; import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase; -import org.apache.flink.types.Row; import org.apache.flink.table.calcite.CalciteConfig; import org.apache.flink.table.calcite.CalciteConfigBuilder; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableException; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.types.Row; + +import org.apache.calcite.tools.RuleSets; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; + import static org.junit.Assert.assertTrue; +/** + * Integration tests for {@link BatchTableEnvironment}. + */ @RunWith(Parameterized.class) public class TableEnvironmentITCase extends TableProgramsCollectionTestBase { @@ -401,7 +406,7 @@ public void testGenericRowWithAlias() throws Exception { BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); // use null value the enforce GenericType - DataSet dataSet = env.fromElements(Row.of((Integer)null)); + DataSet dataSet = env.fromElements(Row.of((Integer) null)); assertTrue(dataSet.getType() instanceof GenericTypeInfo); assertTrue(dataSet.getType().getTypeClass().equals(Row.class)); @@ -482,10 +487,16 @@ public void testCustomCalciteConfig() { // -------------------------------------------------------------------------------------------- + /** + * Non-static class. + */ public class MyNonStatic { public int number; } + /** + * Small POJO. + */ @SuppressWarnings("unused") public static class SmallPojo { @@ -506,6 +517,9 @@ public SmallPojo(String name, int age, double salary, String department, Integer public Integer[] roles; } + /** + * POJO with generic fields. + */ @SuppressWarnings("unused") public static class PojoWithGeneric { public String name; @@ -531,6 +545,9 @@ public String toString() { } } + /** + * Small POJO with private fields. + */ @SuppressWarnings("unused") public static class PrivateSmallPojo { @@ -581,6 +598,9 @@ public void setDepartment(String department) { } } + /** + * Another small POJO. + */ @SuppressWarnings("unused") public static class SmallPojo2 { @@ -606,6 +626,9 @@ public String toString() { } } + /** + * Another small POJO with private fields. + */ @SuppressWarnings("unused") public static class PrivateSmallPojo2 { diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java index a7ccb7e94a8b9..864d4f8c7a1be 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableSourceITCase.java @@ -27,12 +27,16 @@ import org.apache.flink.table.sources.BatchTableSource; import org.apache.flink.table.utils.CommonTestData; import org.apache.flink.types.Row; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import java.util.List; +/** + * Integration tests for {@link BatchTableSource}. + */ @RunWith(Parameterized.class) public class TableSourceITCase extends TableProgramsCollectionTestBase { diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java index 3f611d507ec77..6c1a753169203 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/GroupingSetsITCase.java @@ -18,8 +18,6 @@ package org.apache.flink.table.api.java.batch.sql; -import java.util.Comparator; -import java.util.List; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; @@ -33,11 +31,15 @@ import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.util.TestBaseUtils; import org.apache.flink.types.Row; + import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.util.Comparator; +import java.util.List; + /** * This test should be replaced by a DataSetAggregateITCase. * We should only perform logical unit tests here. @@ -46,8 +48,8 @@ @RunWith(Parameterized.class) public class GroupingSetsITCase extends TableProgramsClusterTestBase { - private final static String TABLE_NAME = "MyTable"; - private final static String TABLE_WITH_NULLS_NAME = "MyTableWithNulls"; + private static final String TABLE_NAME = "MyTable"; + private static final String TABLE_WITH_NULLS_NAME = "MyTableWithNulls"; private BatchTableEnvironment tableEnv; public GroupingSetsITCase(TestExecutionMode mode, TableConfigMode tableConfigMode) { diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java index 114226cb0d20e..f4e5daf658aeb 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/sql/SqlITCase.java @@ -22,17 +22,18 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.typeutils.MapTypeInfo; -import org.apache.flink.api.java.typeutils.TupleTypeInfo; -import org.apache.flink.table.api.java.BatchTableEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.types.Row; -import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.BatchTableEnvironment; +import org.apache.flink.table.api.scala.batch.utils.TableProgramsCollectionTestBase; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; +import org.apache.flink.types.Row; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -42,6 +43,9 @@ import java.util.List; import java.util.Map; +/** + * Integration tests for batch SQL. + */ @RunWith(Parameterized.class) public class SqlITCase extends TableProgramsCollectionTestBase { @@ -136,7 +140,7 @@ public void testJoin() throws Exception { DataSet> ds2 = CollectionDataSets.get5TupleDataSet(env); tableEnv.registerDataSet("t1", ds1, "a, b, c"); - tableEnv.registerDataSet("t2",ds2, "d, e, f, g, h"); + tableEnv.registerDataSet("t2", ds2, "d, e, f, g, h"); String sqlQuery = "SELECT c, g FROM t1, t2 WHERE b = e"; Table result = tableEnv.sql(sqlQuery); diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java index d827cd6450621..92702213c7b67 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/SqlITCase.java @@ -18,27 +18,31 @@ package org.apache.flink.table.api.java.stream.sql; -import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.tuple.Tuple5; -import org.apache.flink.types.Row; -import org.apache.flink.table.api.scala.stream.utils.StreamITCase; -import org.apache.flink.table.api.Table; -import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.table.api.java.stream.utils.StreamTestData; +import org.apache.flink.table.api.scala.stream.utils.StreamITCase; +import org.apache.flink.types.Row; + import org.junit.Test; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.RowTypeInfo; import java.util.ArrayList; import java.util.List; +/** + * Integration tests for streaming SQL. + */ public class SqlITCase extends StreamingMultipleProgramsTestBase { - + @Test public void testRowRegisterRowWithNames() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -49,17 +53,17 @@ public void testRowRegisterRowWithNames() throws Exception { data.add(Row.of(1, 1L, "Hi")); data.add(Row.of(2, 2L, "Hello")); data.add(Row.of(3, 2L, "Hello world")); - + TypeInformation[] types = { BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO}; - String names[] = {"a","b","c"}; - + String[] names = {"a", "b", "c"}; + RowTypeInfo typeInfo = new RowTypeInfo(types, names); - + DataStream ds = env.fromCollection(data).returns(typeInfo); - + Table in = tableEnv.fromDataStream(ds, "a,b,c"); tableEnv.registerTable("MyTableRow", in); diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java index 139801fa1d5c0..a23bc5a74a389 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/utils/StreamTestData.java @@ -27,6 +27,9 @@ import java.util.Collections; import java.util.List; +/** + * Test data. + */ public class StreamTestData { public static DataStream> getSmall3TupleDataSet(StreamExecutionEnvironment env) { diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java index a51a4af81993c..94c5c90c9ef9e 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java @@ -15,6 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.table.api.java.utils; import org.apache.flink.api.java.tuple.Tuple2; @@ -22,98 +23,116 @@ import java.util.Iterator; +/** + * Test aggregator functions. + */ public class UserDefinedAggFunctions { - // Accumulator for test requiresOver - public static class Accumulator0 extends Tuple2{} - - // Test for requiresOver - public static class OverAgg0 extends AggregateFunction { - @Override - public Accumulator0 createAccumulator() { - return new Accumulator0(); - } - - @Override - public Long getValue(Accumulator0 accumulator) { - return 1L; - } - - //Overloaded accumulate method - public void accumulate(Accumulator0 accumulator, long iValue, int iWeight) { - } - - @Override - public boolean requiresOver() { - return true; - } - } - - // Accumulator for WeightedAvg - public static class WeightedAvgAccum extends Tuple2 { - public long sum = 0; - public int count = 0; - } - - // Base class for WeightedAvg - public static class WeightedAvg extends AggregateFunction { - @Override - public WeightedAvgAccum createAccumulator() { - return new WeightedAvgAccum(); - } - - @Override - public Long getValue(WeightedAvgAccum accumulator) { - if (accumulator.count == 0) - return null; - else - return accumulator.sum/accumulator.count; - } - - //Overloaded accumulate method - public void accumulate(WeightedAvgAccum accumulator, long iValue, int iWeight) { - accumulator.sum += iValue * iWeight; - accumulator.count += iWeight; - } - - //Overloaded accumulate method - public void accumulate(WeightedAvgAccum accumulator, int iValue, int iWeight) { - accumulator.sum += iValue * iWeight; - accumulator.count += iWeight; - } - } - - // A WeightedAvg class with merge method - public static class WeightedAvgWithMerge extends WeightedAvg { - public void merge(WeightedAvgAccum acc, Iterable it) { - Iterator iter = it.iterator(); - while (iter.hasNext()) { - WeightedAvgAccum a = iter.next(); - acc.count += a.count; - acc.sum += a.sum; - } - } - } - - // A WeightedAvg class with merge and reset method - public static class WeightedAvgWithMergeAndReset extends WeightedAvgWithMerge { - public void resetAccumulator(WeightedAvgAccum acc) { - acc.count = 0; - acc.sum = 0L; - } - } - - // A WeightedAvg class with retract method - public static class WeightedAvgWithRetract extends WeightedAvg { - //Overloaded retract method - public void retract(WeightedAvgAccum accumulator, long iValue, int iWeight) { - accumulator.sum -= iValue * iWeight; - accumulator.count -= iWeight; - } - - //Overloaded retract method - public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) { - accumulator.sum -= iValue * iWeight; - accumulator.count -= iWeight; - } - } + /** + * Accumulator for test requiresOver. + */ + public static class Accumulator0 extends Tuple2{} + + /** + * Test for requiresOver. + */ + public static class OverAgg0 extends AggregateFunction { + @Override + public Accumulator0 createAccumulator() { + return new Accumulator0(); + } + + @Override + public Long getValue(Accumulator0 accumulator) { + return 1L; + } + + //Overloaded accumulate method + public void accumulate(Accumulator0 accumulator, long iValue, int iWeight) { + } + + @Override + public boolean requiresOver() { + return true; + } + } + + /** + * Accumulator for WeightedAvg. + */ + public static class WeightedAvgAccum extends Tuple2 { + public long sum = 0; + public int count = 0; + } + + /** + * Base class for WeightedAvg. + */ + public static class WeightedAvg extends AggregateFunction { + @Override + public WeightedAvgAccum createAccumulator() { + return new WeightedAvgAccum(); + } + + @Override + public Long getValue(WeightedAvgAccum accumulator) { + if (accumulator.count == 0) { + return null; + } else { + return accumulator.sum / accumulator.count; + } + } + + // overloaded accumulate method + public void accumulate(WeightedAvgAccum accumulator, long iValue, int iWeight) { + accumulator.sum += iValue * iWeight; + accumulator.count += iWeight; + } + + //Overloaded accumulate method + public void accumulate(WeightedAvgAccum accumulator, int iValue, int iWeight) { + accumulator.sum += iValue * iWeight; + accumulator.count += iWeight; + } + } + + /** + * A WeightedAvg class with merge method. + */ + public static class WeightedAvgWithMerge extends WeightedAvg { + public void merge(WeightedAvgAccum acc, Iterable it) { + Iterator iter = it.iterator(); + while (iter.hasNext()) { + WeightedAvgAccum a = iter.next(); + acc.count += a.count; + acc.sum += a.sum; + } + } + } + + /** + * A WeightedAvg class with merge and reset method. + */ + public static class WeightedAvgWithMergeAndReset extends WeightedAvgWithMerge { + public void resetAccumulator(WeightedAvgAccum acc) { + acc.count = 0; + acc.sum = 0L; + } + } + + /** + * A WeightedAvg class with retract method. + */ + public static class WeightedAvgWithRetract extends WeightedAvg { + //Overloaded retract method + public void retract(WeightedAvgAccum accumulator, long iValue, int iWeight) { + accumulator.sum -= iValue * iWeight; + accumulator.count -= iWeight; + } + + //Overloaded retract method + public void retract(WeightedAvgAccum accumulator, int iValue, int iWeight) { + accumulator.sum -= iValue * iWeight; + accumulator.count -= iWeight; + } + } } diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java index 1e5fabe942c6f..214dbeaa55f5d 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java @@ -15,25 +15,39 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.table.api.java.utils; -import java.util.Arrays; import org.apache.flink.table.functions.ScalarFunction; +import java.util.Arrays; + +/** + * Test scalar functions. + */ public class UserDefinedScalarFunctions { + /** + * Increment input. + */ public static class JavaFunc0 extends ScalarFunction { public long eval(Long l) { return l + 1; } } + /** + * Concatenate inputs as strings. + */ public static class JavaFunc1 extends ScalarFunction { public String eval(Integer a, int b, Long c) { return a + " and " + b + " and " + c; } } + /** + * Append product to string. + */ public static class JavaFunc2 extends ScalarFunction { public String eval(String s, Integer... a) { int m = 1; @@ -44,6 +58,9 @@ public String eval(String s, Integer... a) { } } + /** + * Test overloading. + */ public static class JavaFunc3 extends ScalarFunction { public int eval(String a, int... b) { return b.length; @@ -54,6 +71,9 @@ public String eval(String c) { } } + /** + * Concatenate arrays as strings. + */ public static class JavaFunc4 extends ScalarFunction { public String eval(Integer[] a, String[] b) { return Arrays.toString(a) + " and " + Arrays.toString(b); diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedTableFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedTableFunctions.java index 3af8646c4f331..63c07ed9bbe53 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedTableFunctions.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedTableFunctions.java @@ -15,12 +15,19 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.flink.table.api.java.utils; import org.apache.flink.table.functions.TableFunction; +/** + * Test functions. + */ public class UserDefinedTableFunctions { + /** + * Emit inputs as long. + */ public static class JavaTableFunc0 extends TableFunction { public void eval(Integer a, Long b, Long c) { collect(a.longValue()); diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala index 1a6b31491e57a..10af4d7ac3fb0 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/ExplainTest.scala @@ -42,7 +42,7 @@ class ExplainTest val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") val source = scala.io.Source.fromFile(testFilePath + "../../src/test/scala/resources/testFilter0.out").mkString.replaceAll("\\r\\n", "\n") - assertEquals(result, source) + assertEquals(source, result) } @Test @@ -57,7 +57,7 @@ class ExplainTest val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n") val source = scala.io.Source.fromFile(testFilePath + "../../src/test/scala/resources/testFilter1.out").mkString.replaceAll("\\r\\n", "\n") - assertEquals(result, source) + assertEquals(source, result) } @Test @@ -102,7 +102,7 @@ class ExplainTest val result = tEnv.explain(table).replaceAll("\\r\\n", "\n") val source = scala.io.Source.fromFile(testFilePath + "../../src/test/scala/resources/testUnion0.out").mkString.replaceAll("\\r\\n", "\n") - assertEquals(result, source) + assertEquals(source, result) } @Test @@ -117,7 +117,7 @@ class ExplainTest val result = tEnv.explain(table, true).replaceAll("\\r\\n", "\n") val source = scala.io.Source.fromFile(testFilePath + "../../src/test/scala/resources/testUnion1.out").mkString.replaceAll("\\r\\n", "\n") - assertEquals(result, source) + assertEquals(source, result) } }