Skip to content

Commit

Permalink
PHOENIX-7008 Tweaks, fixes and additional test coverage for CREATE CDC (
Browse files Browse the repository at this point in the history
  • Loading branch information
haridsv committed Oct 7, 2023
1 parent 20e9a66 commit e2ef886
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 19 deletions.
59 changes: 59 additions & 0 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.phoenix.end2end;

import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableProperty;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;

Expand All @@ -31,6 +33,7 @@
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;

Expand Down Expand Up @@ -64,6 +67,16 @@ private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeS
PTable table = PhoenixRuntime.getTable(conn, cdcName);
assertEquals(expIncludeScopes, table.getCDCIncludeScopes());
assertEquals(expIncludeScopes, TableProperty.INCLUDE.getPTableValue(table));
assertNull(table.getIndexState()); // Index state should be null for CDC.
}

private void assertSaltBuckets(String cdcName, Integer nbuckets) throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
PTable cdcTable = PhoenixRuntime.getTable(conn, cdcName);
assertEquals(nbuckets, cdcTable.getBucketNum());
PTable indexTable = PhoenixRuntime.getTable(conn, CDCUtil.getCDCIndexName(cdcName));
assertEquals(nbuckets, indexTable.getBucketNum());
}

@Test
Expand Down Expand Up @@ -147,6 +160,52 @@ public void testCreate() throws SQLException {
assertCDCState(conn, cdcName, null, 2);
assertPTable(cdcName, null);

String viewName = generateUniqueName();
conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " +
tableName);
cdcName = generateUniqueName();
try {
conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + viewName +
"(PHOENIX_ROW_TIMESTAMP())");
fail("Expected to fail on VIEW");
}
catch(SQLException e) {
assertEquals(SQLExceptionCode.INVALID_TABLE_TYPE_FOR_CDC.getErrorCode(),
e.getErrorCode());
assertTrue(e.getMessage().endsWith(
SQLExceptionCode.INVALID_TABLE_TYPE_FOR_CDC.getMessage() + " tableType=VIEW"));
}

cdcName = generateUniqueName();
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP()) SALT_BUCKETS = 4");
assertSaltBuckets(cdcName, 4);

conn.close();
}

@Test
public void testCreateCDCMultitenant() throws Exception {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
conn.createStatement().execute("CREATE TABLE " + tableName +
" (tenantId INTEGER NOT NULL, k INTEGER NOT NULL," + " v1 INTEGER, v2 DATE, " +
"CONSTRAINT pk PRIMARY KEY (tenantId, k)) MULTI_TENANT=true");
String cdcName = generateUniqueName();
conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
"(PHOENIX_ROW_TIMESTAMP())");

PTable indexTable = PhoenixRuntime.getTable(conn, CDCUtil.getCDCIndexName(cdcName));
List<PColumn> idxPkColumns = indexTable.getPKColumns();
assertEquals(":TENANTID", idxPkColumns.get(0).getName().getString());
assertEquals(": PHOENIX_ROW_TIMESTAMP()", idxPkColumns.get(1).getName().getString());
assertEquals(":K", idxPkColumns.get(2).getName().getString());

PTable cdcTable = PhoenixRuntime.getTable(conn, cdcName);
List<PColumn> cdcPkColumns = cdcTable.getPKColumns();
assertEquals(" PHOENIX_ROW_TIMESTAMP()", cdcPkColumns.get(0).getName().getString());
assertEquals("TENANTID", cdcPkColumns.get(1).getName().getString());
assertEquals("K", cdcPkColumns.get(2).getName().getString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ public SQLException newException(SQLExceptionInfo info) {
"Missing ENCODED_QUALIFIER."),
EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET(1151, "XCL51", "A batch operation can't include a "
+ "statement that produces result sets.", Factory.BATCH_UPDATE_ERROR),
INVALID_TABLE_TYPE_FOR_CDC(1152, "XCL52",
"Invalid table type for creating CDC."),


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.sql.SQLException;

import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.util.SchemaUtil;


Expand All @@ -35,6 +37,7 @@ public class SQLExceptionInfo {
*/
public static final String SCHEMA_NAME = "schemaName";
public static final String TABLE_NAME = "tableName";
public static final String TABLE_TYPE = "tableType";
public static final String FAMILY_NAME = "familyName";
public static final String COLUMN_NAME = "columnName";
public static final String FUNCTION_NAME = "functionName";
Expand All @@ -51,6 +54,7 @@ public class SQLExceptionInfo {
private final String message;
private final String schemaName;
private final String tableName;
private final PTableType tableType;
private final String familyName;
private final String columnName;
private final String functionName;
Expand Down Expand Up @@ -78,6 +82,7 @@ public static class Builder {
private int phoenixColumnSizeBytes;
private int maxPhoenixColumnSizeBytes;
private String haGroupInfo;
private PTableType tableType;

public Builder(SQLExceptionCode code) {
this.code = code;
Expand All @@ -103,6 +108,11 @@ public Builder setTableName(String tableName) {
return this;
}

public Builder setTableType(PTableType tableType) {
this.tableType = tableType;
return this;
}

public Builder setFamilyName(String familyName) {
this.familyName = familyName;
return this;
Expand Down Expand Up @@ -169,6 +179,7 @@ private SQLExceptionInfo(Builder builder) {
message = builder.message;
schemaName = builder.schemaName;
tableName = builder.tableName;
tableType = builder.tableType;
familyName = builder.familyName;
columnName = builder.columnName;
functionName = builder.functionName;
Expand Down Expand Up @@ -206,6 +217,9 @@ public String toString() {
} else if (schemaName != null) {
builder.append(" ").append(SCHEMA_NAME).append("=").append(columnDisplayName);
}
if (tableType != null) {
builder.append(" ").append(TABLE_TYPE).append("=").append(tableType);
}
if (maxMutationSize != 0) {
builder.append(" ").append(MAX_MUTATION_SIZE).append("=").append(maxMutationSize);
builder.append(" ").append(MUTATION_SIZE).append("=").append(mutationSize);
Expand Down Expand Up @@ -241,6 +255,10 @@ public String getTableName() {
return tableName;
}

public PTableType getTableType() {
return tableType;
}

public String getFamilyName() {
return familyName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE;
import static org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY;
import static org.apache.phoenix.exception.SQLExceptionCode.INVALID_TABLE_TYPE_FOR_CDC;
import static org.apache.phoenix.exception.SQLExceptionCode.TABLE_ALREADY_EXIST;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
Expand Down Expand Up @@ -1711,6 +1712,16 @@ public MutationState createIndex(CreateIndexStatement statement, byte[][] splits
}

public MutationState createCDC(CreateCDCStatement statement) throws SQLException {
// TODO: Do we need to borrow the schema name of the table?
ColumnResolver resolver = FromCompiler.getResolver(NamedTableNode.create(statement.getDataTable()), connection);
TableRef tableRef = resolver.getTables().get(0);
PTable dataTable = tableRef.getTable();
// Check if data table is a view and give a not supported error.
if (dataTable.getType() != TABLE) {
throw new SQLExceptionInfo.Builder(INVALID_TABLE_TYPE_FOR_CDC).setTableType(
dataTable.getType()).build().buildException();
}

Map<String,Object> tableProps = Maps.newHashMapWithExpectedSize(
statement.getProps().size());
Map<String,Object> commonFamilyProps = Maps.newHashMapWithExpectedSize(
Expand All @@ -1728,6 +1739,11 @@ public MutationState createCDC(CreateCDCStatement statement) throws SQLException
SortOrder.getDefault()) }));
IndexType indexType = (IndexType) TableProperty.INDEX_TYPE.getValue(tableProps);
ListMultimap<String, Pair<String, Object>> indexProps = ArrayListMultimap.create();
if (TableProperty.SALT_BUCKETS.getValue(tableProps) != null) {
indexProps.put(QueryConstants.ALL_FAMILY_PROPERTIES_KEY, new Pair<>(
TableProperty.SALT_BUCKETS.getPropertyName(),
TableProperty.SALT_BUCKETS.getValue(tableProps)));
}
// TODO: Transfer TTL and MaxLookback from statement.getProps() to indexProps.
CreateIndexStatement indexStatement = FACTORY.createIndex(indexName, FACTORY.namedTable(null,
statement.getDataTable(), (Double) null), indexKeyConstraint, null, null,
Expand All @@ -1749,10 +1765,6 @@ public MutationState createCDC(CreateCDCStatement statement) throws SQLException
throw e;
}

// TODO: Do we need to borrow the schema name of the table?
ColumnResolver resolver = FromCompiler.getResolver(NamedTableNode.create(statement.getDataTable()), connection);
TableRef tableRef = resolver.getTables().get(0);
PTable dataTable = tableRef.getTable();
List<PColumn> pkColumns = dataTable.getPKColumns();
List<ColumnDef> columnDefs = new ArrayList<>();
List<ColumnDefInPkConstraint> pkColumnDefs = new ArrayList<>();
Expand Down Expand Up @@ -3058,7 +3070,9 @@ public boolean isViewReferenced() {
defaultCreateState = PIndexState.BUILDING;
}
}
PIndexState indexState = parent == null || tableType == PTableType.VIEW ? null : defaultCreateState;
PIndexState indexState = parent == null ||
(tableType == PTableType.VIEW || tableType == PTableType.CDC) ?
null : defaultCreateState;
if (indexState == null && tableProps.containsKey(INDEX_STATE)) {
indexState = PIndexState.fromSerializedValue(tableProps.get(INDEX_STATE).toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2018,8 +2018,7 @@ public static PTable createFromProto(PTableProtos.PTable table) {
}
String cdcIncludeScopesStr = null;
if (table.hasCDCIncludeScopes()) {
cdcIncludeScopesStr =
(String) PVarchar.INSTANCE.toObject(table.getCDCIncludeScopes().toByteArray());
cdcIncludeScopesStr = table.getCDCIncludeScopes();
}
try {
return new PTableImpl.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,17 +354,6 @@ public Object getPTableValue(PTable table) {
},

INCLUDE(PhoenixDatabaseMetaData.CDC_INCLUDE_NAME, COLUMN_FAMILY_NOT_ALLOWED_FOR_PROPERTY, true, false, false) {
@Override
public Object getValue(Object value) {
try {
return value == null ? PTable.CDCChangeScope.CHANGE : PTable.CDCChangeScope.valueOf(value.toString().toUpperCase());
} catch (IllegalArgumentException e) {
throw new RuntimeException(new SQLExceptionInfo.Builder(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE)
.setMessage(value.toString())
.build().buildException());
}
}

@Override
public Object getPTableValue(PTable table) {
return table.getCDCIncludeScopes();
Expand Down
2 changes: 1 addition & 1 deletion phoenix-core/src/main/protobuf/PTable.proto
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ message PTable {
optional bytes externalSchemaId=50;
optional PTable transformingNewTable=51;
optional bytes streamingTopicName=52;
optional bytes CDCIncludeScopes=55;
optional string CDCIncludeScopes=55;
}

message EncodedCQCounter {
Expand Down

0 comments on commit e2ef886

Please sign in to comment.