Skip to content

Commit

Permalink
[BEAM-12680] Calcite SqlTransform no longer experimental
Browse files Browse the repository at this point in the history
  • Loading branch information
apilloud authored and calvinleungyk committed Sep 22, 2021
1 parent a8209a5 commit 30ca3fa
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@

* New highly anticipated feature X added to Python SDK ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).
* The Beam Java API for Calcite SqlTransform is no longer experimental ([BEAM-12680](https://issues.apache.org/jira/browse/BEAM-12680)).

## I/Os

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,7 @@
* }</pre>
*/
@AutoValue
@Experimental
@AutoValue.CopyAnnotations
@SuppressWarnings({
"rawtypes", // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>> {
static final String PCOLLECTION_NAME = "PCOLLECTION";

Expand All @@ -122,9 +117,9 @@ public abstract class SqlTransform extends PTransform<PInput, PCollection<Row>>

abstract QueryParameters queryParameters();

abstract List<UdfDefinition> udfDefinitions();
abstract @Experimental List<UdfDefinition> udfDefinitions();

abstract List<UdafDefinition> udafDefinitions();
abstract @Experimental List<UdafDefinition> udafDefinitions();

abstract boolean autoLoading();

Expand Down Expand Up @@ -154,8 +149,9 @@ public PCollection<Row> expand(PInput input) {

tableProviderMap().forEach(sqlEnvBuilder::addSchema);

if (defaultTableProvider() != null) {
sqlEnvBuilder.setCurrentSchema(defaultTableProvider());
final @Nullable String defaultTableProvider = defaultTableProvider();
if (defaultTableProvider != null) {
sqlEnvBuilder.setCurrentSchema(defaultTableProvider);
}

sqlEnvBuilder.setQueryPlannerClassName(
Expand Down Expand Up @@ -239,6 +235,7 @@ public SqlTransform withDefaultTableProvider(String name, TableProvider tablePro
return withTableProvider(name, tableProvider).toBuilder().setDefaultTableProvider(name).build();
}

@Experimental
public SqlTransform withQueryPlannerClass(Class<? extends QueryPlanner> clazz) {
return toBuilder().setQueryPlannerClassName(clazz.getName()).build();
}
Expand All @@ -265,6 +262,7 @@ public SqlTransform withAutoLoading(boolean autoLoading) {
*
* <p>Refer to {@link BeamSqlUdf} for more about how to implement a UDF in BeamSql.
*/
@Experimental
public SqlTransform registerUdf(String functionName, Class<? extends BeamSqlUdf> clazz) {
return registerUdf(functionName, clazz, BeamSqlUdf.UDF_METHOD);
}
Expand All @@ -273,6 +271,7 @@ public SqlTransform registerUdf(String functionName, Class<? extends BeamSqlUdf>
* Register {@link SerializableFunction} as a UDF function used in this query. Note, {@link
* SerializableFunction} must have a constructor without arguments.
*/
@Experimental
public SqlTransform registerUdf(String functionName, SerializableFunction sfn) {
return registerUdf(functionName, sfn.getClass(), "apply");
}
Expand All @@ -288,6 +287,7 @@ private SqlTransform registerUdf(String functionName, Class<?> clazz, String met
}

/** register a {@link Combine.CombineFn} as UDAF function used in this query. */
@Experimental
public SqlTransform registerUdaf(String functionName, Combine.CombineFn combineFn) {
ImmutableList<UdafDefinition> newUdafs =
ImmutableList.<UdafDefinition>builder()
Expand All @@ -311,16 +311,17 @@ static Builder builder() {
}

@AutoValue.Builder
@AutoValue.CopyAnnotations
abstract static class Builder {
abstract Builder setQueryString(String queryString);

abstract Builder setQueryParameters(QueryParameters queryParameters);

abstract Builder setDdlStrings(List<String> ddlStrings);

abstract Builder setUdfDefinitions(List<UdfDefinition> udfDefinitions);
abstract @Experimental Builder setUdfDefinitions(List<UdfDefinition> udfDefinitions);

abstract Builder setUdafDefinitions(List<UdafDefinition> udafDefinitions);
abstract @Experimental Builder setUdafDefinitions(List<UdafDefinition> udafDefinitions);

abstract Builder setAutoLoading(boolean autoLoading);

Expand All @@ -335,7 +336,7 @@ abstract static class Builder {

@AutoValue
@AutoValue.CopyAnnotations
@SuppressWarnings({"rawtypes"})
@Experimental
abstract static class UdfDefinition {
abstract String udfName();

Expand All @@ -350,7 +351,7 @@ static UdfDefinition of(String udfName, Class<?> clazz, String methodName) {

@AutoValue
@AutoValue.CopyAnnotations
@SuppressWarnings({"rawtypes"})
@Experimental
abstract static class UdafDefinition {
abstract String udafName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Collections;
import java.util.Map;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand All @@ -27,6 +28,7 @@
@SuppressWarnings({
"rawtypes" // TODO(https://issues.apache.org/jira/browse/BEAM-10556)
})
@Experimental
public interface UdfUdafProvider {
/** For UDFs implement {@link BeamSqlUdf}. */
default Map<String, Class<? extends BeamSqlUdf>> getBeamSqlUdfs() {
Expand Down

0 comments on commit 30ca3fa

Please sign in to comment.