From 4ccbc357b8d43c2c41cc96fdb56838b203298eff Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Tue, 9 Jul 2024 19:58:33 +0530 Subject: [PATCH 1/8] @PHOENIX-7239: Save data table salt buckets in IndexMaintainer --- .../org/apache/phoenix/index/IndexMaintainer.java | 12 +++++++++++- .../main/java/org/apache/phoenix/schema/PTable.java | 1 + .../java/org/apache/phoenix/schema/PTableImpl.java | 1 - .../src/main/protobuf/ServerCachingService.proto | 1 + .../java/org/apache/phoenix/end2end/CDCQueryIT.java | 2 +- 5 files changed, 14 insertions(+), 3 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index ec7fc73b676..90ca3880d5c 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -397,6 +397,7 @@ public static IndexMaintainer getIndexMaintainer(List maintaine private RowKeyMetaData rowKeyMetaData; private byte[] indexTableName; private int nIndexSaltBuckets; + private int nDataTableSaltBuckets; private byte[] dataEmptyKeyValueCF; private ImmutableBytesPtr emptyKeyValueCFPtr; private int nDataCFs; @@ -470,6 +471,7 @@ private IndexMaintainer(final PTable dataTable, final PTable cdcTable, final PTa this.immutableStorageScheme = index.getImmutableStorageScheme() == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : index.getImmutableStorageScheme(); this.dataEncodingScheme = dataTable.getEncodingScheme() == null ? QualifierEncodingScheme.NON_ENCODED_QUALIFIERS : dataTable.getEncodingScheme(); this.dataImmutableStorageScheme = dataTable.getImmutableStorageScheme() == null ? ImmutableStorageScheme.ONE_CELL_PER_COLUMN : dataTable.getImmutableStorageScheme(); + this.nDataTableSaltBuckets = isDataTableSalted ? dataTable.getBucketNum() : PTable.NO_SALTING; byte[] indexTableName = index.getPhysicalName().getBytes(); // Use this for the nDataSaltBuckets as we need this for local indexes @@ -925,7 +927,9 @@ public byte[] buildDataRowKey(ImmutableBytesWritable indexRowKeyPtr, byte[][] vi // there to maintain compatibility between an old client and a new server. if (isDataTableSalted) { // Set salt byte - byte saltByte = SaltingUtil.getSaltingByte(dataRowKey, SaltingUtil.NUM_SALTING_BYTES, length-SaltingUtil.NUM_SALTING_BYTES, nIndexSaltBuckets); + byte saltByte = SaltingUtil.getSaltingByte(dataRowKey, + SaltingUtil.NUM_SALTING_BYTES, length-SaltingUtil.NUM_SALTING_BYTES, + nDataTableSaltBuckets); dataRowKey[0] = saltByte; } return dataRowKey.length == length ? dataRowKey : Arrays.copyOf(dataRowKey, length); @@ -1789,6 +1793,9 @@ public static IndexMaintainer fromProto(ServerCachingProtos.IndexMaintainer prot } else { maintainer.isCDCIndex = false; } + if (proto.hasDataTableSaltBuckets()) { + maintainer.nDataTableSaltBuckets = proto.getDataTableSaltBuckets(); + } maintainer.initCachedState(); return maintainer; } @@ -1933,6 +1940,9 @@ public static ServerCachingProtos.IndexMaintainer toProto(IndexMaintainer mainta } } builder.setIsCDCIndex(maintainer.isCDCIndex); + if (maintainer.isDataTableSalted) { + builder.setDataTableSaltBuckets(maintainer.nDataTableSaltBuckets); + } return builder.build(); } diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java index d1d7d5227f4..c51292a8ae8 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTable.java @@ -63,6 +63,7 @@ public interface PTable extends PMetaDataEntity { public static final String IS_IMMUTABLE_ROWS_PROP_NAME = "IMMUTABLE_ROWS"; public static final boolean DEFAULT_DISABLE_WAL = false; public static final boolean DEFAULT_IMMUTABLE_ROWS = false; + static final Integer NO_SALTING = -1; public enum ViewType { MAPPED((byte)1), diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java index b056db84f97..a94b7d3d33b 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/schema/PTableImpl.java @@ -142,7 +142,6 @@ * @since 0.1 */ public class PTableImpl implements PTable { - private static final Integer NO_SALTING = -1; private static final int VIEW_MODIFIED_UPDATE_CACHE_FREQUENCY_BIT_SET_POS = 0; private static final int VIEW_MODIFIED_USE_STATS_FOR_PARALLELIZATION_BIT_SET_POS = 1; private static final int VIEW_MODIFIED_PHOENIX_TTL_BIT_SET_POS = 2; diff --git a/phoenix-core-client/src/main/protobuf/ServerCachingService.proto b/phoenix-core-client/src/main/protobuf/ServerCachingService.proto index c28695ff7dd..d8e19455273 100644 --- a/phoenix-core-client/src/main/protobuf/ServerCachingService.proto +++ b/phoenix-core-client/src/main/protobuf/ServerCachingService.proto @@ -72,6 +72,7 @@ message IndexMaintainer { optional bytes indexWhere = 29; repeated ColumnReference indexWhereColumns = 30; optional bool isCDCIndex = 31; + optional int32 dataTableSaltBuckets = 32; } message TransformMaintainer { diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java index e36851a742a..5d738b4311d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java @@ -102,7 +102,7 @@ public static synchronized Collection data() { { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 1, 1, Boolean.FALSE }, // Once PHOENIX-7239, change this to have different salt buckets for data and index. - { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 1, + { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 3, 5, Boolean.TRUE }, { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4, null, Boolean.FALSE }, From 17bd6272f3bc80e94b4aeb8d1e5c088e0aa6e548 Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Thu, 22 Aug 2024 11:19:44 +0530 Subject: [PATCH 2/8] Fix the bug that is causing incorrect salt bucket number to be used for mutations enerated in CDC index. --- .../apache/phoenix/execute/MutationState.java | 2 +- .../apache/phoenix/index/CDCTableInfo.java | 1 - .../apache/phoenix/index/IndexMaintainer.java | 5 --- .../coprocessor/GlobalIndexRegionScanner.java | 2 +- .../org/apache/phoenix/end2end/CDCBaseIT.java | 32 +++++++++++++++---- .../apache/phoenix/end2end/CDCQueryIT.java | 27 ++++++++++++++-- 6 files changed, 52 insertions(+), 17 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java index 529b6cd60ed..3580b666466 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -668,7 +668,7 @@ public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) { }; ImmutableBytesPtr key = new ImmutableBytesPtr(maintainer.buildRowKey( getter, ptr, null, null, mutationTimestamp)); - PRow row = table.newRow( + PRow row = index.newRow( connection.getKeyValueBuilder(), mutationTimestamp, key, false); row.delete(); indexMutations.addAll(row.toRowMutations()); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java b/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java index 4d80f3e9d24..02fe008ab39 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java @@ -164,7 +164,6 @@ public static CDCInfoProtos.CDCTableDef toProto(StatementContext context) if (cdcDataTableRef.getTable().isImmutableRows() && cdcDataTableRef.getTable().getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) { - List dataColumns = new ArrayList(); PTable table = cdcDataTableRef.getTable(); for (PColumn column : table.getColumns()) { diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 90ca3880d5c..46113152634 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -920,11 +920,6 @@ public byte[] buildDataRowKey(ImmutableBytesWritable indexRowKeyPtr, byte[][] vi length--; trailingVariableWidthColumnNum--; } - // TODO: need to capture nDataSaltBuckets instead of just a boolean. For now, - // we store this in nIndexSaltBuckets, as we only use this function for local indexes - // in which case nIndexSaltBuckets would never be used. Note that when we do add this - // to be serialized, we have to add it at the end and allow for the value not being - // there to maintain compatibility between an old client and a new server. if (isDataTableSalted) { // Set salt byte byte saltByte = SaltingUtil.getSaltingByte(dataRowKey, diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java index bad4c08c5a5..4219e54a929 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java @@ -1360,7 +1360,7 @@ public static List prepareIndexMutationsForRebuild(IndexMaintainer ind indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts, ByteUtil.EMPTY_BYTE_ARRAY); indexMutations.add(IndexRegionObserver.getDeleteIndexMutation( - currentDataRowState, indexMaintainer, ts, rowKeyPtr)); + cdcDataRowState, indexMaintainer, ts, rowKeyPtr)); } } currentDataRowState = null; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java index c554da20792..f3242029cb5 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectReader; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.hbase.index.IndexRegionObserver; +import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableProperty; @@ -64,6 +65,7 @@ import static java.util.stream.Collectors.*; import static org.apache.phoenix.query.QueryConstants.CDC_CHANGE_IMAGE; import static org.apache.phoenix.query.QueryConstants.CDC_DELETE_EVENT_TYPE; +import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE; import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE; import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE; import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE; @@ -515,6 +517,14 @@ protected List generateChanges(long startTS, String[] tenantids, Stri } } committer.reset(); + // For debug logging, uncomment this code to see the list of changes. + //for (int i = 0; i < changes.size(); ++i) { + // System.out.println("----- generated change: " + i + + // " tenantId:" + changes.get(i).tenantId + + // " changeTS: " + changes.get(i).changeTS + + // " pks: " + changes.get(i).pks + + // " change: " + changes.get(i).change); + //} return changes; } @@ -524,10 +534,11 @@ protected void verifyChangesViaSCN(String tenantId, ResultSet rs, String dataTab Set> deletedRows = new HashSet<>(); for (int i = 0, changenr = 0; i < changes.size(); ++i) { ChangeRow changeRow = changes.get(i); - if (changeRow.getTenantID() != null && changeRow.getTenantID() != tenantId) { + if (tenantId != null && changeRow.getTenantID() != tenantId) { continue; } if (changeRow.getChangeType() == CDC_DELETE_EVENT_TYPE) { + // FIXME: Check if this can cause a flapper as we are not incrementing changenr. // Consecutive delete operations don't appear as separate events. if (deletedRows.contains(changeRow.pks)) { continue; @@ -537,13 +548,14 @@ protected void verifyChangesViaSCN(String tenantId, ResultSet rs, String dataTab else { deletedRows.remove(changeRow.pks); } - String changeDesc = "Change " + (changenr+1) + ": " + changeRow; + String changeDesc = "Change " + changenr + ": " + changeRow; assertTrue(changeDesc, rs.next()); for (Map.Entry pkCol: changeRow.pks.entrySet()) { assertEquals(changeDesc, pkCol.getValue(), rs.getObject(pkCol.getKey())); } Map cdcObj = mapper.reader(HashMap.class).readValue( rs.getString(changeRow.pks.size()+2)); + assertEquals(changeDesc, changeRow.getChangeType(), cdcObj.get(CDC_EVENT_TYPE)); if (cdcObj.containsKey(CDC_PRE_IMAGE) && ! ((Map) cdcObj.get(CDC_PRE_IMAGE)).isEmpty() && changeScopes.contains(PTable.CDCChangeScope.PRE)) { @@ -708,6 +720,14 @@ protected List generateChangesImmutableTable(long startTS, String[] t } } committer.reset(); + // For debug logging, uncomment this code to see the list of changes. + //for (int i = 0; i < changes.size(); ++i) { + // System.out.println("----- generated change: " + i + + // " tenantId:" + changes.get(i).tenantId + + // " changeTS: " + changes.get(i).changeTS + + // " pks: " + changes.get(i).pks + + // " change: " + changes.get(i).change); + //} return changes; } @@ -720,13 +740,13 @@ protected List generateChangesImmutableTable(long startTS, String[] t ) protected class ChangeRow implements Comparable { @JsonProperty - private final String tenantId; + protected final String tenantId; @JsonProperty - private final long changeTS; + protected final long changeTS; @JsonProperty - private final Map pks; + protected final Map pks; @JsonProperty - private final Map change; + protected final Map change; public String getTenantID() { return tenantId; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java index 5d738b4311d..2fb95395a91 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java @@ -92,7 +92,7 @@ public CDCQueryIT(Boolean forView, Boolean dataBeforeCDC, } @Parameterized.Parameters(name = "forView={0} dataBeforeCDC={1}, encodingScheme={2}, " + - "multitenant={3}, indexSaltBuckets={4}, tableSaltBuckets={5} withSchemaName=${6}") + "multitenant={3}, indexSaltBuckets={4}, tableSaltBuckets={5} withSchemaName={6}") public static synchronized Collection data() { return Arrays.asList(new Object[][] { { Boolean.FALSE, Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, null, @@ -101,8 +101,7 @@ public static synchronized Collection data() { Boolean.TRUE }, { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 1, 1, Boolean.FALSE }, - // Once PHOENIX-7239, change this to have different salt buckets for data and index. - { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 3, 5, + { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 2, Boolean.TRUE }, { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4, null, Boolean.FALSE }, @@ -170,6 +169,17 @@ public void testSelectCDC() throws Exception { String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { + // For debug: uncomment to see the exact results logged to console. + //try (Statement stmt = conn.createStatement()) { + // try (ResultSet rs = stmt.executeQuery( + // "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + + // "\"CDC JSON\" FROM " + cdcFullName)) { + // while (rs.next()) { + // System.out.println("----- " + rs.getString(1) + " " + + // rs.getInt(2) + " " + rs.getString(3)); + // } + // } + //} // Existence of CDC shouldn't cause the regular query path to fail. String uncovered_sql = "SELECT " + " /*+ INDEX(" + tableName + " " + @@ -416,16 +426,27 @@ private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStor put("V1", "INTEGER"); put("V2", "INTEGER"); }}; + + // For debug: uncomment to see the exact HBase cells. + //LOGGER.debug("----- DUMP data table: " + datatableName + " -----"); + //SingleCellIndexIT.dumpTable(datatableName); + //LOGGER.debug("----- DUMP index table: " + CDCUtil.getCDCIndexName(cdcName) + " -----"); + //SingleCellIndexIT.dumpTable(SchemaUtil.getTableName(schemaName, + // CDCUtil.getCDCIndexName(cdcName))); + //LOGGER.debug("----- -----"); + try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. //try (Statement stmt = conn.createStatement()) { // try (ResultSet rs = stmt.executeQuery( // "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + // "\"CDC JSON\" FROM " + cdcFullName)) { + // LOGGER.debug("----- DUMP of CDC query -----"); // while (rs.next()) { // System.out.println("----- " + rs.getString(1) + " " + // rs.getInt(2) + " " + rs.getString(3)); // } + // LOGGER.debug("----- -----"); // } //} verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery( From b32593ba2cb0f892de8a77a4924a796e4c3f7791 Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Tue, 3 Sep 2024 16:44:19 +0530 Subject: [PATCH 3/8] Revamped the time range test and included the coverage for special CDC DF markers. Also refactored some debug code. --- .../coprocessor/GlobalIndexRegionScanner.java | 6 +- .../hbase/index/IndexRegionObserver.java | 3 +- .../org/apache/phoenix/end2end/CDCBaseIT.java | 97 ++++++- .../apache/phoenix/end2end/CDCQueryIT.java | 249 +++++------------- 4 files changed, 153 insertions(+), 202 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java index 4219e54a929..ee3bb094745 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java @@ -1355,12 +1355,8 @@ public static List prepareIndexMutationsForRebuild(IndexMaintainer ind // CDC Index needs two delete markers one for deleting the index row, // and the other for referencing the data table delete mutation with // the right index row key, that is, the index row key starting with ts - Put cdcDataRowState = new Put(currentDataRowState.getRow()); - cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(), - indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts, - ByteUtil.EMPTY_BYTE_ARRAY); indexMutations.add(IndexRegionObserver.getDeleteIndexMutation( - cdcDataRowState, indexMaintainer, ts, rowKeyPtr)); + currentDataRowState, indexMaintainer, ts, rowKeyPtr)); } } currentDataRowState = null; diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 317841fb13d..8078d763f75 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -1029,7 +1029,6 @@ private void prepareIndexMutations(BatchMutateContext context, List(getDeleteIndexMutation(cdcDataRowState, indexMaintainer, ts, rowKeyPtr), rowKeyPtr.get())); - } } } @@ -1840,4 +1839,4 @@ public static Map getAttributeValuesFromWALKey(WALKey key) { public static boolean isAtomicOperationComplete(OperationStatus status) { return status.getOperationStatusCode() == SUCCESS && status.getResult() != null; } -} \ No newline at end of file +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java index f3242029cb5..e6571f19d35 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java @@ -22,10 +22,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectReader; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.phoenix.end2end.index.SingleCellIndexIT; import org.apache.phoenix.hbase.index.IndexRegionObserver; -import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.TableProperty; @@ -60,12 +59,14 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.stream.Collectors.*; import static org.apache.phoenix.query.QueryConstants.CDC_CHANGE_IMAGE; import static org.apache.phoenix.query.QueryConstants.CDC_DELETE_EVENT_TYPE; import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE; +import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME; import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE; import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE; import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE; @@ -313,15 +314,24 @@ protected List> generateMutations(long startTS, Map> batches = new ArrayList<>(nBatches); Set> mutatedRows = new HashSet<>(nRows); long batchTS = startTS; + boolean gotDelete = false; for (int i = 0; i < nBatches; ++i) { Set batch = new TreeSet<>(); for (int j = 0; j < nRows; ++j) { if (rand.nextInt(nRows) % 2 == 0) { - boolean isDelete = mutatedRows.contains(rows.get(j)) - && rand.nextInt(5) == 0; + boolean isDelete; + if (i > nBatches/2 && ! gotDelete) { + // Force a delete if there was none so far. + isDelete = true; + } + else { + isDelete = mutatedRows.contains(rows.get(j)) + && rand.nextInt(5) == 0; + } ChangeRow changeRow; if (isDelete) { changeRow = new ChangeRow(null, batchTS, rows.get(j), null); + gotDelete = true; } else { changeRow = new ChangeRow(null, batchTS, rows.get(j), @@ -335,6 +345,19 @@ protected List> generateMutations(long startTS, Map batch: allBatches.get(tid)) { + // for (ChangeRow change : batch) { + // LOGGER.debug("Mutation: " + (++mnr) + " in batch: " + bnr + " " + + // " tenantId:" + changes.get(i).tenantId + + // " changeTS: " + changes.get(i).changeTS + + // " pks: " + changes.get(i).pks + + // " change: " + changes.get(i).change); + // } + // ++bnr; + //} + //LOGGER.debug("----------"); return batches; } @@ -364,7 +387,7 @@ private Map generateSampleData(Random rand, Map } protected void applyMutations(CommitAdapter committer, String datatableName, String tid, - List> batches) throws Exception { + List> batches, String cdcName) throws Exception { EnvironmentEdgeManager.injectEdge(injectEdge); try (Connection conn = committer.getConnection(tid)) { for (Set batch: batches) { @@ -375,6 +398,45 @@ protected void applyMutations(CommitAdapter committer, String datatableName, Str } } committer.reset(); + + // For debug: uncomment to see the exact HBase cells. + //dumpCells(datatableName, cdcName); + } + + protected void dumpCells(String datatableName, String cdcName) throws Exception { + LOGGER.debug("----- DUMP data table: " + datatableName + " -----"); + SingleCellIndexIT.dumpTable(datatableName); + LOGGER.debug("----- DUMP index table: " + CDCUtil.getCDCIndexName(cdcName) + " -----"); + SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName)); + LOGGER.debug("----------"); + } + + protected void dumpCDCResults(Connection conn, String cdcName, Map pkColumns, + String cdcQuery) throws Exception { + try (Statement stmt = conn.createStatement()) { + try (ResultSet rs = stmt.executeQuery(cdcQuery)) { + LOGGER.debug("----- DUMP CDC: " + cdcName + " -----"); + for (int i = 0; rs.next(); ++i) { + LOGGER.debug("CDC row: " + (i+1) + " timestamp=" + + rs.getDate(1).getTime() + " " + + collectColumns(pkColumns, rs) + ", " + CDC_JSON_COL_NAME + "=" + + rs.getString(pkColumns.size() + 2)); + } + LOGGER.debug("----------"); + } + } + } + + private static String collectColumns(Map pkColumns, ResultSet rs) { + return pkColumns.keySet().stream().map( + k -> { + try { + return k + "=" + rs.getObject(k); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }).collect( + Collectors.joining(", ")); } protected void createTable(Connection conn, String tableName, Map pkColumns, @@ -528,6 +590,28 @@ protected List generateChanges(long startTS, String[] tenantids, Stri return changes; } + protected void verifyChangesViaSCN(String tenantId, Connection conn, String cdcFullName, + Map pkColumns, + String dataTableName, Map dataColumns, + List changes, long startTS, long endTS) + throws Exception { + List filteredChanges = new ArrayList<>(); + for (ChangeRow change: changes) { + if (change.changeTS >= startTS && change.changeTS <= endTS) { + filteredChanges.add(change); + } + } + String cdcSql = "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROm " + cdcFullName + " WHERE " + + " PHOENIX_ROW_TIMESTAMP() >= CAST(CAST(" + startTS + " AS BIGINT) AS TIMESTAMP) " + + "AND PHOENIX_ROW_TIMESTAMP() <= CAST(CAST(" + endTS + " AS BIGINT) AS TIMESTAMP)"; + dumpCDCResults(conn, cdcFullName, + new TreeMap() {{ put("K1", "INTEGER"); }}, cdcSql); + try (ResultSet rs = conn.createStatement().executeQuery(cdcSql)) { + verifyChangesViaSCN(tenantId, rs, dataTableName, dataColumns, filteredChanges, + CHANGE_IMG); + } + } + protected void verifyChangesViaSCN(String tenantId, ResultSet rs, String dataTableName, Map dataColumns, List changes, Set changeScopes) throws Exception { @@ -653,7 +737,7 @@ private Map encodeValues(Map image, } protected List generateChangesImmutableTable(long startTS, String[] tenantids, - String tableName, CommitAdapter committer) + String tableName, CommitAdapter committer, String cdcName) throws Exception { List changes = new ArrayList<>(); EnvironmentEdgeManager.injectEdge(injectEdge); @@ -721,6 +805,7 @@ protected List generateChangesImmutableTable(long startTS, String[] t } committer.reset(); // For debug logging, uncomment this code to see the list of changes. + //dumpCells(tableName, cdcName); //for (int i = 0; i < changes.size(); ++i) { // System.out.println("----- generated change: " + i + // " tenantId:" + changes.get(i).tenantId + diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java index 2fb95395a91..1c297a3c0d9 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java @@ -18,7 +18,6 @@ package org.apache.phoenix.end2end; import org.apache.hadoop.hbase.TableName; -import org.apache.phoenix.end2end.index.SingleCellIndexIT; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.CDCUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; @@ -35,8 +34,6 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; @@ -46,12 +43,11 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.TreeMap; -import java.util.stream.Collectors; import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE; -import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME; import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS; import static org.junit.Assert.assertEquals; @@ -99,7 +95,7 @@ public static synchronized Collection data() { Boolean.FALSE }, { Boolean.FALSE, Boolean.TRUE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, null, Boolean.TRUE }, - { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 1, 1, + { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, null, 4, Boolean.FALSE }, { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 2, Boolean.TRUE }, @@ -164,22 +160,13 @@ public void testSelectCDC() throws Exception { CDCUtil.getCDCIndexName(cdcName)))); } - //SingleCellIndexIT.dumpTable(tableName); - //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName)); - String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. - //try (Statement stmt = conn.createStatement()) { - // try (ResultSet rs = stmt.executeQuery( - // "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + - // "\"CDC JSON\" FROM " + cdcFullName)) { - // while (rs.next()) { - // System.out.println("----- " + rs.getString(1) + " " + - // rs.getInt(2) + " " + rs.getString(3)); - // } - // } - //} + //dumpCDCResults(conn, cdcName, + // new TreeMap() {{ put("K1", "INTEGER"); }}, + // "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + + // "\"CDC JSON\" FROM " + cdcFullName); // Existence of CDC shouldn't cause the regular query path to fail. String uncovered_sql = "SELECT " + " /*+ INDEX(" + tableName + " " + @@ -295,26 +282,9 @@ public void testSelectGeneric() throws Exception { Map>> allBatches = new HashMap<>(tenantids.length); for (String tid: tenantids) { allBatches.put(tid, generateMutations(startTS, pkColumns, dataColumns, 20, 5)); - // For debug: uncomment to see the exact mutations that are being applied. - //LOGGER.debug("----- DUMP Mutations -----"); - //int bnr = 1, mnr = 0; - //for (Set batch: allBatches.get(tid)) { - // for (ChangeRow changeRow : batch) { - // LOGGER.debug("Mutation: " + (++mnr) + " in batch: " + bnr + " " + changeRow); - // } - // ++bnr; - //} - //LOGGER.debug("----------"); - applyMutations(COMMIT_SUCCESS, tableName, tid, allBatches.get(tid)); + applyMutations(COMMIT_SUCCESS, tableName, tid, allBatches.get(tid), cdcName); } - // For debug: uncomment to see the exact HBase cells. - //LOGGER.debug("----- DUMP data table: " + datatableName + " -----"); - //SingleCellIndexIT.dumpTable(datatableName); - //LOGGER.debug("----- DUMP index table: " + CDCUtil.getCDCIndexName(cdcName) + " -----"); - //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName)); - //LOGGER.debug("----------"); - if (dataBeforeCDC) { try (Connection conn = newConnection()) { createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme, @@ -329,19 +299,8 @@ public void testSelectGeneric() throws Exception { String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. - //try (Statement stmt = conn.createStatement()) { - // try (ResultSet rs = stmt.executeQuery( - // "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName)) { - // LOGGER.debug("----- DUMP CDC: " + cdcName + " -----"); - // for (int i = 0; rs.next(); ++i) { - // LOGGER.debug("CDC row: " + (i+1) + " timestamp=" - // + rs.getDate(1).getTime() + " " - // + collectColumns(pkColumns, rs) + ", " + CDC_JSON_COL_NAME + "=" - // + rs.getString(pkColumns.size() + 2)); - // } - // LOGGER.debug("----------"); - // } - //} + //dumpCDCResults(conn, cdcName, pkColumns, + // "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName); List changes = new ArrayList<>(); for (Set batch: allBatches.get(tenantId)) { @@ -362,18 +321,6 @@ public void testSelectGeneric() throws Exception { } } - private static String collectColumns(Map pkColumns, ResultSet rs) { - return pkColumns.keySet().stream().map( - k -> { - try { - return k + "=" + rs.getObject(k); - } catch (SQLException e) { - throw new RuntimeException(e); - } - }).collect( - Collectors.joining(", ")); - } - private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStorageScheme) throws Exception { String cdcName, cdc_sql; @@ -408,7 +355,7 @@ private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStor long startTS = System.currentTimeMillis(); List changes = generateChangesImmutableTable(startTS, tenantids, tableName, - COMMIT_SUCCESS); + COMMIT_SUCCESS, cdcName); if (dataBeforeCDC) { try (Connection conn = newConnection()) { @@ -427,28 +374,12 @@ private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStor put("V2", "INTEGER"); }}; - // For debug: uncomment to see the exact HBase cells. - //LOGGER.debug("----- DUMP data table: " + datatableName + " -----"); - //SingleCellIndexIT.dumpTable(datatableName); - //LOGGER.debug("----- DUMP index table: " + CDCUtil.getCDCIndexName(cdcName) + " -----"); - //SingleCellIndexIT.dumpTable(SchemaUtil.getTableName(schemaName, - // CDCUtil.getCDCIndexName(cdcName))); - //LOGGER.debug("----- -----"); - try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. - //try (Statement stmt = conn.createStatement()) { - // try (ResultSet rs = stmt.executeQuery( - // "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + - // "\"CDC JSON\" FROM " + cdcFullName)) { - // LOGGER.debug("----- DUMP of CDC query -----"); - // while (rs.next()) { - // System.out.println("----- " + rs.getString(1) + " " + - // rs.getInt(2) + " " + rs.getString(3)); - // } - // LOGGER.debug("----- -----"); - // } - //} + //dumpCDCResults(conn, cdcName, + // new TreeMap() {{ put("K1", "INTEGER"); }}, + // "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + + // "\"CDC JSON\" FROM " + cdcFullName); verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery( "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName), datatableName, dataColumns, changes, PRE_POST_IMG); @@ -472,15 +403,19 @@ public void testSelectCDCImmutableSingleCell() throws Exception { } @Test - public void testSelectTimeRangeQueries() throws Exception { + public void testSeletWithTimeRange() throws Exception { String cdcName, cdc_sql; String schemaName = withSchemaName ? generateUniqueName() : null; String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName()); + String datatableName = tableName; + Map pkColumns = new TreeMap() {{ + put("K1", "INTEGER"); + }}; + Map dataColumns = new TreeMap() {{ + put("V1", "INTEGER"); + }}; try (Connection conn = newConnection()) { - createTable(conn, "CREATE TABLE " + tableName + " (" + - (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + - "k INTEGER NOT NULL, v1 INTEGER, CONSTRAINT PK PRIMARY KEY " + - (multitenant ? "(TENANT_ID, k) " : "(k)") + ")", encodingScheme, multitenant, + createTable(conn, tableName, pkColumns, dataColumns, multitenant, encodingScheme, tableSaltBuckets, false, null); if (forView) { String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName()); @@ -489,134 +424,70 @@ public void testSelectTimeRangeQueries() throws Exception { tableName = viewName; } cdcName = generateUniqueName(); - cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName; + cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE (change)"; if (!dataBeforeCDC) { createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme, indexSaltBuckets); } } - EnvironmentEdgeManager.injectEdge(injectEdge); - String tenantId = multitenant ? "1000" : null; String[] tenantids = {tenantId}; if (multitenant) { tenantids = new String[] {tenantId, "2000"}; } - Timestamp ts1 = new Timestamp(System.currentTimeMillis()); - cal.setTimeInMillis(ts1.getTime()); - injectEdge.setValue(ts1.getTime()); - - for (String tid: tenantids) { - try (Connection conn = newConnection(tid)) { - conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)"); - conn.commit(); - } - } - - injectEdge.incrementValue(100); - - for (String tid: tenantids) { - try (Connection conn = newConnection(tid)) { - conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)"); - conn.commit(); - } - } - - injectEdge.incrementValue(100); - cal.add(Calendar.MILLISECOND, 200); - Timestamp ts2 = new Timestamp(cal.getTime().getTime()); - injectEdge.incrementValue(100); - - for (String tid: tenantids) { - try (Connection conn = newConnection(tid)) { - conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)"); - conn.commit(); - injectEdge.incrementValue(100); - conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (3, 300)"); - conn.commit(); - } - } - - injectEdge.incrementValue(100); - cal.add(Calendar.MILLISECOND, 200 + 100 * tenantids.length); - Timestamp ts3 = new Timestamp(cal.getTime().getTime()); - injectEdge.incrementValue(100); - + long startTS = System.currentTimeMillis(); + Map>> allBatches = new HashMap<>(tenantids.length); for (String tid: tenantids) { - try (Connection conn = newConnection(tid)) { - conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)"); - conn.commit(); - injectEdge.incrementValue(100); - conn.createStatement().execute("DELETE FROM " + tableName + " WHERE k = 2"); - conn.commit(); - } + allBatches.put(tid, generateMutations(startTS, pkColumns, dataColumns, 20, 5)); + applyMutations(COMMIT_SUCCESS, tableName, tid, allBatches.get(tid), cdcName); } - injectEdge.incrementValue(100); - cal.add(Calendar.MILLISECOND, 200 + 100 * tenantids.length); - Timestamp ts4 = new Timestamp(cal.getTime().getTime()); - EnvironmentEdgeManager.reset(); - if (dataBeforeCDC) { try (Connection conn = newConnection()) { createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme, indexSaltBuckets); } + // Testing with flushed data adds more coverage. + getUtility().getAdmin().flush(TableName.valueOf(datatableName)); + getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName, + CDCUtil.getCDCIndexName(cdcName)))); } - //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName)); - String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { - String sel_sql = - "SELECT to_char(phoenix_row_timestamp()), k, \"CDC JSON\" FROM " + cdcFullName + - " WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() <= ?"; - Object[] testDataSets = new Object[] { - new Object[] {ts1, ts2, new int[] {1, 2}}, - new Object[] {ts2, ts3, new int[] {1, 3}}, - new Object[] {ts3, ts4, new int[] {1, 2}}, - new Object[] {ts1, ts4, new int[] {1, 2, 1, 3, 1, 2}}, - }; - PreparedStatement stmt = conn.prepareStatement(sel_sql); // For debug: uncomment to see the exact results logged to console. - //System.out.println("----- ts1: " + ts1 + " ts2: " + ts2 + " ts3: " + ts3 + " ts4: " + - // ts4); - //for (int i = 0; i < testDataSets.length; ++i) { - // Object[] testData = (Object[]) testDataSets[i]; - // stmt.setTimestamp(1, (Timestamp) testData[0]); - // stmt.setTimestamp(2, (Timestamp) testData[1]); - // try (ResultSet rs = stmt.executeQuery()) { - // System.out.println("----- Test data set: " + i); - // while (rs.next()) { - // System.out.println("----- " + rs.getString(1) + " " + - // rs.getInt(2) + " " + rs.getString(3)); - // } - // } - //} - for (int i = 0; i < testDataSets.length; ++i) { - Object[] testData = (Object[]) testDataSets[i]; - stmt.setTimestamp(1, (Timestamp) testData[0]); - stmt.setTimestamp(2, (Timestamp) testData[1]); - try (ResultSet rs = stmt.executeQuery()) { - for (int j = 0; j < ((int[]) testData[2]).length; ++j) { - int k = ((int[]) testData[2])[j]; - assertEquals(" Index: " + j + " Test data set: " + i, - true, rs.next()); - assertEquals(" Index: " + j + " Test data set: " + i, - k, rs.getInt(2)); - } - assertEquals("Test data set: " + i, false, rs.next()); - } - } + dumpCDCResults(conn, cdcName, pkColumns, + "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName); - PreparedStatement pstmt = conn.prepareStatement( - "SELECT * FROM " + cdcFullName + " WHERE PHOENIX_ROW_TIMESTAMP() > ?"); - pstmt.setTimestamp(1, ts4); - try (ResultSet rs = pstmt.executeQuery()) { - assertEquals(false, rs.next()); + List changes = new ArrayList<>(); + for (Set batch: allBatches.get(tenantId)) { + changes.addAll(batch); + } + List uniqueTimestamps = new ArrayList<>(); + Integer lastDeletionTSpos = null; + for (ChangeRow change: changes) { + if (uniqueTimestamps.size() == 0 || + uniqueTimestamps.get(uniqueTimestamps.size()-1) != change.changeTS) { + uniqueTimestamps.add(change.changeTS); + } + if (change.change == null) { + lastDeletionTSpos = uniqueTimestamps.size() - 1; + } } + Random rand = new Random(); + int randMinTSpos = rand.nextInt(lastDeletionTSpos - 1); + int randMaxTSpos = randMinTSpos + 1 + rand.nextInt( + uniqueTimestamps.size() - (randMinTSpos + 1)); + verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns, + datatableName, dataColumns, changes, 0, System.currentTimeMillis()); + verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns, + datatableName, dataColumns, changes, randMinTSpos, randMaxTSpos); + verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns, + datatableName, dataColumns, changes, randMinTSpos, lastDeletionTSpos); + verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns, + datatableName, dataColumns, changes, lastDeletionTSpos, randMaxTSpos); } } From b66d9e92a8a4039ccf831fb2316ab5fc6fda8d0b Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Fri, 13 Sep 2024 10:48:02 +0530 Subject: [PATCH 4/8] Fix issues with debug logging code --- .../org/apache/phoenix/end2end/CDCBaseIT.java | 49 ++++++++++++------- .../apache/phoenix/end2end/CDCQueryIT.java | 20 ++++---- 2 files changed, 43 insertions(+), 26 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java index e6571f19d35..adb329af15f 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.end2end.index.SingleCellIndexIT; import org.apache.phoenix.hbase.index.IndexRegionObserver; @@ -345,15 +346,16 @@ protected List> generateMutations(long startTS, Map batch: allBatches.get(tid)) { + //for (Set batch: batches) { // for (ChangeRow change : batch) { // LOGGER.debug("Mutation: " + (++mnr) + " in batch: " + bnr + " " + - // " tenantId:" + changes.get(i).tenantId + - // " changeTS: " + changes.get(i).changeTS + - // " pks: " + changes.get(i).pks + - // " change: " + changes.get(i).change); + // " tenantId:" + change.tenantId + + // " changeTS: " + change.changeTS + + // " pks: " + change.pks + + // " change: " + change.change); // } // ++bnr; //} @@ -386,13 +388,15 @@ private Map generateSampleData(Random rand, Map return row; } - protected void applyMutations(CommitAdapter committer, String datatableName, String tid, - List> batches, String cdcName) throws Exception { + protected void applyMutations(CommitAdapter committer, String schemaName, String tableName, + String datatableName, String tid, List> batches, + String cdcName) + throws Exception { EnvironmentEdgeManager.injectEdge(injectEdge); try (Connection conn = committer.getConnection(tid)) { for (Set batch: batches) { for (ChangeRow changeRow: batch) { - addChange(conn, datatableName, changeRow); + addChange(conn, tableName, changeRow); } committer.commit(conn); } @@ -400,14 +404,23 @@ protected void applyMutations(CommitAdapter committer, String datatableName, Str committer.reset(); // For debug: uncomment to see the exact HBase cells. - //dumpCells(datatableName, cdcName); + //dumpCells(schemaName, tableName, datatableName, cdcName); } - protected void dumpCells(String datatableName, String cdcName) throws Exception { + protected void dumpCells(String schemaName, String tableName, String datatableName, + String cdcName) throws Exception { LOGGER.debug("----- DUMP data table: " + datatableName + " -----"); SingleCellIndexIT.dumpTable(datatableName); - LOGGER.debug("----- DUMP index table: " + CDCUtil.getCDCIndexName(cdcName) + " -----"); - SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName)); + String indexName = CDCUtil.getCDCIndexName(cdcName); + String indexTableName = SchemaUtil.getTableName(schemaName, tableName == datatableName ? + indexName : getViewIndexPhysicalName(datatableName)); + LOGGER.debug("----- DUMP index table: " + indexTableName + " -----"); + try { + SingleCellIndexIT.dumpTable(indexTableName); + } catch (TableNotFoundException e) { + // Ignore, this would happen if CDC is not yet created. This use case is going to go + // away soon anyway. + } LOGGER.debug("----------"); } @@ -581,7 +594,7 @@ protected List generateChanges(long startTS, String[] tenantids, Stri committer.reset(); // For debug logging, uncomment this code to see the list of changes. //for (int i = 0; i < changes.size(); ++i) { - // System.out.println("----- generated change: " + i + + // LOGGER.debug("----- generated change: " + i + // " tenantId:" + changes.get(i).tenantId + // " changeTS: " + changes.get(i).changeTS + // " pks: " + changes.get(i).pks + @@ -622,9 +635,9 @@ protected void verifyChangesViaSCN(String tenantId, ResultSet rs, String dataTab continue; } if (changeRow.getChangeType() == CDC_DELETE_EVENT_TYPE) { - // FIXME: Check if this can cause a flapper as we are not incrementing changenr. // Consecutive delete operations don't appear as separate events. if (deletedRows.contains(changeRow.pks)) { + ++changenr; continue; } deletedRows.add(changeRow.pks); @@ -737,7 +750,9 @@ private Map encodeValues(Map image, } protected List generateChangesImmutableTable(long startTS, String[] tenantids, - String tableName, CommitAdapter committer, String cdcName) + String schemaName, String tableName, + String datatableName, + CommitAdapter committer, String cdcName) throws Exception { List changes = new ArrayList<>(); EnvironmentEdgeManager.injectEdge(injectEdge); @@ -805,9 +820,9 @@ protected List generateChangesImmutableTable(long startTS, String[] t } committer.reset(); // For debug logging, uncomment this code to see the list of changes. - //dumpCells(tableName, cdcName); + //dumpCells(schemaName, tableName, datatableName, cdcName); //for (int i = 0; i < changes.size(); ++i) { - // System.out.println("----- generated change: " + i + + // LOGGER.debug("----- generated change: " + i + // " tenantId:" + changes.get(i).tenantId + // " changeTS: " + changes.get(i).changeTS + // " pks: " + changes.get(i).pks + diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java index 1c297a3c0d9..abdd4222c47 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java @@ -164,7 +164,7 @@ public void testSelectCDC() throws Exception { try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. //dumpCDCResults(conn, cdcName, - // new TreeMap() {{ put("K1", "INTEGER"); }}, + // new TreeMap() {{ put("K", "INTEGER"); }}, // "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + // "\"CDC JSON\" FROM " + cdcFullName); @@ -282,7 +282,8 @@ public void testSelectGeneric() throws Exception { Map>> allBatches = new HashMap<>(tenantids.length); for (String tid: tenantids) { allBatches.put(tid, generateMutations(startTS, pkColumns, dataColumns, 20, 5)); - applyMutations(COMMIT_SUCCESS, tableName, tid, allBatches.get(tid), cdcName); + applyMutations(COMMIT_SUCCESS, schemaName, tableName, datatableName, tid, + allBatches.get(tid), cdcName); } if (dataBeforeCDC) { @@ -354,8 +355,8 @@ private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStor } long startTS = System.currentTimeMillis(); - List changes = generateChangesImmutableTable(startTS, tenantids, tableName, - COMMIT_SUCCESS, cdcName); + List changes = generateChangesImmutableTable(startTS, tenantids, schemaName, + tableName, datatableName, COMMIT_SUCCESS, cdcName); if (dataBeforeCDC) { try (Connection conn = newConnection()) { @@ -377,7 +378,7 @@ private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStor try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. //dumpCDCResults(conn, cdcName, - // new TreeMap() {{ put("K1", "INTEGER"); }}, + // new TreeMap() {{ put("K", "INTEGER"); }}, // "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + // "\"CDC JSON\" FROM " + cdcFullName); verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery( @@ -403,7 +404,7 @@ public void testSelectCDCImmutableSingleCell() throws Exception { } @Test - public void testSeletWithTimeRange() throws Exception { + public void testSelectWithTimeRange() throws Exception { String cdcName, cdc_sql; String schemaName = withSchemaName ? generateUniqueName() : null; String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName()); @@ -441,7 +442,8 @@ public void testSeletWithTimeRange() throws Exception { Map>> allBatches = new HashMap<>(tenantids.length); for (String tid: tenantids) { allBatches.put(tid, generateMutations(startTS, pkColumns, dataColumns, 20, 5)); - applyMutations(COMMIT_SUCCESS, tableName, tid, allBatches.get(tid), cdcName); + applyMutations(COMMIT_SUCCESS, schemaName, tableName, datatableName, tid, + allBatches.get(tid), cdcName); } if (dataBeforeCDC) { @@ -458,8 +460,8 @@ public void testSeletWithTimeRange() throws Exception { String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. - dumpCDCResults(conn, cdcName, pkColumns, - "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName); + //dumpCDCResults(conn, cdcName, pkColumns, + // "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName); List changes = new ArrayList<>(); for (Set batch: allBatches.get(tenantId)) { From 7f342cd62448c2b701744e9f00dc6205fbd26c5a Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Fri, 13 Sep 2024 12:24:50 +0530 Subject: [PATCH 5/8] Uncomment debug code to help diagnose the occasional flappers we are seeing --- .../org/apache/phoenix/end2end/CDCBaseIT.java | 58 +++++++++---------- .../apache/phoenix/end2end/CDCQueryIT.java | 24 ++++---- 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java index adb329af15f..6a8df160f5d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java @@ -347,19 +347,19 @@ protected List> generateMutations(long startTS, Map batch: batches) { - // for (ChangeRow change : batch) { - // LOGGER.debug("Mutation: " + (++mnr) + " in batch: " + bnr + " " + - // " tenantId:" + change.tenantId + - // " changeTS: " + change.changeTS + - // " pks: " + change.pks + - // " change: " + change.change); - // } - // ++bnr; - //} - //LOGGER.debug("----------"); + LOGGER.debug("----- DUMP Mutations -----"); + int bnr = 1, mnr = 0; + for (Set batch: batches) { + for (ChangeRow change : batch) { + LOGGER.debug("Mutation: " + (++mnr) + " in batch: " + bnr + " " + + " tenantId:" + change.tenantId + + " changeTS: " + change.changeTS + + " pks: " + change.pks + + " change: " + change.change); + } + ++bnr; + } + LOGGER.debug("----------"); return batches; } @@ -404,7 +404,7 @@ protected void applyMutations(CommitAdapter committer, String schemaName, String committer.reset(); // For debug: uncomment to see the exact HBase cells. - //dumpCells(schemaName, tableName, datatableName, cdcName); + dumpCells(schemaName, tableName, datatableName, cdcName); } protected void dumpCells(String schemaName, String tableName, String datatableName, @@ -593,13 +593,13 @@ protected List generateChanges(long startTS, String[] tenantids, Stri } committer.reset(); // For debug logging, uncomment this code to see the list of changes. - //for (int i = 0; i < changes.size(); ++i) { - // LOGGER.debug("----- generated change: " + i + - // " tenantId:" + changes.get(i).tenantId + - // " changeTS: " + changes.get(i).changeTS + - // " pks: " + changes.get(i).pks + - // " change: " + changes.get(i).change); - //} + for (int i = 0; i < changes.size(); ++i) { + LOGGER.debug("----- generated change: " + i + + " tenantId:" + changes.get(i).tenantId + + " changeTS: " + changes.get(i).changeTS + + " pks: " + changes.get(i).pks + + " change: " + changes.get(i).change); + } return changes; } @@ -820,14 +820,14 @@ protected List generateChangesImmutableTable(long startTS, String[] t } committer.reset(); // For debug logging, uncomment this code to see the list of changes. - //dumpCells(schemaName, tableName, datatableName, cdcName); - //for (int i = 0; i < changes.size(); ++i) { - // LOGGER.debug("----- generated change: " + i + - // " tenantId:" + changes.get(i).tenantId + - // " changeTS: " + changes.get(i).changeTS + - // " pks: " + changes.get(i).pks + - // " change: " + changes.get(i).change); - //} + dumpCells(schemaName, tableName, datatableName, cdcName); + for (int i = 0; i < changes.size(); ++i) { + LOGGER.debug("----- generated change: " + i + + " tenantId:" + changes.get(i).tenantId + + " changeTS: " + changes.get(i).changeTS + + " pks: " + changes.get(i).pks + + " change: " + changes.get(i).change); + } return changes; } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java index abdd4222c47..c6e9c5a2631 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java @@ -163,10 +163,10 @@ public void testSelectCDC() throws Exception { String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. - //dumpCDCResults(conn, cdcName, - // new TreeMap() {{ put("K", "INTEGER"); }}, - // "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + - // "\"CDC JSON\" FROM " + cdcFullName); + /dumpCDCResults(conn, cdcName, + new TreeMap() {{ put("K", "INTEGER"); }}, + "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + + "\"CDC JSON\" FROM " + cdcFullName); // Existence of CDC shouldn't cause the regular query path to fail. String uncovered_sql = "SELECT " + " /*+ INDEX(" + tableName + " " + @@ -300,8 +300,8 @@ public void testSelectGeneric() throws Exception { String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. - //dumpCDCResults(conn, cdcName, pkColumns, - // "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName); + dumpCDCResults(conn, cdcName, pkColumns, + "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName); List changes = new ArrayList<>(); for (Set batch: allBatches.get(tenantId)) { @@ -377,10 +377,10 @@ private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStor try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. - //dumpCDCResults(conn, cdcName, - // new TreeMap() {{ put("K", "INTEGER"); }}, - // "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + - // "\"CDC JSON\" FROM " + cdcFullName); + dumpCDCResults(conn, cdcName, + new TreeMap() {{ put("K", "INTEGER"); }}, + "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + + "\"CDC JSON\" FROM " + cdcFullName); verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery( "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName), datatableName, dataColumns, changes, PRE_POST_IMG); @@ -460,8 +460,8 @@ public void testSelectWithTimeRange() throws Exception { String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. - //dumpCDCResults(conn, cdcName, pkColumns, - // "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName); + dumpCDCResults(conn, cdcName, pkColumns, + "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName); List changes = new ArrayList<>(); for (Set batch: allBatches.get(tenantId)) { From 8e940749934ffddd6f9023cc962e386cfc56f83d Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Mon, 23 Sep 2024 10:06:04 +0530 Subject: [PATCH 6/8] Reverting CDC specific changes to be moved to a separate PR --- .../apache/phoenix/execute/MutationState.java | 2 +- .../apache/phoenix/index/CDCTableInfo.java | 1 + .../coprocessor/GlobalIndexRegionScanner.java | 4 + .../hbase/index/IndexRegionObserver.java | 3 +- .../org/apache/phoenix/end2end/CDCBaseIT.java | 146 +---------- .../apache/phoenix/end2end/CDCQueryIT.java | 240 +++++++++++++----- 6 files changed, 194 insertions(+), 202 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java index 3580b666466..529b6cd60ed 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -668,7 +668,7 @@ public ImmutableBytesWritable getLatestValue(ColumnReference ref, long ts) { }; ImmutableBytesPtr key = new ImmutableBytesPtr(maintainer.buildRowKey( getter, ptr, null, null, mutationTimestamp)); - PRow row = index.newRow( + PRow row = table.newRow( connection.getKeyValueBuilder(), mutationTimestamp, key, false); row.delete(); indexMutations.addAll(row.toRowMutations()); diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java b/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java index 02fe008ab39..4d80f3e9d24 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/index/CDCTableInfo.java @@ -164,6 +164,7 @@ public static CDCInfoProtos.CDCTableDef toProto(StatementContext context) if (cdcDataTableRef.getTable().isImmutableRows() && cdcDataTableRef.getTable().getImmutableStorageScheme() == PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) { + List dataColumns = new ArrayList(); PTable table = cdcDataTableRef.getTable(); for (PColumn column : table.getColumns()) { diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java index ee3bb094745..bad4c08c5a5 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java @@ -1355,6 +1355,10 @@ public static List prepareIndexMutationsForRebuild(IndexMaintainer ind // CDC Index needs two delete markers one for deleting the index row, // and the other for referencing the data table delete mutation with // the right index row key, that is, the index row key starting with ts + Put cdcDataRowState = new Put(currentDataRowState.getRow()); + cdcDataRowState.addColumn(indexMaintainer.getDataEmptyKeyValueCF(), + indexMaintainer.getEmptyKeyValueQualifierForDataTable(), ts, + ByteUtil.EMPTY_BYTE_ARRAY); indexMutations.add(IndexRegionObserver.getDeleteIndexMutation( currentDataRowState, indexMaintainer, ts, rowKeyPtr)); } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java index 8078d763f75..317841fb13d 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java @@ -1029,6 +1029,7 @@ private void prepareIndexMutations(BatchMutateContext context, List(getDeleteIndexMutation(cdcDataRowState, indexMaintainer, ts, rowKeyPtr), rowKeyPtr.get())); + } } } @@ -1839,4 +1840,4 @@ public static Map getAttributeValuesFromWALKey(WALKey key) { public static boolean isAtomicOperationComplete(OperationStatus status) { return status.getOperationStatusCode() == SUCCESS && status.getResult() != null; } -} +} \ No newline at end of file diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java index 6a8df160f5d..c554da20792 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCBaseIT.java @@ -22,9 +22,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.hadoop.hbase.TableNotFoundException; +import com.fasterxml.jackson.databind.ObjectReader; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.phoenix.end2end.index.SingleCellIndexIT; import org.apache.phoenix.hbase.index.IndexRegionObserver; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; @@ -60,14 +59,11 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; -import java.util.stream.Collectors; import java.util.stream.Stream; import static java.util.stream.Collectors.*; import static org.apache.phoenix.query.QueryConstants.CDC_CHANGE_IMAGE; import static org.apache.phoenix.query.QueryConstants.CDC_DELETE_EVENT_TYPE; -import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE; -import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME; import static org.apache.phoenix.query.QueryConstants.CDC_POST_IMAGE; import static org.apache.phoenix.query.QueryConstants.CDC_PRE_IMAGE; import static org.apache.phoenix.query.QueryConstants.CDC_UPSERT_EVENT_TYPE; @@ -315,24 +311,15 @@ protected List> generateMutations(long startTS, Map> batches = new ArrayList<>(nBatches); Set> mutatedRows = new HashSet<>(nRows); long batchTS = startTS; - boolean gotDelete = false; for (int i = 0; i < nBatches; ++i) { Set batch = new TreeSet<>(); for (int j = 0; j < nRows; ++j) { if (rand.nextInt(nRows) % 2 == 0) { - boolean isDelete; - if (i > nBatches/2 && ! gotDelete) { - // Force a delete if there was none so far. - isDelete = true; - } - else { - isDelete = mutatedRows.contains(rows.get(j)) - && rand.nextInt(5) == 0; - } + boolean isDelete = mutatedRows.contains(rows.get(j)) + && rand.nextInt(5) == 0; ChangeRow changeRow; if (isDelete) { changeRow = new ChangeRow(null, batchTS, rows.get(j), null); - gotDelete = true; } else { changeRow = new ChangeRow(null, batchTS, rows.get(j), @@ -346,20 +333,6 @@ protected List> generateMutations(long startTS, Map batch: batches) { - for (ChangeRow change : batch) { - LOGGER.debug("Mutation: " + (++mnr) + " in batch: " + bnr + " " + - " tenantId:" + change.tenantId + - " changeTS: " + change.changeTS + - " pks: " + change.pks + - " change: " + change.change); - } - ++bnr; - } - LOGGER.debug("----------"); return batches; } @@ -388,68 +361,18 @@ private Map generateSampleData(Random rand, Map return row; } - protected void applyMutations(CommitAdapter committer, String schemaName, String tableName, - String datatableName, String tid, List> batches, - String cdcName) - throws Exception { + protected void applyMutations(CommitAdapter committer, String datatableName, String tid, + List> batches) throws Exception { EnvironmentEdgeManager.injectEdge(injectEdge); try (Connection conn = committer.getConnection(tid)) { for (Set batch: batches) { for (ChangeRow changeRow: batch) { - addChange(conn, tableName, changeRow); + addChange(conn, datatableName, changeRow); } committer.commit(conn); } } committer.reset(); - - // For debug: uncomment to see the exact HBase cells. - dumpCells(schemaName, tableName, datatableName, cdcName); - } - - protected void dumpCells(String schemaName, String tableName, String datatableName, - String cdcName) throws Exception { - LOGGER.debug("----- DUMP data table: " + datatableName + " -----"); - SingleCellIndexIT.dumpTable(datatableName); - String indexName = CDCUtil.getCDCIndexName(cdcName); - String indexTableName = SchemaUtil.getTableName(schemaName, tableName == datatableName ? - indexName : getViewIndexPhysicalName(datatableName)); - LOGGER.debug("----- DUMP index table: " + indexTableName + " -----"); - try { - SingleCellIndexIT.dumpTable(indexTableName); - } catch (TableNotFoundException e) { - // Ignore, this would happen if CDC is not yet created. This use case is going to go - // away soon anyway. - } - LOGGER.debug("----------"); - } - - protected void dumpCDCResults(Connection conn, String cdcName, Map pkColumns, - String cdcQuery) throws Exception { - try (Statement stmt = conn.createStatement()) { - try (ResultSet rs = stmt.executeQuery(cdcQuery)) { - LOGGER.debug("----- DUMP CDC: " + cdcName + " -----"); - for (int i = 0; rs.next(); ++i) { - LOGGER.debug("CDC row: " + (i+1) + " timestamp=" - + rs.getDate(1).getTime() + " " - + collectColumns(pkColumns, rs) + ", " + CDC_JSON_COL_NAME + "=" - + rs.getString(pkColumns.size() + 2)); - } - LOGGER.debug("----------"); - } - } - } - - private static String collectColumns(Map pkColumns, ResultSet rs) { - return pkColumns.keySet().stream().map( - k -> { - try { - return k + "=" + rs.getObject(k); - } catch (SQLException e) { - throw new RuntimeException(e); - } - }).collect( - Collectors.joining(", ")); } protected void createTable(Connection conn, String tableName, Map pkColumns, @@ -592,52 +515,21 @@ protected List generateChanges(long startTS, String[] tenantids, Stri } } committer.reset(); - // For debug logging, uncomment this code to see the list of changes. - for (int i = 0; i < changes.size(); ++i) { - LOGGER.debug("----- generated change: " + i + - " tenantId:" + changes.get(i).tenantId + - " changeTS: " + changes.get(i).changeTS + - " pks: " + changes.get(i).pks + - " change: " + changes.get(i).change); - } return changes; } - protected void verifyChangesViaSCN(String tenantId, Connection conn, String cdcFullName, - Map pkColumns, - String dataTableName, Map dataColumns, - List changes, long startTS, long endTS) - throws Exception { - List filteredChanges = new ArrayList<>(); - for (ChangeRow change: changes) { - if (change.changeTS >= startTS && change.changeTS <= endTS) { - filteredChanges.add(change); - } - } - String cdcSql = "SELECT /*+ CDC_INCLUDE(CHANGE) */ * FROm " + cdcFullName + " WHERE " + - " PHOENIX_ROW_TIMESTAMP() >= CAST(CAST(" + startTS + " AS BIGINT) AS TIMESTAMP) " + - "AND PHOENIX_ROW_TIMESTAMP() <= CAST(CAST(" + endTS + " AS BIGINT) AS TIMESTAMP)"; - dumpCDCResults(conn, cdcFullName, - new TreeMap() {{ put("K1", "INTEGER"); }}, cdcSql); - try (ResultSet rs = conn.createStatement().executeQuery(cdcSql)) { - verifyChangesViaSCN(tenantId, rs, dataTableName, dataColumns, filteredChanges, - CHANGE_IMG); - } - } - protected void verifyChangesViaSCN(String tenantId, ResultSet rs, String dataTableName, Map dataColumns, List changes, Set changeScopes) throws Exception { Set> deletedRows = new HashSet<>(); for (int i = 0, changenr = 0; i < changes.size(); ++i) { ChangeRow changeRow = changes.get(i); - if (tenantId != null && changeRow.getTenantID() != tenantId) { + if (changeRow.getTenantID() != null && changeRow.getTenantID() != tenantId) { continue; } if (changeRow.getChangeType() == CDC_DELETE_EVENT_TYPE) { // Consecutive delete operations don't appear as separate events. if (deletedRows.contains(changeRow.pks)) { - ++changenr; continue; } deletedRows.add(changeRow.pks); @@ -645,14 +537,13 @@ protected void verifyChangesViaSCN(String tenantId, ResultSet rs, String dataTab else { deletedRows.remove(changeRow.pks); } - String changeDesc = "Change " + changenr + ": " + changeRow; + String changeDesc = "Change " + (changenr+1) + ": " + changeRow; assertTrue(changeDesc, rs.next()); for (Map.Entry pkCol: changeRow.pks.entrySet()) { assertEquals(changeDesc, pkCol.getValue(), rs.getObject(pkCol.getKey())); } Map cdcObj = mapper.reader(HashMap.class).readValue( rs.getString(changeRow.pks.size()+2)); - assertEquals(changeDesc, changeRow.getChangeType(), cdcObj.get(CDC_EVENT_TYPE)); if (cdcObj.containsKey(CDC_PRE_IMAGE) && ! ((Map) cdcObj.get(CDC_PRE_IMAGE)).isEmpty() && changeScopes.contains(PTable.CDCChangeScope.PRE)) { @@ -750,9 +641,7 @@ private Map encodeValues(Map image, } protected List generateChangesImmutableTable(long startTS, String[] tenantids, - String schemaName, String tableName, - String datatableName, - CommitAdapter committer, String cdcName) + String tableName, CommitAdapter committer) throws Exception { List changes = new ArrayList<>(); EnvironmentEdgeManager.injectEdge(injectEdge); @@ -819,15 +708,6 @@ protected List generateChangesImmutableTable(long startTS, String[] t } } committer.reset(); - // For debug logging, uncomment this code to see the list of changes. - dumpCells(schemaName, tableName, datatableName, cdcName); - for (int i = 0; i < changes.size(); ++i) { - LOGGER.debug("----- generated change: " + i + - " tenantId:" + changes.get(i).tenantId + - " changeTS: " + changes.get(i).changeTS + - " pks: " + changes.get(i).pks + - " change: " + changes.get(i).change); - } return changes; } @@ -840,13 +720,13 @@ protected List generateChangesImmutableTable(long startTS, String[] t ) protected class ChangeRow implements Comparable { @JsonProperty - protected final String tenantId; + private final String tenantId; @JsonProperty - protected final long changeTS; + private final long changeTS; @JsonProperty - protected final Map pks; + private final Map pks; @JsonProperty - protected final Map change; + private final Map change; public String getTenantID() { return tenantId; diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java index c6e9c5a2631..e36851a742a 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCQueryIT.java @@ -18,6 +18,7 @@ package org.apache.phoenix.end2end; import org.apache.hadoop.hbase.TableName; +import org.apache.phoenix.end2end.index.SingleCellIndexIT; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.CDCUtil; import org.apache.phoenix.util.EnvironmentEdgeManager; @@ -34,6 +35,8 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Arrays; @@ -43,11 +46,12 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.Set; import java.util.TreeMap; +import java.util.stream.Collectors; import static org.apache.phoenix.query.QueryConstants.CDC_EVENT_TYPE; +import static org.apache.phoenix.query.QueryConstants.CDC_JSON_COL_NAME; import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.NON_ENCODED_QUALIFIERS; import static org.apache.phoenix.schema.PTable.QualifierEncodingScheme.TWO_BYTE_QUALIFIERS; import static org.junit.Assert.assertEquals; @@ -88,16 +92,17 @@ public CDCQueryIT(Boolean forView, Boolean dataBeforeCDC, } @Parameterized.Parameters(name = "forView={0} dataBeforeCDC={1}, encodingScheme={2}, " + - "multitenant={3}, indexSaltBuckets={4}, tableSaltBuckets={5} withSchemaName={6}") + "multitenant={3}, indexSaltBuckets={4}, tableSaltBuckets={5} withSchemaName=${6}") public static synchronized Collection data() { return Arrays.asList(new Object[][] { { Boolean.FALSE, Boolean.FALSE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, null, Boolean.FALSE }, { Boolean.FALSE, Boolean.TRUE, TWO_BYTE_QUALIFIERS, Boolean.FALSE, null, null, Boolean.TRUE }, - { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, null, 4, + { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 1, 1, Boolean.FALSE }, - { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 2, + // Once PHOENIX-7239, change this to have different salt buckets for data and index. + { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.TRUE, 1, 1, Boolean.TRUE }, { Boolean.FALSE, Boolean.FALSE, NON_ENCODED_QUALIFIERS, Boolean.FALSE, 4, null, Boolean.FALSE }, @@ -160,13 +165,11 @@ public void testSelectCDC() throws Exception { CDCUtil.getCDCIndexName(cdcName)))); } + //SingleCellIndexIT.dumpTable(tableName); + //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName)); + String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { - // For debug: uncomment to see the exact results logged to console. - /dumpCDCResults(conn, cdcName, - new TreeMap() {{ put("K", "INTEGER"); }}, - "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + - "\"CDC JSON\" FROM " + cdcFullName); // Existence of CDC shouldn't cause the regular query path to fail. String uncovered_sql = "SELECT " + " /*+ INDEX(" + tableName + " " + @@ -282,10 +285,26 @@ public void testSelectGeneric() throws Exception { Map>> allBatches = new HashMap<>(tenantids.length); for (String tid: tenantids) { allBatches.put(tid, generateMutations(startTS, pkColumns, dataColumns, 20, 5)); - applyMutations(COMMIT_SUCCESS, schemaName, tableName, datatableName, tid, - allBatches.get(tid), cdcName); + // For debug: uncomment to see the exact mutations that are being applied. + //LOGGER.debug("----- DUMP Mutations -----"); + //int bnr = 1, mnr = 0; + //for (Set batch: allBatches.get(tid)) { + // for (ChangeRow changeRow : batch) { + // LOGGER.debug("Mutation: " + (++mnr) + " in batch: " + bnr + " " + changeRow); + // } + // ++bnr; + //} + //LOGGER.debug("----------"); + applyMutations(COMMIT_SUCCESS, tableName, tid, allBatches.get(tid)); } + // For debug: uncomment to see the exact HBase cells. + //LOGGER.debug("----- DUMP data table: " + datatableName + " -----"); + //SingleCellIndexIT.dumpTable(datatableName); + //LOGGER.debug("----- DUMP index table: " + CDCUtil.getCDCIndexName(cdcName) + " -----"); + //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName)); + //LOGGER.debug("----------"); + if (dataBeforeCDC) { try (Connection conn = newConnection()) { createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme, @@ -300,8 +319,19 @@ public void testSelectGeneric() throws Exception { String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. - dumpCDCResults(conn, cdcName, pkColumns, - "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName); + //try (Statement stmt = conn.createStatement()) { + // try (ResultSet rs = stmt.executeQuery( + // "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName)) { + // LOGGER.debug("----- DUMP CDC: " + cdcName + " -----"); + // for (int i = 0; rs.next(); ++i) { + // LOGGER.debug("CDC row: " + (i+1) + " timestamp=" + // + rs.getDate(1).getTime() + " " + // + collectColumns(pkColumns, rs) + ", " + CDC_JSON_COL_NAME + "=" + // + rs.getString(pkColumns.size() + 2)); + // } + // LOGGER.debug("----------"); + // } + //} List changes = new ArrayList<>(); for (Set batch: allBatches.get(tenantId)) { @@ -322,6 +352,18 @@ public void testSelectGeneric() throws Exception { } } + private static String collectColumns(Map pkColumns, ResultSet rs) { + return pkColumns.keySet().stream().map( + k -> { + try { + return k + "=" + rs.getObject(k); + } catch (SQLException e) { + throw new RuntimeException(e); + } + }).collect( + Collectors.joining(", ")); + } + private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStorageScheme) throws Exception { String cdcName, cdc_sql; @@ -355,8 +397,8 @@ private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStor } long startTS = System.currentTimeMillis(); - List changes = generateChangesImmutableTable(startTS, tenantids, schemaName, - tableName, datatableName, COMMIT_SUCCESS, cdcName); + List changes = generateChangesImmutableTable(startTS, tenantids, tableName, + COMMIT_SUCCESS); if (dataBeforeCDC) { try (Connection conn = newConnection()) { @@ -374,13 +416,18 @@ private void _testSelectCDCImmutable(PTable.ImmutableStorageScheme immutableStor put("V1", "INTEGER"); put("V2", "INTEGER"); }}; - try (Connection conn = newConnection(tenantId)) { // For debug: uncomment to see the exact results logged to console. - dumpCDCResults(conn, cdcName, - new TreeMap() {{ put("K", "INTEGER"); }}, - "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + - "\"CDC JSON\" FROM " + cdcFullName); + //try (Statement stmt = conn.createStatement()) { + // try (ResultSet rs = stmt.executeQuery( + // "SELECT /*+ CDC_INCLUDE(PRE, POST) */ PHOENIX_ROW_TIMESTAMP(), K," + + // "\"CDC JSON\" FROM " + cdcFullName)) { + // while (rs.next()) { + // System.out.println("----- " + rs.getString(1) + " " + + // rs.getInt(2) + " " + rs.getString(3)); + // } + // } + //} verifyChangesViaSCN(tenantId, conn.createStatement().executeQuery( "SELECT /*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcFullName), datatableName, dataColumns, changes, PRE_POST_IMG); @@ -404,19 +451,15 @@ public void testSelectCDCImmutableSingleCell() throws Exception { } @Test - public void testSelectWithTimeRange() throws Exception { + public void testSelectTimeRangeQueries() throws Exception { String cdcName, cdc_sql; String schemaName = withSchemaName ? generateUniqueName() : null; String tableName = SchemaUtil.getTableName(schemaName, generateUniqueName()); - String datatableName = tableName; - Map pkColumns = new TreeMap() {{ - put("K1", "INTEGER"); - }}; - Map dataColumns = new TreeMap() {{ - put("V1", "INTEGER"); - }}; try (Connection conn = newConnection()) { - createTable(conn, tableName, pkColumns, dataColumns, multitenant, encodingScheme, + createTable(conn, "CREATE TABLE " + tableName + " (" + + (multitenant ? "TENANT_ID CHAR(5) NOT NULL, " : "") + + "k INTEGER NOT NULL, v1 INTEGER, CONSTRAINT PK PRIMARY KEY " + + (multitenant ? "(TENANT_ID, k) " : "(k)") + ")", encodingScheme, multitenant, tableSaltBuckets, false, null); if (forView) { String viewName = SchemaUtil.getTableName(schemaName, generateUniqueName()); @@ -425,71 +468,134 @@ public void testSelectWithTimeRange() throws Exception { tableName = viewName; } cdcName = generateUniqueName(); - cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INCLUDE (change)"; + cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName; if (!dataBeforeCDC) { createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme, indexSaltBuckets); } } + EnvironmentEdgeManager.injectEdge(injectEdge); + String tenantId = multitenant ? "1000" : null; String[] tenantids = {tenantId}; if (multitenant) { tenantids = new String[] {tenantId, "2000"}; } - long startTS = System.currentTimeMillis(); - Map>> allBatches = new HashMap<>(tenantids.length); + Timestamp ts1 = new Timestamp(System.currentTimeMillis()); + cal.setTimeInMillis(ts1.getTime()); + injectEdge.setValue(ts1.getTime()); + for (String tid: tenantids) { - allBatches.put(tid, generateMutations(startTS, pkColumns, dataColumns, 20, 5)); - applyMutations(COMMIT_SUCCESS, schemaName, tableName, datatableName, tid, - allBatches.get(tid), cdcName); + try (Connection conn = newConnection(tid)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)"); + conn.commit(); + } } + injectEdge.incrementValue(100); + + for (String tid: tenantids) { + try (Connection conn = newConnection(tid)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)"); + conn.commit(); + } + } + + injectEdge.incrementValue(100); + cal.add(Calendar.MILLISECOND, 200); + Timestamp ts2 = new Timestamp(cal.getTime().getTime()); + injectEdge.incrementValue(100); + + for (String tid: tenantids) { + try (Connection conn = newConnection(tid)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)"); + conn.commit(); + injectEdge.incrementValue(100); + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (3, 300)"); + conn.commit(); + } + } + + injectEdge.incrementValue(100); + cal.add(Calendar.MILLISECOND, 200 + 100 * tenantids.length); + Timestamp ts3 = new Timestamp(cal.getTime().getTime()); + injectEdge.incrementValue(100); + + for (String tid: tenantids) { + try (Connection conn = newConnection(tid)) { + conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)"); + conn.commit(); + injectEdge.incrementValue(100); + conn.createStatement().execute("DELETE FROM " + tableName + " WHERE k = 2"); + conn.commit(); + } + } + + injectEdge.incrementValue(100); + cal.add(Calendar.MILLISECOND, 200 + 100 * tenantids.length); + Timestamp ts4 = new Timestamp(cal.getTime().getTime()); + EnvironmentEdgeManager.reset(); + if (dataBeforeCDC) { try (Connection conn = newConnection()) { createCDCAndWait(conn, tableName, cdcName, cdc_sql, encodingScheme, indexSaltBuckets); } - // Testing with flushed data adds more coverage. - getUtility().getAdmin().flush(TableName.valueOf(datatableName)); - getUtility().getAdmin().flush(TableName.valueOf(SchemaUtil.getTableName(schemaName, - CDCUtil.getCDCIndexName(cdcName)))); } + //SingleCellIndexIT.dumpTable(CDCUtil.getCDCIndexName(cdcName)); + String cdcFullName = SchemaUtil.getTableName(schemaName, cdcName); try (Connection conn = newConnection(tenantId)) { + String sel_sql = + "SELECT to_char(phoenix_row_timestamp()), k, \"CDC JSON\" FROM " + cdcFullName + + " WHERE PHOENIX_ROW_TIMESTAMP() >= ? AND PHOENIX_ROW_TIMESTAMP() <= ?"; + Object[] testDataSets = new Object[] { + new Object[] {ts1, ts2, new int[] {1, 2}}, + new Object[] {ts2, ts3, new int[] {1, 3}}, + new Object[] {ts3, ts4, new int[] {1, 2}}, + new Object[] {ts1, ts4, new int[] {1, 2, 1, 3, 1, 2}}, + }; + PreparedStatement stmt = conn.prepareStatement(sel_sql); // For debug: uncomment to see the exact results logged to console. - dumpCDCResults(conn, cdcName, pkColumns, - "SELECT /*+ CDC_INCLUDE(PRE, CHANGE) */ * FROM " + cdcFullName); - - List changes = new ArrayList<>(); - for (Set batch: allBatches.get(tenantId)) { - changes.addAll(batch); - } - List uniqueTimestamps = new ArrayList<>(); - Integer lastDeletionTSpos = null; - for (ChangeRow change: changes) { - if (uniqueTimestamps.size() == 0 || - uniqueTimestamps.get(uniqueTimestamps.size()-1) != change.changeTS) { - uniqueTimestamps.add(change.changeTS); - } - if (change.change == null) { - lastDeletionTSpos = uniqueTimestamps.size() - 1; + //System.out.println("----- ts1: " + ts1 + " ts2: " + ts2 + " ts3: " + ts3 + " ts4: " + + // ts4); + //for (int i = 0; i < testDataSets.length; ++i) { + // Object[] testData = (Object[]) testDataSets[i]; + // stmt.setTimestamp(1, (Timestamp) testData[0]); + // stmt.setTimestamp(2, (Timestamp) testData[1]); + // try (ResultSet rs = stmt.executeQuery()) { + // System.out.println("----- Test data set: " + i); + // while (rs.next()) { + // System.out.println("----- " + rs.getString(1) + " " + + // rs.getInt(2) + " " + rs.getString(3)); + // } + // } + //} + for (int i = 0; i < testDataSets.length; ++i) { + Object[] testData = (Object[]) testDataSets[i]; + stmt.setTimestamp(1, (Timestamp) testData[0]); + stmt.setTimestamp(2, (Timestamp) testData[1]); + try (ResultSet rs = stmt.executeQuery()) { + for (int j = 0; j < ((int[]) testData[2]).length; ++j) { + int k = ((int[]) testData[2])[j]; + assertEquals(" Index: " + j + " Test data set: " + i, + true, rs.next()); + assertEquals(" Index: " + j + " Test data set: " + i, + k, rs.getInt(2)); + } + assertEquals("Test data set: " + i, false, rs.next()); } } - Random rand = new Random(); - int randMinTSpos = rand.nextInt(lastDeletionTSpos - 1); - int randMaxTSpos = randMinTSpos + 1 + rand.nextInt( - uniqueTimestamps.size() - (randMinTSpos + 1)); - verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns, - datatableName, dataColumns, changes, 0, System.currentTimeMillis()); - verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns, - datatableName, dataColumns, changes, randMinTSpos, randMaxTSpos); - verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns, - datatableName, dataColumns, changes, randMinTSpos, lastDeletionTSpos); - verifyChangesViaSCN(tenantId, conn, cdcFullName, pkColumns, - datatableName, dataColumns, changes, lastDeletionTSpos, randMaxTSpos); + + PreparedStatement pstmt = conn.prepareStatement( + "SELECT * FROM " + cdcFullName + " WHERE PHOENIX_ROW_TIMESTAMP() > ?"); + pstmt.setTimestamp(1, ts4); + try (ResultSet rs = pstmt.executeQuery()) { + assertEquals(false, rs.next()); + } } } From a6ea44e0f2b51491951bba59a9cf41e297b78d9b Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Mon, 23 Sep 2024 13:52:53 +0530 Subject: [PATCH 7/8] Make default salt bucket value as -1 for consistency --- .../java/org/apache/phoenix/index/IndexMaintainer.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index 46113152634..b3ea9856e97 100644 --- a/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core-client/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -540,7 +540,7 @@ private IndexMaintainer(final PTable dataTable, final PTable cdcTable, final PTa this.indexedColumnTypes = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.indexedExpressions = Lists.newArrayListWithExpectedSize(nIndexPKColumns-nDataPKColumns); this.coveredColumnsMap = Maps.newHashMapWithExpectedSize(nIndexColumns - nIndexPKColumns); - this.nIndexSaltBuckets = nIndexSaltBuckets == null ? 0 : nIndexSaltBuckets; + this.nIndexSaltBuckets = nIndexSaltBuckets == null ? PTable.NO_SALTING : nIndexSaltBuckets; this.dataEmptyKeyValueCF = SchemaUtil.getEmptyColumnFamily(dataTable); this.emptyKeyValueCFPtr = SchemaUtil.getEmptyColumnFamilyPtr(index); this.nDataCFs = dataTable.getColumnFamilies().size(); @@ -1788,9 +1788,8 @@ public static IndexMaintainer fromProto(ServerCachingProtos.IndexMaintainer prot } else { maintainer.isCDCIndex = false; } - if (proto.hasDataTableSaltBuckets()) { - maintainer.nDataTableSaltBuckets = proto.getDataTableSaltBuckets(); - } + maintainer.nDataTableSaltBuckets = proto.hasDataTableSaltBuckets() ? + proto.getDataTableSaltBuckets() : -1; maintainer.initCachedState(); return maintainer; } From 78e51bcbfd5cd696a0a81931d1dd54bc81436c38 Mon Sep 17 00:00:00 2001 From: Hari Dara Date: Wed, 25 Sep 2024 22:07:19 +0530 Subject: [PATCH 8/8]