Skip to content

Commit

Permalink
PHOENIX-7014: Query compiler/optimizer changes along with some PHOENI…
Browse files Browse the repository at this point in the history
…X-7015 changes (apache#1766)
  • Loading branch information
haridsv committed Dec 23, 2023
1 parent 21bf5a1 commit 7420443
Show file tree
Hide file tree
Showing 24 changed files with 783 additions and 165 deletions.
5 changes: 5 additions & 0 deletions phoenix-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@
</build>

<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>

<!-- shaded thirdparty dependencies -->
<dependency>
<groupId>org.apache.phoenix.thirdparty</groupId>
Expand Down
148 changes: 137 additions & 11 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
*/
package org.apache.phoenix.end2end;

import com.google.gson.Gson;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableProperty;
Expand All @@ -27,13 +30,16 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import javax.xml.transform.Result;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

Expand All @@ -60,14 +66,18 @@ private void assertCDCState(Connection conn, String cdcName, String expInclude,
}
}

private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes)
private void assertPTable(String cdcName, Set<PTable.CDCChangeScope> expIncludeScopes,
String datatableName)
throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
PTable table = PhoenixRuntime.getTable(conn, cdcName);
assertEquals(expIncludeScopes, table.getCDCIncludeScopes());
assertEquals(expIncludeScopes, TableProperty.INCLUDE.getPTableValue(table));
assertNull(table.getIndexState()); // Index state should be null for CDC.
assertNull(table.getIndexType()); // This is not an index.
assertEquals(datatableName, table.getParentName().getString());
assertEquals(CDCUtil.getCDCIndexName(cdcName), table.getPhysicalName().getString());
}

private void assertSaltBuckets(String cdcName, Integer nbuckets) throws SQLException {
Expand Down Expand Up @@ -117,7 +127,7 @@ public void testCreate() throws SQLException {
try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName +"(ROUND(v1))");
fail("Expected to fail due to non-timestamp expression in the index PK");
fail("Expected to fail due to non-date expression in the index PK");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
e.getErrorCode());
Expand All @@ -126,7 +136,7 @@ public void testCreate() throws SQLException {
try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName +"(v1)");
fail("Expected to fail due to non-timestamp column in the index PK");
fail("Expected to fail due to non-date column in the index PK");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
e.getErrorCode());
Expand All @@ -152,13 +162,13 @@ public void testCreate() throws SQLException {
"(v2) INCLUDE (pre, post) INDEX_TYPE=g");
assertCDCState(conn, cdcName, "PRE,POST", 3);
assertPTable(cdcName, new HashSet<>(
Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)));
Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), tableName);

cdcName = generateUniqueName();
conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
"(v2) INDEX_TYPE=l");
assertCDCState(conn, cdcName, null, 2);
assertPTable(cdcName, null);
assertPTable(cdcName, null, tableName);

String viewName = generateUniqueName();
conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " +
Expand Down Expand Up @@ -209,7 +219,6 @@ public void testCreateCDCMultitenant() throws Exception {
assertEquals("K", cdcPkColumns.get(2).getName().getString());
}

@Test
public void testDropCDC () throws SQLException {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
Expand All @@ -219,11 +228,6 @@ public void testDropCDC () throws SQLException {
+ " v2 DATE)");
String cdcName = generateUniqueName();

String cdc_sql = "CREATE CDC " + cdcName
+ " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
conn.createStatement().execute(cdc_sql);
assertCDCState(conn, cdcName, null, 3);

String drop_cdc_sql = "DROP CDC " + cdcName + " ON " + tableName;
conn.createStatement().execute(drop_cdc_sql);

Expand Down Expand Up @@ -269,4 +273,126 @@ public void testDropCDCIndex () throws SQLException {
}
}

private void assertResultSet(ResultSet rs) throws Exception{
Gson gson = new Gson();
assertEquals(true, rs.next());
assertEquals(1, rs.getInt(2));
assertEquals(new HashMap(){{put("V1", 100d);}}, gson.fromJson(rs.getString(3),
HashMap.class));
assertEquals(true, rs.next());
assertEquals(2, rs.getInt(2));
assertEquals(new HashMap(){{put("V1", 200d);}}, gson.fromJson(rs.getString(3),
HashMap.class));
assertEquals(true, rs.next());
assertEquals(1, rs.getInt(2));
assertEquals(new HashMap(){{put("V1", 101d);}}, gson.fromJson(rs.getString(3),
HashMap.class));
assertEquals(false, rs.next());
rs.close();
}

@Test
public void testSelectCDC() throws Exception {
Properties props = new Properties();
props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
props.put("hbase.client.scanner.timeout.period", "6000000");
props.put("phoenix.query.timeoutMs", "6000000");
props.put("zookeeper.session.timeout", "6000000");
props.put("hbase.rpc.timeout", "6000000");
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
conn.createStatement().execute(
"CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER)");
conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 100)");
conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (2, 200)");
conn.commit();
Thread.sleep(1000);
conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES (1, 101)");
conn.commit();
String cdcName = generateUniqueName();
String cdc_sql = "CREATE CDC " + cdcName
+ " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
conn.createStatement().execute(cdc_sql);
assertCDCState(conn, cdcName, null, 3);
// NOTE: To debug the query execution, add the below condition where you need a breakpoint.
// if (<table>.getTableName().getString().equals("N000002") ||
// <table>.getTableName().getString().equals("__CDC__N000002")) {
// "".isEmpty();
// }
assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName));
assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName +
" WHERE PHOENIX_ROW_TIMESTAMP() < NOW()"));
assertResultSet(conn.createStatement().executeQuery("SELECT /*+ INCLUDE(PRE, POST) */ * " +
"FROM " + cdcName));
assertResultSet(conn.createStatement().executeQuery("SELECT " +
"PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + cdcName));

HashMap<String, int[]> testQueries = new HashMap<String, int[]>() {{
put("SELECT 'dummy', k FROM " + cdcName, new int [] {2, 1});
put("SELECT * FROM " + cdcName +
" ORDER BY k ASC", new int [] {1, 1, 2});
put("SELECT * FROM " + cdcName +
" ORDER BY k DESC", new int [] {2, 1, 1});
put("SELECT * FROM " + cdcName +
" ORDER BY PHOENIX_ROW_TIMESTAMP() ASC", new int [] {1, 2, 1});
}};
for (Map.Entry<String, int[]> testQuery: testQueries.entrySet()) {
try (ResultSet rs = conn.createStatement().executeQuery(testQuery.getKey())) {
for (int k: testQuery.getValue()) {
assertEquals(true, rs.next());
assertEquals(k, rs.getInt(2));
}
assertEquals(false, rs.next());
}
}

try (ResultSet rs = conn.createStatement().executeQuery(
"SELECT * FROM " + cdcName + " WHERE PHOENIX_ROW_TIMESTAMP() > NOW()")) {
assertEquals(false, rs.next());
}
try (ResultSet rs = conn.createStatement().executeQuery("SELECT 'abc' FROM " + cdcName)) {
assertEquals(true, rs.next());
assertEquals("abc", rs.getString(1));
assertEquals(true, rs.next());
assertEquals("abc", rs.getString(1));
assertEquals(false, rs.next());
}
}

// Temporary test case used as a reference for debugging and comparing against the CDC query.
@Test
public void testSelectUncoveredIndex() throws Exception {
Properties props = new Properties();
props.put(QueryServices.TASK_HANDLING_INTERVAL_MS_ATTRIB, Long.toString(Long.MAX_VALUE));
props.put("hbase.client.scanner.timeout.period", "6000000");
props.put("phoenix.query.timeoutMs", "6000000");
props.put("zookeeper.session.timeout", "6000000");
props.put("hbase.rpc.timeout", "6000000");
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
conn.createStatement().execute(
"CREATE TABLE " + tableName + " (k INTEGER PRIMARY KEY, v1 INTEGER)");
conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES" +
" (1, 100)");
conn.createStatement().execute("UPSERT INTO " + tableName + " (k, v1) VALUES" +
" (2, 200)");
conn.commit();
String indexName = generateUniqueName();
String index_sql = "CREATE UNCOVERED INDEX " + indexName
+ " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
conn.createStatement().execute(index_sql);
//ResultSet rs =
// conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName +
// " " + indexName + ") */ * FROM " + tableName);
ResultSet rs =
conn.createStatement().executeQuery("SELECT /*+ INDEX(" + tableName +
" " + indexName + ") */ K, V1, PHOENIX_ROW_TIMESTAMP() FROM " + tableName);
assertEquals(true, rs.next());
assertEquals(1, rs.getInt(1));
assertEquals(100, rs.getInt(2));
assertEquals(true, rs.next());
assertEquals(2, rs.getInt(1));
assertEquals(200, rs.getInt(2));
assertEquals(false, rs.next());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void testUncoveredQueryWithPhoenixRowTimestamp() throws Exception {
String timeZoneID = Calendar.getInstance().getTimeZone().getID();
// Write a query to get the val2 = 'bc' with a time range query
String query = "SELECT"+ (uncovered ? " " : "/*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ ")
+ "val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName
+ "val1, val2, PHOENIX_ROW_TIMESTAMP(), val3 from " + dataTableName
+ " WHERE val1 = 'bc' AND " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('"
+ before.toString() + "','yyyy-MM-dd HH:mm:ss.SSS', '"
+ timeZoneID + "') AND " + "PHOENIX_ROW_TIMESTAMP() < TO_DATE('" + after
Expand All @@ -186,8 +186,10 @@ public void testUncoveredQueryWithPhoenixRowTimestamp() throws Exception {
assertTrue(rs.next());
assertEquals("bc", rs.getString(1));
assertEquals("bcd", rs.getString(2));
assertEquals("bcd", rs.getString(2));
assertTrue(rs.getTimestamp(3).after(before));
assertTrue(rs.getTimestamp(3).before(after));
assertEquals("bcde", rs.getString(4));
assertFalse(rs.next());
// Count the number of index rows
rs = conn.createStatement().executeQuery("SELECT COUNT(*) from " + indexTableName);
Expand All @@ -206,10 +208,11 @@ public void testUncoveredQueryWithPhoenixRowTimestamp() throws Exception {
assertEquals("bcd", rs.getString(2));
assertTrue(rs.getTimestamp(3).after(before));
assertTrue(rs.getTimestamp(3).before(after));
assertEquals("bcde", rs.getString(4));
assertFalse(rs.next());
// Write a time range query to get the last row with val2 ='bc'
query = "SELECT"+ (uncovered ? " " : "/*+ INDEX(" + dataTableName + " " + indexTableName + ")*/ ")
+"val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName +
+"val1, val2, PHOENIX_ROW_TIMESTAMP(), val3 from " + dataTableName +
" WHERE val1 = 'bc' AND " + "PHOENIX_ROW_TIMESTAMP() > TO_DATE('" + after
+ "','yyyy-MM-dd HH:mm:ss.SSS', '" + timeZoneID + "')";
// Verify that we will read from the index table
Expand All @@ -219,6 +222,7 @@ public void testUncoveredQueryWithPhoenixRowTimestamp() throws Exception {
assertEquals("bc", rs.getString(1));
assertEquals("ccc", rs.getString(2));
assertTrue(rs.getTimestamp(3).after(after));
assertEquals("cccc", rs.getString(4));
assertFalse(rs.next());
// Verify that we can execute the same query without using the index
String noIndexQuery = "SELECT /*+ NO_INDEX */ val1, val2, PHOENIX_ROW_TIMESTAMP() from " + dataTableName + " WHERE val1 = 'bc' AND " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ private static void projectAllIndexColumns(StatementContext context, TableRef ta
ColumnRef ref = null;
try {
indexColumn = index.getColumnForColumnName(indexColName);
//TODO could should we do this more efficiently than catching the expcetion ?
// TODO: Should we do this more efficiently than catching the exception ?
} catch (ColumnNotFoundException e) {
if (IndexUtil.shouldIndexBeUsedForUncoveredQuery(tableRef)) {
//Projected columns have the same name as in the data table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.util.Map;
import java.util.Set;

import org.apache.phoenix.parse.HintNode;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.thirdparty.com.google.common.base.Optional;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Pair;
Expand Down Expand Up @@ -82,6 +85,7 @@
import org.apache.phoenix.schema.RowValueConstructorOffsetNotCoercibleException;
import org.apache.phoenix.schema.TableNotFoundException;
import org.apache.phoenix.schema.TableRef;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.ParseNodeUtil;
import org.apache.phoenix.util.ParseNodeUtil.RewriteResult;
Expand Down Expand Up @@ -698,6 +702,14 @@ protected QueryPlan compileSingleFlatQuery(
if (projectedTable != null) {
context.setResolver(FromCompiler.getResolverForProjectedTable(projectedTable, context.getConnection(), select.getUdfParseNodes()));
}

if (context.getCurrentTable().getTable().getType() == PTableType.CDC) {
// This will get the data column added to the context so that projection can get
// serialized..
context.getDataColumnPosition(
context.getCurrentTable().getTable().getColumnForColumnName(
QueryConstants.CDC_JSON_COL_NAME));
}
}

ColumnResolver resolver = context.getResolver();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;

import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
Expand All @@ -46,6 +45,7 @@
import org.apache.phoenix.schema.types.PTime;
import org.apache.phoenix.schema.types.PTimestamp;
import org.apache.phoenix.thirdparty.com.google.common.collect.Maps;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.NumberUtil;
import org.apache.phoenix.util.ReadOnlyProps;
Expand Down Expand Up @@ -84,6 +84,7 @@ public class StatementContext {
private QueryLogger queryLogger;
private boolean isClientSideUpsertSelect;
private boolean isUncoveredIndex;
private String cdcIncludeScopes;

public StatementContext(PhoenixStatement statement) {
this(statement, new Scan());
Expand Down Expand Up @@ -378,4 +379,11 @@ public boolean getRetryingPersistentCache(long cacheId) {
return retrying;
}
}
public String getEncodedCdcIncludeScopes() {
return cdcIncludeScopes;
}

public void setCDCIncludeScopes(Set<PTable.CDCChangeScope> cdcIncludeScopes) {
this.cdcIncludeScopes = CDCUtil.makeChangeScopeStringFromEnums(cdcIncludeScopes);
}
}

0 comments on commit 7420443

Please sign in to comment.