diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java index 0460507d7..2a93dead5 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/WayangAggregateVisitor.java @@ -18,6 +18,7 @@ package org.apache.wayang.api.sql.calcite.converter; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -51,7 +52,7 @@ Operator visit(final WayangAggregate wayangRelNode) { final List aggregateCalls = ((Aggregate) wayangRelNode).getAggCallList(); final int groupCount = wayangRelNode.getGroupCount(); - final Set groupingFields = wayangRelNode.getGroupSet().asSet(); + final HashSet groupingFields = new HashSet<>(wayangRelNode.getGroupSet().asSet()); final MapOperator mapOperator = new MapOperator<>( new AggregateAddCols(aggregateCalls), diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateAddCols.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateAddCols.java index a7afcf850..468c87f2d 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateAddCols.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateAddCols.java @@ -18,17 +18,19 @@ package org.apache.wayang.api.sql.calcite.converter.functions; import java.util.List; +import java.util.stream.Collectors; import org.apache.calcite.rel.core.AggregateCall; - +import org.apache.calcite.sql.SqlKind; import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.data.Tuple2; import org.apache.wayang.core.function.FunctionDescriptor; public class AggregateAddCols implements FunctionDescriptor.SerializableFunction { - final List aggregateCalls; + final List>> aggregateCalls; public AggregateAddCols(final List aggregateCalls) { - this.aggregateCalls = aggregateCalls; + this.aggregateCalls = aggregateCalls.stream().map(call -> new Tuple2<>(call.getAggregation().getKind(), call.getArgList())).collect(Collectors.toList()); } @Override @@ -42,13 +44,16 @@ public Record apply(final Record record) { } int i = l; - for (final AggregateCall aggregateCall : aggregateCalls) { - switch (aggregateCall.getAggregation().kind) { + for (final Tuple2> aggregateCall : aggregateCalls) { + final SqlKind kind = aggregateCall.field0; + final List argList = aggregateCall.field1; + + switch (kind) { case COUNT: resValues[i] = 1; break; default: - resValues[i] = record.getField(aggregateCall.getArgList().get(0)); + resValues[i] = record.getField(argList.get(0)); } i++; } diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateFunction.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateFunction.java index c60f64f5f..d2dc84bd0 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateFunction.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateFunction.java @@ -21,19 +21,22 @@ import java.util.List; import java.util.Optional; import java.util.function.BiFunction; +import java.util.stream.Collectors; import org.apache.calcite.rel.core.AggregateCall; import org.apache.calcite.runtime.SqlFunctions; +import org.apache.calcite.sql.SqlKind; import org.apache.wayang.basic.data.Record; import org.apache.wayang.core.function.FunctionDescriptor; public class AggregateFunction implements FunctionDescriptor.SerializableBinaryOperator { - - final List aggregateCalls; + final List aggregateKinds; public AggregateFunction(final List aggregateCalls) { - this.aggregateCalls = aggregateCalls; + this.aggregateKinds = aggregateCalls.stream() + .map(call -> call.getAggregation().getKind()) + .collect(Collectors.toList()); } @Override @@ -42,16 +45,16 @@ public Record apply(final Record record1, final Record record2) { final Object[] resValues = new Object[l]; boolean countDone = false; - for (int i = 0; i < l - aggregateCalls.size() - 1; i++) { + for (int i = 0; i < l - aggregateKinds.size() - 1; i++) { resValues[i] = record1.getField(i); } - int counter = l - aggregateCalls.size() - 1; - for (final AggregateCall aggregateCall : aggregateCalls) { + int counter = l - aggregateKinds.size() - 1; + for (final SqlKind kind : aggregateKinds) { final Object field1 = record1.getField(counter); final Object field2 = record2.getField(counter); - switch (aggregateCall.getAggregation().kind) { + switch (kind) { case SUM: resValues[counter] = this.castAndMap(field1, field2, null, Long::sum, Integer::sum, Double::sum); break; @@ -85,7 +88,7 @@ public Record apply(final Record record1, final Record record2) { } break; default: - throw new IllegalStateException("Unsupported operation: " + aggregateCall.getAggregation().kind); + throw new IllegalStateException("Unsupported operation: " + kind); } counter++; } diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateGetResult.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateGetResult.java index 7274ab78d..451db8d56 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateGetResult.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateGetResult.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; import java.util.Set; +import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -30,32 +31,35 @@ import org.apache.wayang.core.function.FunctionDescriptor; public class AggregateGetResult implements FunctionDescriptor.SerializableFunction { - private final List aggregateCallList; - private final Set groupingfields; - - public AggregateGetResult(final List aggregateCalls, final Set groupingfields) { - this.aggregateCallList = aggregateCalls; - this.groupingfields = groupingfields; - } - - @Override - public Record apply(final Record record) { - final int recordSize = record.size(); - final int aggregateCallOffset = recordSize - aggregateCallList.size() - 1; - - final Object[] fields = groupingfields.stream() - .map(record::getField) - .toArray(); - - final Object[] aggregateCallFields = IntStream.range(0, aggregateCallList.size()) - .mapToObj(i -> aggregateCallList.get(i).getAggregation().getKind().equals(SqlKind.AVG) - ? record.getDouble(i + aggregateCallOffset) / record.getDouble(recordSize - 1) - : record.getField(i + aggregateCallOffset)) - .toArray(); - - final Object[] combinedFields = Stream.concat(Arrays.stream(fields), Arrays.stream(aggregateCallFields)) - .toArray(); - - return new Record(combinedFields); - } + private final List aggregateKindList; + private final Set groupingfields; + + public AggregateGetResult(final List aggregateCalls, final Set groupingfields) { + this.aggregateKindList = aggregateCalls.stream() + .map(call -> call.getAggregation().getKind()) + .collect(Collectors.toList()); + this.groupingfields = groupingfields; + } + + @Override + public Record apply(final Record record) { + final int recordSize = record.size(); + final int aggregateCallOffset = recordSize - aggregateKindList.size() - 1; + + final Object[] fields = groupingfields.stream() + .map(record::getField) + .toArray(); + + final Object[] aggregateCallFields = IntStream.range(0, aggregateKindList.size()) + .mapToObj(i -> aggregateKindList.get(i).equals(SqlKind.AVG) + ? record.getDouble(i + aggregateCallOffset) + / record.getDouble(recordSize - 1) + : record.getField(i + aggregateCallOffset)) + .toArray(); + + final Object[] combinedFields = Stream.concat(Arrays.stream(fields), Arrays.stream(aggregateCallFields)) + .toArray(); + + return new Record(combinedFields); + } } \ No newline at end of file diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateKeyExtractor.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateKeyExtractor.java index 374a047ad..ad3af7efe 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateKeyExtractor.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateKeyExtractor.java @@ -19,16 +19,16 @@ package org.apache.wayang.api.sql.calcite.converter.functions; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; -import java.util.Set; import org.apache.wayang.basic.data.Record; import org.apache.wayang.core.function.FunctionDescriptor; public class AggregateKeyExtractor implements FunctionDescriptor.SerializableFunction { - private final Set indexSet; + private final HashSet indexSet; - public AggregateKeyExtractor(final Set indexSet) { + public AggregateKeyExtractor(final HashSet indexSet) { this.indexSet = indexSet; } diff --git a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/SortFilter.java b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/SortFilter.java index 3ecd1a24b..cfe5cd8e4 100644 --- a/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/SortFilter.java +++ b/wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/SortFilter.java @@ -39,9 +39,9 @@ public SortFilter(final int fetch, final int offset) { @Override public boolean test(final Record record) { - final boolean test = increment >= offset && increment <= fetch; increment++; - + final boolean test = increment >= offset && increment <= fetch; + return test; } } \ No newline at end of file diff --git a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java index 7ff81456a..f226e69bf 100755 --- a/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java +++ b/wayang-api/wayang-api-sql/src/test/java/org/apache/wayang/api/sql/SqlToWayangRelTest.java @@ -358,9 +358,27 @@ public void javaLimit() throws Exception { final List result = r.stream().collect(Collectors.toList()); + assert (result.size() == 1); assert (result.get(0).equals(new Record(2, "a", "a", 2))); } + @Test + public void javaLimitNoSort() throws Exception { + final SqlContext sqlContext = createSqlContext("/data/exampleSort.csv"); + + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT col1, col2, col3 from fs.exampleSort LIMIT 2"); + + final Collection r = t.field0; + final WayangPlan wayangPlan = t.field1; + + sqlContext.execute(wayangPlan); + + final List result = r.stream().collect(Collectors.toList()); + + assert (result.size() == 2); + } + @Test public void javaSort() throws Exception { final SqlContext sqlContext = createSqlContext("/data/exampleSort.csv"); @@ -460,6 +478,26 @@ public void sparkFilter() throws Exception { assert (result.stream().anyMatch(rec -> rec.equals(new Record("test1", "test1")))); } + @Test + public void sparkAggregate() throws Exception { + final SqlContext sqlContext = this.createSqlContext("/data/largeLeftTableIndex.csv"); + final Tuple2, WayangPlan> t = this.buildCollectorAndWayangPlan(sqlContext, + "SELECT largeLeftTableIndex.NAMEC, COUNT(*) FROM fs.largeLeftTableIndex GROUP BY NAMEC"); + final Collection result = t.field0; + final WayangPlan wayangPlan = t.field1; + + // except reduce by + PlanTraversal.upstream().traverse(wayangPlan.getSinks()).getTraversedNodes().forEach(node -> { + node.addTargetPlatform(Spark.platform()); + }); + + sqlContext.execute(wayangPlan); + + final Record rec = result.stream().findFirst().get(); + assert (rec.size() == 2); + assert (rec.getInt(1) == 3); + } + // tests sql-apis ability to serialize projections and joins @Test public void sparkInnerJoin() throws Exception { diff --git a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java index 9d5b611a9..4a7df7d3c 100644 --- a/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java +++ b/wayang-platforms/wayang-jdbc-template/src/main/java/org/apache/wayang/jdbc/execution/JdbcExecutor.java @@ -19,6 +19,7 @@ package org.apache.wayang.jdbc.execution; import org.apache.wayang.basic.channels.FileChannel; +import org.apache.wayang.basic.data.Tuple2; import org.apache.wayang.basic.operators.TableSource; import org.apache.wayang.core.api.Job; import org.apache.wayang.core.api.exception.WayangException; @@ -69,150 +70,162 @@ public class JdbcExecutor extends ExecutorTemplate { private final FunctionCompiler functionCompiler = new FunctionCompiler(); - public JdbcExecutor(JdbcPlatformTemplate platform, Job job) { + public JdbcExecutor(final JdbcPlatformTemplate platform, final Job job) { super(job.getCrossPlatformExecutor()); this.platform = platform; this.connection = this.platform.createDatabaseDescriptor(job.getConfiguration()).createJdbcConnection(); } @Override - public void execute(ExecutionStage stage, OptimizationContext optimizationContext, ExecutionState executionState) { - // TODO: Load ChannelInstances from executionState? (as of now there is no input into PostgreSQL). - Collection startTasks = stage.getStartTasks(); - Collection termTasks = stage.getTerminalTasks(); + public void execute(final ExecutionStage stage, final OptimizationContext optimizationContext, final ExecutionState executionState) { + final Tuple2 pair = this.createSqlQuery(stage, optimizationContext); + final String query = pair.field0; + final SqlQueryChannel.Instance queryChannel = pair.field1; - // Verify that we can handle this instance. - assert startTasks.size() == 1 : "Invalid jdbc stage: multiple sources are not currently supported"; - ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0]; - assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported."; - ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0]; - assert startTask.getOperator() instanceof TableSource : "Invalid JDBC stage: Start task has to be a TableSource"; - - // Extract the different types of ExecutionOperators from the stage. - TableSource tableOp = (TableSource) startTask.getOperator(); - SqlQueryChannel.Instance tipChannelInstance = this.instantiateOutboundChannel(startTask, optimizationContext); - Collection filterTasks = new ArrayList<>(4); - ExecutionTask projectionTask = null; - Collection joinTasks = new ArrayList<>(); - Set allTasks = stage.getAllTasks(); - assert allTasks.size() <= 3; - ExecutionTask nextTask = this.findJdbcExecutionOperatorTaskInStage(startTask, stage); - while (nextTask != null) { - // Evaluate the nextTask. - if (nextTask.getOperator() instanceof JdbcFilterOperator) { - filterTasks.add(nextTask); - } else if (nextTask.getOperator() instanceof JdbcProjectionOperator) { - assert projectionTask == null; //Allow one projection operator per stage for now. - projectionTask = nextTask; - } else if (nextTask.getOperator() instanceof JdbcJoinOperator) { - joinTasks.add(nextTask); - } else { - throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString())); - } - - // Move the tipChannelInstance. - tipChannelInstance = this.instantiateOutboundChannel(nextTask, optimizationContext, tipChannelInstance); - - // Go to the next nextTask. - nextTask = this.findJdbcExecutionOperatorTaskInStage(nextTask, stage); - } - - // Create the SQL query. - String tableName = this.getSqlClause(tableOp); - Collection conditions = filterTasks.stream() - .map(ExecutionTask::getOperator) - .map(this::getSqlClause) - .collect(Collectors.toList()); - String projection = projectionTask == null ? "*" : this.getSqlClause(projectionTask.getOperator()); - Collection joins = joinTasks.stream() - .map(ExecutionTask::getOperator) - .map(this::getSqlClause) - .collect(Collectors.toList()); - String query = this.createSqlQuery(tableName, conditions, projection, joins); - tipChannelInstance.setSqlQuery(query); + queryChannel.setSqlQuery(query); // Return the tipChannelInstance. - executionState.register(tipChannelInstance); + executionState.register(queryChannel); } /** - * Retrieves the follow-up {@link ExecutionTask} of the given {@code task} unless it is not comprising a + * Retrieves the follow-up {@link ExecutionTask} of the given {@code task} + * unless it is not comprising a * {@link JdbcExecutionOperator} and/or not in the given {@link ExecutionStage}. * - * @param task whose follow-up {@link ExecutionTask} is requested; should have a single follower + * @param task whose follow-up {@link ExecutionTask} is requested; should have + * a single follower * @param stage in which the follow-up {@link ExecutionTask} should be * @return the said follow-up {@link ExecutionTask} or {@code null} if none */ - private ExecutionTask findJdbcExecutionOperatorTaskInStage(ExecutionTask task, ExecutionStage stage) { + private ExecutionTask findJdbcExecutionOperatorTaskInStage(final ExecutionTask task, final ExecutionStage stage) { assert task.getNumOuputChannels() == 1; final Channel outputChannel = task.getOutputChannel(0); final ExecutionTask consumer = WayangCollections.getSingle(outputChannel.getConsumers()); - return consumer.getStage() == stage && consumer.getOperator() instanceof JdbcExecutionOperator ? - consumer : - null; + return consumer.getStage() == stage && consumer.getOperator() instanceof JdbcExecutionOperator ? consumer + : null; } /** - * Instantiates the outbound {@link SqlQueryChannel} of an {@link ExecutionTask}. + * Instantiates the outbound {@link SqlQueryChannel} of an + * {@link ExecutionTask}. * - * @param task whose outbound {@link SqlQueryChannel} should be instantiated - * @param optimizationContext provides information about the {@link ExecutionTask} + * @param task whose outbound {@link SqlQueryChannel} should be + * instantiated + * @param optimizationContext provides information about the + * {@link ExecutionTask} * @return the {@link SqlQueryChannel.Instance} */ - private SqlQueryChannel.Instance instantiateOutboundChannel(ExecutionTask task, - OptimizationContext optimizationContext) { + private SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task, + final OptimizationContext optimizationContext) { assert task.getNumOuputChannels() == 1 : String.format("Illegal task: %s.", task); assert task.getOutputChannel(0) instanceof SqlQueryChannel : String.format("Illegal task: %s.", task); - SqlQueryChannel outputChannel = (SqlQueryChannel) task.getOutputChannel(0); - OptimizationContext.OperatorContext operatorContext = optimizationContext.getOperatorContext(task.getOperator()); + final SqlQueryChannel outputChannel = (SqlQueryChannel) task.getOutputChannel(0); + final OptimizationContext.OperatorContext operatorContext = optimizationContext + .getOperatorContext(task.getOperator()); return outputChannel.createInstance(this, operatorContext, 0); } /** - * Instantiates the outbound {@link SqlQueryChannel} of an {@link ExecutionTask}. + * Instantiates the outbound {@link SqlQueryChannel} of an + * {@link ExecutionTask}. * - * @param task whose outbound {@link SqlQueryChannel} should be instantiated - * @param optimizationContext provides information about the {@link ExecutionTask} - * @param predecessorChannelInstance preceeding {@link SqlQueryChannel.Instance} to keep track of lineage + * @param task whose outbound {@link SqlQueryChannel} + * should be instantiated + * @param optimizationContext provides information about the + * {@link ExecutionTask} + * @param predecessorChannelInstance preceeding {@link SqlQueryChannel.Instance} + * to keep track of lineage * @return the {@link SqlQueryChannel.Instance} */ - private SqlQueryChannel.Instance instantiateOutboundChannel(ExecutionTask task, - OptimizationContext optimizationContext, - SqlQueryChannel.Instance predecessorChannelInstance) { + private SqlQueryChannel.Instance instantiateOutboundChannel(final ExecutionTask task, + final OptimizationContext optimizationContext, + final SqlQueryChannel.Instance predecessorChannelInstance) { final SqlQueryChannel.Instance newInstance = this.instantiateOutboundChannel(task, optimizationContext); newInstance.getLineage().addPredecessor(predecessorChannelInstance.getLineage()); return newInstance; } /** - * Creates a SQL query. - * - * @param tableName the table to be queried - * @param conditions conditions for the {@code WHERE} clause - * @param projection projection for the {@code SELECT} clause - * @param joins join clauses for multiple {@code JOIN} clauses - * @return the SQL query + * Creates a query channel and the sql statement + * + * @param stage + * @param context + * @return a tuple containing the sql statement */ - protected String createSqlQuery(String tableName, Collection conditions, String projection, Collection joins) { - StringBuilder sb = new StringBuilder(1000); + protected Tuple2 createSqlQuery(final ExecutionStage stage, + final OptimizationContext context) { + final Collection startTasks = stage.getStartTasks(); + final Collection termTasks = stage.getTerminalTasks(); + + // Verify that we can handle this instance. + assert startTasks.size() == 1 : "Invalid jdbc stage: multiple sources are not currently supported"; + final ExecutionTask startTask = (ExecutionTask) startTasks.toArray()[0]; + assert termTasks.size() == 1 : "Invalid JDBC stage: multiple terminal tasks are not currently supported."; + final ExecutionTask termTask = (ExecutionTask) termTasks.toArray()[0]; + assert startTask.getOperator() instanceof TableSource + : "Invalid JDBC stage: Start task has to be a TableSource"; + + // Extract the different types of ExecutionOperators from the stage. + final TableSource tableOp = (TableSource) startTask.getOperator(); + SqlQueryChannel.Instance tipChannelInstance = this.instantiateOutboundChannel(startTask, context); + final Collection filterTasks = new ArrayList<>(4); + ExecutionTask projectionTask = null; + final Collection joinTasks = new ArrayList<>(); + final Set allTasks = stage.getAllTasks(); + assert allTasks.size() <= 3; + ExecutionTask nextTask = this.findJdbcExecutionOperatorTaskInStage(startTask, stage); + while (nextTask != null) { + // Evaluate the nextTask. + if (nextTask.getOperator() instanceof JdbcFilterOperator) { + filterTasks.add(nextTask); + } else if (nextTask.getOperator() instanceof JdbcProjectionOperator) { + assert projectionTask == null; // Allow one projection operator per stage for now. + projectionTask = nextTask; + } else if (nextTask.getOperator() instanceof JdbcJoinOperator) { + joinTasks.add(nextTask); + } else { + throw new WayangException(String.format("Unsupported JDBC execution task %s", nextTask.toString())); + } + + // Move the tipChannelInstance. + tipChannelInstance = this.instantiateOutboundChannel(nextTask, context, tipChannelInstance); + + // Go to the next nextTask. + nextTask = this.findJdbcExecutionOperatorTaskInStage(nextTask, stage); + } + + // Create the SQL query. + final String tableName = this.getSqlClause(tableOp); + final Collection conditions = filterTasks.stream() + .map(ExecutionTask::getOperator) + .map(this::getSqlClause) + .collect(Collectors.toList()); + final String projection = projectionTask == null ? "*" : this.getSqlClause(projectionTask.getOperator()); + final Collection joins = joinTasks.stream() + .map(ExecutionTask::getOperator) + .map(this::getSqlClause) + .collect(Collectors.toList()); + + final StringBuilder sb = new StringBuilder(1000); sb.append("SELECT ").append(projection).append(" FROM ").append(tableName); if (!joins.isEmpty()) { - String separator = " "; - for (String join : joins) { + final String separator = " "; + for (final String join : joins) { sb.append(separator).append(join); } } if (!conditions.isEmpty()) { sb.append(" WHERE "); String separator = ""; - for (String condition : conditions) { + for (final String condition : conditions) { sb.append(separator).append(condition); separator = " AND "; } } sb.append(';'); - return sb.toString(); + return new Tuple2<>(sb.toString(), tipChannelInstance); } /** @@ -221,7 +234,7 @@ protected String createSqlQuery(String tableName, Collection conditions, * @param operator for that the SQL clause should be generated * @return the SQL clause */ - private String getSqlClause(Operator operator) { + private String getSqlClause(final Operator operator) { return ((JdbcExecutionOperator) operator).createSqlClause(this.connection, this.functionCompiler); } @@ -229,7 +242,7 @@ private String getSqlClause(Operator operator) { public void dispose() { try { this.connection.close(); - } catch (SQLException e) { + } catch (final SQLException e) { this.logger.error("Could not close JDBC connection to PostgreSQL correctly.", e); } } @@ -239,14 +252,15 @@ public Platform getPlatform() { return this.platform; } - - private void saveResult(FileChannel.Instance outputFileChannelInstance, ResultSet rs) throws IOException, SQLException { + private void saveResult(final FileChannel.Instance outputFileChannelInstance, final ResultSet rs) + throws IOException, SQLException { // Output results. final FileSystem outFs = FileSystems.getFileSystem(outputFileChannelInstance.getSinglePath()).get(); - try (final OutputStreamWriter writer = new OutputStreamWriter(outFs.create(outputFileChannelInstance.getSinglePath()))) { + try (final OutputStreamWriter writer = new OutputStreamWriter( + outFs.create(outputFileChannelInstance.getSinglePath()))) { while (rs.next()) { - //System.out.println(rs.getInt(1) + " " + rs.getString(2)); - ResultSetMetaData rsmd = rs.getMetaData(); + // System.out.println(rs.getInt(1) + " " + rs.getString(2)); + final ResultSetMetaData rsmd = rs.getMetaData(); for (int i = 1; i <= rsmd.getColumnCount(); i++) { writer.write(rs.getString(i)); if (i < rsmd.getColumnCount()) { @@ -257,7 +271,7 @@ private void saveResult(FileChannel.Instance outputFileChannelInstance, ResultSe writer.write('\n'); } } - } catch (UncheckedIOException e) { + } catch (final UncheckedIOException e) { throw e.getCause(); } }