Skip to content

Commit

Permalink
PHOENIX-7001: Initial implemntation of Change Data Capture (CDC) feature
Browse files Browse the repository at this point in the history
This commit includes a squash of all the below changes from the PRs apache#1662, apache#1694,
down by removing non-functional changes for the ease of review. On top of it,
the changes have been reworked for the changed state of master, especially the
new submodules.

* 4c9827a Hari.. PHOENIX-7008 Shallow grammar support for CREATE CDC (apache#1662)
* e5220e0 TheN..  PHOENIX-7054 Shallow grammar support for DROP CDC and ALTER CDC  (apache#1694)
* 581e613 Hari.. PHOENIX-7008 Implementation for CREATE CDC  (apache#1681)
* e2ef886 Hari.. PHOENIX-7008 Tweaks, fixes and additional test coverage for CREATE CDC (apache#1703)
* 5d3fd40 TheN.. PHOENIX-7074 DROP CDC Implementation (apache#1713)
* 7420443 Hari.. PHOENIX-7014: Query compiler/optimizer changes along with some PHOENIX-7015 changes (apache#1766)
* da6ddad Kadi.. Add an extra delete mutation for CDC
* 93d586e Kadi.. Add an extra delete mutation during rebuild for CDC index
* f07898f Hari.. PHOENIX-7008: Addressing Jira spec and review feedback changes (apache#1802)
* 87a2ea1 Hari.. PHOENIX-7008: Fix for parser gap and fix for failing test (apache#1812)
* e395780 TheN.. PHOENIX-7015 Implementing CDCGlobalIndexRegionScanner (apache#1813)

Co-authored-by: Saurabh Rai <saurabh.rai@salesforce.com>
  • Loading branch information
haridsv and Saurabh Rai committed Mar 29, 2024
1 parent ff37830 commit 818087f
Show file tree
Hide file tree
Showing 69 changed files with 4,236 additions and 401 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ phoenix-hbase-compat-1.3.0/
phoenix-hbase-compat-1.4.0/
phoenix-hbase-compat-1.5.0/
*/hbase.log

# Vim swap files
.*.sw*
49 changes: 49 additions & 0 deletions phoenix-core-client/src/main/antlr3/PhoenixSQL.g
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ tokens
IF='if';
CONSTRAINT='constraint';
TABLES='tables';
CDC='cdc';
PRE='pre';
POST='post';
CHANGE='change';
LATEST='latest';
ALL='all';
INDEX='index';
INCLUDE='include';
Expand Down Expand Up @@ -188,6 +193,8 @@ import java.lang.Boolean;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.Stack;
import java.sql.SQLException;
import org.apache.phoenix.expression.function.CountAggregateFunction;
Expand All @@ -202,6 +209,7 @@ import org.apache.phoenix.schema.IllegalDataException;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.PTable.IndexType;
import org.apache.phoenix.schema.PTable.CDCChangeScope;
import org.apache.phoenix.schema.stats.StatisticsCollectionScope;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.schema.types.PDate;
Expand Down Expand Up @@ -425,14 +433,17 @@ oneStatement returns [BindableStatement ret]
| s=create_schema_node
| s=create_view_node
| s=create_index_node
| s=create_cdc_node
| s=cursor_open_node
| s=cursor_close_node
| s=cursor_fetch_node
| s=declare_cursor_node
| s=drop_table_node
| s=drop_index_node
| s=drop_cdc_node
| s=alter_index_node
| s=alter_table_node
| s=alter_cdc_node
| s=show_node
| s=show_create_table_node
| s=trace_node
Expand Down Expand Up @@ -554,6 +565,31 @@ create_index_node returns [CreateIndexStatement ret]
}
;

create_cdc_node returns [CreateCDCStatement ret]
: CREATE CDC (IF NOT ex=EXISTS)? o=cdc_name ON t=from_table_name
(INCLUDE LPAREN v=cdc_change_scopes RPAREN)?
(p=fam_properties)?
{
ret = factory.createCDC(o, t, v, p, ex != null, getBindCount());
}
;

cdc_name returns [NamedNode ret]
: name=identifier {$ret = factory.cdcName(name); }
;

cdc_change_scopes returns [Set<CDCChangeScope> ret]
@init { ret = new HashSet<>(); }
: v=cdc_change_scope { $ret.add(v); } ( COMMA v=cdc_change_scope { $ret.add(v); } )*
;

cdc_change_scope returns [CDCChangeScope ret]
: v=(PRE | POST | CHANGE)
{
ret = CDCChangeScope.valueOf(v.getText().toUpperCase());
}
;

// Parse a create sequence statement.
create_sequence_node returns [CreateSequenceStatement ret]
: CREATE SEQUENCE (IF NOT ex=EXISTS)? t=from_table_name
Expand Down Expand Up @@ -653,13 +689,26 @@ drop_index_node returns [DropIndexStatement ret]
{ret = factory.dropIndex(i, t, ex!=null); }
;

// Parse a drop CDC statement
drop_cdc_node returns [DropCDCStatement ret]
: DROP CDC (IF ex=EXISTS)? o=cdc_name ON t=from_table_name
{ret = factory.dropCDC(o, t, ex!=null); }
;

// Parse a alter index statement
alter_index_node returns [AlterIndexStatement ret]
: ALTER INDEX (IF ex=EXISTS)? i=index_name ON t=from_table_name
((s=(USABLE | UNUSABLE | REBUILD (isRebuildAll=ALL)? | DISABLE | ACTIVE)) (async=ASYNC)? ((SET?)p=fam_properties)?)
{ret = factory.alterIndex(factory.namedTable(null, TableName.create(t.getSchemaName(), i.getName())), t.getTableName(), ex!=null, PIndexState.valueOf(SchemaUtil.normalizeIdentifier(s.getText())), isRebuildAll!=null, async!=null, p); }
;

// Parse a alter CDC statement
alter_cdc_node returns [AlterCDCStatement ret]
: ALTER CDC (IF ex=EXISTS)? o=cdc_name ON t=from_table_name
((SET?) p=fam_properties)?
{ret = factory.alterCDC(factory.namedTable(null, TableName.create(t.getSchemaName(), o.getName())), t.getTableName(), ex!=null, p); }
;

// Parse a trace statement.
trace_node returns [TraceStatement ret]
: TRACE ((flag = ON ( WITH SAMPLING s = sampling_rate)?) | flag = OFF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,4 @@ public ExplainPlan getExplainPlan() throws SQLException {

};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ private static void projectAllIndexColumns(StatementContext context, TableRef ta
ColumnRef ref = null;
try {
indexColumn = index.getColumnForColumnName(indexColName);
//TODO could should we do this more efficiently than catching the expcetion ?
// TODO: Should we do this more efficiently than catching the exception ?
} catch (ColumnNotFoundException e) {
if (IndexUtil.shouldIndexBeUsedForUncoveredQuery(tableRef)) {
//Projected columns have the same name as in the data table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
import java.util.Map;
import java.util.Set;

import org.apache.phoenix.expression.function.PhoenixRowTimestampFunction;
import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.parse.NamedTableNode;
import org.apache.phoenix.parse.TerminalParseNode;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
Expand Down Expand Up @@ -82,6 +88,7 @@
import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ParseNodeUtil;
import org.apache.phoenix.util.ParseNodeUtil.RewriteResult;
Expand Down Expand Up @@ -248,8 +255,41 @@ public QueryPlan compileUnionAll(SelectStatement select) throws SQLException {
return plan;
}

private QueryPlan getExistingDataPlanForCDC() {
if (dataPlans != null) {
for (QueryPlan plan : dataPlans.values()) {
if (plan.getTableRef().getTable().getType() == PTableType.CDC) {
return plan;
}
}
}
return null;
}

public QueryPlan compileSelect(SelectStatement select) throws SQLException{
StatementContext context = new StatementContext(statement, resolver, bindManager, scan, sequenceManager);
QueryPlan dataPlanForCDC = getExistingDataPlanForCDC();
if (dataPlanForCDC != null) {
TableRef cdcTableRef = dataPlanForCDC.getTableRef();
PTable cdcTable = cdcTableRef.getTable();
NamedTableNode cdcDataTableName = NODE_FACTORY.namedTable(null,
NODE_FACTORY.table(cdcTable.getSchemaName().getString(),
cdcTable.getParentTableName().getString()),
select.getTableSamplingRate());
ColumnResolver dataTableResolver = FromCompiler.getResolver(cdcDataTableName,
statement.getConnection());
TableRef cdcDataTableRef = dataTableResolver.getTables().get(0);
Set<PTable.CDCChangeScope> cdcIncludeScopes =
cdcTable.getCDCIncludeScopes();
String cdcHint = select.getHint().getHint(Hint.CDC_INCLUDE);
if (cdcHint != null && cdcHint.startsWith(HintNode.PREFIX)) {
cdcIncludeScopes = CDCUtil.makeChangeScopeEnumsFromString(cdcHint.substring(1,
cdcHint.length() - 1));
}
context.setCDCDataTableRef(cdcDataTableRef);
context.setCDCTableRef(cdcTableRef);
context.setCDCIncludeScopes(cdcIncludeScopes);
}
if (select.isJoin()) {
JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver());
return compileJoinQuery(context, joinTable, false, false, null);
Expand Down Expand Up @@ -716,6 +756,38 @@ protected QueryPlan compileSingleFlatQuery(
TableRef tableRef = context.getCurrentTable();
PTable table = tableRef.getTable();

if (table.getType() == PTableType.CDC) {
List<AliasedNode> selectNodes = select.getSelect();
// For CDC queries, if a single wildcard projection is used, automatically insert
// PHOENIX_ROW_TIMESTAMP() as a project at the beginning.
ParseNode selectNode = selectNodes.size() == 1 ? selectNodes.get(0).getNode() : null;
if (selectNode instanceof TerminalParseNode
&& ((TerminalParseNode) selectNode).isWildcardNode()) {
List<AliasedNode> tmpSelectNodes = Lists.newArrayListWithExpectedSize(
selectNodes.size() + 1);
tmpSelectNodes.add(NODE_FACTORY.aliasedNode(null,
NODE_FACTORY.function(PhoenixRowTimestampFunction.NAME,
Collections.emptyList())));
tmpSelectNodes.add(NODE_FACTORY.aliasedNode(null,
((TerminalParseNode) selectNode).getRewritten()));
selectNodes = tmpSelectNodes;
}
List<OrderByNode> orderByNodes = select.getOrderBy();
// For CDC queries, if no ORDER BY is specified, add default ordering.
if (orderByNodes.size() == 0) {
orderByNodes = Lists.newArrayListWithExpectedSize(1);
orderByNodes.add(NODE_FACTORY.orderBy(
NODE_FACTORY.function(PhoenixRowTimestampFunction.NAME,
Collections.emptyList()),
false, SortOrder.getDefault() == SortOrder.ASC));
}
select = NODE_FACTORY.select(select.getFrom(),
select.getHint(), select.isDistinct(), selectNodes, select.getWhere(),
select.getGroupBy(), select.getHaving(), orderByNodes, select.getLimit(),
select.getOffset(), select.getBindCount(), select.isAggregate(),
select.hasSequence(), select.getSelects(), select.getUdfParseNodes());
}

ParseNode viewWhere = null;
if (table.getViewStatement() != null) {
viewWhere = new SQLParser(table.getViewStatement()).parseQuery().getWhere();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;

import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
Expand All @@ -46,6 +45,7 @@
import org.apache.phoenix.schema.types.PTime;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.NumberUtil;
import org.apache.phoenix.util.ReadOnlyProps;
Expand Down Expand Up @@ -84,7 +84,10 @@ public class StatementContext {
private QueryLogger queryLogger;
private boolean isClientSideUpsertSelect;
private boolean isUncoveredIndex;

private String cdcIncludeScopes;
private TableRef cdcTableRef;
private TableRef cdcDataTableRef;

public StatementContext(PhoenixStatement statement) {
this(statement, new Scan());
}
Expand Down Expand Up @@ -378,4 +381,27 @@ public boolean getRetryingPersistentCache(long cacheId) {
return retrying;
}
}
public String getEncodedCdcIncludeScopes() {
return cdcIncludeScopes;
}

public void setCDCIncludeScopes(Set<PTable.CDCChangeScope> cdcIncludeScopes) {
this.cdcIncludeScopes = CDCUtil.makeChangeScopeStringFromEnums(cdcIncludeScopes);
}

public TableRef getCDCDataTableRef() {
return cdcDataTableRef;
}

public void setCDCDataTableRef(TableRef cdcDataTableRef) {
this.cdcDataTableRef = cdcDataTableRef;
}

public TableRef getCDCTableRef() {
return cdcTableRef;
}

public void setCDCTableRef(TableRef cdcTableRef) {
this.cdcTableRef = cdcTableRef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -54,25 +55,27 @@
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.ScanUtil;
import org.apache.phoenix.util.SchemaUtil;

import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;

public class TupleProjectionCompiler {
public static final PName PROJECTED_TABLE_SCHEMA = PNameFactory.newName(".");
public static final EnumSet<PTableType> PROJECTED_TABLE_TYPES = EnumSet.of(PTableType.TABLE,
PTableType.INDEX, PTableType.VIEW, PTableType.CDC);
private static final ParseNodeFactory NODE_FACTORY = new ParseNodeFactory();

public static PTable createProjectedTable(SelectStatement select, StatementContext context) throws SQLException {
Preconditions.checkArgument(!select.isJoin());
// Non-group-by or group-by aggregations will create its own projected result.
if (select.getInnerSelectStatement() != null
if (select.getInnerSelectStatement() != null
|| select.getFrom() == null
|| select.isAggregate()
|| select.isDistinct()
|| (context.getResolver().getTables().get(0).getTable().getType() != PTableType.TABLE
&& context.getResolver().getTables().get(0).getTable().getType() != PTableType.INDEX && context.getResolver().getTables().get(0).getTable().getType() != PTableType.VIEW))
|| ! PROJECTED_TABLE_TYPES.contains(
context.getResolver().getTables().get(0).getTable().getType())) {
return null;
}

List<PColumn> projectedColumns = new ArrayList<PColumn>();
boolean isWildcard = false;
Expand All @@ -86,7 +89,7 @@ public static PTable createProjectedTable(SelectStatement select, StatementConte
if (node instanceof WildcardParseNode) {
if (((WildcardParseNode) node).isRewrite()) {
TableRef parentTableRef = FromCompiler.getResolver(
NODE_FACTORY.namedTable(null, TableName.create(table.getSchemaName().getString(),
NODE_FACTORY.namedTable(null, TableName.create(table.getSchemaName().getString(),
table.getParentTableName().getString())), context.getConnection()).resolveTable(
table.getSchemaName().getString(),
table.getParentTableName().getString());
Expand Down Expand Up @@ -162,8 +165,8 @@ public static PTable createProjectedTable(SelectStatement select, StatementConte
// add IndexUncoveredDataColumnRef
position = projectedColumns.size() + (hasSaltingColumn ? 1 : 0);
for (IndexUncoveredDataColumnRef sourceColumnRef : visitor.indexColumnRefSet) {
PColumn column = new ProjectedColumn(sourceColumnRef.getColumn().getName(),
sourceColumnRef.getColumn().getFamilyName(), position++,
PColumn column = new ProjectedColumn(sourceColumnRef.getColumn().getName(),
sourceColumnRef.getColumn().getFamilyName(), position++,
sourceColumnRef.getColumn().isNullable(), sourceColumnRef, sourceColumnRef.getColumn().getColumnQualifierBytes());
projectedColumns.add(column);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public static long getMaxLookbackInMillis(Configuration conf){
public static final String EMPTY_COLUMN_QUALIFIER_NAME = "_EmptyCQName";
public static final String INDEX_ROW_KEY = "_IndexRowKey";
public static final String READ_REPAIR_TRANSFORMING_TABLE = "_ReadRepairTransformingTable";
public static final String CDC_DATA_TABLE_DEF = "_CdcDataTableDef";

public static final String MAX_LOOKBACK_AGE = "MAX_LOOKBACK_AGE";
/**
Expand Down Expand Up @@ -184,4 +185,4 @@ public static long getMaxLookbackInMillis(Configuration conf){

/** Exposed for testing */
public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ public SQLException newException(SQLExceptionInfo info) {
AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX(520, "42897", "Aggregate expression not allowed in an index."),
NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX(521, "42898", "Non-deterministic expression not allowed in an index."),
STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX(522, "42899", "Stateless expression not allowed in an index."),
/**

/**
* Transaction exceptions.
*/
TRANSACTION_CONFLICT_EXCEPTION(523, "42900", "Transaction aborted due to conflict with other mutations."),
Expand Down Expand Up @@ -360,6 +360,7 @@ public SQLException newException(SQLExceptionInfo info) {
VIEW_CANNOT_EXTEND_PK_WITH_PARENT_INDEXES(10956, "44A38", "View can extend parent primary key"
+ " only if none of the parents have indexes in the parent hierarchy"),
MAX_LOOKBACK_AGE_SUPPORTED_FOR_TABLES_ONLY(10957, "44A39", "Max lookback age can only be set for tables"),
UNKNOWN_INCLUDE_CHANGE_SCOPE(10958, "44A40", "Unknown change scope for CDC INCLUDE"),

/** Sequence related */
SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
Expand Down Expand Up @@ -440,7 +441,9 @@ public SQLException newException(SQLExceptionInfo info) {

CANNOT_ALTER_IMMUTABLE_ROWS_PROPERTY(1133, "XCL33", "IMMUTABLE_ROWS property can be changed only if the table storage scheme is ONE_CELL_PER_KEYVALUE_COLUMN"),
CANNOT_ALTER_TABLE_PROPERTY_ON_VIEW(1134, "XCL34", "Altering this table property on a view is not allowed"),


CANNOT_DROP_CDC_INDEX(1153, "XCL53",
"Cannot drop the index associated with CDC"),
IMMUTABLE_TABLE_PROPERTY_INVALID(1135, "XCL35", "IMMUTABLE table property cannot be used with CREATE IMMUTABLE TABLE statement "),

MAX_COLUMNS_EXCEEDED(1136, "XCL36", "The number of columns exceed the maximum supported by the table's qualifier encoding scheme"),
Expand Down

0 comments on commit 818087f

Please sign in to comment.