Skip to content

Commit

Permalink
PHOENIX-7008 Shallow grammar support for CREATE CDC (apache#1662)
Browse files Browse the repository at this point in the history
  • Loading branch information
haridsv committed Sep 19, 2023
1 parent bb8e9da commit 4c9827a
Show file tree
Hide file tree
Showing 7 changed files with 259 additions and 4 deletions.
41 changes: 41 additions & 0 deletions phoenix-core/src/main/antlr3/PhoenixSQL.g
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ tokens
IF='if';
CONSTRAINT='constraint';
TABLES='tables';
CDC='cdc';
PRE='pre';
POST='post';
LATEST='latest';
ALL='all';
INDEX='index';
INCLUDE='include';
Expand Down Expand Up @@ -187,6 +191,8 @@ import java.lang.Boolean;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.Stack;
import java.sql.SQLException;
import org.apache.phoenix.expression.function.CountAggregateFunction;
Expand All @@ -201,6 +207,7 @@ import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.CDCChangeScope;
import org.apache.phoenix.schema.stats.StatisticsCollectionScope;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDate;
Expand Down Expand Up @@ -424,6 +431,7 @@ oneStatement returns [BindableStatement ret]
| s=create_schema_node
| s=create_view_node
| s=create_index_node
| s=create_cdc_node
| s=cursor_open_node
| s=cursor_close_node
| s=cursor_fetch_node
Expand Down Expand Up @@ -545,6 +553,39 @@ create_index_node returns [CreateIndexStatement ret]
}
;

create_cdc_node returns [CreateCDCStatement ret]
: CREATE CDC (IF NOT ex=EXISTS)? o=cdc_name ON t=from_table_name
LPAREN (tcol=column_name | tfunc=cdc_time_func) RPAREN
(INCLUDE LPAREN v=cdc_change_scopes RPAREN)?
(p=fam_properties)?
{
ret = factory.createCDC(o, t, tcol, tfunc, v, p, ex != null, getBindCount());
}
;

cdc_name returns [NamedNode ret]
: name=identifier {$ret = factory.cdcName(name); }
;

cdc_time_func returns [FunctionParseNode ret]
: field=identifier LPAREN l=zero_or_more_expressions RPAREN
{
ret = factory.function(field, l);
}
;

cdc_change_scopes returns [Set<CDCChangeScope> ret]
@init { ret = new HashSet<>(); }
: v=cdc_change_scope { $ret.add(v); } ( COMMA v=cdc_change_scope { $ret.add(v); } )*
;

cdc_change_scope returns [CDCChangeScope ret]
: v=(PRE | POST | LATEST | ALL)
{
ret = CDCChangeScope.valueOf(v.getText().toUpperCase());
}
;

// Parse a create sequence statement.
create_sequence_node returns [CreateSequenceStatement ret]
: CREATE SEQUENCE (IF NOT ex=EXISTS)? t=from_table_name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,4 @@ public ExplainPlan getExplainPlan() throws SQLException {

};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import org.apache.phoenix.parse.CloseStatement;
import org.apache.phoenix.parse.ColumnDef;
import org.apache.phoenix.parse.ColumnName;
import org.apache.phoenix.parse.CreateCDCStatement;
import org.apache.phoenix.parse.CreateFunctionStatement;
import org.apache.phoenix.parse.CreateIndexStatement;
import org.apache.phoenix.parse.CreateSchemaStatement;
Expand All @@ -142,6 +143,7 @@
import org.apache.phoenix.parse.DeleteJarStatement;
import org.apache.phoenix.parse.DeleteStatement;
import org.apache.phoenix.parse.ExplainType;
import org.apache.phoenix.parse.FunctionParseNode;
import org.apache.phoenix.parse.ShowCreateTableStatement;
import org.apache.phoenix.parse.ShowCreateTable;
import org.apache.phoenix.parse.DropColumnStatement;
Expand Down Expand Up @@ -194,6 +196,7 @@
import org.apache.phoenix.schema.PDatum;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PNameFactory;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.RowKeyValueAccessor;
Expand Down Expand Up @@ -521,7 +524,7 @@ private int executeMutation(final CompilableStatement stmt, final boolean doRetr
GLOBAL_MUTATION_SQL_COUNTER.increment();
try {
return CallRunner
.run(
.run(
new CallRunner.CallableThrowable<Integer, SQLException>() {
@Override
public Integer call() throws SQLException {
Expand Down Expand Up @@ -1056,6 +1059,24 @@ public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqActio
}
}

private static class ExecutableCreateCDCStatement extends CreateCDCStatement
implements CompilableStatement {
public ExecutableCreateCDCStatement(NamedNode cdcObjName, TableName dataTable,
ColumnName timeIdxColumn, FunctionParseNode tfunc,
Set<PTable.CDCChangeScope> includeScopes,
ListMultimap<String, Pair<String, Object>> props,
boolean ifNotExists, int bindCount) {
super(cdcObjName, dataTable, timeIdxColumn, tfunc, includeScopes, props, ifNotExists,
bindCount);
}

@Override
public MutationPlan compilePlan(PhoenixStatement stmt,
Sequence.ValueOp seqAction) throws SQLException {
return null;
}
}

private static class ExecutableCreateSchemaStatement extends CreateSchemaStatement implements CompilableStatement {
ExecutableCreateSchemaStatement(String schemaName, boolean ifNotExists) {
super(schemaName, ifNotExists);
Expand Down Expand Up @@ -1827,6 +1848,16 @@ public CreateTableStatement createTable(TableName tableName, ListMultimap<String
return new ExecutableCreateTableStatement(tableName, props, columns, pkConstraint, splits, tableType, ifNotExists, baseTableName, tableTypeIdNode, bindCount, immutableRows, null);
}

@Override
public CreateCDCStatement createCDC(NamedNode cdcObj, TableName dataTable,
ColumnName timeIdxColumn, FunctionParseNode timeIdxFunc,
Set<PTable.CDCChangeScope> includeScopes,
ListMultimap<String, Pair<String, Object>> props,
boolean ifNotExists, int bindCount) {
return new ExecutableCreateCDCStatement(cdcObj, dataTable, timeIdxColumn, timeIdxFunc,
includeScopes, props, ifNotExists, bindCount);
}

@Override
public CreateSchemaStatement createSchema(String schemaName, boolean ifNotExists) {
return new ExecutableCreateSchemaStatement(schemaName, ifNotExists);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.parse;

import java.util.Set;

import org.apache.hadoop.hbase.util.Pair;

import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.phoenix.thirdparty.com.google.common.collect.ListMultimap;

public class CreateCDCStatement extends MutableStatement {
private final NamedNode cdcObjName;
private final TableName dataTable;
private final ColumnName timeIdxColumn;
private final FunctionParseNode timeIdxFunc;
private final Set<PTable.CDCChangeScope> includeScopes;
private final ListMultimap<String,Pair<String,Object>> props;
private final boolean ifNotExists;
private final int bindCount;

public CreateCDCStatement(NamedNode cdcObjName, TableName dataTable, ColumnName timeIdxColumn,
FunctionParseNode timeIdxFunc,
Set<PTable.CDCChangeScope> includeScopes, ListMultimap<String,
Pair<String, Object>> props, boolean ifNotExists, int bindCount) {
this.cdcObjName = cdcObjName;
this.dataTable = dataTable;
this.timeIdxColumn = timeIdxColumn;
this.timeIdxFunc = timeIdxFunc;
this.includeScopes = includeScopes;
this.props = props == null ? ArrayListMultimap.<String,Pair<String,Object>>create() : props;
this.ifNotExists = ifNotExists;
this.bindCount = bindCount;
}

public NamedNode getCdcObjName() {
return cdcObjName;
}

public TableName getDataTable() {
return dataTable;
}

public ColumnName getTimeIdxColumn() {
return timeIdxColumn;
}

public FunctionParseNode getTimeIdxFunc() {
return timeIdxFunc;
}

public Set<PTable.CDCChangeScope> getIncludeScopes() {
return includeScopes;
}

public ListMultimap<String, Pair<String, Object>> getProps() {
return props;
}

public boolean isIfNotExists() {
return ifNotExists;
}

@Override
public int getBindCount() {
return bindCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.phoenix.thirdparty.com.google.common.collect.ArrayListMultimap;
import org.apache.hadoop.hbase.CompareOperator;
Expand Down Expand Up @@ -347,6 +349,15 @@ public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode data
return new CreateIndexStatement(indexName, dataTable, ikConstraint, includeColumns, splits, props, ifNotExists, indexType, async, bindCount, udfParseNodes);
}

public CreateCDCStatement createCDC(NamedNode cdcObj, TableName dataTable,
ColumnName timeIdxColumn, FunctionParseNode timeIdxFunc,
Set<PTable.CDCChangeScope> includeScopes,
ListMultimap<String, Pair<String, Object>> props,
boolean ifNotExists, int bindCount) {
return new CreateCDCStatement(cdcObj, dataTable, timeIdxColumn, timeIdxFunc, includeScopes,
props, ifNotExists, bindCount);
}

public CreateSequenceStatement createSequence(TableName tableName, ParseNode startsWith,
ParseNode incrementBy, ParseNode cacheSize, ParseNode minValue, ParseNode maxValue,
boolean cycle, boolean ifNotExits, int bindCount) {
Expand Down Expand Up @@ -426,6 +437,10 @@ public NamedNode indexName(String name) {
return new NamedNode(name);
}

public NamedNode cdcName(String name) {
return new NamedNode(name);
}

@Deprecated
public NamedTableNode namedTable(String alias, TableName name) {
return new NamedTableNode(alias, name);
Expand Down
22 changes: 22 additions & 0 deletions phoenix-core/src/main/java/org/apache/phoenix/schema/PTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1048,4 +1048,26 @@ public Map<String, Integer> values() {

}

enum CDCChangeScope {
/**
* Include only the pre image (state prior to the change) of the row.
*/
PRE,

/**
* Include only the post image (state past the change) of the row.
*/
POST,

/**
* Include only the latest image of the row.
*/
LATEST,

/**
* Include all images.
*/
ALL,
}

}

0 comments on commit 4c9827a

Please sign in to comment.