Skip to content

Commit

Permalink
PHOENIX-7008: Addressing Jira spec and review feedback changes
Browse files Browse the repository at this point in the history
Address the changes in the CREATE CDC spec in PHOENIX-7001
Also includes some review feedback changes across the changes in PRs apache#1681, apache#1703 and apache#1766
  • Loading branch information
haridsv committed Jan 23, 2024
1 parent 7420443 commit 77bb242
Show file tree
Hide file tree
Showing 16 changed files with 155 additions and 203 deletions.
131 changes: 61 additions & 70 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/CDCMiscIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,25 @@

import com.google.gson.Gson;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.TableProperty;
import org.apache.phoenix.util.CDCUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import javax.xml.transform.Result;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -48,8 +50,22 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

@RunWith(Parameterized.class)
@Category(ParallelStatsDisabledTest.class)
public class CDCMiscIT extends ParallelStatsDisabledIT {
private final boolean forView;

public CDCMiscIT(boolean forView) {
this.forView = forView;
}

@Parameterized.Parameters(name = "forVieiw={0}")
public static synchronized Collection<Boolean[]> data() {
return Arrays.asList(new Boolean[][] {
{ false}, { true }
});
}

private void assertCDCState(Connection conn, String cdcName, String expInclude,
int idxType) throws SQLException {
try (ResultSet rs = conn.createStatement().executeQuery("SELECT cdc_include FROM " +
Expand Down Expand Up @@ -89,62 +105,50 @@ private void assertSaltBuckets(String cdcName, Integer nbuckets) throws SQLExcep
assertEquals(nbuckets, indexTable.getBucketNum());
}

private void createAndWait(Connection conn, String tableName, String cdcName, String cdc_sql)
throws Exception {
conn.createStatement().execute(cdc_sql);
IndexToolIT.runIndexTool(false, null, tableName,
"\""+CDCUtil.getCDCIndexName(cdcName)+"\"");
TestUtil.waitForIndexState(conn, CDCUtil.getCDCIndexName(cdcName), PIndexState.ACTIVE);
}

@Test
public void testCreate() throws SQLException {
public void testCreate() throws Exception {
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
String tableName = generateUniqueName();
conn.createStatement().execute(
"CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+ " v2 DATE)");
if (forView) {
String viewName = generateUniqueName();
conn.createStatement().execute(
"CREATE VIEW " + viewName + " AS SELECT * FROM " + tableName);
tableName = viewName;
}
String cdcName = generateUniqueName();

try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON NON_EXISTENT_TABLE (PHOENIX_ROW_TIMESTAMP())");
+ " ON NON_EXISTENT_TABLE");
fail("Expected to fail due to non-existent table");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.TABLE_UNDEFINED.getErrorCode(), e.getErrorCode());
}

try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName +"(UNKNOWN_FUNCTION())");
fail("Expected to fail due to invalid function");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.FUNCTION_UNDEFINED.getErrorCode(), e.getErrorCode());
}

try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName +"(NOW())");
fail("Expected to fail due to non-deterministic function");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.NON_DETERMINISTIC_EXPRESSION_NOT_ALLOWED_IN_INDEX.
getErrorCode(), e.getErrorCode());
}

try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName +"(ROUND(v1))");
fail("Expected to fail due to non-date expression in the index PK");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
e.getErrorCode());
}

try {
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName +"(v1)");
fail("Expected to fail due to non-date column in the index PK");
+ " ON " + tableName + " INCLUDE (abc)");
fail("Expected to fail due to invalid INCLUDE");
} catch (SQLException e) {
assertEquals(SQLExceptionCode.INCORRECT_DATATYPE_FOR_EXPRESSION.getErrorCode(),
assertEquals(SQLExceptionCode.UNKNOWN_INCLUDE_CHANGE_SCOPE.getErrorCode(),
e.getErrorCode());
assertTrue(e.getMessage().endsWith("abc"));
}

String cdc_sql = "CREATE CDC " + cdcName
+ " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
conn.createStatement().execute(cdc_sql);
String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
createAndWait(conn, tableName, cdcName, cdc_sql);
assertCDCState(conn, cdcName, null, 3);

try {
Expand All @@ -154,43 +158,32 @@ public void testCreate() throws SQLException {
assertEquals(SQLExceptionCode.TABLE_ALREADY_EXIST.getErrorCode(), e.getErrorCode());
assertTrue(e.getMessage().endsWith(cdcName));
}

conn.createStatement().execute("CREATE CDC IF NOT EXISTS " + cdcName + " ON " + tableName +
"(v2) INCLUDE (pre, post) INDEX_TYPE=g");
" INCLUDE (pre, post) INDEX_TYPE=g");

cdcName = generateUniqueName();
conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
"(v2) INCLUDE (pre, post) INDEX_TYPE=g");
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName +
" INCLUDE (pre, post) INDEX_TYPE=g";
createAndWait(conn, tableName, cdcName, cdc_sql);
assertCDCState(conn, cdcName, "PRE,POST", 3);
assertPTable(cdcName, new HashSet<>(
Arrays.asList(PTable.CDCChangeScope.PRE, PTable.CDCChangeScope.POST)), tableName);

cdcName = generateUniqueName();
conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
"(v2) INDEX_TYPE=l");
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " INDEX_TYPE=l";
createAndWait(conn, tableName, cdcName, cdc_sql);
assertCDCState(conn, cdcName, null, 2);
assertPTable(cdcName, null, tableName);

String viewName = generateUniqueName();
conn.createStatement().execute("CREATE VIEW " + viewName + " AS SELECT * FROM " +
tableName);
cdcName = generateUniqueName();
try {
conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + viewName +
"(PHOENIX_ROW_TIMESTAMP())");
fail("Expected to fail on VIEW");
}
catch(SQLException e) {
assertEquals(SQLExceptionCode.INVALID_TABLE_TYPE_FOR_CDC.getErrorCode(),
e.getErrorCode());
assertTrue(e.getMessage().endsWith(
SQLExceptionCode.INVALID_TABLE_TYPE_FOR_CDC.getMessage() + " tableType=VIEW"));
// Indexes on views don't support salt buckets and is currently silently ignored.
if (! forView) {
cdcName = generateUniqueName();
cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName + " SALT_BUCKETS = 4";
createAndWait(conn, tableName, cdcName, cdc_sql);
assertSaltBuckets(cdcName, 4);
}

cdcName = generateUniqueName();
conn.createStatement().execute("CREATE CDC " + cdcName
+ " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP()) SALT_BUCKETS = 4");
assertSaltBuckets(cdcName, 4);

conn.close();
}

Expand All @@ -203,8 +196,7 @@ public void testCreateCDCMultitenant() throws Exception {
" (tenantId INTEGER NOT NULL, k INTEGER NOT NULL," + " v1 INTEGER, v2 DATE, " +
"CONSTRAINT pk PRIMARY KEY (tenantId, k)) MULTI_TENANT=true");
String cdcName = generateUniqueName();
conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName +
"(PHOENIX_ROW_TIMESTAMP())");
conn.createStatement().execute("CREATE CDC " + cdcName + " ON " + tableName);

PTable indexTable = PhoenixRuntime.getTable(conn, CDCUtil.getCDCIndexName(cdcName));
List<PColumn> idxPkColumns = indexTable.getPKColumns();
Expand All @@ -214,7 +206,7 @@ public void testCreateCDCMultitenant() throws Exception {

PTable cdcTable = PhoenixRuntime.getTable(conn, cdcName);
List<PColumn> cdcPkColumns = cdcTable.getPKColumns();
assertEquals(" PHOENIX_ROW_TIMESTAMP()", cdcPkColumns.get(0).getName().getString());
assertEquals("PHOENIX_ROW_TIMESTAMP()", cdcPkColumns.get(0).getName().getString());
assertEquals("TENANTID", cdcPkColumns.get(1).getName().getString());
assertEquals("K", cdcPkColumns.get(2).getName().getString());
}
Expand Down Expand Up @@ -260,8 +252,7 @@ public void testDropCDCIndex () throws SQLException {
"CREATE TABLE " + tableName + " ( k INTEGER PRIMARY KEY," + " v1 INTEGER,"
+ " v2 DATE)");
String cdcName = generateUniqueName();
String cdc_sql = "CREATE CDC " + cdcName
+ " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
String cdc_sql = "CREATE CDC " + cdcName + " ON " + tableName;
conn.createStatement().execute(cdc_sql);
assertCDCState(conn, cdcName, null, 3);
String drop_cdc_index_sql = "DROP INDEX \"" + CDCUtil.getCDCIndexName(cdcName) + "\" ON " + tableName;
Expand Down Expand Up @@ -311,8 +302,8 @@ public void testSelectCDC() throws Exception {
conn.commit();
String cdcName = generateUniqueName();
String cdc_sql = "CREATE CDC " + cdcName
+ " ON " + tableName + "(PHOENIX_ROW_TIMESTAMP())";
conn.createStatement().execute(cdc_sql);
+ " ON " + tableName;
createAndWait(conn, tableName, cdcName, cdc_sql);
assertCDCState(conn, cdcName, null, 3);
// NOTE: To debug the query execution, add the below condition where you need a breakpoint.
// if (<table>.getTableName().getString().equals("N000002") ||
Expand All @@ -322,8 +313,8 @@ public void testSelectCDC() throws Exception {
assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName));
assertResultSet(conn.createStatement().executeQuery("SELECT * FROM " + cdcName +
" WHERE PHOENIX_ROW_TIMESTAMP() < NOW()"));
assertResultSet(conn.createStatement().executeQuery("SELECT /*+ INCLUDE(PRE, POST) */ * " +
"FROM " + cdcName));
assertResultSet(conn.createStatement().executeQuery("SELECT " +
"/*+ CDC_INCLUDE(PRE, POST) */ * FROM " + cdcName));
assertResultSet(conn.createStatement().executeQuery("SELECT " +
"PHOENIX_ROW_TIMESTAMP(), K, \"CDC JSON\" FROM " + cdcName));

Expand Down
10 changes: 1 addition & 9 deletions phoenix-core/src/main/antlr3/PhoenixSQL.g
Original file line number Diff line number Diff line change
Expand Up @@ -566,25 +566,17 @@ create_index_node returns [CreateIndexStatement ret]

create_cdc_node returns [CreateCDCStatement ret]
: CREATE CDC (IF NOT ex=EXISTS)? o=cdc_name ON t=from_table_name
LPAREN (tcol=column_name | tfunc=cdc_time_func) RPAREN
(INCLUDE LPAREN v=cdc_change_scopes RPAREN)?
(p=fam_properties)?
{
ret = factory.createCDC(o, t, tcol, tfunc, v, p, ex != null, getBindCount());
ret = factory.createCDC(o, t, v, p, ex != null, getBindCount());
}
;

cdc_name returns [NamedNode ret]
: name=identifier {$ret = factory.cdcName(name); }
;

cdc_time_func returns [FunctionParseNode ret]
: field=identifier LPAREN l=zero_or_more_expressions RPAREN
{
ret = factory.function(field, l);
}
;

cdc_change_scopes returns [Set<CDCChangeScope> ret]
@init { ret = new HashSet<>(); }
: v=cdc_change_scope { $ret.add(v); } ( COMMA v=cdc_change_scope { $ret.add(v); } )*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ public SQLException newException(SQLExceptionInfo info) {
VIEW_CANNOT_EXTEND_PK_WITH_PARENT_INDEXES(10956, "44A38", "View can extend parent primary key"
+ " only if none of the parents have indexes in the parent hierarchy"),
UNKNOWN_INDEX_TYPE(1098, "44A39", "Unknown INDEX type: "),
UNKNOWN_INCLUDE_CHANGE_SCOPE(1099, "44A40", "Unknown change scope for INCLUDE: "),
UNKNOWN_INCLUDE_CHANGE_SCOPE(1099, "44A40", "Unknown change scope for CDC INCLUDE"),

/** Sequence related */
SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
Expand Down Expand Up @@ -469,8 +469,6 @@ public SQLException newException(SQLExceptionInfo info) {
"Missing ENCODED_QUALIFIER."),
EXECUTE_BATCH_FOR_STMT_WITH_RESULT_SET(1151, "XCL51", "A batch operation can't include a "
+ "statement that produces result sets.", Factory.BATCH_UPDATE_ERROR),
INVALID_TABLE_TYPE_FOR_CDC(1152, "XCL52",
"Invalid table type for creating CDC."),


/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public class SQLExceptionInfo {
private final int phoenixColumnSizeBytes;
private final int maxPhoenixColumnSizeBytes;
private final String haGroupInfo;
private final String cdcChangeScope;

public static class Builder {
private Throwable rootCause;
Expand All @@ -83,6 +84,7 @@ public static class Builder {
private int maxPhoenixColumnSizeBytes;
private String haGroupInfo;
private PTableType tableType;
private String cdcChangeScope;

public Builder(SQLExceptionCode code) {
this.code = code;
Expand Down Expand Up @@ -163,6 +165,11 @@ public Builder setHaGroupInfo(String haGroupInfo) {
return this;
}

public Builder setCdcChangeScope(String cdcChangeScope) {
this.cdcChangeScope = cdcChangeScope;
return this;
}

public SQLExceptionInfo build() {
return new SQLExceptionInfo(this);
}
Expand Down Expand Up @@ -190,6 +197,7 @@ private SQLExceptionInfo(Builder builder) {
maxPhoenixColumnSizeBytes = builder.maxPhoenixColumnSizeBytes;
phoenixColumnSizeBytes = builder.phoenixColumnSizeBytes;
haGroupInfo = builder.haGroupInfo;
cdcChangeScope = builder.cdcChangeScope;
}

@Override
Expand Down Expand Up @@ -235,6 +243,9 @@ public String toString() {
if (haGroupInfo != null) {
builder.append(" ").append(HA_GROUP_INFO).append("=").append(haGroupInfo);
}
if (cdcChangeScope != null) {
builder.append(": ").append(cdcChangeScope);
}

return builder.toString();
}
Expand Down Expand Up @@ -306,4 +317,8 @@ public int getPhoenixColumnSizeBytes() {
public String getHaGroupInfo() {
return haGroupInfo;
}

public String getCdcChangeScope() {
return cdcChangeScope;
}
}

0 comments on commit 77bb242

Please sign in to comment.