Skip to content

Commit

Permalink
PHOENIX-4187 Use server timestamp for ROW_TIMESTAMP column when value…
Browse files Browse the repository at this point in the history
… is not specified
  • Loading branch information
twdsilva committed Sep 28, 2017
1 parent 033a2fc commit 45079c4
Show file tree
Hide file tree
Showing 9 changed files with 626 additions and 667 deletions.
Expand Up @@ -2199,7 +2199,7 @@ public void testDeclaringColumnAsRowTimestamp() throws Exception {

String dataTableName2 = BaseTest.generateUniqueName();
String dataTableFullName2 = SchemaUtil.getTableName(schemaName, dataTableName2);
conn.createStatement().execute("CREATE TABLE " + dataTableFullName2 + " (PK1 VARCHAR, PK2 DATE PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR, KV2 INTEGER)");
conn.createStatement().execute("CREATE IMMUTABLE TABLE " + dataTableFullName2 + " (PK1 VARCHAR, PK2 DATE PRIMARY KEY ROW_TIMESTAMP, KV1 VARCHAR, KV2 INTEGER)");
table = phxConn.getTable(new PTableKey(phxConn.getTenantId(), dataTableFullName2));
// Assert that the column shows up as row time stamp in the cache.
assertFalse(table.getColumnForColumnName("PK1").isRowTimestamp());
Expand Down
Expand Up @@ -117,7 +117,7 @@ public void testSecondaryIndex() throws Exception {
conn.createStatement().execute("CREATE SCHEMA IF NOT EXISTS " + schemaName);
}
stmt.execute(
String.format("CREATE TABLE %s (ID BIGINT NOT NULL, NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s",
String.format("CREATE IMMUTABLE TABLE %s (ID BIGINT NOT NULL, NAME VARCHAR, ZIP INTEGER CONSTRAINT PK PRIMARY KEY(ID ROW_TIMESTAMP)) %s",
fullTableName, tableDDLOptions));
String upsertQuery = String.format("UPSERT INTO %s VALUES(?, ?, ?)", fullTableName);
PreparedStatement stmt1 = conn.prepareStatement(upsertQuery);
Expand Down Expand Up @@ -162,8 +162,8 @@ public void testSecondaryIndex() throws Exception {
PTable pindexTable = PhoenixRuntime.getTable(conn, SchemaUtil.getTableName(schemaName, indxTable));
assertEquals(PIndexState.BUILDING, pindexTable.getIndexState());
assertEquals(rs.getLong(1), pindexTable.getTimeStamp());
//assert disabled timestamp
assertEquals(rs.getLong(2), 3000);
//assert disabled timestamp is set correctly when index mutations are processed on the server
assertEquals(0, rs.getLong(2));

String selectSql = String.format("SELECT LPAD(UPPER(NAME),11,'x')||'_xyz',ID FROM %s", fullTableName);
rs = conn.createStatement().executeQuery("EXPLAIN " + selectSql);
Expand Down Expand Up @@ -195,7 +195,7 @@ public void testSecondaryIndex() throws Exception {
conf.set(QueryServices.IS_NAMESPACE_MAPPING_ENABLED, Boolean.toString(isNamespaceEnabled));
indexingTool.setConf(conf);

final String[] cmdArgs = getArgValues(schemaName, dataTableName);
final String[] cmdArgs = getArgValues(schemaName, dataTableName, indxTable);
int status = indexingTool.run(cmdArgs);
assertEquals(0, status);

Expand Down Expand Up @@ -237,15 +237,17 @@ public static void assertExplainPlan(final String actualExplainPlan, String sche
assertTrue(actualExplainPlan.contains(expectedExplainPlan));
}

public String[] getArgValues(String schemaName, String dataTable) {
public String[] getArgValues(String schemaName, String dataTable, String indexName) {
final List<String> args = Lists.newArrayList();
if (schemaName!=null) {
args.add("-s");
args.add(schemaName);
}
args.add("-dt");
args.add(dataTable);
args.add("-pr");
// complete index rebuild
args.add("-it");
args.add(indexName);
args.add("-op");
args.add("/tmp/output/partialTable_");
return args.toArray(new String[0]);
Expand Down

Large diffs are not rendered by default.

361 changes: 135 additions & 226 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertSelectIT.java

Large diffs are not rendered by default.

492 changes: 77 additions & 415 deletions phoenix-core/src/it/java/org/apache/phoenix/end2end/UpsertValuesIT.java

Large diffs are not rendered by default.

Expand Up @@ -221,7 +221,7 @@ public void testCreatingIndexOnGlobalView() throws Exception {
String globalView = generateUniqueName();
String globalViewIdx = generateUniqueName();
try (Connection conn = DriverManager.getConnection(getUrl())) {
conn.createStatement().execute("CREATE TABLE " + baseTable + " (TENANT_ID CHAR(15) NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR, KV2 VARCHAR, KV3 CHAR(15) CONSTRAINT PK PRIMARY KEY(TENANT_ID, PK2 ROW_TIMESTAMP, PK3)) MULTI_TENANT=true");
conn.createStatement().execute("CREATE IMMUTABLE TABLE " + baseTable + " (TENANT_ID CHAR(15) NOT NULL, PK2 DATE NOT NULL, PK3 INTEGER NOT NULL, KV1 VARCHAR, KV2 VARCHAR, KV3 CHAR(15) CONSTRAINT PK PRIMARY KEY(TENANT_ID, PK2 ROW_TIMESTAMP, PK3)) MULTI_TENANT=true");
conn.createStatement().execute("CREATE VIEW " + globalView + " AS SELECT * FROM " + baseTable);
conn.createStatement().execute("CREATE INDEX " + globalViewIdx + " ON " + globalView + " (PK3 DESC, KV3) INCLUDE (KV1)");
PreparedStatement stmt = conn.prepareStatement("UPSERT INTO " + globalView + " (TENANT_ID, PK2, PK3, KV1, KV3) VALUES (?, ?, ?, ?, ?)");
Expand Down
Expand Up @@ -293,6 +293,7 @@ public SQLException newException(SQLExceptionInfo info) {

SEQUENCE_NOT_CASTABLE_TO_AUTO_PARTITION_ID_COLUMN(1086, "44A17", "Sequence Value not castable to auto-partition id column"),
CANNOT_COERCE_AUTO_PARTITION_ID(1087, "44A18", "Auto-partition id cannot be coerced"),
CANNOT_CREATE_INDEX_ON_MUTABLE_TABLE_WITH_ROWTIMESTAMP(1088, "44A19", "Cannot create an index on a mutable table that has a ROW_TIMESTAMP column."),

/** Sequence related */
SEQUENCE_ALREADY_EXIST(1200, "42Z00", "Sequence already exists.", new Factory() {
Expand Down
Expand Up @@ -470,7 +470,7 @@ private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr
}

private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tableRef, final Map<ImmutableBytesPtr, RowMutationState> values,
final long timestamp, boolean includeAllIndexes, final boolean sendAll) {
final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) {
final PTable table = tableRef.getTable();
final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism
includeAllIndexes ?
Expand All @@ -480,7 +480,7 @@ private Iterator<Pair<PName,List<Mutation>>> addRowMutations(final TableRef tabl
Iterators.<PTable>emptyIterator();
final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size());
final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null;
generateMutations(tableRef, timestamp, values, mutationList, mutationsPertainingToIndex);
generateMutations(tableRef, mutationTimestamp, serverTimestamp, values, mutationList, mutationsPertainingToIndex);
return new Iterator<Pair<PName,List<Mutation>>>() {
boolean isFirst = true;

Expand All @@ -507,7 +507,7 @@ public Pair<PName, List<Mutation>> next() {
Map<ImmutableBytesPtr, RowMutationState> rowToColumnMap = mutations.remove(key);
if (rowToColumnMap!=null) {
final List<Mutation> deleteMutations = Lists.newArrayList();
generateMutations(tableRef, timestamp, rowToColumnMap, deleteMutations, null);
generateMutations(tableRef, mutationTimestamp, serverTimestamp, rowToColumnMap, deleteMutations, null);
indexMutations.addAll(deleteMutations);
}
}
Expand All @@ -525,14 +525,14 @@ public void remove() {
};
}

private void generateMutations(final TableRef tableRef, long timestamp,
final Map<ImmutableBytesPtr, RowMutationState> values,
private void generateMutations(final TableRef tableRef, final long mutationTimestamp,
final long serverTimestamp, final Map<ImmutableBytesPtr, RowMutationState> values,
final List<Mutation> mutationList, final List<Mutation> mutationsPertainingToIndex) {
final PTable table = tableRef.getTable();
boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1;
Iterator<Map.Entry<ImmutableBytesPtr, RowMutationState>> iterator =
values.entrySet().iterator();
long timestampToUse = timestamp;
long timestampToUse = mutationTimestamp;
Map<ImmutableBytesPtr, RowMutationState> modifiedValues = Maps.newHashMap();
while (iterator.hasNext()) {
Map.Entry<ImmutableBytesPtr, RowMutationState> rowEntry = iterator.next();
Expand All @@ -543,12 +543,13 @@ private void generateMutations(final TableRef tableRef, long timestamp,
if (tableWithRowTimestampCol) {
RowTimestampColInfo rowTsColInfo = state.getRowTimestampColInfo();
if (rowTsColInfo.useServerTimestamp()) {
// regenerate the key with this timestamp.
key = getNewRowKeyWithRowTimestamp(key, serverTimestamp, table);
// since we are about to modify the byte[] stored in key (which changes its hashcode)
// we need to remove the entry from the values map and add a new entry with the modified byte[]
modifiedValues.put(key, state);
iterator.remove();
// regenerate the key with this timestamp.
key = getNewRowKeyWithRowTimestamp(key, timestampToUse, table);
timestampToUse = serverTimestamp;
} else {
if (rowTsColInfo.getTimestamp() != null) {
timestampToUse = rowTsColInfo.getTimestamp();
Expand Down Expand Up @@ -620,13 +621,14 @@ public Iterator<Pair<byte[],List<Mutation>>> toMutations(final boolean includeMu
return Iterators.emptyIterator();
}
Long scn = connection.getSCN();
final long timestamp = getMutationTimestamp(tableTimestamp, scn);
final long serverTimestamp = getTableTimestamp(tableTimestamp, scn);
final long mutationTimestamp = getMutationTimestamp(scn);
return new Iterator<Pair<byte[],List<Mutation>>>() {
private Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> current = iterator.next();
private Iterator<Pair<byte[],List<Mutation>>> innerIterator = init();

private Iterator<Pair<byte[],List<Mutation>>> init() {
final Iterator<Pair<PName, List<Mutation>>> mutationIterator = addRowMutations(current.getKey(), current.getValue(), timestamp, includeMutableIndexes, true);
final Iterator<Pair<PName, List<Mutation>>> mutationIterator = addRowMutations(current.getKey(), current.getValue(), mutationTimestamp, serverTimestamp, includeMutableIndexes, true);
return new Iterator<Pair<byte[],List<Mutation>>>() {
@Override
public boolean hasNext() {
Expand Down Expand Up @@ -668,9 +670,13 @@ public void remove() {
};
}

public static long getMutationTimestamp(final Long tableTimestamp, Long scn) {
public static long getTableTimestamp(final Long tableTimestamp, Long scn) {
return (tableTimestamp!=null && tableTimestamp!=QueryConstants.UNSET_TIMESTAMP) ? tableTimestamp : (scn == null ? HConstants.LATEST_TIMESTAMP : scn);
}

public static long getMutationTimestamp(final Long scn) {
return scn == null ? HConstants.LATEST_TIMESTAMP : scn;
}

/**
* Validates that the meta data is valid against the server meta data if we haven't yet done so.
Expand All @@ -684,14 +690,15 @@ private long[] validateAll() throws SQLException {
long[] timeStamps = new long[this.mutations.size()];
for (Map.Entry<TableRef, Map<ImmutableBytesPtr,RowMutationState>> entry : mutations.entrySet()) {
TableRef tableRef = entry.getKey();
timeStamps[i++] = validate(tableRef, entry.getValue());
timeStamps[i++] = validateAndGetServerTimestamp(tableRef, entry.getValue());
}
return timeStamps;
}

private long validate(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException {
private long validateAndGetServerTimestamp(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> rowKeyToColumnMap) throws SQLException {
Long scn = connection.getSCN();
MetaDataClient client = new MetaDataClient(connection);
long serverTimeStamp = tableRef.getTimeStamp();
// If we're auto committing, we've already validated the schema when we got the ColumnResolver,
// so no need to do it again here.
PTable table = tableRef.getTable();
Expand All @@ -715,6 +722,7 @@ private long validate(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState
}
long timestamp = result.getMutationTime();
if (timestamp != QueryConstants.UNSET_TIMESTAMP) {
serverTimeStamp = timestamp;
if (result.wasUpdated()) {
List<PColumn> columns = Lists.newArrayListWithExpectedSize(table.getColumns().size());
for (Map.Entry<ImmutableBytesPtr,RowMutationState> rowEntry : rowKeyToColumnMap.entrySet()) {
Expand All @@ -736,7 +744,7 @@ private long validate(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState
}
}
}
return scn == null ? HConstants.LATEST_TIMESTAMP : scn;
return serverTimeStamp == QueryConstants.UNSET_TIMESTAMP ? HConstants.LATEST_TIMESTAMP : serverTimeStamp;
}

private static long calculateMutationSize(List<Mutation> mutations) {
Expand Down Expand Up @@ -913,9 +921,11 @@ private void send(Iterator<TableRef> tableRefIterator) throws SQLException {
continue;
}
// Validate as we go if transactional since we can undo if a problem occurs (which is unlikely)
long serverTimestamp = serverTimeStamps == null ? validate(tableRef, valuesMap) : serverTimeStamps[i++];
long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, valuesMap) : serverTimeStamps[i++];
Long scn = connection.getSCN();
long mutationTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn;
final PTable table = tableRef.getTable();
Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, serverTimestamp, false, sendAll);
Iterator<Pair<PName,List<Mutation>>> mutationsIterator = addRowMutations(tableRef, valuesMap, mutationTimestamp, serverTimestamp, false, sendAll);
// build map from physical table to mutation list
boolean isDataTable = true;
while (mutationsIterator.hasNext()) {
Expand Down
Expand Up @@ -581,8 +581,10 @@ private MetaDataMutationResult updateCache(PName origTenantId, String schemaName
// Do not make rpc to getTable if
// 1. table is a system table
// 2. table was already resolved as of that timestamp
// 3. table does not have a ROW_TIMESTAMP column and age is less then UPDATE_CACHE_FREQUENCY
if (table != null && !alwaysHitServer
&& (systemTable || resolvedTimestamp == tableResolvedTimestamp || connection.getMetaDataCache().getAge(tableRef) < table.getUpdateCacheFrequency())) {
&& (systemTable || resolvedTimestamp == tableResolvedTimestamp ||
(table.getRowTimestampColPos() == -1 && connection.getMetaDataCache().getAge(tableRef) < table.getUpdateCacheFrequency() ))) {
return new MetaDataMutationResult(MutationCode.TABLE_ALREADY_EXISTS, QueryConstants.UNSET_TIMESTAMP, table);
}

Expand Down Expand Up @@ -1425,6 +1427,10 @@ public MutationState createIndex(CreateIndexStatement statement, byte[][] splits
if (!connection.getQueryServices().hasIndexWALCodec() && !dataTable.isTransactional()) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.INVALID_MUTABLE_INDEX_CONFIG).setTableName(indexTableName.getTableName()).build().buildException();
}
boolean tableWithRowTimestampCol = dataTable.getRowTimestampColPos() != -1;
if (tableWithRowTimestampCol) {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.CANNOT_CREATE_INDEX_ON_MUTABLE_TABLE_WITH_ROWTIMESTAMP).setTableName(indexTableName.getTableName()).build().buildException();
}
}
int posOffset = 0;
List<PColumn> pkColumns = dataTable.getPKColumns();
Expand Down Expand Up @@ -3746,7 +3752,7 @@ else if (table.isAppendOnlySchema()) {
if(!indexColumnsToDrop.isEmpty()) {
long indexTableSeqNum = incrementTableSeqNum(index, index.getType(), -indexColumnsToDrop.size(), null, null);
dropColumnMutations(index, indexColumnsToDrop);
long clientTimestamp = MutationState.getMutationTimestamp(timeStamp, connection.getSCN());
long clientTimestamp = MutationState.getTableTimestamp(timeStamp, connection.getSCN());
connection.removeColumn(tenantId, index.getName().getString(),
indexColumnsToDrop, clientTimestamp, indexTableSeqNum,
TransactionUtil.getResolvedTimestamp(connection, index.isTransactional(), clientTimestamp));
Expand Down

0 comments on commit 45079c4

Please sign in to comment.