Skip to content

Commit

Permalink
1908: new window function implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
adeneche committed Mar 6, 2015
1 parent 4b58ac8 commit fb293ba
Show file tree
Hide file tree
Showing 23 changed files with 1,074 additions and 831 deletions.
Expand Up @@ -39,4 +39,8 @@ protected NamedExpression[] aN(List<NamedExpression> exprs){
return exprs.toArray(new NamedExpression[exprs.size()]);
}

protected Order.Ordering[] aO(List<Order.Ordering> orderings) {
return orderings.toArray(new Order.Ordering[orderings.size()]);
}

}
Expand Up @@ -38,20 +38,23 @@
public class Window extends SingleInputOperator {
private final NamedExpression[] withins;
private final NamedExpression[] aggregations;
private final Order.Ordering[] orderings;
private final long start;
private final long end;


@JsonCreator
public Window(@JsonProperty("withins") NamedExpression[] withins,
@JsonProperty("aggregations") NamedExpression[] aggregations,
@JsonProperty("orderings") Order.Ordering[] orderings,
@JsonProperty("start") Long start,
@JsonProperty("end") Long end) {
super();
this.withins = withins;
this.start = start == null ? Long.MIN_VALUE : start;
this.end = end == null ? Long.MIN_VALUE : end;
this.aggregations = aggregations;
this.orderings = orderings;
}

public NamedExpression[] getWithins() {
Expand All @@ -70,6 +73,10 @@ public NamedExpression[] getAggregations() {
return aggregations;
}

public Order.Ordering[] getOrderings() {
return orderings;
}

@Override
public <T, X, E extends Throwable> T accept(LogicalVisitor<T, X, E> logicalVisitor, X value) throws E {
return logicalVisitor.visitWindow(this, value);
Expand Down Expand Up @@ -99,9 +106,10 @@ public Builder addWithin(FieldReference within, LogicalExpression expr) {
}

public Window internalBuild() {
//TODO withins can actually be empty: over(), over(order by <expression>), ...
checkState(!withins.isEmpty(), "Withins in window must not be empty.");
checkState(!aggregations.isEmpty(), "Aggregations in window must not be empty.");
return new Window(aN(withins), aN(aggregations), start, end);
return new Window(aN(withins), aN(aggregations), aO(orderings), start, end);
}

public Builder addOrdering(Order.Ordering ordering) {
Expand Down
1 change: 1 addition & 0 deletions contrib/data/pom.xml
Expand Up @@ -33,5 +33,6 @@

<modules>
<module>tpch-sample-data</module>
<module>window-test-data</module>
</modules>
</project>
64 changes: 64 additions & 0 deletions contrib/data/window-test-data/pom.xml
@@ -0,0 +1,64 @@
<?xml version="1.0"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>drill-contrib-data-parent</artifactId>
<groupId>org.apache.drill.contrib.data</groupId>
<version>0.8.0-SNAPSHOT</version>
</parent>

<artifactId>window-test-data</artifactId>
<name>contrib/data/window-test-data</name>
<packaging>jar</packaging>

<dependencies>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.googlecode.maven-download-plugin</groupId>
<artifactId>download-maven-plugin</artifactId>
<version>1.2.0</version>
<executions>
<execution>
<id>install-tgz</id>
<phase>prepare-package</phase>
<goals>
<goal>wget</goal>
</goals>
<configuration>
<url>https://s3-us-west-2.amazonaws.com/denbucket/window_test_data_0.1.tgz</url>
<outputFileName>window.tgz</outputFileName>
<unpack>true</unpack>
<outputDirectory>${project.build.directory}/classes/window</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<pluginRepositories>
<pluginRepository>
<id>sonatype-public-repository</id>
<url>https://oss.sonatype.org/content/groups/public</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</pluginRepository>
</pluginRepositories>

</project>
6 changes: 6 additions & 0 deletions exec/java-exec/pom.xml
Expand Up @@ -38,6 +38,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.drill.contrib.data</groupId>
<artifactId>window-test-data</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<!-- <dependency> -->
<!-- <groupId>org.ow2.asm</groupId> -->
<!-- <artifactId>asm-commons</artifactId> -->
Expand Down
Expand Up @@ -197,4 +197,7 @@ public interface ExecConstants {
public static final String MAX_LOADING_CACHE_SIZE_CONFIG = "drill.exec.compile.cache_max_size";

public static final String DRILL_SYS_FILE_SUFFIX = ".sys.drill";

public static final String ENABLE_WINDOW_FUNCTIONS = "window.enable";
public static final OptionValidator ENABLE_WINDOW_FUNCTIONS_VALIDATOR = new BooleanValidator(ENABLE_WINDOW_FUNCTIONS, false);
}
Expand Up @@ -194,7 +194,7 @@ public PhysicalOperator visitWindow(Window window, Object value) throws Optimize

input = new Sort(input, ods, false);

return new WindowPOP(input, window.getWithins(), window.getAggregations(), window.getStart(), window.getEnd());
return new WindowPOP(input, window.getWithins(), window.getAggregations(), window.getOrderings(), window.getStart(), window.getEnd());
}

@Override
Expand Down
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.logical.data.Order;
import org.apache.drill.exec.physical.base.AbstractSingle;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.base.PhysicalVisitor;
Expand All @@ -31,24 +32,27 @@ public class WindowPOP extends AbstractSingle {

private final NamedExpression[] withins;
private final NamedExpression[] aggregations;
private final Order.Ordering[] orderings;
private final long start;
private final long end;

public WindowPOP(@JsonProperty("child") PhysicalOperator child,
@JsonProperty("within") NamedExpression[] withins,
@JsonProperty("aggregations") NamedExpression[] aggregations,
@JsonProperty("orderings") Order.Ordering[] orderings,
@JsonProperty("start") long start,
@JsonProperty("end") long end) {
super(child);
this.withins = withins;
this.aggregations = aggregations;
this.orderings = orderings;
this.start = start;
this.end = end;
}

@Override
protected PhysicalOperator getNewWithChild(PhysicalOperator child) {
return new WindowPOP(child, withins, aggregations, start, end);
return new WindowPOP(child, withins, aggregations, orderings, start, end);
}

@Override
Expand Down Expand Up @@ -76,4 +80,8 @@ public NamedExpression[] getAggregations() {
public NamedExpression[] getWithins() {
return withins;
}

public Order.Ordering[] getOrderings() {
return orderings;
}
}
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.window;

import org.eigenbase.sql.SqlCall;
import org.eigenbase.sql.SqlKind;
import org.eigenbase.sql.SqlNode;
import org.eigenbase.sql.SqlOperator;
import org.eigenbase.sql.util.SqlBasicVisitor;
import org.eigenbase.util.Util;

/**
* Visitor which looks for an over clause inside a tree of {@link
* SqlNode} objects.
*/
public class OverFinder extends SqlBasicVisitor<Void> {

public boolean findOver(SqlNode node) {
try {
node.accept(this);
return false;
} catch (Util.FoundOne e) {
Util.swallow(e, null);
return true;
}
}

@Override
public Void visit(SqlCall call) {
final SqlOperator operator = call.getOperator();

if (operator.getKind().equals(SqlKind.OVER)) {
throw new Util.FoundOne(call);
}

return super.visit(call);
}
}

0 comments on commit fb293ba

Please sign in to comment.