Skip to content

Commit

Permalink
PHOENIX-7008 Implementation for CREATE CDC (apache#1681)
Browse files Browse the repository at this point in the history
  • Loading branch information
haridsv committed Sep 30, 2023
1 parent 3adc4d0 commit 581e613
Show file tree
Hide file tree
Showing 22 changed files with 671 additions and 142 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ phoenix-hbase-compat-1.3.0/
phoenix-hbase-compat-1.4.0/
phoenix-hbase-compat-1.5.0/
*/hbase.log

# Vim swap files
.*.sw*
152 changes: 152 additions & 0 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.end2end;

import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableProperty;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@Category(ParallelStatsDisabledTest.class)
public class CDCMiscIT extends ParallelStatsDisabledIT {
private void assertCDCState(Connection conn, String cdcName, String expInclude,
int idxType) throws SQLException {
try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
"system.catalog WHERE table_name = '" + cdcName +
"' AND column_name IS NULL and column_family IS NULL")) {
assertEquals(true, rs.next());
assertEquals(expInclude, rs.getString(1));
}
try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " +
"system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) +
"' AND column_name IS NULL and column_family IS NULL")) {
assertEquals(true, rs.next());
assertEquals(idxType, rs.getInt(1));
}
}

private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes)
throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
PTable table = PhoenixRuntime.getTable(conn, cdcName);
assertEquals(expIncludeScopes, table.getCDCIncludeScopes());
assertEquals(expIncludeScopes, TableProperty.INCLUDE.getPTableValue(table));
}

@Test
public void testCreate() throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
conn.createStatement().execute(
"CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+ " v2 DATE)");
String cdcName = generateUniqueName();

try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON NON_EXISTENT_TABLE (PHOENIX_ROW_TIMESTAMP())");
fail("Expected to fail due to non-existent table");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
}

try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName +"(UNKNOWN_FUNCTION())");
fail("Expected to fail due to invalid function");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.FUNCTION_UNDEFINED.getErrorCode(), e.getErrorCode());
}

try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName +"(NOW())");
fail("Expected to fail due to non-deterministic function");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX.
getErrorCode(), e.getErrorCode());
}

try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName +"(ROUND(v1))");
fail("Expected to fail due to non-timestamp expression in the index PK");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
e.getErrorCode());
}

try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName +"(v1)");
fail("Expected to fail due to non-timestamp column in the index PK");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
e.getErrorCode());
}

String cdc_sql = "CREATE CDC " + cdcName
+ " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
conn.createStatement().execute(cdc_sql);
assertCDCState(conn, cdcName, null, 3);

try {
conn.createStatement().execute(cdc_sql);
fail("Expected to fail due to duplicate index");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(), e.getErrorCode());
assertTrue(e.getMessage().endsWith(cdcName));
}
conn.createStatement().execute("CREATE CDC IF NOT EXISTS " + cdcName + " ON " + tableName +
"(v2) INCLUDE (pre, post) INDEX_TYPE=g");

cdcName = generateUniqueName();
conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
"(v2) INCLUDE (pre, post) INDEX_TYPE=g");
assertCDCState(conn, cdcName, "PRE,POST", 3);
assertPTable(cdcName, new HashSet<>(
Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)));

cdcName = generateUniqueName();
conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
"(v2) INDEX_TYPE=l");
assertCDCState(conn, cdcName, null, 2);
assertPTable(cdcName, null);

conn.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public MutationPlan compile(final CreateIndexStatement create) throws SQLExcepti
return new BaseMutationPlan(context, operation) {
@Override
public MutationState execute() throws SQLException {
return client.createIndex(create, splits);
return client.createIndex(create, splits, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_INDEX_ID_DATA_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_STATEMENT_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.VIEW_TYPE_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_BYTES;
import static org.apache.phoenix.query.QueryConstants.VIEW_MODIFIED_PROPERTY_TAG_TYPE;
import static org.apache.phoenix.schema.PTableImpl.getColumnsToClone;
import static org.apache.phoenix.schema.PTableType.INDEX;
Expand Down Expand Up @@ -246,6 +247,7 @@
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.transaction.TransactionFactory;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EncodedColumnsUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.MetaDataUtil;
Expand Down Expand Up @@ -365,6 +367,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr
TABLE_FAMILY_BYTES, EXTERNAL_SCHEMA_ID_BYTES);
private static final Cell STREAMING_TOPIC_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
TABLE_FAMILY_BYTES, STREAMING_TOPIC_NAME_BYTES);
private static final Cell CDC_INCLUDE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
TABLE_FAMILY_BYTES, CDC_INCLUDE_BYTES);

private static final List<Cell> TABLE_KV_COLUMNS = Lists.newArrayList(
EMPTY_KEYVALUE_KV,
Expand Down Expand Up @@ -404,7 +408,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr
CHANGE_DETECTION_ENABLED_KV,
SCHEMA_VERSION_KV,
EXTERNAL_SCHEMA_ID_KV,
STREAMING_TOPIC_NAME_KV
STREAMING_TOPIC_NAME_KV,
CDC_INCLUDE_KV
);

static {
Expand Down Expand Up @@ -452,6 +457,7 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr
TABLE_KV_COLUMNS.indexOf(EXTERNAL_SCHEMA_ID_KV);
private static final int STREAMING_TOPIC_NAME_INDEX =
TABLE_KV_COLUMNS.indexOf(STREAMING_TOPIC_NAME_KV);
private static final int CDC_INCLUDE_INDEX = TABLE_KV_COLUMNS.indexOf(CDC_INCLUDE_KV);
// KeyValues for Column
private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
private static final KeyValue COLUMN_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES);
Expand Down Expand Up @@ -1423,6 +1429,15 @@ private PTable getTableFromCells(List<Cell> tableCellList, List<List<Cell>> allC
builder.setStreamingTopicName(streamingTopicName != null ? streamingTopicName :
oldTable != null ? oldTable.getStreamingTopicName() : null);

Cell includeSpecKv = tableKeyValues[CDC_INCLUDE_INDEX];
String includeSpec = includeSpecKv != null ?
(String) PVarchar.INSTANCE.toObject(includeSpecKv.getValueArray(),
includeSpecKv.getValueOffset(), includeSpecKv.getValueLength())
: null;
builder.setCDCIncludeScopes(includeSpec != null ?
CDCUtil.makeChangeScopeEnumsFromString(includeSpec) :
oldTable != null ? oldTable.getCDCIncludeScopes() : null);

// Check the cell tag to see whether the view has modified this property
final byte[] tagUseStatsForParallelization = (useStatsForParallelizationKv == null) ?
HConstants.EMPTY_BYTE_ARRAY :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public abstract class MetaDataProtocol extends MetaDataService {
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0 = MIN_TABLE_TIMESTAMP + 29;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 = MIN_TABLE_TIMESTAMP + 33;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_1_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = MIN_TABLE_TIMESTAMP + 37;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = MIN_TABLE_TIMESTAMP + 38;
// MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public void preCreateTable(ObserverContext<PhoenixMetaDataControllerEnvironment>
Set<byte[]> familySet, Set<TableName> indexes) throws IOException {
if (!accessCheckEnabled) { return; }

if (tableType != PTableType.VIEW) {
if (tableType != PTableType.VIEW && tableType != PTableType.CDC) {
TableDescriptorBuilder tableDescBuilder = TableDescriptorBuilder.newBuilder(physicalTableName);
for (byte[] familyName : familySet) {
tableDescBuilder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(familyName).build());
Expand All @@ -184,9 +184,9 @@ public void preCreateTable(ObserverContext<PhoenixMetaDataControllerEnvironment>
}
}

// Index and view require read access on parent physical table.
// Index, view and CDC require read access on parent physical table.
Set<TableName> physicalTablesChecked = new HashSet<TableName>();
if (tableType == PTableType.VIEW || tableType == PTableType.INDEX) {
if (tableType == PTableType.VIEW || tableType == PTableType.INDEX || tableType == PTableType.CDC) {
physicalTablesChecked.add(parentPhysicalTableName);
if (execPermissionsCheckEnabled) {
requireAccess("Create" + tableType, parentPhysicalTableName, Action.READ, Action.EXEC);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ public SQLException newException(SQLExceptionInfo info) {
AGGREGATE_EXPRESSION_NOT_ALLOWED_IN_INDEX(520, "42897", "Aggregate expression not allowed in an index."),
NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX(521, "42898", "Non-deterministic expression not allowed in an index."),
STATELESS_EXPRESSION_NOT_ALLOWED_IN_INDEX(522, "42899", "Stateless expression not allowed in an index."),

/**
INCORRECT_DATATYPE_FOR_EXPRESSION(539, "42102", "Expression datatype is incorrect for this index."),

/**
* Transaction exceptions.
*/
TRANSACTION_CONFLICT_EXCEPTION(523, "42900", "Transaction aborted due to conflict with other mutations."),
Expand Down Expand Up @@ -351,6 +352,8 @@ public SQLException newException(SQLExceptionInfo info) {
+ PhoenixDatabaseMetaData.PHOENIX_TTL + " property on an view when parent/child view has PHOENIX_TTL set,"),
CHANGE_DETECTION_SUPPORTED_FOR_TABLES_AND_VIEWS_ONLY(10954, "44A36",
CHANGE_DETECTION_ENABLED + " is only supported on tables and views"),
UNKNOWN_INDEX_TYPE(1098,"44A37", "Unknown INDEX type: "),
UNKNOWN_INCLUDE_CHANGE_SCOPE(1099,"44A38", "Unknown change scope for INCLUDE: "),

/** Sequence related */
SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,13 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final String SYSTEM_TRANSFORM_TABLE = "TRANSFORM";
public static final String SYSTEM_TRANSFORM_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_TRANSFORM_TABLE);

public static final String CDC_INCLUDE_NAME = "INCLUDE";
public static final String CDC_INCLUDE_TABLE = "CDC_INCLUDE";
public static final byte[] CDC_INCLUDE_BYTES = Bytes.toBytes(CDC_INCLUDE_TABLE);

// This is just a virtual property on CDC that translates to the type of index created.
public static final String CDC_INDEX_TYPE_NAME = "INDEX_TYPE";

//SYSTEM:LOG
public static final String SYSTEM_LOG_TABLE = "LOG";
public static final String SYSTEM_LOG_NAME =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,20 @@ public ExecutableCreateCDCStatement(NamedNode cdcObjName, TableName dataTable,
@Override
public MutationPlan compilePlan(PhoenixStatement stmt,
Sequence.ValueOp seqAction) throws SQLException {
return null;
final StatementContext context = new StatementContext(stmt);
return new BaseMutationPlan(context, this.getOperation()) {

@Override
public ExplainPlan getExplainPlan() throws SQLException {
return new ExplainPlan(Collections.singletonList("CREATE CDC"));
}

@Override
public MutationState execute() throws SQLException {
MetaDataClient client = new MetaDataClient(getContext().getConnection());
return client.createCDC(ExecutableCreateCDCStatement.this);
}
};
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2106,8 +2106,10 @@ public MetaDataMutationResult createTable(final List<Mutation> tableMetaData, fi
break;
}
}
if ((tableType == PTableType.VIEW && physicalTableName != null) ||
(tableType != PTableType.VIEW && (physicalTableName == null || localIndexTable))) {
if ((tableType != PTableType.CDC) && (
(tableType == PTableType.VIEW && physicalTableName != null) ||
(tableType != PTableType.VIEW && (physicalTableName == null || localIndexTable))
)) {
// For views this will ensure that metadata already exists
// For tables and indexes, this will create the metadata if it doesn't already exist
ensureTableCreated(physicalTableNameBytes, null, tableType, tableProps, families, splits, true,
Expand Down Expand Up @@ -4096,19 +4098,22 @@ protected PhoenixConnection upgradeSystemCatalogIfRequired(PhoenixConnection met
}
if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0) {
metaConnection = addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -3,
PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 4,
PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME
+ " " + PVarchar.INSTANCE.getSqlTypeName());

metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -2,
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 3,
PhoenixDatabaseMetaData.SCHEMA_VERSION + " " + PVarchar.INSTANCE.getSqlTypeName());
metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -1,
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 2,
PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID + " " + PVarchar.INSTANCE.getSqlTypeName());
metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0,
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 - 1,
PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME + " " + PVarchar.INSTANCE.getSqlTypeName());
metaConnection = addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0,
PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE + " " + PVarchar.INSTANCE.getSqlTypeName());
UpgradeUtil.bootstrapLastDDLTimestampForIndexes(metaConnection);
}
return metaConnection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BIND_PARAMETERS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.BUFFER_LENGTH;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CACHE_SIZE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CDC_INCLUDE_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHANGE_DETECTION_ENABLED;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CHAR_OCTET_LENGTH;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.CLASS_NAME;
Expand Down Expand Up @@ -303,6 +304,9 @@ enum JoinType {INNER, LEFT_OUTER}

// custom TagType
byte VIEW_MODIFIED_PROPERTY_TAG_TYPE = (byte) 70;

String CDC_JSON_COL_NAME = "CDC JSON";

/**
* We mark counter values 0 to 10 as reserved. Value 0 is used by
* {@link #ENCODED_EMPTY_COLUMN_NAME}. Values 1-10
Expand Down Expand Up @@ -342,6 +346,7 @@ enum JoinType {INNER, LEFT_OUTER}
SCHEMA_VERSION + " VARCHAR, \n" +
EXTERNAL_SCHEMA_ID + " VARCHAR, \n" +
STREAMING_TOPIC_NAME + " VARCHAR, \n" +
CDC_INCLUDE_TABLE + " VARCHAR, \n" +
// Column metadata (will be null for table row)
DATA_TYPE + " INTEGER," +
COLUMN_SIZE + " INTEGER," +
Expand Down

0 comments on commit 581e613

Please sign in to comment.