-
Notifications
You must be signed in to change notification settings - Fork 994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PHOENIX-7001: Initial implementation of Change Data Capture (CDC) feature #1866
PHOENIX-7001: Initial implementation of Change Data Capture (CDC) feature #1866
Conversation
…ture This commit includes a squash of all the below changes from the PRs apache#1662, apache#1694, down by removing non-functional changes for the ease of review. On top of it, the changes have been reworked for the changed state of master, especially the new submodules. * 4c9827a Hari.. PHOENIX-7008 Shallow grammar support for CREATE CDC (apache#1662) * e5220e0 TheN.. PHOENIX-7054 Shallow grammar support for DROP CDC and ALTER CDC (apache#1694) * 581e613 Hari.. PHOENIX-7008 Implementation for CREATE CDC (apache#1681) * e2ef886 Hari.. PHOENIX-7008 Tweaks, fixes and additional test coverage for CREATE CDC (apache#1703) * 5d3fd40 TheN.. PHOENIX-7074 DROP CDC Implementation (apache#1713) * 7420443 Hari.. PHOENIX-7014: Query compiler/optimizer changes along with some PHOENIX-7015 changes (apache#1766) * da6ddad Kadi.. Add an extra delete mutation for CDC * 93d586e Kadi.. Add an extra delete mutation during rebuild for CDC index * f07898f Hari.. PHOENIX-7008: Addressing Jira spec and review feedback changes (apache#1802) * 87a2ea1 Hari.. PHOENIX-7008: Fix for parser gap and fix for failing test (apache#1812) * e395780 TheN.. PHOENIX-7015 Implementing CDCGlobalIndexRegionScanner (apache#1813) Co-authored-by: Saurabh Rai <saurabh.rai@salesforce.com>
"\""+CDCUtil.getCDCIndexName(cdcName)+"\""); | ||
String indexFullName = SchemaUtil.getTableName(schemaName, | ||
CDCUtil.getCDCIndexName(cdcName)); | ||
TestUtil.waitForIndexState(conn, indexFullName, PIndexState.ACTIVE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? IndexTool runs and waits for the job to complete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return CDC_INDEX_PREFIX + SchemaUtil.getTableNameFromFullName(cdcName.toUpperCase()); | ||
} | ||
|
||
public static boolean isCDCIndex(String indexName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like this API assumes that the indexName doesn't have the schema qualifier ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that is right. There are 2 calls to this right now, 1 is from the overloaded method that works on PTable and it is passing the name without schema qualifier. The other call comes from the "DROP INDEX" where you won't expect a schema qualifier anyway.
return isCDCIndex(indexTable.getTableName().getString()); | ||
} | ||
|
||
public static Scan initForRawScan(Scan scan) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A better name for this API could be initializeScanForCDC or setupScanForCDC because it is doing more than just setting the raw attribute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! It started purely for setting raw scan attributes so the name was appropriate back then, but your suggestion to rename sounds good.
assertNoResults(conn, cdcName); | ||
|
||
try { | ||
conn.createStatement().execute(cdc_sql); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can I create multiple cdc's on the same table with different cdc name but same change scope ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the CDC objects are completely independent of each other, with their own change scopes, so they can use same or different change scopes.
" INCLUDE (pre, post) INDEX_TYPE=g"); | ||
|
||
cdcName = generateUniqueName(); | ||
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE (pre, post)"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using hard coded strings PRE, POST use the constants defined in the code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, just to confirm, you are referring to the line 107, correct? I would like to leave the usage in 102 and 105 as is.
saltingConfig[1], null); | ||
try { | ||
assertCDCState(conn, cdcName, null, 3); | ||
// Index inherits table salt buckets. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is not right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is meant for the next line, will move it.
I found usage of maps to keep track of mutations and pre and post images are too complex in ITs. The test code should be easy to read and verify. We need a list of mutations and then check the result CDC against this list of mutations. For example, each mutation can be modeled as a timestamp, mutation type, list of column names and list of values represented as strings. This list can be initialized when it is constructed. Creating upsert and delete statements from this mutation list should be straightforward. Comparing the result of CDC against this list also should simple. The pre and post images can be retrived from the data table using SCN connections and verified against the CDC result. |
Gson gson = new GsonBuilder().serializeNulls().create(); | ||
byte[] value = gson.toJson(changeBuilder.buildCDCEvent()).getBytes(StandardCharsets.UTF_8); | ||
CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY); | ||
Result cdcRow = Result.create(Arrays.asList(builder | ||
.setRow(indexRowKey) | ||
.setFamily(ImmutableBytesPtr.cloneCellFamilyIfNecessary(firstCell)) | ||
.setQualifier(cdcDataTableInfo.getCdcJsonColQualBytes()) | ||
.setTimestamp(changeBuilder.getChangeTimestamp()) | ||
.setValue(value) | ||
.setType(Cell.Type.Put) | ||
.build())); | ||
return cdcRow; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use Jackson ObjectMapper to build the Json here? Also, Jackson is more efficient and thread-safe so we can define public static ObjectMapper instance and use the static object here.
This has two advantages:
- No need to create new object of Gson everytime we create Json.
- No new dependency needs to be introduced for Gson.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also use JacksonUtil at some places: https://github.com/apache/phoenix/blob/master/phoenix-core-client/src/main/java/org/apache/phoenix/util/JacksonUtil.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched from using Gson to Jackson.
pom.xml
Outdated
<dependency> | ||
<groupId>com.google.code.gson</groupId> | ||
<artifactId>gson</artifactId> | ||
<version>${gson.version}</version> | ||
</dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be avoided altogether if we use JacksonUtil
} | ||
} | ||
|
||
public Map buildCDCEvent() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the places where raw Map
is used, let's replace it with Map<String, Object>
createTable(conn, "CREATE TABLE " + tableName + " (" + | ||
(multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + | ||
"k INTEGER NOT NULL, a_binary binary(10), d Date, t TIMESTAMP, " + | ||
"CONSTRAINT PK PRIMARY KEY " + | ||
(multitenant ? "(TENANT_ID, k) " : "(k)") + ")", encodingScheme, multitenant, | ||
tableSaltBuckets, false, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also add VARBINARY
test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a new test that has VARBINARY
and a few other types.
…p to the previous commit
Now that JSON PR is merged, we can also address it with changes/pre/post image and include some tests for JSON data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, Thanks!
Let us open a separate Jira to support complex data types including Array and JSON. |
Belated +1 |
@@ -1514,7 +1514,6 @@ public void testLastDDLTimestampOnAsyncIndexes() throws Exception { | |||
|
|||
// run the index MR job. | |||
IndexToolIT.runIndexTool(false, TestUtil.DEFAULT_SCHEMA_NAME, tableName, indexName); | |||
TestUtil.waitForIndexState(conn, fullIndexName, PIndexState.ACTIVE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this change is causing these 3 tests to fail continuously on all PRs on master:
GlobalImmutableTxIndexWithRegionMovesIT.testLastDDLTimestampOnAsyncIndexes
GlobalMutableTxIndexIT.testLastDDLTimestampOnAsyncIndexes
GlobalImmutableTxIndexIT.testLastDDLTimestampOnAsyncIndexes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see it was removed based on Kadir's comment here - #1866 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tkhurana Should I add it back to BaseIndexIT and BaseIndexWithRegionMovesIT in my PR, although it will be a while before it gets merged to master? Not sure why the behaviour of IndexTool is different in these 3 cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will have to put this back, we are just discussing this internally.
This commit includes a squash of all the below changes from all the CDC feature PRs to address PHOENIX-7001 and further narrows the diff down by removing non-functional changes for the ease of review. On top of it, the changes have been reworked for the changed state of master, especially the new submodules.