From 521e67efdd7d84fc26495c69c6ad2ddbfc2e902f Mon Sep 17 00:00:00 2001 From: Palash Chauhan Date: Mon, 27 Apr 2026 12:54:54 -0700 Subject: [PATCH] PHOENIX-7811 : Create CDC should not drop user supplied table properties --- .../apache/phoenix/schema/MetaDataClient.java | 1 - .../phoenix/end2end/CDCDefinitionIT.java | 115 ++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 530c5612ac2..4fff6826c6e 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -2021,7 +2021,6 @@ public MutationState createCDC(CreateCDCStatement statement) throws SQLException columnDefs.add(FACTORY.columnDef(FACTORY.columnName(QueryConstants.CDC_JSON_COL_NAME), PVarchar.INSTANCE.getSqlTypeName(), false, null, true, null, null, false, SortOrder.getDefault(), "", null, false)); - tableProps = new HashMap<>(); if (dataTable.getImmutableStorageScheme() == SINGLE_CELL_ARRAY_WITH_OFFSETS) { // CDC table doesn't need SINGLE_CELL_ARRAY_WITH_OFFSETS encoding, so override it. tableProps.put(TableProperty.IMMUTABLE_STORAGE_SCHEME.getPropertyName(), diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java index 0aee3c9ab19..92f41c2808f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCDefinitionIT.java @@ -18,13 +18,23 @@ package org.apache.phoenix.end2end; import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; +import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Arrays; @@ -33,16 +43,22 @@ import java.util.Properties; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver; +import org.apache.phoenix.query.ConnectionQueryServices; import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.types.PVarchar; import org.apache.phoenix.util.CDCUtil; import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.PropertiesUtil; import org.apache.phoenix.util.SchemaUtil; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import org.mockito.verification.VerificationMode; @RunWith(Parameterized.class) @Category(ParallelStatsDisabledTest.class) @@ -462,4 +478,103 @@ public void testSelectCDCBadIncludeSpec() throws Exception { assertTrue(e.getMessage().endsWith("DUMMY")); } } + + /** + * Verifies CDC honors UPDATE_CACHE_FREQUENCY when set on CREATE CDC, and falls back to the + * connection default when not. With UCF set, a warmed-cache SELECT against the CDC object must + * issue zero getTable RPCs for it; without UCF, atleast one per query. + */ + @Test + public void testCreateCDCWithUpdateCacheFrequency() throws Exception { + final long ucfMillis = 300000L; + + // parent has UCF, CREATE CDC does not - CDC keeps the connection default and every SELECT + // against it RPCs. Phoenix's compilation pipeline resolves the FROM table multiple times + // per query + String tableA = generateUniqueName(); + String cdcA = generateUniqueName(); + try (Connection conn = newConnection()) { + createTableAndCDC(conn, tableA, cdcA, "UPDATE_CACHE_FREQUENCY=" + ucfMillis, + /* cdcUcf */ null); + assertCdcUpdateCacheFrequency(conn, cdcA, 0L); + } + assertGetTableRpcModeForCdc(SchemaUtil.getTableName(null, cdcA), atLeast(1)); + + // CREATE CDC sets UCF - it must be persisted and suppress getTable RPCs. + String tableB = generateUniqueName(); + String cdcB = generateUniqueName(); + try (Connection conn = newConnection()) { + createTableAndCDC(conn, tableB, cdcB, /* tableProps */ null, + "UPDATE_CACHE_FREQUENCY=" + ucfMillis); + assertCdcUpdateCacheFrequency(conn, cdcB, ucfMillis); + } + assertGetTableRpcModeForCdc(SchemaUtil.getTableName(null, cdcB), times(0)); + } + + /** + * Create {@code dataTable}, optionally with a view on it (when {@link #forView} is true), a CDC + * over the resulting parent, and upsert one row. + */ + private void createTableAndCDC(Connection conn, String dataTable, String cdcName, + String tablePropsSuffix, String cdcPropsSuffix) throws Exception { + String tableSql = "CREATE TABLE " + dataTable + " (k INTEGER PRIMARY KEY, v INTEGER)" + + (tablePropsSuffix == null ? "" : " " + tablePropsSuffix); + conn.createStatement().execute(tableSql); + String parentName = dataTable; + if (forView) { + String viewName = generateUniqueName(); + conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " + dataTable); + parentName = viewName; + } + String cdcSql = "CREATE CDC " + cdcName + " ON " + parentName + + (cdcPropsSuffix == null ? "" : " " + cdcPropsSuffix); + createCDC(conn, cdcSql); + conn.createStatement().execute("UPSERT INTO " + dataTable + " VALUES (1, 1)"); + conn.commit(); + } + + private void assertCdcUpdateCacheFrequency(Connection conn, String cdcName, long expected) + throws SQLException { + try (ResultSet rs = conn.createStatement() + .executeQuery("SELECT UPDATE_CACHE_FREQUENCY FROM SYSTEM.CATALOG WHERE TABLE_NAME = '" + + cdcName + "' AND COLUMN_NAME IS NULL AND COLUMN_FAMILY IS NULL")) { + assertTrue("CDC row not found in SYSTEM.CATALOG for " + cdcName, rs.next()); + assertEquals("Unexpected UPDATE_CACHE_FREQUENCY on CDC virtual table " + cdcName, expected, + rs.getLong(1)); + } + } + + /** + * Open a fresh spied {@link ConnectionQueryServices}, run a CDC select once to warm the client + * metadata cache, reset the spy, run the same select again, and assert that {@code getTable(...)} + * for the CDC virtual table was invoked according to {@code mode}. + */ + private void assertGetTableRpcModeForCdc(String fullCdcName, VerificationMode mode) + throws SQLException { + ConnectionQueryServices spied = + spy(driver.getConnectionQueryServices(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES))); + Properties cprops = new Properties(); + cprops.putAll(PhoenixEmbeddedDriver.DEFAULT_PROPS.asMap()); + String selectSql = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + fullCdcName + " LIMIT 1"; + + try (Connection conn = spied.connect(getUrl(), cprops)) { + try (PreparedStatement ps = conn.prepareStatement(selectSql); + ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + } + } + reset(spied); + + try (PreparedStatement ps = conn.prepareStatement(selectSql); + ResultSet rs = ps.executeQuery()) { + while (rs.next()) { + } + } + + String cdcSchema = SchemaUtil.getSchemaNameFromFullName(fullCdcName); + String cdcTableNameOnly = SchemaUtil.getTableNameFromFullName(fullCdcName); + verify(spied, mode).getTable((PName) any(), eq(PVarchar.INSTANCE.toBytes(cdcSchema)), + eq(PVarchar.INSTANCE.toBytes(cdcTableNameOnly)), anyLong(), anyLong()); + } + } }