Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-6685 - Change Data Capture - Populate Table/Topic Mappings #1441

Merged
merged 2 commits into from
May 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -407,12 +407,17 @@ public void testSetPropertySchemaVersion() throws Exception {
final String tableName = generateUniqueName();
final String dataTableFullName = SchemaUtil.getTableName(schemaName, tableName);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
CreateTableIT.testCreateTableSchemaVersionHelper(conn, schemaName, tableName, "V1.0");
String version = "V1.1";
String alterSql = "ALTER TABLE " + dataTableFullName + " SET SCHEMA_VERSION='" + version + "'";
CreateTableIT.testCreateTableSchemaVersionAndTopicNameHelper(conn, schemaName, tableName, "V1.0", null);
final String version = "V1.1";
final String alterSql = "ALTER TABLE " + dataTableFullName + " SET SCHEMA_VERSION='" + version + "'";
conn.createStatement().execute(alterSql);
PTable table = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
assertEquals(version, table.getSchemaVersion());
final String topicName = "MyTopicName";
final String alterSql2 = "ALTER TABLE " + dataTableFullName + " SET STREAMING_TOPIC_NAME='" + topicName + "'";
conn.createStatement().execute(alterSql2);
table = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
assertEquals(topicName, table.getStreamingTopicName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1477,19 +1477,25 @@ public void testCreateViewSchemaVersion() throws Exception {
final String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String oldVersion = "V1.0";
CreateTableIT.testCreateTableSchemaVersionHelper(conn, schemaName, tableName, oldVersion);
CreateTableIT.testCreateTableSchemaVersionAndTopicNameHelper(conn, schemaName, tableName, oldVersion, null);
String createViewSql = "CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName +
" SCHEMA_VERSION='" + oldVersion + "'";
conn.createStatement().execute(createViewSql);
PTable view = PhoenixRuntime.getTableNoCache(conn, viewFullName);
assertEquals(oldVersion, view.getSchemaVersion());
assertNull(view.getStreamingTopicName());

String newVersion = "V1.1";
String alterViewSql = "ALTER VIEW " + viewFullName + " SET SCHEMA_VERSION='" + newVersion + "'";
String topicName = "MyTopicName";
String alterViewSql = "ALTER VIEW " + viewFullName + " SET SCHEMA_VERSION='"
+ newVersion + "', STREAMING_TOPIC_NAME='" + topicName + "'";
conn.createStatement().execute(alterViewSql);
PTable view2 = PhoenixRuntime.getTableNoCache(conn, viewFullName);
assertEquals(newVersion, view2.getSchemaVersion());
PTable baseTable = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
assertEquals(oldVersion, baseTable.getSchemaVersion());
assertNull(baseTable.getStreamingTopicName());
assertEquals(topicName, view2.getStreamingTopicName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -74,7 +75,6 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;


@Category(ParallelStatsDisabledTest.class)
public class CreateTableIT extends ParallelStatsDisabledIT {

Expand Down Expand Up @@ -1192,28 +1192,38 @@ public void testTableDescriptorPriority() throws SQLException, IOException {
}

@Test
public void testCreateTableSchemaVersion() throws Exception {
public void testCreateTableSchemaVersionAndTopicName() throws Exception {
Properties props = new Properties();
final String schemaName = generateUniqueName();
final String tableName = generateUniqueName();
final String version = "V1.0";
final String topicName = "MyTopicName";
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
testCreateTableSchemaVersionHelper(conn, schemaName, tableName, version);
testCreateTableSchemaVersionAndTopicNameHelper(conn, schemaName, tableName, version, topicName);
}
}

public static void testCreateTableSchemaVersionHelper(Connection conn, String schemaName, String tableName,
String dataTableVersion)
public static void testCreateTableSchemaVersionAndTopicNameHelper(Connection conn, String schemaName, String tableName,
String dataTableVersion, String topicName)
throws Exception {
final String dataTableFullName = SchemaUtil.getTableName(schemaName, tableName);
String ddl =
"CREATE TABLE " + dataTableFullName + " (\n" + "ID1 VARCHAR(15) NOT NULL,\n"
+ "ID2 VARCHAR(15) NOT NULL,\n" + "CREATED_DATE DATE,\n"
+ "CREATION_TIME BIGINT,\n" + "LAST_USED DATE,\n"
+ "CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SCHEMA_VERSION='" + dataTableVersion + "'";
+ "CONSTRAINT PK PRIMARY KEY (ID1, ID2)) SCHEMA_VERSION='" + dataTableVersion
+ "'";
if (topicName != null) {
ddl += ", STREAMING_TOPIC_NAME='" + topicName + "'";
}
conn.createStatement().execute(ddl);
PTable table = PhoenixRuntime.getTableNoCache(conn, dataTableFullName);
assertEquals(dataTableVersion, table.getSchemaVersion());
if (topicName != null) {
assertEquals(topicName, table.getStreamingTopicName());
} else {
assertNull(table.getStreamingTopicName());
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,12 +338,15 @@ public void testCreateViewSchemaVersion() throws Exception {
final String viewFullName = SchemaUtil.getTableName(schemaName, viewName);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String version = "V1.0";
CreateTableIT.testCreateTableSchemaVersionHelper(conn, schemaName, tableName, version);
String topicName = "MyTopicName";
CreateTableIT.testCreateTableSchemaVersionAndTopicNameHelper(conn, schemaName,
tableName, version, topicName);
String createViewSql = "CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName +
" SCHEMA_VERSION='" + version + "'";
" SCHEMA_VERSION='" + version + "', STREAMING_TOPIC_NAME='" + topicName + "'";
conn.createStatement().execute(createViewSql);
PTable view = PhoenixRuntime.getTableNoCache(conn, viewFullName);
assertEquals(version, view.getSchemaVersion());
assertEquals(topicName, view.getStreamingTopicName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,7 @@ public void testCreateIndexSchemaVersion() throws Exception {
final String indexFullName = SchemaUtil.getTableName(schemaName, indexName);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String version = "V1.0";
CreateTableIT.testCreateTableSchemaVersionHelper(conn, schemaName, tableName, version);
CreateTableIT.testCreateTableSchemaVersionAndTopicNameHelper(conn, schemaName, tableName, version, null);
String createIndexSql = "CREATE INDEX " + indexName + " ON " + dataTableFullName +
" (ID2) INCLUDE (ID1) SCHEMA_VERSION='" + version + "'";
conn.createStatement().execute(createIndexSql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ public void testCreateViewSchemaVersion() throws Exception {
final String viewIndexFullName = SchemaUtil.getTableName(schemaName, viewIndexName);
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
String version = "V1.0";
CreateTableIT.testCreateTableSchemaVersionHelper(conn, schemaName, tableName, version);
CreateTableIT.testCreateTableSchemaVersionAndTopicNameHelper(conn, schemaName, tableName, version, null);
String createViewSql = "CREATE VIEW " + viewFullName + " AS SELECT * FROM " + dataTableFullName +
" SCHEMA_VERSION='" + version + "'";
conn.createStatement().execute(createViewSql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SORT_ORDER_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORAGE_SCHEME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_NAME_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.TABLE_SEQ_NUM_BYTES;
Expand Down Expand Up @@ -363,6 +364,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr
TABLE_FAMILY_BYTES, SCHEMA_VERSION_BYTES);
private static final Cell EXTERNAL_SCHEMA_ID_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
TABLE_FAMILY_BYTES, EXTERNAL_SCHEMA_ID_BYTES);
private static final Cell STREAMING_TOPIC_NAME_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY,
TABLE_FAMILY_BYTES, STREAMING_TOPIC_NAME_BYTES);

private static final List<Cell> TABLE_KV_COLUMNS = Lists.newArrayList(
EMPTY_KEYVALUE_KV,
Expand Down Expand Up @@ -401,7 +404,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr
LAST_DDL_TIMESTAMP_KV,
CHANGE_DETECTION_ENABLED_KV,
SCHEMA_VERSION_KV,
EXTERNAL_SCHEMA_ID_KV
EXTERNAL_SCHEMA_ID_KV,
STREAMING_TOPIC_NAME_KV
);

static {
Expand Down Expand Up @@ -447,6 +451,8 @@ public class MetaDataEndpointImpl extends MetaDataProtocol implements RegionCopr
private static final int SCHEMA_VERSION_INDEX = TABLE_KV_COLUMNS.indexOf(SCHEMA_VERSION_KV);
private static final int EXTERNAL_SCHEMA_ID_INDEX =
TABLE_KV_COLUMNS.indexOf(EXTERNAL_SCHEMA_ID_KV);
private static final int STREAMING_TOPIC_NAME_INDEX =
TABLE_KV_COLUMNS.indexOf(STREAMING_TOPIC_NAME_KV);
// KeyValues for Column
private static final KeyValue DECIMAL_DIGITS_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, DECIMAL_DIGITS_BYTES);
private static final KeyValue COLUMN_SIZE_KV = createFirstOnRow(ByteUtil.EMPTY_BYTE_ARRAY, TABLE_FAMILY_BYTES, COLUMN_SIZE_BYTES);
Expand Down Expand Up @@ -1398,6 +1404,14 @@ private PTable getTableFromCells(List<Cell> tableCellList, List<List<Cell>> allC
builder.setExternalSchemaId(externalSchemaId != null ? externalSchemaId :
oldTable != null ? oldTable.getExternalSchemaId() : null);

Cell streamingTopicNameKv = tableKeyValues[STREAMING_TOPIC_NAME_INDEX];
String streamingTopicName = streamingTopicNameKv != null ?
(String) PVarchar.INSTANCE.toObject(streamingTopicNameKv.getValueArray(),
streamingTopicNameKv.getValueOffset(), streamingTopicNameKv.getValueLength())
: null;
builder.setStreamingTopicName(streamingTopicName != null ? streamingTopicName :
oldTable != null ? oldTable.getStreamingTopicName() : null);

// Check the cell tag to see whether the view has modified this property
final byte[] tagUseStatsForParallelization = (useStatsForParallelizationKv == null) ?
HConstants.EMPTY_BYTE_ARRAY :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,12 @@ public abstract class MetaDataProtocol extends MetaDataService {
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_13_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0 = MIN_TABLE_TIMESTAMP + 28;
// TODO Was there a system table upgrade?
// TODO Need to account for the inevitable 4.14 release too

public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_0_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_14_0;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0 = MIN_TABLE_TIMESTAMP + 29;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0 = MIN_TABLE_TIMESTAMP + 33;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 = MIN_TABLE_TIMESTAMP + 36;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_1_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0;
public static final long MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 = MIN_TABLE_TIMESTAMP + 37;
// MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants
public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,9 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData {
public static final String EXTERNAL_SCHEMA_ID = "EXTERNAL_SCHEMA_ID";
public static final byte[] EXTERNAL_SCHEMA_ID_BYTES = Bytes.toBytes(EXTERNAL_SCHEMA_ID);

public static final String STREAMING_TOPIC_NAME = "STREAMING_TOPIC_NAME";
public static final byte[] STREAMING_TOPIC_NAME_BYTES = Bytes.toBytes(STREAMING_TOPIC_NAME);

public static final String SYSTEM_CHILD_LINK_TABLE = "CHILD_LINK";
public static final String SYSTEM_CHILD_LINK_NAME = SchemaUtil.getTableName(SYSTEM_CATALOG_SCHEMA, SYSTEM_CHILD_LINK_TABLE);
public static final byte[] SYSTEM_CHILD_LINK_NAME_BYTES = Bytes.toBytes(SYSTEM_CHILD_LINK_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_15_0;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_16_0;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION;
import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER;
Expand Down Expand Up @@ -3939,18 +3939,21 @@ protected PhoenixConnection upgradeSystemCatalogIfRequired(PhoenixConnection met
}
}
}
if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0) {
if (currentServerSideTableTimeStamp < MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0) {
metaConnection = addColumnsIfNotExists(metaConnection,
PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 -2,
PhoenixDatabaseMetaData.SYSTEM_CATALOG, MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -3,
PhoenixDatabaseMetaData.PHYSICAL_TABLE_NAME
+ " " + PVarchar.INSTANCE.getSqlTypeName());

metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0 -1,
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -2,
PhoenixDatabaseMetaData.SCHEMA_VERSION + " " + PVarchar.INSTANCE.getSqlTypeName());
metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MIN_SYSTEM_TABLE_TIMESTAMP_4_17_0,
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0 -1,
PhoenixDatabaseMetaData.EXTERNAL_SCHEMA_ID + " " + PVarchar.INSTANCE.getSqlTypeName());
metaConnection = addColumnsIfNotExists(metaConnection, PhoenixDatabaseMetaData.SYSTEM_CATALOG,
MIN_SYSTEM_TABLE_TIMESTAMP_5_2_0,
PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME + " " + PVarchar.INSTANCE.getSqlTypeName());
}
return metaConnection;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_TIME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.START_WITH;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STORE_NULLS;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.STREAMING_TOPIC_NAME;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_SCHEMA;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CATALOG_TABLE;
import static org.apache.phoenix.jdbc.PhoenixDatabaseMetaData.SYSTEM_CHILD_LINK_TABLE;
Expand Down Expand Up @@ -340,6 +341,7 @@ enum JoinType {INNER, LEFT_OUTER}
CHANGE_DETECTION_ENABLED + " BOOLEAN, \n" +
SCHEMA_VERSION + " VARCHAR, \n" +
EXTERNAL_SCHEMA_ID + " VARCHAR, \n" +
STREAMING_TOPIC_NAME + " VARCHAR, \n" +
// Column metadata (will be null for table row)
DATA_TYPE + " INTEGER," +
COLUMN_SIZE + " INTEGER," +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,9 @@ public String getExternalSchemaId() {
return delegate.getExternalSchemaId();
}

@Override
public String getStreamingTopicName() { return delegate.getStreamingTopicName(); }

@Override public Map<String, String> getPropertyValues() { return delegate.getPropertyValues(); }

@Override public Map<String, String> getDefaultPropertyValues() { return delegate.getDefaultPropertyValues(); }
Expand Down
Loading