Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PHOENIX-5211 Consistent Immutable Global Indexes for Non-Transactiona… #517

Closed
wants to merge 1 commit into from

Conversation

gokceni
Copy link
Contributor

@gokceni gokceni commented Jun 11, 2019

…l Tables

long mutationSizeBytes = 0;
long mutationCommitTime = 0;
long numFailedMutations = 0;
;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary ;

}
}

private void sendMutations(Iterator<Entry<TableInfo, List<Mutation>>> mutationsIterator, Span span, ImmutableBytesWritable indexMetaDataPtr)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice refactoring!

try {
desc = connection.getQueryServices().getTableDescriptor(index.getBytes());
} catch (SQLException ex) {
//logger.warn("Error during getTableDescriptor ", ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: commented code.

}

}
return verifiedMutationsMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understood this function correctly, I find the variable name little misleading. verifiedMutationMap --> mutations in the map are verified, which doesn't seem to be the case. Rather, the mutations are from the first phase where verified column is set to false. Please correct me if I am wrong.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see why it is confusing, I will do some more refactoring here

PTable indexTable = null;
PTable dataTable = tableInfo.origTableRef.getTable();

List<PTable> indexes = dataTable.getIndexes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function will return all the indexes, including local indexes. Is that the expectation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fine. It will not match the GlobalIndexChecker index name


List<PTable> indexes = dataTable.getIndexes();
for (PTable index : indexes) {
if (index.getName().equals(tableInfo.hTableName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although private member hTableName is accessible in the Outer class, better to use the public method to access it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ack

upsertRows(conn, tableName, numOfRowsToInsert);
conn.commit();

Thread.sleep(15000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please avoid adding this sleep? I worry about increased test run-time every time we add such sleeps. Perhaps we can check readiness of the index/table - and/or query syscat, or other alternatives.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

+ " ON " + tableName + " (long_pk, varchar_pk)"
+ " INCLUDE (long_col1, long_col2) ";

Statement stmt = conn.createStatement();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a difference between the two createStatement() calls? If not, consistent usage would be nice.

private void createTableAndIndexForConsistentIndex(Connection conn, String tableName, String indexName, int numOfRowsToInsert)
throws Exception {
String ddl = "CREATE TABLE " + TABLE_NAME + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions;
INDEX_DDL =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any risks associated with changing the static member? If there are no real benefits to a static member, perhaps we should make this a function local variable to avoid surprises.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is actually used as part of a coprocessor test class in this file. But the tests that use that coprocessor is disabled. So I think it is ok to make them local

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok .. We can perhaps do this work in another PR. Using statics makes it impossible to run these tests in parallel without worrying about updates happening concurrently.

@@ -107,15 +121,17 @@ public static void doSetup() throws Exception {
Map<String, String> clientProps = Maps.newHashMapWithExpectedSize(2);
clientProps.put(QueryServices.TRANSACTIONS_ENABLED, "true");
clientProps.put(QueryServices.INDEX_POPULATION_SLEEP_TIME, "15000");
clientProps.put(QueryServices.INDEX_REGION_OBSERVER_ENABLED_ATTRIB,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Consistent inlining of this put with others right above.

conn.setAutoCommit(true);
int num = 1;
createTableAndIndexForConsistentIndex(conn, fullTableName, fullIndexName, num);
// TestUtil.waitForIndexRebuild(conn, indexName, PIndexState.ACTIVE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove commented out code.

TABLE_NAME = fullTableName;
try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
conn.setAutoCommit(true);
int num = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/num/numRows

String indexName = "IND_" + generateUniqueName();
String fullTableName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, tableName);
String fullIndexName = SchemaUtil.getTableName(TestUtil.DEFAULT_SCHEMA_NAME, indexName);
TABLE_NAME = fullTableName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same concern here with changing static member field

assertTrue(rs.next());
assertEquals(num-1, rs.getInt(1));

// Disable data table so that delete fails on data table but index table row remains as unverified
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice test! simulating the unverified index row

@@ -24,14 +24,19 @@
import static org.apache.phoenix.monitoring.GlobalClientMetrics.GLOBAL_MUTATION_COMMIT_TIME;
import static org.apache.phoenix.query.QueryServices.WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;
import static org.apache.phoenix.query.QueryServicesOptions.DEFAULT_WILDCARD_QUERY_DYNAMIC_COLS_ATTRIB;
import static org.apache.phoenix.schema.types.PDataType.FALSE_BYTES;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know much about this class yet. Bit with 492 lines of code change, I see no corresponding test changes. Are there no existing unit-tests that need changes? If not, should we start writing them with this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a MutationStateIT class, it might count as unit/it tests.
My changes are mostly refactors. There was a huge send() method, I refactored and moved some part of send code into sendMutations function. This is the place where client creates and sends mutations for index and data table.

try {
desc = connection.getQueryServices().getTableDescriptor(index.getBytes());
} catch (SQLException ex) {
//logger.warn("Error during getTableDescriptor ", ex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we throw this exception instead of silently ignoring it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -17,10 +17,15 @@
*/
package org.apache.phoenix.end2end.index;

import static org.apache.phoenix.end2end.IndexToolIT.assertExplainPlan;
import static org.apache.phoenix.schema.types.PDataType.FALSE_BYTES;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using TRUE_BYTES and FALSE_BYTES, please use IndexRegionObserver.UNVERIFIED_BYTES and IndexRegionObserver.VERIFIED_BYTES. I have changed this recently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
assertTrue(rs.next());
assertEquals(numRows, rs.getInt(1));
verifyRowsForEmptyColValue(conn, fullIndexName, IndexRegionObserver.VERIFIED_BYTES);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return value of verifyRowsForEmptyColValue() is not checked here and in the other places. Is it intentional? If so, what is the purpose of calling verifyRowsForEmptyColValue().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does anyone know if there is a compile time check for such return-value usages (unless there is explicit ignores)?

"EXPLAIN SELECT long_pk, varchar_pk, long_col1 FROM " + TABLE_NAME + " WHERE varchar_pk='varchar2' AND long_pk=2");
String actualExplainPlan = QueryUtil.getExplainPlan(rs);
assertExplainPlan(false, actualExplainPlan, fullTableName, fullIndexName);
assertFalse(rs.next());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test has failed to delete the rows WHERE varchar_pk='varchar2'. What is the purpose of this select with WHERE varchar_pk='varchar2' AND long_pk=2? and why to expect assertFalse(rs.next())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixing it. The tests makes sure we use index for that select. rs.next should actually test the select and see it is not returning data.

// build map from physical table to mutation list
boolean isDataTable = true;
while (mutationsIterator.hasNext()) {
Pair<PName, List<Mutation>> pair = mutationsIterator.next();
PName hTableName = pair.getFirst();
List<Mutation> mutationList = pair.getSecond();
if (clonedDataMutations.size() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clonedDataMutations are not used anymore. We can remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

filterIndexCheckerMutations(physicalTableMutationMap, unverifiedIndexMutations,
verifiedOrDeletedIndexMutations);

// Phase 1: Send verified indexes with VERIFIED=false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest rephrasing the comment, for example:
// Phase 1: Send index mutations with the empty column value = "unverified"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

physicalTableMutationMap.entrySet().iterator();
sendMutations(mutationsIterator, span, indexMetaDataPtr);

// Phase 3: Send verfied indexes with VERIFIED = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest rephrasing the comment, for example:
// Phase 3: Send put index mutations with the empty column value = "verified" and/or delete index mutations

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -957,7 +955,8 @@ public static void waitForIndexState(Connection conn, String fullIndexName, PInd
}
}
} while (++nTries < maxTries);
fail("Ran out of time waiting for index state to become " + expectedIndexState);
fail("Ran out of time waiting for index state to become " + expectedIndexState + " last seen actual state is " +
(actualIndexState == null ? "" : actualIndexState.toString()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Perhaps "Unknown" instead of ""?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

rs = conn.createStatement().executeQuery("SELECT COUNT(*) FROM " + fullIndexName);
assertTrue(rs.next());
assertEquals(numRows, rs.getInt(1));
verifyRowsForEmptyColValue(conn, fullIndexName, IndexRegionObserver.VERIFIED_BYTES);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does anyone know if there is a compile time check for such return-value usages (unless there is explicit ignores)?

private void createTableAndIndexForConsistentIndex(Connection conn, String tableName, String indexName, int numOfRowsToInsert)
throws Exception {
String ddl = "CREATE TABLE " + TABLE_NAME + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions;
INDEX_DDL =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok .. We can perhaps do this work in another PR. Using statics makes it impossible to run these tests in parallel without worrying about updates happening concurrently.

verifiedOrDeletedIndexMutations);

// Phase 1: Send verified indexes with VERIFIED=false
// addRowMutations generates the mutations with VERIFIED=false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the comment with "addRowMutations" relevant here? If it is, I see that getting called from send() - so is it modifying the unverifiedIndexMutations iterator?

Iterator<Entry<TableInfo, List<Mutation>>>
mutationsIterator =
physicalTableMutationMap.entrySet().iterator();
sendMutations(mutationsIterator, span, indexMetaDataPtr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: For readability, we can simply make this sendMutations(physicalTableMutationMap.entrySet().iterator(), span, indexMetaDataPtr) similar to Phase1 & Phase3 send calls.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

sendMutations(verifiedOrDeletedIndexMutations.entrySet().iterator(), span, indexMetaDataPtr);
} catch (SQLException ex) {
LOGGER.warn(
"Ignoring exception that happened during setting index verified value to verified=TRUE",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this log-line need table/index names or row-key?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gokceni - I agree with @priyankporwal , we should log the table and index names

} else {
// These mutations already have Unverified_Bytes set.
addToMap(unverifiedIndexMutations, tableInfo, m);
// We will need to have verified byte set version for the Phase 3.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "We will need .." sounds like that is being done somewhere else. Perhaps rephrase as "set verified bytes for phase3"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok


List<Mutation> mutations = pair.getValue();

if (mutations.size() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this size check? Won't the for loop be sufficient?

@gokceni gokceni force-pushed the PHOENIX-5211 branch 3 times, most recently from 1c6679f to 5b3c9fc Compare June 18, 2019 18:33
Copy link
Contributor

@kadirozde kadirozde left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work!

Copy link
Contributor

@gjacoby126 gjacoby126 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gokceni - As others have said, good job on the MutationState cleanup. I found mostly small stuff, but please see my comment about the perf of frequent checking for GlobalIndexChecker coprocessor.


private void createAndPopulateTableAndIndexForConsistentIndex(Connection conn, String tableName, String indexName, int numOfRowsToInsert)
throws Exception {
String ddl = "CREATE TABLE " + TABLE_NAME + TestUtil.TEST_TABLE_SCHEMA + tableDDLOptions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gokceni - passing in a tableName parameter but not using it here, since the create statement is using the static value. Even if we need the static value to be set for some other reason, (and I agree with @priyankporwal that we ideally shouldn't need that), within this function we should be consistent.

sendMutations(verifiedOrDeletedIndexMutations.entrySet().iterator(), span, indexMetaDataPtr);
} catch (SQLException ex) {
LOGGER.warn(
"Ignoring exception that happened during setting index verified value to verified=TRUE",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gokceni - I agree with @priyankporwal , we should log the table and index names

@@ -911,7 +911,8 @@ private void addCoprocessors(byte[] tableName, TableDescriptorBuilder builder, P
QueryServicesOptions.DEFAULT_INDEX_REGION_OBSERVER_ENABLED);

if (tableType == PTableType.INDEX && !isTransactional) {
if (globalIndexerEnabled && !newDesc.hasCoprocessor(GlobalIndexChecker.class.getName())) {
if (globalIndexerEnabled && !newDesc.hasCoprocessor(GlobalIndexChecker.class.getName()) &&
!isLocalIndexTable(builder.build().getColumnFamilyNames())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can use newDesc for the TableDescriptor rather than using builder.build() to reconstruct it.


public static boolean isGlobalIndexCheckerEnabled(PhoenixConnection connection, PName index)
throws SQLException {
TableDescriptor desc = desc = connection.getQueryServices().getTableDescriptor(index.getBytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called from generateIndexData which will be high-traffic. What are the perf implications of grabbing the table descriptor very frequently, and do we need caching?

Looks like this creates an HTable each time which will in turn make a remote call to the Master each time
@gokceni @kadirozde

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding to a static map.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I am not sure about caching this. If upgrade tool adds the coprocessor, map will go stale.

return getRawRowCount(table, true);
}

public static int getRawRowCount(Table table, boolean isRaw) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Seems a little odd to have a function named getRawRowCount that doesn't do a raw scan if you pass in false. (As opposed to having a getRowCount funciton with an isRaw parameter.)

@gokceni
Copy link
Contributor Author

gokceni commented Jun 24, 2019

@gjacoby126 for your logging comment, it will complicate the code to get index name and table name at that part of the code. I can print the hashmap though

@gokceni gokceni force-pushed the PHOENIX-5211 branch 9 times, most recently from 218e2fd to 29fd078 Compare June 26, 2019 17:48
@gokceni gokceni force-pushed the PHOENIX-5211 branch 2 times, most recently from b52a78e to 2c50607 Compare June 27, 2019 20:27
@@ -1539,6 +1539,7 @@ public IndexType getIndexType() {
public static PTable createFromProto(PTableProtos.PTable table) {
if (table==null)
return null;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTableImpl.java should be removed from this patch as it is included by mistake

Copy link
Contributor

@kadirozde kadirozde left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

@gjacoby126 gjacoby126 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gokceni - Some nits + a bug when the filter logic encounters an Append mutation, which should be a simple fix.

@@ -137,7 +143,12 @@
public class IndexUtil {
public static final String INDEX_COLUMN_NAME_SEP = ":";
public static final byte[] INDEX_COLUMN_NAME_SEP_BYTES = Bytes.toBytes(INDEX_COLUMN_NAME_SEP);


private static Cache<String, Boolean> IndexNameGlobalIndexCheckerEnabledMap = CacheBuilder.newBuilder()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super tiny nit: variables in Java should start with a lower case letter. Only bother with this if you need to make a change for other reasons. :-)

}
}
IndexNameGlobalIndexCheckerEnabledMap.put(indexName, result);
} catch (TableNotFoundException ex) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can pass in the PTable rather than the PName, you could interrogate the PTable to see whether it's a view index or not and save yourself the RPC and the exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The thing is I don't have PTable, I have just the name. It is in that TableRef object. There is a ptable but it is the base table

// The Delete gets marked as unverified in Phase 1 and gets deleted on Phase 3.
addToMap(unverifiedIndexMutations, tableInfo, put);
addToMap(verifiedOrDeletedIndexMutations, tableInfo, m);
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gokceni - This will fail with a ClassCastException if it encounters an Append, a third kind of Mutation that acts somewhat like a Put, but reads the existing value and increments / concatenates to it.

Since Appends also have an addColumn method, you probably just need another else if for them and similar logic to the Put.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, and Increment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to @kadirozde , "Increment and append should not be supported by Immutable tables by definition
We need to support them for mutable tables". I guess he will have a jira for that.

hTable.delete(new Delete(res.getRow()));
}
}
hTable.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create hTable with try (i.e. try with resources)

Copy link
Contributor

@gjacoby126 gjacoby126 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@kadirozde kadirozde closed this Jul 3, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants