Skip to content

Commit

Permalink
PHOENIX-7008: Addressing Jira spec and review feedback changes (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
haridsv committed Jan 24, 2024
1 parent 93d586e commit f07898f
Show file tree
Hide file tree
Showing 17 changed files with 245 additions and 234 deletions.
231 changes: 146 additions & 85 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java

Large diffs are not rendered by default.

10 changes: 1 addition & 9 deletions phoenix-core/src/main/antlr3/PhoenixSQL.g
Original file line number Diff line number Diff line change
Expand Up @@ -566,25 +566,17 @@ create_index_node returns [CreateIndexStatement ret]

create_cdc_node returns [CreateCDCStatement ret]
: CREATE CDC (IF NOT ex=EXISTS)? o=cdc_name ON t=from_table_name
LPAREN (tcol=column_name | tfunc=cdc_time_func) RPAREN
(INCLUDE LPAREN v=cdc_change_scopes RPAREN)?
(p=fam_properties)?
{
ret = factory.createCDC(o, t, tcol, tfunc, v, p, ex != null, getBindCount());
ret = factory.createCDC(o, t, v, p, ex != null, getBindCount());
}
;

cdc_name returns [NamedNode ret]
: name=identifier {$ret = factory.cdcName(name); }
;

cdc_time_func returns [FunctionParseNode ret]
: field=identifier LPAREN l=zero_or_more_expressions RPAREN
{
ret = factory.function(field, l);
}
;

cdc_change_scopes returns [Set<CDCChangeScope> ret]
@init { ret = new HashSet<>(); }
: v=cdc_change_scope { $ret.add(v); } ( COMMA v=cdc_change_scope { $ret.add(v); } )*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ public SQLException newException(SQLExceptionInfo info) {
VIEW_CANNOT_EXTEND_PK_WITH_PARENT_INDEXES(10956, "44A38", "View can extend parent primary key"
+ " only if none of the parents have indexes in the parent hierarchy"),
UNKNOWN_INDEX_TYPE(1098, "44A39", "Unknown INDEX type: "),
UNKNOWN_INCLUDE_CHANGE_SCOPE(1099, "44A40", "Unknown change scope for INCLUDE: "),
UNKNOWN_INCLUDE_CHANGE_SCOPE(1099, "44A40", "Unknown change scope for CDC INCLUDE"),

/** Sequence related */
SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
Expand Down Expand Up @@ -469,8 +469,6 @@ public SQLException newException(SQLExceptionInfo info) {
"Missing ENCODED_QUALIFIER."),
EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET(1151, "XCL51", "A batch operation can't include a "
+ "statement that produces result sets.", Factory.BATCH_UPDATE_ERROR),
INVALID_TABLE_TYPE_FOR_CDC(1152, "XCL52",
"Invalid table type for creating CDC."),


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class SQLExceptionInfo {
private final int phoenixColumnSizeBytes;
private final int maxPhoenixColumnSizeBytes;
private final String haGroupInfo;
private final String cdcChangeScope;

public static class Builder {
private Throwable rootCause;
Expand All @@ -83,6 +84,7 @@ public static class Builder {
private int maxPhoenixColumnSizeBytes;
private String haGroupInfo;
private PTableType tableType;
private String cdcChangeScope;

public Builder(SQLExceptionCode code) {
this.code = code;
Expand Down Expand Up @@ -163,6 +165,11 @@ public Builder setHaGroupInfo(String haGroupInfo) {
return this;
}

public Builder setCdcChangeScope(String cdcChangeScope) {
this.cdcChangeScope = cdcChangeScope;
return this;
}

public SQLExceptionInfo build() {
return new SQLExceptionInfo(this);
}
Expand Down Expand Up @@ -190,6 +197,7 @@ private SQLExceptionInfo(Builder builder) {
maxPhoenixColumnSizeBytes = builder.maxPhoenixColumnSizeBytes;
phoenixColumnSizeBytes = builder.phoenixColumnSizeBytes;
haGroupInfo = builder.haGroupInfo;
cdcChangeScope = builder.cdcChangeScope;
}

@Override
Expand Down Expand Up @@ -235,6 +243,9 @@ public String toString() {
if (haGroupInfo != null) {
builder.append(" ").append(HA_GROUP_INFO).append("=").append(haGroupInfo);
}
if (cdcChangeScope != null) {
builder.append(": ").append(cdcChangeScope);
}

return builder.toString();
}
Expand Down Expand Up @@ -306,4 +317,8 @@ public int getPhoenixColumnSizeBytes() {
public String getHaGroupInfo() {
return haGroupInfo;
}

public String getCdcChangeScope() {
return cdcChangeScope;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ private Void addDataColInfo(final PTable dataTable, Expression expression) {
this.indexWhere = index.getIndexWhereExpression(connection);
this.indexWhereColumns = index.getIndexWhereColumns(connection);
}
this.isCDCIndex = CDCUtil.isACDCIndex(index);
this.isCDCIndex = CDCUtil.isCDCIndex(index);

initCachedState();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public RegionScanner getWrappedScanner(final RegionCoprocessorEnvironment env,
long extraLimit = -1;

{
if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) {
// for indexes construct the row filter for uncovered columns if it exists
if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)) {
byte[] expBytes = scan.getAttribute(BaseScannerRegionObserver.INDEX_FILTER);
if (expBytes == null) {
// For older clients
Expand All @@ -160,48 +161,45 @@ public RegionScanner getWrappedScanner(final RegionCoprocessorEnvironment env,
if (limitBytes != null) {
extraLimit = Bytes.toLong(limitBytes);
}
if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)
&& (tupleProjector != null
|| (indexMaintainer != null && indexMaintainer.isUncovered()))) {
if (ScanUtil.isLocalOrUncoveredGlobalIndex(scan)
&& (tupleProjector != null
|| (indexMaintainer != null && indexMaintainer.isUncovered()))) {

PTable.ImmutableStorageScheme storageScheme =
indexMaintainer.getIndexStorageScheme();
PTable.ImmutableStorageScheme storageScheme =
indexMaintainer.getIndexStorageScheme();
Scan dataTableScan = new Scan();
if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
if (dataColumns != null) {
if (dataColumns != null) {
for (int i = 0; i < dataColumns.length; i++) {
if (storageScheme ==
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
dataTableScan.addFamily(dataColumns[i].getFamily());
} else {
dataTableScan.addColumn(dataColumns[i].getFamily(),
dataColumns[i].getQualifier());
}
if (storageScheme ==
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
dataTableScan.addFamily(dataColumns[i].getFamily());
} else {
dataTableScan.addColumn(dataColumns[i].getFamily(),
dataColumns[i].getQualifier());
}
}
} else if (indexMaintainer.isUncovered()) {
} else if (indexMaintainer.isUncovered()) {
// Indexed columns and the columns in index where clause should also be added
// to the data columns to scan for uncovered global indexes. This is required
// to verify the index row against the data table row.
for (ColumnReference column : indexMaintainer.getAllColumnsForDataTable()) {
dataTableScan.addColumn(column.getFamily(), column.getQualifier());
dataTableScan.addColumn(column.getFamily(), column.getQualifier());
}
}
}
if (ScanUtil.isLocalIndex(scan)) {
s = new UncoveredLocalIndexRegionScanner(regionScanner, dataRegion, scan, env,
dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
pageSizeMs, offset, actualStartKey, extraLimit);
} else {
if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
pageSizeMs, extraLimit);
}
else {
s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
s = new UncoveredLocalIndexRegionScanner(regionScanner, dataRegion, scan, env,
dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
pageSizeMs, extraLimit);
}
pageSizeMs, offset, actualStartKey, extraLimit);
} else {
if (scan.getAttribute(CDC_DATA_TABLE_NAME) != null) {
s = new CDCGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
pageSizeMs, extraLimit);
} else {
s = new UncoveredGlobalIndexRegionScanner(regionScanner, dataRegion, scan, env,
dataTableScan, tupleProjector, indexMaintainer, viewConstants, ptr,
pageSizeMs, extraLimit);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@
import org.apache.phoenix.log.QueryStatus;
import org.apache.phoenix.monitoring.TableMetricsManager;
import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.optimize.Cost;
import org.apache.phoenix.parse.AddColumnStatement;
import org.apache.phoenix.parse.AddJarsStatement;
import org.apache.phoenix.parse.AliasedNode;
Expand All @@ -146,7 +145,6 @@
import org.apache.phoenix.parse.DeleteJarStatement;
import org.apache.phoenix.parse.DeleteStatement;
import org.apache.phoenix.parse.ExplainType;
import org.apache.phoenix.parse.FunctionParseNode;
import org.apache.phoenix.parse.ShowCreateTableStatement;
import org.apache.phoenix.parse.ShowCreateTable;
import org.apache.phoenix.parse.DropColumnStatement;
Expand Down Expand Up @@ -1082,12 +1080,10 @@ public MutationPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqActio
private static class ExecutableCreateCDCStatement extends CreateCDCStatement
implements CompilableStatement {
public ExecutableCreateCDCStatement(NamedNode cdcObjName, TableName dataTable,
ColumnName timeIdxColumn, FunctionParseNode tfunc,
Set<PTable.CDCChangeScope> includeScopes,
ListMultimap<String, Pair<String, Object>> props,
boolean ifNotExists, int bindCount) {
super(cdcObjName, dataTable, timeIdxColumn, tfunc, includeScopes, props, ifNotExists,
bindCount);
super(cdcObjName, dataTable, includeScopes, props, ifNotExists, bindCount);
}

@Override
Expand Down Expand Up @@ -1594,7 +1590,7 @@ public ExplainPlan getExplainPlan() throws SQLException {
@Override
public MutationState execute() throws SQLException {
String indexName = ExecutableDropIndexStatement.this.getIndexName().getName();
if (CDCUtil.isACDCIndex(indexName)) {
if (CDCUtil.isCDCIndex(indexName)) {
throw new SQLExceptionInfo.Builder(CANNOT_DROP_CDC_INDEX)
.setTableName(indexName)
.build().buildException();
Expand Down Expand Up @@ -1940,11 +1936,10 @@ public CreateTableStatement createTable(TableName tableName, ListMultimap<String

@Override
public CreateCDCStatement createCDC(NamedNode cdcObj, TableName dataTable,
ColumnName timeIdxColumn, FunctionParseNode timeIdxFunc,
Set<PTable.CDCChangeScope> includeScopes,
ListMultimap<String, Pair<String, Object>> props,
boolean ifNotExists, int bindCount) {
return new ExecutableCreateCDCStatement(cdcObj, dataTable, timeIdxColumn, timeIdxFunc,
return new ExecutableCreateCDCStatement(cdcObj, dataTable,
includeScopes, props, ifNotExists, bindCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,9 @@ private List<QueryPlan> getApplicablePlansForSingleFlatQuery(QueryPlan dataPlan,
}

PTable table = dataPlan.getTableRef().getTable();
// TODO: Need to handle CDC hints.
if (table.getType() == PTableType.CDC) {
Set<PTable.CDCChangeScope> cdcIncludeScopes = table.getCDCIncludeScopes();
String cdcHint = select.getHint().getHint(Hint.INCLUDE);
String cdcHint = select.getHint().getHint(Hint.CDC_INCLUDE);
if (cdcHint != null && cdcHint.startsWith(HintNode.PREFIX)) {
cdcIncludeScopes = CDCUtil.makeChangeScopeEnumsFromString(cdcHint.substring(1,
cdcHint.length() - 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,16 @@
public class CreateCDCStatement extends MutableStatement {
private final NamedNode cdcObjName;
private final TableName dataTable;
private final ColumnName timeIdxColumn;
private final FunctionParseNode timeIdxFunc;
private final Set<PTable.CDCChangeScope> includeScopes;
private final ListMultimap<String,Pair<String,Object>> props;
private final boolean ifNotExists;
private final int bindCount;

public CreateCDCStatement(NamedNode cdcObjName, TableName dataTable, ColumnName timeIdxColumn,
FunctionParseNode timeIdxFunc,
public CreateCDCStatement(NamedNode cdcObjName, TableName dataTable,
Set<PTable.CDCChangeScope> includeScopes, ListMultimap<String,
Pair<String, Object>> props, boolean ifNotExists, int bindCount) {
this.cdcObjName = cdcObjName;
this.dataTable = dataTable;
this.timeIdxColumn = timeIdxColumn;
this.timeIdxFunc = timeIdxFunc;
this.includeScopes = includeScopes;
this.props = props == null ? ArrayListMultimap.<String,Pair<String,Object>>create() : props;
this.ifNotExists = ifNotExists;
Expand All @@ -57,14 +52,6 @@ public TableName getDataTable() {
return dataTable;
}

public ColumnName getTimeIdxColumn() {
return timeIdxColumn;
}

public FunctionParseNode getTimeIdxFunc() {
return timeIdxFunc;
}

public Set<PTable.CDCChangeScope> getIncludeScopes() {
return includeScopes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public enum Hint {
/**
* Override the default CDC include scopes.
*/
INCLUDE,
CDC_INCLUDE,
;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,11 +377,10 @@ public CreateIndexStatement createIndex(NamedNode indexName, NamedTableNode data
}

public CreateCDCStatement createCDC(NamedNode cdcObj, TableName dataTable,
ColumnName timeIdxColumn, FunctionParseNode timeIdxFunc,
Set<PTable.CDCChangeScope> includeScopes,
ListMultimap<String, Pair<String, Object>> props,
boolean ifNotExists, int bindCount) {
return new CreateCDCStatement(cdcObj, dataTable, timeIdxColumn, timeIdxFunc, includeScopes,
return new CreateCDCStatement(cdcObj, dataTable, includeScopes,
props, ifNotExists, bindCount);
}

Expand Down

0 comments on commit f07898f

Please sign in to comment.