Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2021,7 +2021,6 @@ public MutationState createCDC(CreateCDCStatement statement) throws SQLException
columnDefs.add(FACTORY.columnDef(FACTORY.columnName(QueryConstants.CDC_JSON_COL_NAME),
PVarchar.INSTANCE.getSqlTypeName(), false, null, true, null, null, false,
SortOrder.getDefault(), "", null, false));
tableProps = new HashMap<>();
if (dataTable.getImmutableStorageScheme() == SINGLE_CELL_ARRAY_WITH_OFFSETS) {
// CDC table doesn't need SINGLE_CELL_ARRAY_WITH_OFFSETS encoding, so override it.
tableProps.put(TableProperty.IMMUTABLE_STORAGE_SCHEME.getPropertyName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,23 @@
package org.apache.phoenix.end2end;

import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS;
import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
Expand All @@ -33,16 +43,22 @@
import java.util.Properties;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
import org.apache.phoenix.query.ConnectionQueryServices;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.types.PVarchar;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.verification.VerificationMode;

@RunWith(Parameterized.class)
@Category(ParallelStatsDisabledTest.class)
Expand Down Expand Up @@ -462,4 +478,103 @@ public void testSelectCDCBadIncludeSpec() throws Exception {
assertTrue(e.getMessage().endsWith("DUMMY"));
}
}

/**
* Verifies CDC honors UPDATE_CACHE_FREQUENCY when set on CREATE CDC, and falls back to the
* connection default when not. With UCF set, a warmed-cache SELECT against the CDC object must
* issue zero getTable RPCs for it; without UCF, atleast one per query.
*/
@Test
public void testCreateCDCWithUpdateCacheFrequency() throws Exception {
final long ucfMillis = 300000L;

// parent has UCF, CREATE CDC does not - CDC keeps the connection default and every SELECT
// against it RPCs. Phoenix's compilation pipeline resolves the FROM table multiple times
// per query
String tableA = generateUniqueName();
String cdcA = generateUniqueName();
try (Connection conn = newConnection()) {
createTableAndCDC(conn, tableA, cdcA, "UPDATE_CACHE_FREQUENCY=" + ucfMillis,
/* cdcUcf */ null);
assertCdcUpdateCacheFrequency(conn, cdcA, 0L);
}
assertGetTableRpcModeForCdc(SchemaUtil.getTableName(null, cdcA), atLeast(1));

// CREATE CDC sets UCF - it must be persisted and suppress getTable RPCs.
String tableB = generateUniqueName();
String cdcB = generateUniqueName();
try (Connection conn = newConnection()) {
createTableAndCDC(conn, tableB, cdcB, /* tableProps */ null,
"UPDATE_CACHE_FREQUENCY=" + ucfMillis);
assertCdcUpdateCacheFrequency(conn, cdcB, ucfMillis);
}
assertGetTableRpcModeForCdc(SchemaUtil.getTableName(null, cdcB), times(0));
}

/**
* Create {@code dataTable}, optionally with a view on it (when {@link #forView} is true), a CDC
* over the resulting parent, and upsert one row.
*/
private void createTableAndCDC(Connection conn, String dataTable, String cdcName,
String tablePropsSuffix, String cdcPropsSuffix) throws Exception {
String tableSql = "CREATE TABLE " + dataTable + " (k INTEGER PRIMARY KEY, v INTEGER)"
+ (tablePropsSuffix == null ? "" : " " + tablePropsSuffix);
conn.createStatement().execute(tableSql);
String parentName = dataTable;
if (forView) {
String viewName = generateUniqueName();
conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " + dataTable);
parentName = viewName;
}
String cdcSql = "CREATE CDC " + cdcName + " ON " + parentName
+ (cdcPropsSuffix == null ? "" : " " + cdcPropsSuffix);
createCDC(conn, cdcSql);
conn.createStatement().execute("UPSERT INTO " + dataTable + " VALUES (1, 1)");
conn.commit();
}

private void assertCdcUpdateCacheFrequency(Connection conn, String cdcName, long expected)
throws SQLException {
try (ResultSet rs = conn.createStatement()
.executeQuery("SELECT UPDATE_CACHE_FREQUENCY FROM SYSTEM.CATALOG WHERE TABLE_NAME = '"
+ cdcName + "' AND COLUMN_NAME IS NULL AND COLUMN_FAMILY IS NULL")) {
assertTrue("CDC row not found in SYSTEM.CATALOG for " + cdcName, rs.next());
assertEquals("Unexpected UPDATE_CACHE_FREQUENCY on CDC virtual table " + cdcName, expected,
rs.getLong(1));
}
}

/**
* Open a fresh spied {@link ConnectionQueryServices}, run a CDC select once to warm the client
* metadata cache, reset the spy, run the same select again, and assert that {@code getTable(...)}
* for the CDC virtual table was invoked according to {@code mode}.
*/
private void assertGetTableRpcModeForCdc(String fullCdcName, VerificationMode mode)
throws SQLException {
ConnectionQueryServices spied =
spy(driver.getConnectionQueryServices(getUrl(), PropertiesUtil.deepCopy(TEST_PROPERTIES)));
Properties cprops = new Properties();
cprops.putAll(PhoenixEmbeddedDriver.DEFAULT_PROPS.asMap());
String selectSql = "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + fullCdcName + " LIMIT 1";

try (Connection conn = spied.connect(getUrl(), cprops)) {
try (PreparedStatement ps = conn.prepareStatement(selectSql);
ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
}
}
reset(spied);

try (PreparedStatement ps = conn.prepareStatement(selectSql);
ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
}
}

String cdcSchema = SchemaUtil.getSchemaNameFromFullName(fullCdcName);
String cdcTableNameOnly = SchemaUtil.getTableNameFromFullName(fullCdcName);
verify(spied, mode).getTable((PName) any(), eq(PVarchar.INSTANCE.toBytes(cdcSchema)),
eq(PVarchar.INSTANCE.toBytes(cdcTableNameOnly)), anyLong(), anyLong());
}
}
}