Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-7014: Query compiler/optimizer changes along with some PHOENIX-7015 changes #1766

Merged
merged 29 commits into from
Dec 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c57f9b4
Simplified hint parsing logic
haridsv Oct 10, 2023
74c0c9e
Set parent table info for CDC PTable
haridsv Oct 10, 2023
9bab359
Experimental client side compiler and optimizer changes with a lot of…
haridsv Nov 13, 2023
884db9e
Update PHOENIX_ROW_TIMESTAMP to DATE type to be consistent with that …
haridsv Nov 13, 2023
79b5090
Remove debug code
haridsv Nov 14, 2023
5291bbf
Add a new assert for unimplemented functionality and also implement a…
haridsv Nov 14, 2023
a9e5e33
Fixex for failing hint test scenarios
haridsv Nov 15, 2023
7ab3b95
Go through the uncovered index via hint
haridsv Nov 15, 2023
fce7ea6
A small deduping attempt
haridsv Nov 15, 2023
66f99d2
Fix typo
haridsv Nov 15, 2023
c0f2910
Add test coverage to catch my earlier typo in BaseQueryPlan
haridsv Nov 15, 2023
6ec934f
Update test case after previous typo fix
haridsv Nov 16, 2023
1e4ade2
Parking ad-hoc changes to get projection type with correct expression…
haridsv Nov 21, 2023
291049e
Removed previous CDC specific optimizer code and bypassed optimizer c…
haridsv Nov 23, 2023
3fc774b
revert whitespace diff and remove some dead code.
haridsv Nov 23, 2023
e97ff2b
Narrow down some more diff
haridsv Nov 24, 2023
2a6b4f9
Got the mock data to come through to the client side as expected
haridsv Nov 27, 2023
1f6b6fc
Address a FIXME and remove some debug code
haridsv Nov 27, 2023
002ddb6
Recognize CDC hints and serialize into a scan attribute
haridsv Nov 27, 2023
1a9d19e
Remove debug code
haridsv Nov 27, 2023
d810886
Experimental CDC scanner with raw scan
haridsv Dec 7, 2023
a3a09b3
Snapashot of experimental changes
haridsv Dec 11, 2023
733420f
Snapshot of experimental changes with timeline
haridsv Dec 12, 2023
8fae9d6
Merge remote-tracking branch 'upstream/PHOENIX-7001-feature' into PHO…
haridsv Dec 13, 2023
26f7356
Switch from TreeMap to HashMap where possible
haridsv Dec 14, 2023
a9244c5
Reuse scanDataRows from base class
haridsv Dec 15, 2023
d9dce19
Address spotbugs
haridsv Dec 15, 2023
72faab4
Fix an NPE and add more test cases
haridsv Dec 19, 2023
ea2d8fa
Clone byte arrays for additional scenarios
haridsv Dec 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions phoenix-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@
</build>

<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>

<!-- shaded thirdparty dependencies -->
<dependency>
<groupId>org.apache.phoenix.thirdparty</groupId>
Expand Down
148 changes: 137 additions & 11 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
*/
package org.apache.phoenix.end2end;

import com.google.gson.Gson;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableProperty;
Expand All @@ -27,13 +30,16 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import javax.xml.transform.Result;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

Expand All @@ -60,14 +66,18 @@ private void assertCDCState(Connection conn, String cdcName, String expInclude,
}
}

private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes)
private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes,
String datatableName)
throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
PTable table = PhoenixRuntime.getTable(conn, cdcName);
assertEquals(expIncludeScopes, table.getCDCIncludeScopes());
assertEquals(expIncludeScopes, TableProperty.INCLUDE.getPTableValue(table));
assertNull(table.getIndexState()); // Index state should be null for CDC.
assertNull(table.getIndexType()); // This is not an index.
assertEquals(datatableName, table.getParentName().getString());
assertEquals(CDCUtil.getCDCIndexName(cdcName), table.getPhysicalName().getString());
}

private void assertSaltBuckets(String cdcName, Integer nbuckets) throws SQLException {
Expand Down Expand Up @@ -117,7 +127,7 @@ public void testCreate() throws SQLException {
try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName +"(ROUND(v1))");
fail("Expected to fail due to non-timestamp expression in the index PK");
fail("Expected to fail due to non-date expression in the index PK");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
e.getErrorCode());
Expand All @@ -126,7 +136,7 @@ public void testCreate() throws SQLException {
try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName +"(v1)");
fail("Expected to fail due to non-timestamp column in the index PK");
fail("Expected to fail due to non-date column in the index PK");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
e.getErrorCode());
Expand All @@ -152,13 +162,13 @@ public void testCreate() throws SQLException {
"(v2) INCLUDE (pre, post) INDEX_TYPE=g");
assertCDCState(conn, cdcName, "PRE,POST", 3);
assertPTable(cdcName, new HashSet<>(
Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)));
Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), tableName);

cdcName = generateUniqueName();
conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
"(v2) INDEX_TYPE=l");
assertCDCState(conn, cdcName, null, 2);
assertPTable(cdcName, null);
assertPTable(cdcName, null, tableName);

String viewName = generateUniqueName();
conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " +
Expand Down Expand Up @@ -209,7 +219,6 @@ public void testCreateCDCMultitenant() throws Exception {
assertEquals("K", cdcPkColumns.get(2).getName().getString());
}

@Test
public void testDropCDC () throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
Expand All @@ -219,11 +228,6 @@ public void testDropCDC () throws SQLException {
+ " v2 DATE)");
String cdcName = generateUniqueName();

String cdc_sql = "CREATE CDC " + cdcName
+ " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
conn.createStatement().execute(cdc_sql);
assertCDCState(conn, cdcName, null, 3);

String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
conn.createStatement().execute(drop_cdc_sql);

Expand Down Expand Up @@ -269,4 +273,126 @@ public void testDropCDCIndex () throws SQLException {
}
}

private void assertResultSet(ResultSet rs) throws Exception{
Gson gson = new Gson();
assertEquals(true, rs.next());
assertEquals(1, rs.getInt(2));
assertEquals(new HashMap(){{put("V1", 100d);}}, gson.fromJson(rs.getString(3),
HashMap.class));
assertEquals(true, rs.next());
assertEquals(2, rs.getInt(2));
assertEquals(new HashMap(){{put("V1", 200d);}}, gson.fromJson(rs.getString(3),
HashMap.class));
assertEquals(true, rs.next());
assertEquals(1, rs.getInt(2));
assertEquals(new HashMap(){{put("V1", 101d);}}, gson.fromJson(rs.getString(3),
HashMap.class));
assertEquals(false, rs.next());
rs.close();
}

@Test
public void testSelectCDC() throws Exception {
Properties props = new Properties();
props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
props.put("hbase.client.scanner.timeout.period", "6000000");
props.put("phoenix.query.timeoutMs", "6000000");
props.put("zookeeper.session.timeout", "6000000");
props.put("hbase.rpc.timeout", "6000000");
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
conn.createStatement().execute(
"CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)");
conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)");
conn.commit();
Thread.sleep(1000);
conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
conn.commit();
String cdcName = generateUniqueName();
String cdc_sql = "CREATE CDC " + cdcName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everytime we run CREATE CDC do we need to pass PHOENIX_ROW_TIMESTAMP ? If that is something mandatory we shouldn't make it part of the interface and the system should do it implicitly under the hood.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it seems the uncovered index that is being created as part of CREATE CDC is being created synchronously. That will only work for small tables. For bigger tables the index needs to be created async and then built explicitly using IndexTool.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everytime we run CREATE CDC do we need to pass PHOENIX_ROW_TIMESTAMP ? If that is something mandatory we shouldn't make it part of the interface and the system should do it implicitly under the hood.

The feature allows any timestamp like column to be used instead of the PHOENIX_ROW_TIMESTMAP that is why this flexibility is being allowed, however I am not 100% sure about the use cases, I will have an offline discussion on this, thank you!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For bigger tables the index needs to be created async and then built explicitly using IndexTool.

If we do this, won't we also need to project the index status via CDC object, since we intend to keep the index hidden as an implementation detail? Do you think we should have an offline mode for CREATE CDC itself?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do this, won't we also need to project the index status via CDC object

I think that can be later improvement but supporting creation of async index is necessary because creating sync index for large table will anyways be too expensive or not possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do this, won't we also need to project the index status via CDC object

@haridsv I didn't quite understand what you mean by index status here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't quite understand what you mean by index status here ?

I was referring to the async index creation lifecycle, which should be attributed to the CDC so that user can query and know when it is done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tkhurana In any case, I am not planning to address these as part of this PR, so are you OK to merge this as is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, we can tackle that in a separate PR.

Copy link
Contributor Author

@haridsv haridsv Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tkhurana Async index creation is addressed as part of #1802.

Also removed the need to explicitly specify PHOENIX_ROW_TIMESTAMP() as we dropped the support for user specified timestamp column.

+ " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
conn.createStatement().execute(cdc_sql);
assertCDCState(conn, cdcName, null, 3);
// NOTE: To debug the query execution, add the below condition where you need a breakpoint.
// if (<table>.getTableName().getString().equals("N000002") ||
// <table>.getTableName().getString().equals("__CDC__N000002")) {
// "".isEmpty();
// }
assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName));
assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName +
" WHERE PHOENIX_ROW_TIMESTAMP() < NOW()"));
assertResultSet(conn.createStatement().executeQuery("SELECT /*+ INCLUDE(PRE, POST) */ * " +
"FROM " + cdcName));
assertResultSet(conn.createStatement().executeQuery("SELECT " +
"PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + cdcName));

HashMap<String, int[]> testQueries = new HashMap<String, int[]>() {{
put("SELECT 'dummy', k FROM " + cdcName, new int [] {2, 1});
put("SELECT * FROM " + cdcName +
" ORDER BY k ASC", new int [] {1, 1, 2});
put("SELECT * FROM " + cdcName +
" ORDER BY k DESC", new int [] {2, 1, 1});
put("SELECT * FROM " + cdcName +
" ORDER BY PHOENIX_ROW_TIMESTAMP() ASC", new int [] {1, 2, 1});
}};
for (Map.Entry<String, int[]> testQuery: testQueries.entrySet()) {
try (ResultSet rs = conn.createStatement().executeQuery(testQuery.getKey())) {
for (int k: testQuery.getValue()) {
assertEquals(true, rs.next());
assertEquals(k, rs.getInt(2));
}
assertEquals(false, rs.next());
}
}

try (ResultSet rs = conn.createStatement().executeQuery(
"SELECT * FROM " + cdcName + " WHERE PHOENIX_ROW_TIMESTAMP() > NOW()")) {
assertEquals(false, rs.next());
}
try (ResultSet rs = conn.createStatement().executeQuery("SELECT 'abc' FROM " + cdcName)) {
assertEquals(true, rs.next());
assertEquals("abc", rs.getString(1));
assertEquals(true, rs.next());
assertEquals("abc", rs.getString(1));
assertEquals(false, rs.next());
}
}

// Temporary test case used as a reference for debugging and comparing against the CDC query.
@Test
public void testSelectUncoveredIndex() throws Exception {
Properties props = new Properties();
props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
props.put("hbase.client.scanner.timeout.period", "6000000");
props.put("phoenix.query.timeoutMs", "6000000");
props.put("zookeeper.session.timeout", "6000000");
props.put("hbase.rpc.timeout", "6000000");
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
conn.createStatement().execute(
"CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, v1 INTEGER)");
conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES" +
" (1, 100)");
conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES" +
" (2, 200)");
conn.commit();
String indexName = generateUniqueName();
String index_sql = "CREATE UNCOVERED INDEX " + indexName
+ " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
conn.createStatement().execute(index_sql);
//ResultSet rs =
// conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName +
// " " + indexName + ") */ * FROM " + tableName);
ResultSet rs =
conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName +
" " + indexName + ") */ K, V1, PHOENIX_ROW_TIMESTAMP() FROM " + tableName);
assertEquals(true, rs.next());
assertEquals(1, rs.getInt(1));
assertEquals(100, rs.getInt(2));
assertEquals(true, rs.next());
assertEquals(2, rs.getInt(1));
assertEquals(200, rs.getInt(2));
assertEquals(false, rs.next());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void testUncoveredQueryWithPhoenixRowTimestamp() throws Exception {
String timeZoneID = Calendar.getInstance().getTimeZone().getID();
// Write a query to get the val2 = 'bc' with a time range query
String query = "SELECT"+ (uncovered ? " " : "/*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ ")
+ "val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName
+ "val1, val2, PHOENIX_ROW_TIMESTAMP(), val3 from " + dataTableName
+ " WHERE val1 = 'bc' AND " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('"
+ before.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '"
+ timeZoneID + "') AND " + "PHOENIX_ROW_TIMESTAMP() < TO_DATE('" + after
Expand All @@ -186,8 +186,10 @@ public void testUncoveredQueryWithPhoenixRowTimestamp() throws Exception {
assertTrue(rs.next());
assertEquals("bc", rs.getString(1));
assertEquals("bcd", rs.getString(2));
assertEquals("bcd", rs.getString(2));
assertTrue(rs.getTimestamp(3).after(before));
assertTrue(rs.getTimestamp(3).before(after));
assertEquals("bcde", rs.getString(4));
assertFalse(rs.next());
// Count the number of index rows
rs = conn.createStatement().executeQuery("SELECT COUNT(*) from " + indexTableName);
Expand All @@ -206,10 +208,11 @@ public void testUncoveredQueryWithPhoenixRowTimestamp() throws Exception {
assertEquals("bcd", rs.getString(2));
assertTrue(rs.getTimestamp(3).after(before));
assertTrue(rs.getTimestamp(3).before(after));
assertEquals("bcde", rs.getString(4));
assertFalse(rs.next());
// Write a time range query to get the last row with val2 ='bc'
query = "SELECT"+ (uncovered ? " " : "/*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ ")
+"val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName +
+"val1, val2, PHOENIX_ROW_TIMESTAMP(), val3 from " + dataTableName +
" WHERE val1 = 'bc' AND " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after
+ "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
// Verify that we will read from the index table
Expand All @@ -219,6 +222,7 @@ public void testUncoveredQueryWithPhoenixRowTimestamp() throws Exception {
assertEquals("bc", rs.getString(1));
assertEquals("ccc", rs.getString(2));
assertTrue(rs.getTimestamp(3).after(after));
assertEquals("cccc", rs.getString(4));
assertFalse(rs.next());
// Verify that we can execute the same query without using the index
String noIndexQuery = "SELECT /*+ NO_INDEX */ val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE val1 = 'bc' AND " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private static void projectAllIndexColumns(StatementContext context, TableRef ta
ColumnRef ref = null;
try {
indexColumn = index.getColumnForColumnName(indexColName);
//TODO could should we do this more efficiently than catching the expcetion ?
// TODO: Should we do this more efficiently than catching the exception ?
} catch (ColumnNotFoundException e) {
if (IndexUtil.shouldIndexBeUsedForUncoveredQuery(tableRef)) {
//Projected columns have the same name as in the data table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.util.Map;
import java.util.Set;

import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
Expand Down Expand Up @@ -82,6 +85,7 @@
import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ParseNodeUtil;
import org.apache.phoenix.util.ParseNodeUtil.RewriteResult;
Expand Down Expand Up @@ -698,6 +702,14 @@ protected QueryPlan compileSingleFlatQuery(
if (projectedTable != null) {
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), select.getUdfParseNodes()));
}

if (context.getCurrentTable().getTable().getType() == PTableType.CDC) {
// This will get the data column added to the context so that projection can get
// serialized..
context.getDataColumnPosition(
context.getCurrentTable().getTable().getColumnForColumnName(
QueryConstants.CDC_JSON_COL_NAME));
}
}

ColumnResolver resolver = context.getResolver();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;

import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
Expand All @@ -46,6 +45,7 @@
import org.apache.phoenix.schema.types.PTime;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.NumberUtil;
import org.apache.phoenix.util.ReadOnlyProps;
Expand Down Expand Up @@ -84,6 +84,7 @@ public class StatementContext {
private QueryLogger queryLogger;
private boolean isClientSideUpsertSelect;
private boolean isUncoveredIndex;
private String cdcIncludeScopes;

public StatementContext(PhoenixStatement statement) {
this(statement, new Scan());
Expand Down Expand Up @@ -378,4 +379,11 @@ public boolean getRetryingPersistentCache(long cacheId) {
return retrying;
}
}
public String getEncodedCdcIncludeScopes() {
return cdcIncludeScopes;
}

public void setCDCIncludeScopes(Set<PTable.CDCChangeScope> cdcIncludeScopes) {
this.cdcIncludeScopes = CDCUtil.makeChangeScopeStringFromEnums(cdcIncludeScopes);
}
}