Skip to content

Commit

Permalink
[BEAM-2161] This closes #2919
Browse files Browse the repository at this point in the history
  • Loading branch information
jbonofre committed May 9, 2017
2 parents 4ea38d8 + b12c7b4 commit 3e678a7
Show file tree
Hide file tree
Showing 25 changed files with 1,501 additions and 6 deletions.
Expand Up @@ -19,6 +19,7 @@

import java.util.ArrayList;
import java.util.List;

import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlAndExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlEqualExpression;
Expand All @@ -38,6 +39,15 @@
import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlModExpression;
import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlMultiplyExpression;
import org.apache.beam.dsls.sql.interpreter.operator.arithmetic.BeamSqlPlusExpression;
import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlCharLengthExpression;
import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlConcatExpression;
import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlInitCapExpression;
import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlLowerExpression;
import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlOverlayExpression;
import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlPositionExpression;
import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlSubstringExpression;
import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlTrimExpression;
import org.apache.beam.dsls.sql.interpreter.operator.string.BeamSqlUpperExpression;
import org.apache.beam.dsls.sql.rel.BeamFilterRel;
import org.apache.beam.dsls.sql.rel.BeamProjectRel;
import org.apache.beam.dsls.sql.rel.BeamRelNode;
Expand Down Expand Up @@ -82,6 +92,7 @@ public BeamSQLFnExecutor(BeamRelNode relNode) {
static BeamSqlExpression buildExpression(RexNode rexNode) {
if (rexNode instanceof RexLiteral) {
RexLiteral node = (RexLiteral) rexNode;

return BeamSqlPrimitive.of(node.getTypeName(), node.getValue());
} else if (rexNode instanceof RexInputRef) {
RexInputRef node = (RexInputRef) rexNode;
Expand Down Expand Up @@ -124,6 +135,28 @@ static BeamSqlExpression buildExpression(RexNode rexNode) {
case "MOD":
return new BeamSqlModExpression(subExps);

// string operators
case "||":
return new BeamSqlConcatExpression(subExps);
case "POSITION":
return new BeamSqlPositionExpression(subExps);
case "CHAR_LENGTH":
case "CHARACTER_LENGTH":
return new BeamSqlCharLengthExpression(subExps);
case "UPPER":
return new BeamSqlUpperExpression(subExps);
case "LOWER":
return new BeamSqlLowerExpression(subExps);
case "TRIM":
return new BeamSqlTrimExpression(subExps);
case "SUBSTRING":
return new BeamSqlSubstringExpression(subExps);
case "OVERLAY":
return new BeamSqlOverlayExpression(subExps);
case "INITCAP":
return new BeamSqlInitCapExpression(subExps);


case "IS NULL":
return new BeamSqlIsNullExpression(subExps.get(0));
case "IS NOT NULL":
Expand Down
Expand Up @@ -41,6 +41,18 @@ public BeamSqlExpression(List<BeamSqlExpression> operands, SqlTypeName outputTyp
this.outputType = outputType;
}

public BeamSqlExpression op(int idx) {
return operands.get(idx);
}

public SqlTypeName opType(int idx) {
return op(idx).getOutputType();
}

public <T> T opValueEvaluated(int idx, BeamSQLRow row) {
return (T) op(idx).evaluate(row).getValue();
}

/**
* assertion to make sure the input and output are supported in this expression.
*/
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.dsls.sql.exception.BeamSqlUnsupportedException;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.NlsString;

/**
* {@link BeamSqlPrimitive} is a special, self-reference {@link BeamSqlExpression}.
Expand Down Expand Up @@ -86,9 +87,8 @@ public boolean accept() {
case BOOLEAN:
return value instanceof Boolean;
case CHAR:
return value instanceof Character;
case VARCHAR:
return value instanceof String;
return value instanceof String || value instanceof NlsString;
default:
throw new BeamSqlUnsupportedException(outputType.name());
}
Expand Down
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.dsls.sql.interpreter.operator.string;

import java.util.List;

import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.calcite.sql.type.SqlTypeName;

/**
* 'CHAR_LENGTH' operator.
*/
public class BeamSqlCharLengthExpression extends BeamSqlStringUnaryExpression {
public BeamSqlCharLengthExpression(List<BeamSqlExpression> operands) {
super(operands, SqlTypeName.INTEGER);
}

@Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
String str = opValueEvaluated(0, inputRecord);
return BeamSqlPrimitive.of(SqlTypeName.INTEGER, str.length());
}
}
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.dsls.sql.interpreter.operator.string;

import java.util.List;

import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.calcite.sql.type.SqlTypeName;

/**
* String concat operator.
*/
public class BeamSqlConcatExpression extends BeamSqlExpression {

protected BeamSqlConcatExpression(List<BeamSqlExpression> operands, SqlTypeName outputType) {
super(operands, outputType);
}

public BeamSqlConcatExpression(List<BeamSqlExpression> operands) {
super(operands, SqlTypeName.VARCHAR);
}

@Override public boolean accept() {
if (operands.size() != 2) {
return false;
}

for (BeamSqlExpression exp : getOperands()) {
if (!SqlTypeName.CHAR_TYPES.contains(exp.getOutputType())) {
return false;
}
}

return true;
}

@Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
String left = opValueEvaluated(0, inputRecord);
String right = opValueEvaluated(1, inputRecord);

return BeamSqlPrimitive.of(SqlTypeName.VARCHAR,
new StringBuilder(left.length() + right.length())
.append(left).append(right).toString());
}
}
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.dsls.sql.interpreter.operator.string;

import java.util.List;

import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.calcite.sql.type.SqlTypeName;

/**
* 'INITCAP' operator.
*/
public class BeamSqlInitCapExpression extends BeamSqlStringUnaryExpression {
public BeamSqlInitCapExpression(List<BeamSqlExpression> operands) {
super(operands, SqlTypeName.VARCHAR);
}

@Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
String str = opValueEvaluated(0, inputRecord);

StringBuilder ret = new StringBuilder(str);
boolean isInit = true;
for (int i = 0; i < str.length(); i++) {
if (Character.isWhitespace(str.charAt(i))) {
isInit = true;
continue;
}

if (isInit) {
ret.setCharAt(i, Character.toUpperCase(str.charAt(i)));
isInit = false;
} else {
ret.setCharAt(i, Character.toLowerCase(str.charAt(i)));
}
}
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, ret.toString());
}
}
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.dsls.sql.interpreter.operator.string;

import java.util.List;

import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.calcite.sql.type.SqlTypeName;

/**
* 'LOWER' operator.
*/
public class BeamSqlLowerExpression extends BeamSqlStringUnaryExpression {
public BeamSqlLowerExpression(List<BeamSqlExpression> operands) {
super(operands, SqlTypeName.VARCHAR);
}

@Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
String str = opValueEvaluated(0, inputRecord);
return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, str.toLowerCase());
}
}
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.beam.dsls.sql.interpreter.operator.string;

import java.util.List;

import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlExpression;
import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive;
import org.apache.beam.dsls.sql.schema.BeamSQLRow;
import org.apache.calcite.sql.type.SqlTypeName;

/**
* 'OVERLAY' operator.
*
* <p>
* OVERLAY(string1 PLACING string2 FROM integer [ FOR integer2 ])
* </p>
*/
public class BeamSqlOverlayExpression extends BeamSqlExpression {
public BeamSqlOverlayExpression(List<BeamSqlExpression> operands) {
super(operands, SqlTypeName.VARCHAR);
}

@Override public boolean accept() {
if (operands.size() < 3 || operands.size() > 4) {
return false;
}

if (!SqlTypeName.CHAR_TYPES.contains(opType(0))
|| !SqlTypeName.CHAR_TYPES.contains(opType(1))
|| !SqlTypeName.INT_TYPES.contains(opType(2))) {
return false;
}

if (operands.size() == 4 && !SqlTypeName.INT_TYPES.contains(opType(3))) {
return false;
}

return true;
}

@Override public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) {
String str = opValueEvaluated(0, inputRecord);
String replaceStr = opValueEvaluated(1, inputRecord);
int idx = opValueEvaluated(2, inputRecord);
// the index is 1 based.
idx -= 1;
int length = replaceStr.length();
if (operands.size() == 4) {
length = opValueEvaluated(3, inputRecord);
}

StringBuilder result = new StringBuilder(
str.length() + replaceStr.length() - length);
result.append(str.substring(0, idx))
.append(replaceStr)
.append(str.substring(idx + length));

return BeamSqlPrimitive.of(SqlTypeName.VARCHAR, result.toString());
}
}

0 comments on commit 3e678a7

Please sign in to comment.