Skip to content

Commit

Permalink
PHOENIX-7074 DROP CDC Implementation (apache#1713)
Browse files Browse the repository at this point in the history
  • Loading branch information
TheNamesRai committed Dec 4, 2023
1 parent 3ebf0bb commit 5d3fd40
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 5 deletions.
61 changes: 61 additions & 0 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,4 +208,65 @@ public void testCreateCDCMultitenant() throws Exception {
assertEquals("TENANTID", cdcPkColumns.get(1).getName().getString());
assertEquals("K", cdcPkColumns.get(2).getName().getString());
}

@Test
public void testDropCDC () throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
conn.createStatement().execute(
"CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+ " v2 DATE)");
String cdcName = generateUniqueName();

String cdc_sql = "CREATE CDC " + cdcName
+ " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
conn.createStatement().execute(cdc_sql);
assertCDCState(conn, cdcName, null, 3);

String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
conn.createStatement().execute(drop_cdc_sql);

try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
"system.catalog WHERE table_name = '" + cdcName +
"' AND column_name IS NULL and column_family IS NULL")) {
assertEquals(false, rs.next());
}
try (ResultSet rs = conn.createStatement().executeQuery("SELECT index_type FROM " +
"system.catalog WHERE table_name = '" + CDCUtil.getCDCIndexName(cdcName) +
"' AND column_name IS NULL and column_family IS NULL")) {
assertEquals(false, rs.next());
}

try {
conn.createStatement().execute(drop_cdc_sql);
fail("Expected to fail as cdc table doesn't exist");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
assertTrue(e.getMessage().endsWith(cdcName));
}
}

@Test
public void testDropCDCIndex () throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
conn.createStatement().execute(
"CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+ " v2 DATE)");
String cdcName = generateUniqueName();
String cdc_sql = "CREATE CDC " + cdcName
+ " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
conn.createStatement().execute(cdc_sql);
assertCDCState(conn, cdcName, null, 3);
String drop_cdc_index_sql = "DROP INDEX \"" + CDCUtil.getCDCIndexName(cdcName) + "\" ON " + tableName;
try {
conn.createStatement().execute(drop_cdc_index_sql);
} catch (SQLException e) {
assertEquals(SQLExceptionCode.CANNOT_DROP_CDC_INDEX.getErrorCode(), e.getErrorCode());
assertTrue(e.getMessage().endsWith(CDCUtil.getCDCIndexName(cdcName)));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2999,7 +2999,7 @@ private MetaDataMutationResult doDropTable(byte[] key, byte[] tenantId, byte[] s
// Add to list of HTables to delete, unless it's a view or its a shared index
if (tableType == INDEX && table.getViewIndexId() != null) {
sharedTablesToDelete.add(new SharedTableState(table));
} else if (tableType != PTableType.VIEW) {
} else if (tableType != PTableType.VIEW && tableType != PTableType.CDC) {
tableNamesToDelete.add(table.getPhysicalName().getBytes());
}
invalidateList.add(cacheKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public void preDropTable(ObserverContext<PhoenixMetaDataControllerEnvironment> c
if (!accessCheckEnabled) { return; }

for (MasterObserver observer : getAccessControllers()) {
if (tableType != PTableType.VIEW) {
if (tableType != PTableType.VIEW && tableType != PTableType.CDC) {
observer.preDeleteTable(getMasterObsevrverContext(), physicalTableName);
}
if (indexes != null) {
Expand All @@ -377,7 +377,8 @@ public void preDropTable(ObserverContext<PhoenixMetaDataControllerEnvironment> c
}
}
//checking similar permission checked during the create of the view.
if (tableType == PTableType.VIEW || tableType == PTableType.INDEX) {
if (tableType == PTableType.VIEW || tableType == PTableType.INDEX
|| tableType == PTableType.CDC) {
if (execPermissionsCheckEnabled) {
requireAccess("Drop "+tableType, parentPhysicalTableName, Action.READ, Action.EXEC);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,9 @@ public SQLException newException(SQLExceptionInfo info) {

CANNOT_ALTER_IMMUTABLE_ROWS_PROPERTY(1133, "XCL33", "IMMUTABLE_ROWS property can be changed only if the table storage scheme is ONE_CELL_PER_KEYVALUE_COLUMN"),
CANNOT_ALTER_TABLE_PROPERTY_ON_VIEW(1134, "XCL34", "Altering this table property on a view is not allowed"),


CANNOT_DROP_CDC_INDEX(1153, "XCL53",
"Cannot drop the index associated with CDC"),
IMMUTABLE_TABLE_PROPERTY_INVALID(1135, "XCL35", "IMMUTABLE table property cannot be used with CREATE IMMUTABLE TABLE statement "),

MAX_COLUMNS_EXCEEDED(1136, "XCL36", "The number of columns exceed the maximum supported by the table's qualifier encoding scheme"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.phoenix.jdbc;

import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_DROP_CDC_INDEX;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_SQL_COUNTER;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_QUERY_TIME;
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_SELECT_SQL_COUNTER;
Expand Down Expand Up @@ -215,6 +216,7 @@
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.trace.util.Tracing;
import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.CursorUtil;
import org.apache.phoenix.util.PhoenixKeyValueUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
Expand Down Expand Up @@ -1591,6 +1593,12 @@ public ExplainPlan getExplainPlan() throws SQLException {

@Override
public MutationState execute() throws SQLException {
String indexName = ExecutableDropIndexStatement.this.getIndexName().getName();
if (CDCUtil.isACDCIndex(indexName)) {
throw new SQLExceptionInfo.Builder(CANNOT_DROP_CDC_INDEX)
.setTableName(indexName)
.build().buildException();
}
MetaDataClient client = new MetaDataClient(getContext().getConnection());
return client.dropIndex(ExecutableDropIndexStatement.this);
}
Expand All @@ -1607,7 +1615,20 @@ public ExecutableDropCDCStatement(NamedNode cdcObjName, TableName tableName, boo
@SuppressWarnings("unchecked")
@Override
public MutationPlan compilePlan(final PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
return null;
final StatementContext context = new StatementContext(stmt);
return new BaseMutationPlan(context, this.getOperation()) {

@Override
public ExplainPlan getExplainPlan() throws SQLException {
return new ExplainPlan(Collections.singletonList("DROP CDC"));
}

@Override
public MutationState execute() throws SQLException {
MetaDataClient client = new MetaDataClient(getContext().getConnection());
return client.dropCDC(ExecutableDropCDCStatement.this);
}
};
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.phoenix.parse.CreateCDCStatement;
import org.apache.phoenix.parse.DropCDCStatement;
import org.apache.hadoop.hbase.client.Table;
import org.apache.phoenix.coprocessor.TableInfo;
import org.apache.phoenix.query.ConnectionlessQueryServicesImpl;
Expand Down Expand Up @@ -3631,6 +3632,26 @@ public MutationState dropIndex(DropIndexStatement statement) throws SQLException
return dropTable(schemaName, tableName, parentTableName, PTableType.INDEX, statement.ifExists(), false, false);
}

public MutationState dropCDC(DropCDCStatement statement) throws SQLException {
String schemaName = statement.getTableName().getSchemaName();
String cdcTableName = statement.getCdcObjName().getName();
String parentTableName = statement.getTableName().getTableName();
// Dropping the virtual CDC Table
dropTable(schemaName, cdcTableName, parentTableName, PTableType.CDC, statement.ifExists(),
false, false);

String indexName = CDCUtil.getCDCIndexName(statement.getCdcObjName().getName());
// Dropping the uncovered index associated with the CDC Table
try {
return dropTable(schemaName, indexName, parentTableName, PTableType.INDEX,
statement.ifExists(), false, false);
} catch (SQLException e) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.fromErrorCode(e.getErrorCode()))
.setTableName(statement.getCdcObjName().getName()).setRootCause(e.getCause())
.build().buildException();
}
}

private MutationState dropFunction(String functionName,
boolean ifExists) throws SQLException {
connection.rollback();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.phoenix.util;

import org.apache.phoenix.schema.PTable;
Expand Down

0 comments on commit 5d3fd40

Please sign in to comment.