diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateAddCols.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateAddCols.java index 6d7783904..a7afcf850 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateAddCols.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateAddCols.java @@ -26,8 +26,8 @@ public class AggregateAddCols implements FunctionDescriptor.SerializableFunction { final List aggregateCalls; - - public AggregateAddCols(final List aggregateCalls){ + + public AggregateAddCols(final List aggregateCalls) { this.aggregateCalls = aggregateCalls; } @@ -36,7 +36,7 @@ public Record apply(final Record record) { final int l = record.size(); final int newRecordSize = l + aggregateCalls.size() + 1; final Object[] resValues = new Object[newRecordSize]; - + for (int i = 0; i < l; i++) { resValues[i] = record.getField(i); } diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateFunction.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateFunction.java index bc046bd66..c60f64f5f 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateFunction.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateFunction.java @@ -17,6 +17,7 @@ */ package org.apache.wayang.api.sql.calcite.converter.functions; +import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.function.BiFunction; @@ -39,7 +40,7 @@ public AggregateFunction(final List aggregateCalls) { public Record apply(final Record record1, final Record record2) { final int l = record1.size(); final Object[] resValues = new Object[l]; - final boolean countDone = false; + boolean countDone = false; for (int i = 0; i < l - aggregateCalls.size() - 1; i++) { resValues[i] = record1.getField(i); @@ -61,25 +62,33 @@ public Record apply(final Record record1, final Record record2) { case MAX: resValues[counter] = this.castAndMap(field1, field2, SqlFunctions::greatest, SqlFunctions::greatest, SqlFunctions::greatest, SqlFunctions::greatest); + break; case COUNT: // since aggregates inject an extra column for counting before, // see AggregateAddCols. the column we operate on are integer counts, // which means we can eagerly get the fields as integers and simply sum assert (field1 instanceof Integer && field2 instanceof Integer) : "Expected to find integers for count but found: " + field1 + " and " + field2; - Object obj = Integer.class.cast(field1) + Integer.class.cast(field2); - resValues[counter] = obj; + final Object count = Integer.class.cast(field1) + Integer.class.cast(field2); + resValues[counter] = count; break; case AVG: - throw new UnsupportedOperationException("Averages not currently supported"); - // resValues[counter] = this.castAndMap(field1, field2, null, null, null, null); - // break; + assert (field1 instanceof Integer && field2 instanceof Integer) + : "Expected to find integers for count but found: " + field1 + " and " + field2; + final Object avg = Integer.class.cast(field1) + Integer.class.cast(field2); + + resValues[counter] = avg; + + if (!countDone) { + resValues[l - 1] = record1.getInt(l - 1) + record2.getInt(l - 1); + countDone = true; + } + break; default: throw new IllegalStateException("Unsupported operation: " + aggregateCall.getAggregation().kind); } counter++; } - return new Record(resValues); } diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateGetResult.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateGetResult.java index caf6b68e5..7274ab78d 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateGetResult.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateGetResult.java @@ -18,10 +18,14 @@ package org.apache.wayang.api.sql.calcite.converter.functions; +import java.util.Arrays; import java.util.List; import java.util.Set; +import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.calcite.rel.core.AggregateCall; +import org.apache.calcite.sql.SqlKind; import org.apache.wayang.basic.data.Record; import org.apache.wayang.core.function.FunctionDescriptor; @@ -36,31 +40,22 @@ public AggregateGetResult(final List aggregateCalls, final Set aggregateCallList.get(i).getAggregation().getKind().equals(SqlKind.AVG) + ? record.getDouble(i + aggregateCallOffset) / record.getDouble(recordSize - 1) + : record.getField(i + aggregateCallOffset)) + .toArray(); + + final Object[] combinedFields = Stream.concat(Arrays.stream(fields), Arrays.stream(aggregateCallFields)) + .toArray(); + + return new Record(combinedFields); } } \ No newline at end of file diff --git a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java index daa35f30a..7ff81456a 100755 --- a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java +++ b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java @@ -77,586 +77,602 @@ public class SqlToWayangRelTest { - /** - * Method for building {@link WayangPlan}s useful for testing, benchmarking and - * other - * usages where you want to handle the intermediate {@link WayangPlan} - * - * @param sql sql query string with the {@code ;} cut off - * @param udfJars - * @return a {@link WayangPlan} of a given sql string - * @throws SqlParseException - * @throws SQLException - */ - public Tuple2, WayangPlan> buildCollectorAndWayangPlan(final SqlContext context, - final String sql, final String... udfJars) throws SqlParseException, SQLException { - final Properties configProperties = Optimizer.ConfigProperties.getDefaults(); - final RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl(); - - final Optimizer optimizer = Optimizer.create( - SchemaUtils.getSchema(context.getConfiguration()), - configProperties, - relDataTypeFactory); - - final SqlNode sqlNode = optimizer.parseSql(sql); - final SqlNode validatedSqlNode = optimizer.validate(sqlNode); - final RelNode relNode = optimizer.convert(validatedSqlNode); - - final RuleSet rules = RuleSets.ofList( - CoreRules.FILTER_INTO_JOIN, - WayangRules.WAYANG_TABLESCAN_RULE, - WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, - WayangRules.WAYANG_PROJECT_RULE, - WayangRules.WAYANG_FILTER_RULE, - WayangRules.WAYANG_JOIN_RULE, - WayangRules.WAYANG_AGGREGATE_RULE, - WayangRules.WAYANG_SORT_RULE); - - final RelNode wayangRel = optimizer.optimize( - relNode, - relNode.getTraitSet().plus(WayangConvention.INSTANCE), - rules); - - final Collection collector = new ArrayList<>(); - - final WayangPlan wayangPlan = optimizer.convertWithConfig(wayangRel, context.getConfiguration(), - collector); - - return new Tuple2<>(collector, wayangPlan); - } - - @Test - public void javaJoinTest() throws Exception { - final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex JOIN fs.exampleRefToRef ON largeLeftTableIndex.NAMEA = exampleRefToRef.NAMEA"); - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - // except reduce by - PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { - node.addTargetPlatform(Java.platform()); - }); - - sqlContext.execute(wayangPlan); - - assert (result.stream() - .anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2", "test1", "test1")))); - } - - @Test - public void javaMultiConditionJoin() throws Exception { - final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex JOIN fs.exampleRefToRef ON largeLeftTableIndex.NAMEB = exampleRefToRef.NAMEB AND largeLeftTableIndex.NAMEC = exampleRefToRef.NAMEB"); - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - // except reduce by - PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { - node.addTargetPlatform(Java.platform()); - }); - - sqlContext.execute(wayangPlan); - - final boolean checkEq = result.stream() - .allMatch(rec -> rec.equals(new Record("", "test2", "test2", "", "test2"))); - - assert (checkEq); - } - - @Test - public void aggregateCountInJavaWithIntegers() throws Exception { - final SqlContext sqlContext = this.createSqlContext("/data/exampleInt.csv"); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT exampleInt.NAMEC, COUNT(*) FROM fs.exampleInt GROUP BY NAMEC"); - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - // except reduce by - PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { - node.addTargetPlatform(Java.platform()); - }); - - sqlContext.execute(wayangPlan); - - final Record rec = result.stream().findFirst().get(); - assert (rec.size() == 2); - assert (rec.getInt(1) == 3); - } - - @Test - public void aggregateCountInJava() throws Exception { - final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT largeLeftTableIndex.NAMEC, COUNT(*) FROM fs.largeLeftTableIndex GROUP BY NAMEC"); - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - // except reduce by - PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { - node.addTargetPlatform(Java.platform()); - }); - - sqlContext.execute(wayangPlan); - - final Record rec = result.stream().findFirst().get(); - assert (rec.size() == 2); - assert (rec.getInt(1) == 3); - } - - @Test - public void filterIsNull() throws Exception { - final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA IS NULL)" // - ); - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); - assert (result.size() == 0); - } - - @Test - public void filterIsNotValue() throws Exception { - final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA <> 'test1')" // - ); - - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - sqlContext.execute(wayangPlan); - - assert (!result.stream().anyMatch(record -> record.getField(0).equals("test1"))); - } - - @Test - public void filterIsNotNull() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + /** + * Method for building {@link WayangPlan}s useful for testing, benchmarking and + * other + * usages where you want to handle the intermediate {@link WayangPlan} + * + * @param sql sql query string with the {@code ;} cut off + * @param udfJars + * @return a {@link WayangPlan} of a given sql string + * @throws SqlParseException + * @throws SQLException + */ + public Tuple2, WayangPlan> buildCollectorAndWayangPlan(final SqlContext context, + final String sql, final String... udfJars) throws SqlParseException, SQLException { + final Properties configProperties = Optimizer.ConfigProperties.getDefaults(); + final RelDataTypeFactory relDataTypeFactory = new JavaTypeFactoryImpl(); + + final Optimizer optimizer = Optimizer.create( + SchemaUtils.getSchema(context.getConfiguration()), + configProperties, + relDataTypeFactory); + + final SqlNode sqlNode = optimizer.parseSql(sql); + final SqlNode validatedSqlNode = optimizer.validate(sqlNode); + final RelNode relNode = optimizer.convert(validatedSqlNode); + + final RuleSet rules = RuleSets.ofList( + CoreRules.FILTER_INTO_JOIN, + WayangRules.WAYANG_TABLESCAN_RULE, + WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, + WayangRules.WAYANG_PROJECT_RULE, + WayangRules.WAYANG_FILTER_RULE, + WayangRules.WAYANG_JOIN_RULE, + WayangRules.WAYANG_AGGREGATE_RULE, + WayangRules.WAYANG_SORT_RULE); + + final RelNode wayangRel = optimizer.optimize( + relNode, + relNode.getTraitSet().plus(WayangConvention.INSTANCE), + rules); + + final Collection collector = new ArrayList<>(); + + final WayangPlan wayangPlan = optimizer.convertWithConfig(wayangRel, context.getConfiguration(), + collector); + + return new Tuple2<>(collector, wayangPlan); + } + + @Test + public void javaJoinTest() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex JOIN fs.exampleRefToRef ON largeLeftTableIndex.NAMEA = exampleRefToRef.NAMEA"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + // except reduce by + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Java.platform()); + }); + + sqlContext.execute(wayangPlan); + + assert (result.stream() + .anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2", "test1", "test1")))); + } + + @Test + public void javaMultiConditionJoin() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex JOIN fs.exampleRefToRef ON largeLeftTableIndex.NAMEB = exampleRefToRef.NAMEB AND largeLeftTableIndex.NAMEC = exampleRefToRef.NAMEB"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + // except reduce by + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Java.platform()); + }); + + sqlContext.execute(wayangPlan); + + final boolean checkEq = result.stream() + .allMatch(rec -> rec.equals(new Record("", "test2", "test2", "", "test2"))); + + assert (checkEq); + } + + @Test + public void aggregateCountInJavaWithIntegers() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/exampleInt.csv"); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT exampleInt.NAMEC, COUNT(*) FROM fs.exampleInt GROUP BY NAMEC"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + // except reduce by + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Java.platform()); + }); + + sqlContext.execute(wayangPlan); + + final Record rec = result.stream().findFirst().get(); + assert (rec.size() == 2); + assert (rec.getInt(1) == 3); + } + + @Test + public void aggregateCountInJava() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT largeLeftTableIndex.NAMEC, COUNT(*) FROM fs.largeLeftTableIndex GROUP BY NAMEC"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + // except reduce by + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Java.platform()); + }); + + sqlContext.execute(wayangPlan); + + final Record rec = result.stream().findFirst().get(); + assert (rec.size() == 2); + assert (rec.getInt(1) == 3); + } + + @Test + public void filterIsNull() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA IS NULL)" // + ); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); + assert (result.size() == 0); + } + + @Test + public void javaAverage() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/exampleSort.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT AVG(col1) FROM fs.exampleSort" // + ); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + sqlContext.execute(wayangPlan); + + assert (result.size() == 1); + assert (result.stream().findFirst().get().getDouble(0) == 0.875f); + } + + @Test + public void filterNotEqualsValue() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA IS NOT NULL)" // - ); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA <> 'test1')" // + ); + + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + sqlContext.execute(wayangPlan); + + assert (!result.stream().anyMatch(record -> record.getField(0).equals("test1"))); + } - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); + @Test + public void filterIsNotNull() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - assert (!result.stream().anyMatch(record -> record.getField(0).equals(null))); - } + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA IS NOT NULL)" // + ); - @Test - public void javaReduceBy() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan( - sqlContext, - "select exampleSmallA.COLA, count(*) from fs.exampleSmallA group by exampleSmallA.COLA"); + assert (!result.stream().anyMatch(record -> record.getField(0).equals(null))); + } - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; + @Test + public void javaReduceBy() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { - node.addTargetPlatform(Java.platform()); - }); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan( + sqlContext, + "select exampleSmallA.COLA, count(*) from fs.exampleSmallA group by exampleSmallA.COLA"); - sqlContext.execute(wayangPlan); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; - assert (result.stream().anyMatch(rec -> rec.equals(new Record("item1", 2)))); - } + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Java.platform()); + }); - @Test - public void javaCrossJoin() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + sqlContext.execute(wayangPlan); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan( - sqlContext, - "select * from fs.exampleSmallA cross join fs.exampleSmallB"); + assert (result.stream().anyMatch(rec -> rec.equals(new Record("item1", 2)))); + } - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; + @Test + public void javaCrossJoin() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - sqlContext.execute(wayangPlan); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan( + sqlContext, + "select * from fs.exampleSmallA cross join fs.exampleSmallB"); - final List shouldBe = List.of( - new Record("item1", "item2", "item1", "item2", "item3"), - new Record("item1", "item2", "item1", "item2", "item3"), - new Record("item1", "item2", "item1", "item2", "item3"), - new Record("item1", "item2", "item1", "item2", "item3"), - new Record("item1", "item2", "x", "x", "x"), - new Record("item1", "item2", "x", "x", "x")); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; - final Map resultTally = result.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - final Map shouldBeTally = shouldBe.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + sqlContext.execute(wayangPlan); - assert (resultTally.equals(shouldBeTally)); - } + final List shouldBe = List.of( + new Record("item1", "item2", "item1", "item2", "item3"), + new Record("item1", "item2", "item1", "item2", "item3"), + new Record("item1", "item2", "item1", "item2", "item3"), + new Record("item1", "item2", "item1", "item2", "item3"), + new Record("item1", "item2", "x", "x", "x"), + new Record("item1", "item2", "x", "x", "x")); - @Test - public void filterWithNotLike() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + final Map resultTally = result.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + final Map shouldBeTally = shouldBe.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA NOT LIKE '_est1')" // - ); + assert (resultTally.equals(shouldBeTally)); + } - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); + @Test + public void filterWithNotLike() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - assert (!result.stream().anyMatch(record -> record.getString(0).equals("test1"))); - } + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex WHERE (largeLeftTableIndex.NAMEA NOT LIKE '_est1')" // + ); - @Test - public void filterWithLike() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex WHERE largeLeftTableIndex.NAMEA LIKE '_est1'" // - ); + assert (!result.stream().anyMatch(record -> record.getString(0).equals("test1"))); + } - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); + @Test + public void filterWithLike() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2")))); - } + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex WHERE largeLeftTableIndex.NAMEA LIKE '_est1'" // + ); - @Test - public void javaLimit() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/exampleSort.csv"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT col1, col2, col3, count(*) as total from fs.exampleSort group by col1, col2, col3 order by col1 desc, col2, col3 desc LIMIT 1"); + assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1", "test2")))); + } - final Collection r = t.field0; - final WayangPlan wayangPlan = t.field1; + @Test + public void javaLimit() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/exampleSort.csv"); - sqlContext.execute(wayangPlan); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT col1, col2, col3, count(*) as total from fs.exampleSort group by col1, col2, col3 order by col1 desc, col2, col3 desc LIMIT 1"); - final List result = r.stream().collect(Collectors.toList()); + final Collection r = t.field0; + final WayangPlan wayangPlan = t.field1; - assert (result.get(0).equals(new Record(2, "a", "a", 2))); - } + sqlContext.execute(wayangPlan); - @Test - public void javaSort() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/exampleSort.csv"); + final List result = r.stream().collect(Collectors.toList()); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT col1, col2, col3, count(*) as total from fs.exampleSort group by col1, col2, col3 order by col1 desc, col2, col3 desc"); + assert (result.get(0).equals(new Record(2, "a", "a", 2))); + } - final Collection r = t.field0; - final WayangPlan wayangPlan = t.field1; + @Test + public void javaSort() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/exampleSort.csv"); - sqlContext.execute(wayangPlan); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT col1, col2, col3, count(*) as total from fs.exampleSort group by col1, col2, col3 order by col1 desc, col2, col3 desc"); - final List result = r.stream().collect(Collectors.toList()); + final Collection r = t.field0; + final WayangPlan wayangPlan = t.field1; - assert (result.get(0).equals(new Record(2, "a", "a", 2))); - assert (result.get(1).equals(new Record(1, "a", "b", 1))); - assert (result.get(2).equals(new Record(1, "a", "a", 1))); - assert (result.get(3).equals(new Record(1, "b", "b", 1))); - assert (result.get(4).equals(new Record(0, "a", "b", 1))); - assert (result.get(5).equals(new Record(0, "a", "a", 1))); - assert (result.get(6).equals(new Record(0, "b", "b", 1))); - } + sqlContext.execute(wayangPlan); - @Test - public void joinWithLargeLeftTableIndexCorrect() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + final List result = r.stream().collect(Collectors.toList()); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON na.NAMEB = nb.NAMEA " // - ); + assert (result.get(0).equals(new Record(2, "a", "a", 2))); + assert (result.get(1).equals(new Record(1, "a", "b", 1))); + assert (result.get(2).equals(new Record(1, "a", "a", 1))); + assert (result.get(3).equals(new Record(1, "b", "b", 1))); + assert (result.get(4).equals(new Record(0, "a", "b", 1))); + assert (result.get(5).equals(new Record(0, "a", "a", 1))); + assert (result.get(6).equals(new Record(0, "b", "b", 1))); + } - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); + @Test + public void joinWithLargeLeftTableIndexCorrect() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - final List shouldBe = List.of( - new Record("test1", "test1", "test2", "test1", "test1", "test2"), - new Record("test2", "", "test2", "", "test2", "test2"), - new Record("", "test2", "test2", "test2", "", "test2")); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON na.NAMEB = nb.NAMEA " // + ); - final Map resultTally = result.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - final Map shouldBeTally = shouldBe.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); - assert (resultTally.equals(shouldBeTally)); - } + final List shouldBe = List.of( + new Record("test1", "test1", "test2", "test1", "test1", "test2"), + new Record("test2", "", "test2", "", "test2", "test2"), + new Record("", "test2", "test2", "test2", "", "test2")); - // Imagine case: l = {item1, item2}, r = {item3,item4}, j = {item1, item2, - // item3, item4} join on =($1,$3) would be =(item2, item4) in the join set - // however from the r set we need to factor in the - // offset, $3 -> 3 - l.size() = $1, r($1) = "item4" we cannot naively assume - // that it is always ordered as =(lRef,rRef), lRef < rRef. - // it may also be =($3,$1) - @Test - public void joinWithLargeLeftTableIndexMirrorAlias() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + final Map resultTally = result.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + final Map shouldBeTally = shouldBe.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON nb.NAMEB = na.NAMEA " // - ); + assert (resultTally.equals(shouldBeTally)); + } - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); + // Imagine case: l = {item1, item2}, r = {item3,item4}, j = {item1, item2, + // item3, item4} join on =($1,$3) would be =(item2, item4) in the join set + // however from the r set we need to factor in the + // offset, $3 -> 3 - l.size() = $1, r($1) = "item4" we cannot naively assume + // that it is always ordered as =(lRef,rRef), lRef < rRef. + // it may also be =($3,$1) + @Test + public void joinWithLargeLeftTableIndexMirrorAlias() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - final List shouldBe = List.of( - new Record("test1", "test1", "test2", "test1", "test1", "test2"), - new Record("test2", "", "test2", "", "test2", "test2"), - new Record("", "test2", "test2", "test2", "", "test2")); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON nb.NAMEB = na.NAMEA " // + ); - final Map resultTally = result.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - final Map shouldBeTally = shouldBe.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); - assert (resultTally.equals(shouldBeTally)); - } - - // @Test - public void sparkFilter() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + final List shouldBe = List.of( + new Record("test1", "test1", "test2", "test1", "test1", "test2"), + new Record("test2", "", "test2", "", "test2", "test2"), + new Record("", "test2", "test2", "test2", "", "test2")); - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex AS na WHERE na.NAMEA = 'test1'" // - ); - - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { - node.addTargetPlatform(Spark.platform()); - }); - - sqlContext.execute(wayangPlan); - - assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1")))); - } - - // tests sql-apis ability to serialize projections and joins - @Test - public void sparkInnerJoin() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON nb.NAMEB = na.NAMEA " // - ); - - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - - PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { - node.addTargetPlatform(Spark.platform()); - }); - - sqlContext.execute(wayangPlan); - - final List shouldBe = List.of( - new Record("test1", "test1", "test2", "test1", "test1", "test2"), - new Record("test2", "", "test2", "", "test2", "test2"), - new Record("", "test2", "test2", "test2", "", "test2")); - - final Map resultTally = result.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - final Map shouldBeTally = shouldBe.stream() - .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); - - assert (resultTally.equals(shouldBeTally)); - } - - // @Test - public void rexSerializationTest() throws Exception { - // create filterPredicateImpl for serialisation - final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(); - final RexBuilder rb = new RexBuilder(typeFactory); - final RexNode leftOperand = rb.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0); - final RexNode rightOperand = rb.makeLiteral("test"); - final RexNode cond = rb.makeCall(SqlStdOperatorTable.EQUALS, leftOperand, rightOperand); - final SerializablePredicate fpImpl = new FilterPredicateImpl(cond); - - final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - final ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); - objectOutputStream.writeObject(fpImpl); - objectOutputStream.close(); - - final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream( - byteArrayOutputStream.toByteArray()); - final ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); - final Object deserializedObject = objectInputStream.readObject(); - objectInputStream.close(); - - assert (((FilterPredicateImpl) deserializedObject).test(new Record("test"))); - } - - @Test - public void exampleFilterTableRefToTableRef() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/exampleRefToRef.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT * FROM fs.exampleRefToRef WHERE exampleRefToRef.NAMEA = exampleRefToRef.NAMEB" // - ); - - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); - - assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1")))); - } - - @Test - public void exampleMinWithStrings() throws Exception { - final SqlContext sqlContext = createSqlContext("/data/exampleMin.csv"); - - final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, - "SELECT MIN(exampleMin.NAME) FROM fs.exampleMin" // - ); - final Collection result = t.field0; - final WayangPlan wayangPlan = t.field1; - sqlContext.execute(wayangPlan); - - assert (result.stream().findAny().get().getString(0).equals("AA")); - } - - public void test_simple_sql() throws Exception { - final WayangTable customer = WayangTableBuilder.build("customer") - .addField("id", SqlTypeName.INTEGER) - .addField("name", SqlTypeName.VARCHAR) - .addField("age", SqlTypeName.INTEGER) - .withRowCount(100) - .build(); - - final WayangTable orders = WayangTableBuilder.build("orders") - .addField("id", SqlTypeName.INTEGER) - .addField("cid", SqlTypeName.INTEGER) - .addField("price", SqlTypeName.DECIMAL) - .addField("quantity", SqlTypeName.INTEGER) - .withRowCount(100) - .build(); - - final WayangSchema wayangSchema = WayangSchemaBuilder.build("exSchema") - .addTable(customer) - .addTable(orders) - .build(); - - final Optimizer optimizer = Optimizer.create(wayangSchema); - - // String sql = "select c.name, c.age from customer c where (c.age < 40 or c.age - // > 60) and \'alex\' = c.name"; - // String sql = "select c.age from customer c"; - final String sql = "select c.name, c.age, o.price from customer c join orders o on c.id = o.cid where c.age > 40 " - + - "and o" + - ".price < 100"; - - final SqlNode sqlNode = optimizer.parseSql(sql); - final SqlNode validatedSqlNode = optimizer.validate(sqlNode); - final RelNode relNode = optimizer.convert(validatedSqlNode); - - print("After parsing", relNode); - - final RuleSet rules = RuleSets.ofList( - WayangRules.WAYANG_TABLESCAN_RULE, - WayangRules.WAYANG_PROJECT_RULE, - WayangRules.WAYANG_FILTER_RULE, - WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, - WayangRules.WAYANG_JOIN_RULE, - WayangRules.WAYANG_AGGREGATE_RULE); - - final RelNode wayangRel = optimizer.optimize( - relNode, - relNode.getTraitSet().plus(WayangConvention.INSTANCE), - rules); - - print("After rel to wayang conversion", wayangRel); - - // WayangPlan plan = optimizer.convert(wayangRel); - - // print("After Translating to WayangPlan", plan); - - } - - private SqlContext createSqlContext(final String tableResourceName) - throws IOException, ParseException, SQLException { - final String calciteModel = "{\r\n" + // - " \"calcite\": {\r\n" + // - " \"version\": \"1.0\",\r\n" + // - " \"defaultSchema\": \"wayang\",\r\n" + // - " \"schemas\": [\r\n" + // - " {\r\n" + // - " \"name\": \"fs\",\r\n" + // - " \"type\": \"custom\",\r\n" + // - " \"factory\": \"org.apache.calcite.adapter.file.FileSchemaFactory\",\r\n" + // - " \"operand\": {\r\n" + // - " \"directory\": \"" + "/" + this.getClass().getResource("/data").getPath() - + "\"\r\n" + // - " }\r\n" + // - " }\r\n" + // - " ]\r\n" + // - " },\r\n" + // - " \"separator\": \";\"\r\n" + // - " }\r\n" + // - " \r\n" + // - " \r\n" + // - ""; - - final JSONObject calciteModelJSON = (JSONObject) new JSONParser().parse(calciteModel); - final Configuration configuration = new ModelParser(new Configuration(), calciteModelJSON) - .setProperties(); - assert (configuration != null) - : "Could not get configuration with calcite model: " + calciteModel; - - final String dataPath = this.getClass().getResource(tableResourceName).getPath(); - assert (dataPath != null && dataPath != "") - : "Could not get table resource from path: " + tableResourceName; - - configuration.setProperty("wayang.fs.table.url", dataPath); - - configuration.setProperty( - "wayang.ml.executions.file", - "mle" + ".txt"); - - configuration.setProperty( - "wayang.ml.optimizations.file", - "mlo" + ".txt"); - - configuration.setProperty("wayang.ml.experience.enabled", "false"); - - return new SqlContext(configuration); - } - - private void print(final String header, final WayangPlan plan) { - final StringWriter sw = new StringWriter(); - sw.append(header).append(":").append("\n"); - - final Collection operators = PlanTraversal.upstream().traverse(plan.getSinks()) - .getTraversedNodes(); - operators.forEach(o -> sw.append(o.toString())); - - System.out.println(sw.toString()); - } - - private void print(final String header, final RelNode relTree) { - final StringWriter sw = new StringWriter(); - - sw.append(header).append(":").append("\n"); - - final RelWriterImpl relWriter = new RelWriterImpl(new PrintWriter(sw), SqlExplainLevel.ALL_ATTRIBUTES, - true); - - relTree.explain(relWriter); - - System.out.println(sw.toString()); - } + final Map resultTally = result.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + final Map shouldBeTally = shouldBe.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + + assert (resultTally.equals(shouldBeTally)); + } + + // @Test + public void sparkFilter() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex AS na WHERE na.NAMEA = 'test1'" // + ); + + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Spark.platform()); + }); + + sqlContext.execute(wayangPlan); + + assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1")))); + } + + // tests sql-apis ability to serialize projections and joins + @Test + public void sparkInnerJoin() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/largeLeftTableIndex.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.largeLeftTableIndex AS na INNER JOIN fs.largeLeftTableIndex AS nb ON nb.NAMEB = na.NAMEA " // + ); + + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Spark.platform()); + }); + + sqlContext.execute(wayangPlan); + + final List shouldBe = List.of( + new Record("test1", "test1", "test2", "test1", "test1", "test2"), + new Record("test2", "", "test2", "", "test2", "test2"), + new Record("", "test2", "test2", "test2", "", "test2")); + + final Map resultTally = result.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + final Map shouldBeTally = shouldBe.stream() + .collect(Collectors.toMap(rec -> rec, rec -> 1, Integer::sum)); + + assert (resultTally.equals(shouldBeTally)); + } + + // @Test + public void rexSerializationTest() throws Exception { + // create filterPredicateImpl for serialisation + final RelDataTypeFactory typeFactory = new JavaTypeFactoryImpl(); + final RexBuilder rb = new RexBuilder(typeFactory); + final RexNode leftOperand = rb.makeInputRef(typeFactory.createSqlType(SqlTypeName.VARCHAR), 0); + final RexNode rightOperand = rb.makeLiteral("test"); + final RexNode cond = rb.makeCall(SqlStdOperatorTable.EQUALS, leftOperand, rightOperand); + final SerializablePredicate fpImpl = new FilterPredicateImpl(cond); + + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + final ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream); + objectOutputStream.writeObject(fpImpl); + objectOutputStream.close(); + + final ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream( + byteArrayOutputStream.toByteArray()); + final ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream); + final Object deserializedObject = objectInputStream.readObject(); + objectInputStream.close(); + + assert (((FilterPredicateImpl) deserializedObject).test(new Record("test"))); + } + + @Test + public void exampleFilterTableRefToTableRef() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/exampleRefToRef.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT * FROM fs.exampleRefToRef WHERE exampleRefToRef.NAMEA = exampleRefToRef.NAMEB" // + ); + + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); + + assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1")))); + } + + @Test + public void exampleMinWithStrings() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/exampleMin.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT MIN(exampleMin.NAME) FROM fs.exampleMin" // + ); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + sqlContext.execute(wayangPlan); + + assert (result.stream().findAny().get().getString(0).equals("AA")); + } + + public void test_simple_sql() throws Exception { + final WayangTable customer = WayangTableBuilder.build("customer") + .addField("id", SqlTypeName.INTEGER) + .addField("name", SqlTypeName.VARCHAR) + .addField("age", SqlTypeName.INTEGER) + .withRowCount(100) + .build(); + + final WayangTable orders = WayangTableBuilder.build("orders") + .addField("id", SqlTypeName.INTEGER) + .addField("cid", SqlTypeName.INTEGER) + .addField("price", SqlTypeName.DECIMAL) + .addField("quantity", SqlTypeName.INTEGER) + .withRowCount(100) + .build(); + + final WayangSchema wayangSchema = WayangSchemaBuilder.build("exSchema") + .addTable(customer) + .addTable(orders) + .build(); + + final Optimizer optimizer = Optimizer.create(wayangSchema); + + // String sql = "select c.name, c.age from customer c where (c.age < 40 or c.age + // > 60) and \'alex\' = c.name"; + // String sql = "select c.age from customer c"; + final String sql = "select c.name, c.age, o.price from customer c join orders o on c.id = o.cid where c.age > 40 " + + + "and o" + + ".price < 100"; + + final SqlNode sqlNode = optimizer.parseSql(sql); + final SqlNode validatedSqlNode = optimizer.validate(sqlNode); + final RelNode relNode = optimizer.convert(validatedSqlNode); + + print("After parsing", relNode); + + final RuleSet rules = RuleSets.ofList( + WayangRules.WAYANG_TABLESCAN_RULE, + WayangRules.WAYANG_PROJECT_RULE, + WayangRules.WAYANG_FILTER_RULE, + WayangRules.WAYANG_TABLESCAN_ENUMERABLE_RULE, + WayangRules.WAYANG_JOIN_RULE, + WayangRules.WAYANG_AGGREGATE_RULE); + + final RelNode wayangRel = optimizer.optimize( + relNode, + relNode.getTraitSet().plus(WayangConvention.INSTANCE), + rules); + + print("After rel to wayang conversion", wayangRel); + + // WayangPlan plan = optimizer.convert(wayangRel); + + // print("After Translating to WayangPlan", plan); + + } + + private SqlContext createSqlContext(final String tableResourceName) + throws IOException, ParseException, SQLException { + final String calciteModel = "{\r\n" + // + " \"calcite\": {\r\n" + // + " \"version\": \"1.0\",\r\n" + // + " \"defaultSchema\": \"wayang\",\r\n" + // + " \"schemas\": [\r\n" + // + " {\r\n" + // + " \"name\": \"fs\",\r\n" + // + " \"type\": \"custom\",\r\n" + // + " \"factory\": \"org.apache.calcite.adapter.file.FileSchemaFactory\",\r\n" + // + " \"operand\": {\r\n" + // + " \"directory\": \"" + "/" + this.getClass().getResource("/data").getPath() + + "\"\r\n" + // + " }\r\n" + // + " }\r\n" + // + " ]\r\n" + // + " },\r\n" + // + " \"separator\": \";\"\r\n" + // + " }\r\n" + // + " \r\n" + // + " \r\n" + // + ""; + + final JSONObject calciteModelJSON = (JSONObject) new JSONParser().parse(calciteModel); + final Configuration configuration = new ModelParser(new Configuration(), calciteModelJSON) + .setProperties(); + assert (configuration != null) + : "Could not get configuration with calcite model: " + calciteModel; + + final String dataPath = this.getClass().getResource(tableResourceName).getPath(); + assert (dataPath != null && dataPath != "") + : "Could not get table resource from path: " + tableResourceName; + + configuration.setProperty("wayang.fs.table.url", dataPath); + + configuration.setProperty( + "wayang.ml.executions.file", + "mle" + ".txt"); + + configuration.setProperty( + "wayang.ml.optimizations.file", + "mlo" + ".txt"); + + configuration.setProperty("wayang.ml.experience.enabled", "false"); + + return new SqlContext(configuration); + } + + private void print(final String header, final WayangPlan plan) { + final StringWriter sw = new StringWriter(); + sw.append(header).append(":").append("\n"); + + final Collection operators = PlanTraversal.upstream().traverse(plan.getSinks()) + .getTraversedNodes(); + operators.forEach(o -> sw.append(o.toString())); + + System.out.println(sw.toString()); + } + + private void print(final String header, final RelNode relTree) { + final StringWriter sw = new StringWriter(); + + sw.append(header).append(":").append("\n"); + + final RelWriterImpl relWriter = new RelWriterImpl(new PrintWriter(sw), SqlExplainLevel.ALL_ATTRIBUTES, + true); + + relTree.explain(relWriter); + + System.out.println(sw.toString()); + } }