Skip to content

Commit

Permalink
[BEAM-2287] This closes #3447
Browse files Browse the repository at this point in the history
  • Loading branch information
takidau committed Jun 30, 2017
2 parents 2096da2 + a13fce9 commit 7ba77dd
Show file tree
Hide file tree
Showing 6 changed files with 633 additions and 523 deletions.
10 changes: 10 additions & 0 deletions dsls/sql/src/main/java/org/apache/beam/dsls/sql/BeamSqlEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.beam.dsls.sql.planner.BeamQueryPlanner;
import org.apache.beam.dsls.sql.schema.BaseBeamTable;
import org.apache.beam.dsls.sql.schema.BeamSqlRecordType;
import org.apache.beam.dsls.sql.schema.BeamSqlUdaf;
import org.apache.beam.dsls.sql.utils.CalciteUtils;
import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.Enumerable;
Expand All @@ -32,6 +33,7 @@
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Statistic;
import org.apache.calcite.schema.Statistics;
import org.apache.calcite.schema.impl.AggregateFunctionImpl;
import org.apache.calcite.schema.impl.ScalarFunctionImpl;
import org.apache.calcite.tools.Frameworks;

Expand All @@ -57,6 +59,14 @@ public void registerUdf(String functionName, Class<?> clazz, String methodName)
schema.add(functionName, ScalarFunctionImpl.create(clazz, methodName));
}

/**
* Register a UDAF function which can be used in GROUP-BY expression.
* See {@link BeamSqlUdaf} on how to implement a UDAF.
*/
public void registerUdaf(String functionName, Class<? extends BeamSqlUdaf> clazz) {
schema.add(functionName, AggregateFunctionImpl.create(clazz));
}

/**
* Registers a {@link BaseBeamTable} which can be used for all subsequent queries.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public PCollection<BeamSqlRow> buildBeamPipeline(PCollectionTuple inputPCollecti
PCollection<KV<BeamSqlRow, BeamSqlRow>> aggregatedStream = exCombineByStream.apply(
stageName + "combineBy",
Combine.<BeamSqlRow, BeamSqlRow, BeamSqlRow>perKey(
new BeamAggregationTransforms.AggregationCombineFn(getAggCallList(),
new BeamAggregationTransforms.AggregationAdaptor(getAggCallList(),
CalciteUtils.toBeamRecordType(input.getRowType()))))
.setCoder(KvCoder.of(keyCoder, aggCoder));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.dsls.sql.schema;

import java.io.Serializable;
import java.lang.reflect.ParameterizedType;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.Combine.CombineFn;

/**
* abstract class of aggregation functions in Beam SQL.
*
* <p>There're several constrains for a UDAF:<br>
* 1. A constructor with an empty argument list is required;<br>
* 2. The type of {@code InputT} and {@code OutputT} can only be Interger/Long/Short/Byte/Double
* /Float/Date/BigDecimal, mapping as SQL type INTEGER/BIGINT/SMALLINT/TINYINE/DOUBLE/FLOAT
* /TIMESTAMP/DECIMAL;<br>
* 3. Keep intermediate data in {@code AccumT}, and do not rely on elements in class;<br>
*/
public abstract class BeamSqlUdaf<InputT, AccumT, OutputT> implements Serializable {
public BeamSqlUdaf(){}

/**
* create an initial aggregation object, equals to {@link CombineFn#createAccumulator()}.
*/
public abstract AccumT init();

/**
* add an input value, equals to {@link CombineFn#addInput(Object, Object)}.
*/
public abstract AccumT add(AccumT accumulator, InputT input);

/**
* merge aggregation objects from parallel tasks, equals to
* {@link CombineFn#mergeAccumulators(Iterable)}.
*/
public abstract AccumT merge(Iterable<AccumT> accumulators);

/**
* extract output value from aggregation object, equals to
* {@link CombineFn#extractOutput(Object)}.
*/
public abstract OutputT result(AccumT accumulator);

/**
* get the coder for AccumT which stores the intermediate result.
* By default it's fetched from {@link CoderRegistry}.
*/
public Coder<AccumT> getAccumulatorCoder(CoderRegistry registry)
throws CannotProvideCoderException {
return registry.getCoder(
(Class<AccumT>) ((ParameterizedType) getClass()
.getGenericSuperclass()).getActualTypeArguments()[1]);
}
}

0 comments on commit 7ba77dd

Please sign in to comment.