From 6cffdf58a5bd576411ba05e5e2bba195ea315df0 Mon Sep 17 00:00:00 2001 From: Geoffrey Jacoby Date: Thu, 12 May 2022 15:28:33 -0400 Subject: [PATCH 1/2] PHOENIX-6685 - Change Data Capture - Populate Table/Topic Mappings --- .../apache/phoenix/end2end/AlterTableIT.java | 11 +++-- .../end2end/AlterTableWithViewsIT.java | 10 +++- .../apache/phoenix/end2end/CreateTableIT.java | 14 +++--- .../org/apache/phoenix/end2end/ViewIT.java | 7 ++- .../phoenix/end2end/index/MutableIndexIT.java | 2 +- .../phoenix/end2end/index/ViewIndexIT.java | 2 +- .../coprocessor/MetaDataEndpointImpl.java | 16 ++++++- .../phoenix/coprocessor/MetaDataProtocol.java | 6 +-- .../phoenix/jdbc/PhoenixDatabaseMetaData.java | 3 ++ .../query/ConnectionQueryServicesImpl.java | 13 ++++-- .../apache/phoenix/query/QueryConstants.java | 2 + .../apache/phoenix/schema/DelegateTable.java | 3 ++ .../apache/phoenix/schema/MetaDataClient.java | 46 +++++++++++++++++-- .../org/apache/phoenix/schema/PTable.java | 6 +++ .../org/apache/phoenix/schema/PTableImpl.java | 25 +++++++++- .../apache/phoenix/schema/TableProperty.java | 14 +++++- .../phoenix/schema/transform/Transform.java | 1 + phoenix-core/src/main/protobuf/PTable.proto | 1 + 18 files changed, 150 insertions(+), 32 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java index 99764b5c85b..e2badfda923 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableIT.java @@ -407,12 +407,17 @@ public void testSetPropertySchemaVersion() throws Exception { final String tableName = generateUniqueName(); final String dataTableFullName = SchemaUtil.getTableName(schemaName, tableName); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - CreateTableIT.testCreateTableSchemaVersionHelper(conn, schemaName, tableName, "V1.0"); - String version = "V1.1"; - String alterSql = "ALTER TABLE " + dataTableFullName + " SET SCHEMA_VERSION='" + version + "'"; + CreateTableIT.testCreateTableSchemaVersionAndTopicNameHelper(conn, schemaName, tableName, "V1.0", null); + final String version = "V1.1"; + final String alterSql = "ALTER TABLE " + dataTableFullName + " SET SCHEMA_VERSION='" + version + "'"; conn.createStatement().execute(alterSql); PTable table = PhoenixRuntime.getTableNoCache(conn, dataTableFullName); assertEquals(version, table.getSchemaVersion()); + final String topicName = "MyTopicName"; + final String alterSql2 = "ALTER TABLE " + dataTableFullName + " SET STREAMING_TOPIC_NAME='" + topicName + "'"; + conn.createStatement().execute(alterSql2); + table = PhoenixRuntime.getTableNoCache(conn, dataTableFullName); + assertEquals(topicName, table.getStreamingTopicName()); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java index 829aa77c735..f8b47d435da 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java @@ -1477,19 +1477,25 @@ public void testCreateViewSchemaVersion() throws Exception { final String viewFullName = SchemaUtil.getTableName(schemaName, viewName); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String oldVersion = "V1.0"; - CreateTableIT.testCreateTableSchemaVersionHelper(conn, schemaName, tableName, oldVersion); + CreateTableIT.testCreateTableSchemaVersionAndTopicNameHelper(conn, schemaName, tableName, oldVersion, null); String createViewSql = "CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName + " SCHEMA_VERSION='" + oldVersion + "'"; conn.createStatement().execute(createViewSql); PTable view = PhoenixRuntime.getTableNoCache(conn, viewFullName); assertEquals(oldVersion, view.getSchemaVersion()); + assertNull(view.getStreamingTopicName()); + String newVersion = "V1.1"; - String alterViewSql = "ALTER VIEW " + viewFullName + " SET SCHEMA_VERSION='" + newVersion + "'"; + String topicName = "MyTopicName"; + String alterViewSql = "ALTER VIEW " + viewFullName + " SET SCHEMA_VERSION='" + + newVersion + "', STREAMING_TABLE_NAME='" + topicName + "'"; conn.createStatement().execute(alterViewSql); PTable view2 = PhoenixRuntime.getTableNoCache(conn, viewFullName); assertEquals(newVersion, view2.getSchemaVersion()); PTable baseTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName); assertEquals(oldVersion, baseTable.getSchemaVersion()); + assertNull(baseTable.getStreamingTopicName()); + assertEquals(topicName, view2.getStreamingTopicName()); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java index 44f2d3d1c0b..cc33899c1c9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java @@ -74,7 +74,6 @@ import org.junit.Test; import org.junit.experimental.categories.Category; - @Category(ParallelStatsDisabledTest.class) public class CreateTableIT extends ParallelStatsDisabledIT { @@ -1192,28 +1191,31 @@ public void testTableDescriptorPriority() throws SQLException, IOException { } @Test - public void testCreateTableSchemaVersion() throws Exception { + public void testCreateTableSchemaVersionAndTopicName() throws Exception { Properties props = new Properties(); final String schemaName = generateUniqueName(); final String tableName = generateUniqueName(); final String version = "V1.0"; + final String topicName = "MyTopicName"; try (Connection conn = DriverManager.getConnection(getUrl(), props)) { - testCreateTableSchemaVersionHelper(conn, schemaName, tableName, version); + testCreateTableSchemaVersionAndTopicNameHelper(conn, schemaName, tableName, version, topicName); } } - public static void testCreateTableSchemaVersionHelper(Connection conn, String schemaName, String tableName, - String dataTableVersion) + public static void testCreateTableSchemaVersionAndTopicNameHelper(Connection conn, String schemaName, String tableName, + String dataTableVersion, String topicName) throws Exception { final String dataTableFullName = SchemaUtil.getTableName(schemaName, tableName); String ddl = "CREATE TABLE " + dataTableFullName + " (\n" + "ID1 VARCHAR(15) NOT NULL,\n" + "ID2 VARCHAR(15) NOT NULL,\n" + "CREATED_DATE DATE,\n" + "CREATION_TIME BIGINT,\n" + "LAST_USED DATE,\n" - + "CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SCHEMA_VERSION='" + dataTableVersion + "'"; + + "CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SCHEMA_VERSION='" + dataTableVersion + + "', STREAMING_TOPIC_NAME='" + topicName + "'"; conn.createStatement().execute(ddl); PTable table = PhoenixRuntime.getTableNoCache(conn, dataTableFullName); assertEquals(dataTableVersion, table.getSchemaVersion()); + assertEquals(topicName, table.getStreamingTopicName()); } @Test diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java index afff0771490..0d151e6ef11 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ViewIT.java @@ -338,12 +338,15 @@ public void testCreateViewSchemaVersion() throws Exception { final String viewFullName = SchemaUtil.getTableName(schemaName, viewName); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String version = "V1.0"; - CreateTableIT.testCreateTableSchemaVersionHelper(conn, schemaName, tableName, version); + String topicName = "MyTopicName"; + CreateTableIT.testCreateTableSchemaVersionAndTopicNameHelper(conn, schemaName, + tableName, version, topicName); String createViewSql = "CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName + - " SCHEMA_VERSION='" + version + "'"; + " SCHEMA_VERSION='" + version + "', STREAMING_TOPIC_NAME='" + topicName + "'"; conn.createStatement().execute(createViewSql); PTable view = PhoenixRuntime.getTableNoCache(conn, viewFullName); assertEquals(version, view.getSchemaVersion()); + assertEquals(topicName, view.getStreamingTopicName()); } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java index 7235aaeaf3c..79b4b9375b2 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java @@ -934,7 +934,7 @@ public void testCreateIndexSchemaVersion() throws Exception { final String indexFullName = SchemaUtil.getTableName(schemaName, indexName); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String version = "V1.0"; - CreateTableIT.testCreateTableSchemaVersionHelper(conn, schemaName, tableName, version); + CreateTableIT.testCreateTableSchemaVersionAndTopicNameHelper(conn, schemaName, tableName, version, null); String createIndexSql = "CREATE INDEX " + indexName + " ON " + dataTableFullName + " (ID2) INCLUDE (ID1) SCHEMA_VERSION='" + version + "'"; conn.createStatement().execute(createIndexSql); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java index accfa6c9098..686d2aff815 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/ViewIndexIT.java @@ -826,7 +826,7 @@ public void testCreateViewSchemaVersion() throws Exception { final String viewIndexFullName = SchemaUtil.getTableName(schemaName, viewIndexName); try (Connection conn = DriverManager.getConnection(getUrl(), props)) { String version = "V1.0"; - CreateTableIT.testCreateTableSchemaVersionHelper(conn, schemaName, tableName, version); + CreateTableIT.testCreateTableSchemaVersionAndTopicNameHelper(conn, schemaName, tableName, version, null); String createViewSql = "CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName + " SCHEMA_VERSION='" + version + "'"; conn.createStatement().execute(createViewSql); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java index ba57e47de66..bbcb6dcb858 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java @@ -67,6 +67,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS_BYTES; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES; @@ -363,6 +364,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr TABLE_FAMILY_BYTES, SCHEMA_VERSION_BYTES); private static final Cell EXTERNAL_SCHEMA_ID_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, EXTERNAL_SCHEMA_ID_BYTES); + private static final Cell STREAMING_TOPIC_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, + TABLE_FAMILY_BYTES, STREAMING_TOPIC_NAME_BYTES); private static final List TABLE_KV_COLUMNS = Lists.newArrayList( EMPTY_KEYVALUE_KV, @@ -401,7 +404,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr LAST_DDL_TIMESTAMP_KV, CHANGE_DETECTION_ENABLED_KV, SCHEMA_VERSION_KV, - EXTERNAL_SCHEMA_ID_KV + EXTERNAL_SCHEMA_ID_KV, + STREAMING_TOPIC_NAME_KV ); static { @@ -447,6 +451,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr private static final int SCHEMA_VERSION_INDEX = TABLE_KV_COLUMNS.indexOf(SCHEMA_VERSION_KV); private static final int EXTERNAL_SCHEMA_ID_INDEX = TABLE_KV_COLUMNS.indexOf(EXTERNAL_SCHEMA_ID_KV); + private static final int STREAMING_TOPIC_NAME_INDEX = + TABLE_KV_COLUMNS.indexOf(STREAMING_TOPIC_NAME_KV); // KeyValues for Column private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES); private static final KeyValue COLUMN_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES); @@ -1398,6 +1404,14 @@ private PTable getTableFromCells(List tableCellList, List> allC builder.setExternalSchemaId(externalSchemaId != null ? externalSchemaId : oldTable != null ? oldTable.getExternalSchemaId() : null); + Cell streamingTopicNameKv = tableKeyValues[STREAMING_TOPIC_NAME_INDEX]; + String streamingTopicName = streamingTopicNameKv != null ? + (String) PVarchar.INSTANCE.toObject(streamingTopicNameKv.getValueArray(), + streamingTopicNameKv.getValueOffset(), streamingTopicNameKv.getValueLength()) + : null; + builder.setStreamingTopicName(streamingTopicName != null ? streamingTopicName : + oldTable != null ? oldTable.getStreamingTopicName() : null); + // Check the cell tag to see whether the view has modified this property final byte[] tagUseStatsForParallelization = (useStatsForParallelizationKv == null) ? HConstants.EMPTY_BYTE_ARRAY : diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index 6243e978b0e..af6c1cf312d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -94,14 +94,12 @@ public abstract class MetaDataProtocol extends MetaDataService { public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0 = MIN_TABLE_TIMESTAMP + 28; - // TODO Was there a system table upgrade? - // TODO Need to account for the inevitable 4.14 release too + public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_0_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0 = MIN_TABLE_TIMESTAMP + 29; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 = MIN_TABLE_TIMESTAMP + 33; - public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 = MIN_TABLE_TIMESTAMP + 36; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_1_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0; - public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0; + public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = MIN_TABLE_TIMESTAMP + 37; // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index 8c5694857df..81e2639b558 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -413,6 +413,9 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final String EXTERNAL_SCHEMA_ID = "EXTERNAL_SCHEMA_ID"; public static final byte[] EXTERNAL_SCHEMA_ID_BYTES = Bytes.toBytes(EXTERNAL_SCHEMA_ID); + public static final String STREAMING_TOPIC_NAME = "STREAMING_TOPIC_NAME"; + public static final byte[] STREAMING_TOPIC_NAME_BYTES = Bytes.toBytes(STREAMING_TOPIC_NAME); + public static final String SYSTEM_CHILD_LINK_TABLE = "CHILD_LINK"; public static final String SYSTEM_CHILD_LINK_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CHILD_LINK_TABLE); public static final byte[] SYSTEM_CHILD_LINK_NAME_BYTES = Bytes.toBytes(SYSTEM_CHILD_LINK_NAME); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java index 4a8364ed65f..7a28d198158 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/ConnectionQueryServicesImpl.java @@ -24,7 +24,7 @@ import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP; import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0; import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0; -import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0; +import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION; import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER; @@ -3939,18 +3939,21 @@ protected PhoenixConnection upgradeSystemCatalogIfRequired(PhoenixConnection met } } } - if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0) { + if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0) { metaConnection = addColumnsIfNotExists(metaConnection, - PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 -2, + PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -3, PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME + " " + PVarchar.INSTANCE.getSqlTypeName()); metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 -1, + MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -2, PhoenixDatabaseMetaData.SCHEMA_VERSION + " " + PVarchar.INSTANCE.getSqlTypeName()); metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, - MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0, + MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -1, PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID + " " + PVarchar.INSTANCE.getSqlTypeName()); + metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG, + MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0, + PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME + " " + PVarchar.INSTANCE.getSqlTypeName()); } return metaConnection; } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java index f4654123eb1..52326c189d0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryConstants.java @@ -134,6 +134,7 @@ import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_TABLE; @@ -340,6 +341,7 @@ enum JoinType {INNER, LEFT_OUTER} CHANGE_DETECTION_ENABLED + " BOOLEAN, \n" + SCHEMA_VERSION + " VARCHAR, \n" + EXTERNAL_SCHEMA_ID + " VARCHAR, \n" + + STREAMING_TOPIC_NAME + " VARCHAR, \n" + // Column metadata (will be null for table row) DATA_TYPE + " INTEGER," + COLUMN_SIZE + " INTEGER," + diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java index 1c9cefd9de8..b79691b3353 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/DelegateTable.java @@ -394,6 +394,9 @@ public String getExternalSchemaId() { return delegate.getExternalSchemaId(); } + @Override + public String getStreamingTopicName() { return delegate.getStreamingTopicName(); } + @Override public Map getPropertyValues() { return delegate.getPropertyValues(); } @Override public Map getDefaultPropertyValues() { return delegate.getDefaultPropertyValues(); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index dc57c70517a..3dbf700f58a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -19,6 +19,7 @@ import static org.apache.phoenix.exception.SQLExceptionCode.CANNOT_TRANSFORM_TRANSACTIONAL_TABLE; import static org.apache.phoenix.exception.SQLExceptionCode.ERROR_WRITING_TO_SCHEMA_REGISTRY; +import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME; import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_TASK_TABLE; import static org.apache.phoenix.query.QueryConstants.SYSTEM_SCHEMA_NAME; import static org.apache.phoenix.query.QueryServices.INDEX_CREATE_DEFAULT_STATE; @@ -347,8 +348,9 @@ public class MetaDataClient { CHANGE_DETECTION_ENABLED + "," + PHYSICAL_TABLE_NAME + "," + SCHEMA_VERSION + + STREAMING_TOPIC_NAME + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " + - "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; private static final String CREATE_SCHEMA = "UPSERT INTO " + SYSTEM_CATALOG_SCHEMA + ".\"" + SYSTEM_CATALOG_TABLE + "\"( " + TABLE_SCHEM + "," + TABLE_NAME + ") VALUES (?,?)"; @@ -2029,6 +2031,7 @@ private PTable createTableInternal(CreateTableStatement statement, byte[][] spli verifyChangeDetectionTableType(tableType, isChangeDetectionEnabledProp); String schemaVersion = (String) TableProperty.SCHEMA_VERSION.getValue(tableProps); + String streamingTopicName = (String) TableProperty.STREAMING_TOPIC_NAME.getValue(tableProps); if (parent != null && tableType == PTableType.INDEX) { timestamp = TransactionUtil.getTableTimestamp(connection, transactionProvider != null, transactionProvider); @@ -2991,6 +2994,12 @@ public boolean isViewReferenced() { tableUpsert.setString(34, schemaVersion); } + if (streamingTopicName == null) { + tableUpsert.setNull(35, Types.VARCHAR); + } else { + tableUpsert.setString(35, streamingTopicName); + } + tableUpsert.execute(); if (asyncCreatedDate != null) { @@ -3137,6 +3146,7 @@ public boolean isViewReferenced() { .setSchemaVersion(schemaVersion) .setExternalSchemaId(result.getTable() != null ? result.getTable().getExternalSchemaId() : null) + .setStreamingTopicName(streamingTopicName) .build(); result = new MetaDataMutationResult(code, result.getMutationTime(), table, true); addTableToCache(result); @@ -3594,7 +3604,8 @@ private long incrementTableSeqNum(PTable table, PTableType expectedType, int col metaPropertiesEvaluated.isChangeDetectionEnabled(), metaPropertiesEvaluated.getPhysicalTableName(), metaPropertiesEvaluated.getSchemaVersion(), - metaPropertiesEvaluated.getColumnEncodedBytes()); + metaPropertiesEvaluated.getColumnEncodedBytes(), + metaPropertiesEvaluated.getStreamingTopicName()); } private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, Boolean isTransactional, @@ -3602,7 +3613,7 @@ private long incrementTableSeqNum(PTable table, PTableType expectedType, int co String schemaVersion, QualifierEncodingScheme columnEncodedBytes) throws SQLException { return incrementTableSeqNum(table, expectedType, columnCountDelta, isTransactional, null, updateCacheFrequency, null, null, null, null, -1L, null, null, null,phoenixTTL, false, physicalTableName, - schemaVersion, columnEncodedBytes); + schemaVersion, columnEncodedBytes, null); } private long incrementTableSeqNum(PTable table, PTableType expectedType, int columnCountDelta, @@ -3611,7 +3622,7 @@ private long incrementTableSeqNum(PTable table, PTableType expectedType, int col Boolean isMultiTenant, Boolean storeNulls, Long guidePostWidth, Boolean appendOnlySchema, ImmutableStorageScheme immutableStorageScheme, Boolean useStatsForParallelization, Long phoenixTTL, Boolean isChangeDetectionEnabled, String physicalTableName, String schemaVersion, - QualifierEncodingScheme columnEncodedBytes) + QualifierEncodingScheme columnEncodedBytes, String streamingTopicName) throws SQLException { String schemaName = table.getSchemaName().getString(); String tableName = table.getTableName().getString(); @@ -3679,6 +3690,9 @@ private long incrementTableSeqNum(PTable table, PTableType expectedType, int col if (!Strings.isNullOrEmpty(schemaVersion)) { mutateStringProperty(connection, tenantId, schemaName, tableName, SCHEMA_VERSION, schemaVersion); } + if (!Strings.isNullOrEmpty(streamingTopicName)) { + mutateStringProperty(connection, tenantId, schemaName, tableName, STREAMING_TOPIC_NAME, streamingTopicName); + } return seqNum; } @@ -5177,6 +5191,8 @@ private MetaProperties loadStmtProperties(ListMultimap propertyValues; private String schemaVersion; private String externalSchemaId; + private String streamingTopicName; public static class Builder { private PTableKey key; @@ -274,6 +275,7 @@ public static class Builder { private Map propertyValues = new HashMap<>(); private String schemaVersion; private String externalSchemaId; + private String streamingTopicName; // Used to denote which properties a view has explicitly modified private BitSet viewModifiedPropSet = new BitSet(3); @@ -687,6 +689,13 @@ public Builder setExternalSchemaId(String externalSchemaId) { return this; } + public Builder setStreamingTopicName(String topicName) { + if (topicName != null) { + this.streamingTopicName = topicName; + } + return this; + } + /** * Populate derivable attributes of the PTable * @return PTableImpl.Builder object @@ -959,6 +968,7 @@ private PTableImpl(Builder builder) { this.isChangeDetectionEnabled = builder.isChangeDetectionEnabled; this.schemaVersion = builder.schemaVersion; this.externalSchemaId = builder.externalSchemaId; + this.streamingTopicName = builder.streamingTopicName; } // When cloning table, ignore the salt column as it will be added back in the constructor @@ -1037,7 +1047,8 @@ public static PTableImpl.Builder builderFromExisting(PTable table) { .setLastDDLTimestamp(table.getLastDDLTimestamp()) .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled()) .setSchemaVersion(table.getSchemaVersion()) - .setExternalSchemaId(table.getExternalSchemaId()); + .setExternalSchemaId(table.getExternalSchemaId()) + .setStreamingTopicName(table.getStreamingTopicName()); } @Override @@ -1960,6 +1971,11 @@ public static PTable createFromProto(PTableProtos.PTable table) { externalSchemaId = (String) PVarchar.INSTANCE.toObject(table.getExternalSchemaId().toByteArray()); } + String streamingTopicName = null; + if (table.hasStreamingTopicName()) { + streamingTopicName = + (String) PVarchar.INSTANCE.toObject(table.getStreamingTopicName().toByteArray()); + } try { return new PTableImpl.Builder() .setType(tableType) @@ -2016,6 +2032,7 @@ public static PTable createFromProto(PTableProtos.PTable table) { .setIsChangeDetectionEnabled(isChangeDetectionEnabled) .setSchemaVersion(schemaVersion) .setExternalSchemaId(externalSchemaId) + .setStreamingTopicName(streamingTopicName) .build(); } catch (SQLException e) { throw new RuntimeException(e); // Impossible @@ -2148,6 +2165,7 @@ public static PTableProtos.PTable toProto(PTable table) { builder.setChangeDetectionEnabled(table.isChangeDetectionEnabled()); builder.setSchemaVersion(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getSchemaVersion()))); builder.setExternalSchemaId(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getExternalSchemaId()))); + builder.setStreamingTopicName(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getStreamingTopicName()))); return builder.build(); } @@ -2285,6 +2303,11 @@ public String getExternalSchemaId() { return externalSchemaId; } + @Override + public String getStreamingTopicName() { + return streamingTopicName; + } + private static final class KVColumnFamilyQualifier { @Nonnull private final String colFamilyName; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java index d35ebc093ae..688b6df01a4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/TableProperty.java @@ -325,8 +325,18 @@ public Object getValue(Object value) { @Override public Object getPTableValue(PTable table) { return table.getSchemaVersion(); } - } - ; + }, + + STREAMING_TOPIC_NAME(PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME, COLUMN_FAMILY_NOT_ALLOWED_TABLE_PROPERTY, true, true, true) { + @Override + public Object getValue(Object value) { + return value == null ? null : value.toString(); + } + + @Override public Object getPTableValue(PTable table) { + return table.getStreamingTopicName(); + } + }; private final String propertyName; private final SQLExceptionCode colFamSpecifiedException; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java index 88112ab8ff6..54f1450f277 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/transform/Transform.java @@ -200,6 +200,7 @@ protected static PTable addTransform( .setUseStatsForParallelization(table.useStatsForParallelization()) .setSchemaVersion(table.getSchemaVersion()) .setIsChangeDetectionEnabled(table.isChangeDetectionEnabled()) + .setStreamingTopicName(table.getStreamingTopicName()) // Transformables .setImmutableStorageScheme( (changedProps.getImmutableStorageSchemeProp() != null? changedProps.getImmutableStorageSchemeProp():table.getImmutableStorageScheme())) diff --git a/phoenix-core/src/main/protobuf/PTable.proto b/phoenix-core/src/main/protobuf/PTable.proto index 26b40b48c76..6f6a663a993 100644 --- a/phoenix-core/src/main/protobuf/PTable.proto +++ b/phoenix-core/src/main/protobuf/PTable.proto @@ -116,6 +116,7 @@ message PTable { optional bytes schemaVersion = 49; optional bytes externalSchemaId=50; optional PTable transformingNewTable=51; + optional bytes streamingTopicName=52; } message EncodedCQCounter { From 074ec4cbcab04c30ea96ecca26976c74498dc6c5 Mon Sep 17 00:00:00 2001 From: Geoffrey Jacoby Date: Thu, 12 May 2022 18:34:06 -0400 Subject: [PATCH 2/2] PHOENIX-6685 - Change Data Capture - Populate Table/Topic Mappings --- .../phoenix/end2end/AlterTableWithViewsIT.java | 2 +- .../apache/phoenix/end2end/CreateTableIT.java | 12 ++++++++++-- .../apache/phoenix/schema/MetaDataClient.java | 2 +- .../org/apache/phoenix/schema/PTableImpl.java | 18 ++++++++++++------ 4 files changed, 24 insertions(+), 10 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java index f8b47d435da..a36b331b355 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/AlterTableWithViewsIT.java @@ -1488,7 +1488,7 @@ public void testCreateViewSchemaVersion() throws Exception { String newVersion = "V1.1"; String topicName = "MyTopicName"; String alterViewSql = "ALTER VIEW " + viewFullName + " SET SCHEMA_VERSION='" - + newVersion + "', STREAMING_TABLE_NAME='" + topicName + "'"; + + newVersion + "', STREAMING_TOPIC_NAME='" + topicName + "'"; conn.createStatement().execute(alterViewSql); PTable view2 = PhoenixRuntime.getTableNoCache(conn, viewFullName); assertEquals(newVersion, view2.getSchemaVersion()); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java index cc33899c1c9..8fa3a5958a3 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CreateTableIT.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -1211,11 +1212,18 @@ public static void testCreateTableSchemaVersionAndTopicNameHelper(Connection con + "ID2 VARCHAR(15) NOT NULL,\n" + "CREATED_DATE DATE,\n" + "CREATION_TIME BIGINT,\n" + "LAST_USED DATE,\n" + "CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SCHEMA_VERSION='" + dataTableVersion - + "', STREAMING_TOPIC_NAME='" + topicName + "'"; + + "'"; + if (topicName != null) { + ddl += ", STREAMING_TOPIC_NAME='" + topicName + "'"; + } conn.createStatement().execute(ddl); PTable table = PhoenixRuntime.getTableNoCache(conn, dataTableFullName); assertEquals(dataTableVersion, table.getSchemaVersion()); - assertEquals(topicName, table.getStreamingTopicName()); + if (topicName != null) { + assertEquals(topicName, table.getStreamingTopicName()); + } else { + assertNull(table.getStreamingTopicName()); + } } @Test diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java index 3dbf700f58a..d6b7941103f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java @@ -347,7 +347,7 @@ public class MetaDataClient { PHOENIX_TTL_HWM + "," + CHANGE_DETECTION_ENABLED + "," + PHYSICAL_TABLE_NAME + "," + - SCHEMA_VERSION + + SCHEMA_VERSION + "," + STREAMING_TOPIC_NAME + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, " + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java index 6d6a4ca69e3..a40e1b61273 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -689,9 +689,9 @@ public Builder setExternalSchemaId(String externalSchemaId) { return this; } - public Builder setStreamingTopicName(String topicName) { - if (topicName != null) { - this.streamingTopicName = topicName; + public Builder setStreamingTopicName(String streamingTopicName) { + if (streamingTopicName != null) { + this.streamingTopicName = streamingTopicName; } return this; } @@ -2163,9 +2163,15 @@ public static PTableProtos.PTable toProto(PTable table) { builder.setLastDDLTimestamp(table.getLastDDLTimestamp()); } builder.setChangeDetectionEnabled(table.isChangeDetectionEnabled()); - builder.setSchemaVersion(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getSchemaVersion()))); - builder.setExternalSchemaId(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getExternalSchemaId()))); - builder.setStreamingTopicName(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getStreamingTopicName()))); + if (table.getSchemaVersion() != null) { + builder.setSchemaVersion(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getSchemaVersion()))); + } + if (table.getExternalSchemaId() != null) { + builder.setExternalSchemaId(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getExternalSchemaId()))); + } + if (table.getStreamingTopicName() != null) { + builder.setStreamingTopicName(ByteStringer.wrap(PVarchar.INSTANCE.toBytes(table.getStreamingTopicName()))); + } return builder.build(); }