Skip to content

Commit

Permalink
PHOENIX-3254 IndexId Sequence is incremented even if index exists alr…
Browse files Browse the repository at this point in the history
…eady.
  • Loading branch information
ankitsinghal committed Sep 29, 2016
1 parent 15a2f9a commit 655fe2b
Show file tree
Hide file tree
Showing 15 changed files with 413 additions and 169 deletions.
Expand Up @@ -114,7 +114,7 @@ private void testTableWithSameSchema(boolean notExists, boolean sameClient) thro
// verify no create table rpcs
verify(connectionQueryServices, never()).createTable(anyListOf(Mutation.class),
any(byte[].class), any(PTableType.class), anyMap(), anyList(), any(byte[][].class),
eq(false));
eq(false), eq(false));
reset(connectionQueryServices);

// execute alter table ddl that adds the same column
Expand Down
Expand Up @@ -28,12 +28,9 @@
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.PNameFactory;
Expand All @@ -42,7 +39,6 @@
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.junit.Ignore;
import org.junit.Test;


Expand Down Expand Up @@ -126,8 +122,8 @@ private void testMultiCFViewIndex(boolean localIndex, boolean isNamespaceEnabled
String sequenceNameA = getViewIndexSequenceName(PNameFactory.newName(tableName), PNameFactory.newName("a"), isNamespaceEnabled);
String sequenceNameB = getViewIndexSequenceName(PNameFactory.newName(tableName), PNameFactory.newName("b"), isNamespaceEnabled);
String sequenceSchemaName = getViewIndexSequenceSchemaName(PNameFactory.newName(tableName), isNamespaceEnabled);
verifySequence(isNamespaceEnabled? "a" : null, sequenceNameA, sequenceSchemaName, true);
verifySequence(isNamespaceEnabled? "b" : null, sequenceNameB, sequenceSchemaName, true);
verifySequenceValue(isNamespaceEnabled? "a" : null, sequenceNameA, sequenceSchemaName, -32767);
verifySequenceValue(isNamespaceEnabled? "b" : null, sequenceNameB, sequenceSchemaName, -32767);

Properties props = new Properties();
props.setProperty(PhoenixRuntime.TENANT_ID_ATTRIB, "a");
Expand All @@ -140,8 +136,8 @@ private void testMultiCFViewIndex(boolean localIndex, boolean isNamespaceEnabled
}
DriverManager.getConnection(getUrl()).createStatement().execute("DROP TABLE " + tableName + " CASCADE");

verifySequence(isNamespaceEnabled? "a" : null, sequenceNameA, sequenceSchemaName, false);
verifySequence(isNamespaceEnabled? "b" : null, sequenceNameB, sequenceSchemaName, false);
verifySequenceNotExists(isNamespaceEnabled? "a" : null, sequenceNameA, sequenceSchemaName);
verifySequenceNotExists(isNamespaceEnabled? "b" : null, sequenceNameB, sequenceSchemaName);
}

private void createViewAndIndexesWithTenantId(String tableName,String baseViewName, boolean localIndex, String tenantId,
Expand Down
Expand Up @@ -210,11 +210,10 @@ public void testDropLocalIndexTable() throws Exception {
Connection conn1 = getConnection();
Connection conn2 = getConnection();
conn1.createStatement().execute("CREATE LOCAL INDEX " + indexName + " ON " + tableName + "(v1)");
verifySequence(null, sequenceName, sequenceSchemaName, true);
verifySequenceValue(null, sequenceName, sequenceSchemaName,-32767);
conn2.createStatement().executeQuery("SELECT * FROM " + tableName).next();
conn1.createStatement().execute("DROP TABLE "+ tableName);

verifySequence(null, sequenceName, sequenceSchemaName, false);
verifySequenceNotExists(null, sequenceName, sequenceSchemaName);
}

@Test
Expand Down
Expand Up @@ -108,14 +108,17 @@ public void testDeleteViewIndexSequences() throws Exception {
String sequenceSchemaName = getViewIndexSequenceSchemaName(PNameFactory.newName(tableName), isNamespaceMapped);
String seqName = getViewIndexSequenceName(PNameFactory.newName(tableName), null, !isNamespaceMapped);
String seqSchemaName = getViewIndexSequenceSchemaName(PNameFactory.newName(tableName), !isNamespaceMapped);
verifySequence(null, sequenceName, sequenceSchemaName, true);
verifySequenceValue(null, sequenceName, sequenceSchemaName, -32767);
verifySequenceValue(null, sequenceName, sequenceSchemaName, -32767);
conn1.createStatement().execute("CREATE INDEX " + indexName + "_2 ON " + viewName + " (v1)");
verifySequenceValue(null, sequenceName, sequenceSchemaName, -32766);
// Check other format of sequence is not there as Sequences format is different for views/indexes created on
// table which are namespace mapped and which are not.
verifySequence(null, seqName, seqSchemaName, false);
verifySequenceNotExists(null, seqName, seqSchemaName);
conn1.createStatement().execute("DROP VIEW " + viewName);
conn1.createStatement().execute("DROP TABLE "+ tableName);

verifySequence(null, sequenceName, sequenceSchemaName, false);
verifySequenceNotExists(null, sequenceName, sequenceSchemaName);
}

@Test
Expand Down
Expand Up @@ -195,6 +195,9 @@
import org.apache.phoenix.schema.PTable.ViewType;
import org.apache.phoenix.schema.PTableImpl;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.SequenceAllocation;
import org.apache.phoenix.schema.SequenceAlreadyExistsException;
import org.apache.phoenix.schema.SequenceKey;
import org.apache.phoenix.schema.SequenceNotFoundException;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.TableNotFoundException;
Expand Down Expand Up @@ -1499,6 +1502,53 @@ public void createTable(RpcController controller, CreateTableRequest request,
cell.getTimestamp(), Type.codeToType(cell.getTypeByte()), bytes);
cells.add(viewConstantCell);
}
Short indexId = null;
if (request.hasAllocateIndexId() && request.getAllocateIndexId()) {
String tenantIdStr = tenantIdBytes.length == 0 ? null : Bytes.toString(tenantIdBytes);
final Properties props = new Properties();
UpgradeUtil.doNotUpgradeOnFirstConnection(props);
try (PhoenixConnection connection = DriverManager.getConnection(MetaDataUtil.getJdbcUrl(env), props).unwrap(PhoenixConnection.class)){
PName physicalName = parentTable.getPhysicalName();
int nSequenceSaltBuckets = connection.getQueryServices().getSequenceSaltBuckets();
SequenceKey key = MetaDataUtil.getViewIndexSequenceKey(tenantIdStr, physicalName,
nSequenceSaltBuckets, parentTable.isNamespaceMapped() );
// TODO Review Earlier sequence was created at (SCN-1/LATEST_TIMESTAMP) and incremented at the client max(SCN,dataTable.getTimestamp), but it seems we should
// use always LATEST_TIMESTAMP to avoid seeing wrong sequence values by different connection having SCN
// or not.
long sequenceTimestamp = HConstants.LATEST_TIMESTAMP;
try {
connection.getQueryServices().createSequence(key.getTenantId(), key.getSchemaName(), key.getSequenceName(),
Short.MIN_VALUE, 1, 1, Long.MIN_VALUE, Long.MAX_VALUE, false, sequenceTimestamp);
} catch (SequenceAlreadyExistsException e) {
}
long[] seqValues = new long[1];
SQLException[] sqlExceptions = new SQLException[1];
connection.getQueryServices().incrementSequences(Collections.singletonList(new SequenceAllocation(key, 1)),
HConstants.LATEST_TIMESTAMP, seqValues, sqlExceptions);
if (sqlExceptions[0] != null) {
throw sqlExceptions[0];
}
long seqValue = seqValues[0];
if (seqValue > Short.MAX_VALUE) {
builder.setReturnCode(MetaDataProtos.MutationCode.TOO_MANY_INDEXES);
builder.setMutationTime(EnvironmentEdgeManager.currentTimeMillis());
done.run(builder.build());
return;
}
Put tableHeaderPut = MetaDataUtil.getPutOnlyTableHeaderRow(tableMetadata);
NavigableMap<byte[], List<Cell>> familyCellMap = tableHeaderPut.getFamilyCellMap();
List<Cell> cells = familyCellMap.get(TABLE_FAMILY_BYTES);
Cell cell = cells.get(0);
PDataType dataType = MetaDataUtil.getViewIndexIdDataType();
Object val = dataType.toObject(seqValue, PLong.INSTANCE);
byte[] bytes = new byte [dataType.getByteSize() + 1];
dataType.toBytes(val, bytes, 0);
Cell indexIdCell = new KeyValue(cell.getRow(), cell.getFamily(), VIEW_INDEX_ID_BYTES,
cell.getTimestamp(), Type.codeToType(cell.getTypeByte()), bytes);
cells.add(indexIdCell);
indexId = (short) seqValue;
}
}

// TODO: Switch this to HRegion#batchMutate when we want to support indexes on the
// system table. Basically, we get all the locks that we don't already hold for all the
Expand All @@ -1518,6 +1568,9 @@ public void createTable(RpcController controller, CreateTableRequest request,
// Get timeStamp from mutations - the above method sets it if it's unset
long currentTimeStamp = MetaDataUtil.getClientTimeStamp(tableMetadata);
builder.setReturnCode(MetaDataProtos.MutationCode.TABLE_NOT_FOUND);
if (indexId != null) {
builder.setViewIndexId(indexId);
}
builder.setMutationTime(currentTimeStamp);
done.run(builder.build());
return;
Expand Down
Expand Up @@ -130,6 +130,7 @@ public enum MutationCode {
UNALLOWED_SCHEMA_MUTATION,
AUTO_PARTITION_SEQUENCE_NOT_FOUND,
CANNOT_COERCE_AUTO_PARTITION_ID,
TOO_MANY_INDEXES,
NO_OP
};

Expand Down Expand Up @@ -208,6 +209,7 @@ public static class MetaDataMutationResult {
private byte[] familyName;
private boolean wasUpdated;
private PSchema schema;
private Short viewIndexId;

private List<PFunction> functions = new ArrayList<PFunction>(1);
private long autoPartitionNum;
Expand Down Expand Up @@ -253,6 +255,11 @@ public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable
this.tableNamesToDelete = tableNamesToDelete;
}

public MetaDataMutationResult(MutationCode returnCode, int currentTime, PTable table, int viewIndexId) {
this(returnCode, currentTime, table, Collections.<byte[]> emptyList());
this.viewIndexId = (short)viewIndexId;
}

public MetaDataMutationResult(MutationCode returnCode, long currentTime, PTable table, List<byte[]> tableNamesToDelete, List<SharedTableState> sharedTablesToDelete) {
this(returnCode, currentTime, table, tableNamesToDelete);
this.sharedTablesToDelete = sharedTablesToDelete;
Expand Down Expand Up @@ -305,6 +312,10 @@ public List<SharedTableState> getSharedTablesToDelete() {
public long getAutoPartitionNum() {
return autoPartitionNum;
}

public Short getViewIndexId() {
return viewIndexId;
}

public static MetaDataMutationResult constructFromProto(MetaDataResponse proto) {
MetaDataMutationResult result = new MetaDataMutationResult();
Expand Down Expand Up @@ -347,6 +358,9 @@ public static MetaDataMutationResult constructFromProto(MetaDataResponse proto)
if (proto.hasAutoPartitionNum()) {
result.autoPartitionNum = proto.getAutoPartitionNum();
}
if (proto.hasViewIndexId()) {
result.viewIndexId = (short)proto.getViewIndexId();
}
return result;
}

Expand Down Expand Up @@ -394,6 +408,9 @@ public static MetaDataResponse toProto(MetaDataMutationResult result) {
builder.setSchema(PSchema.toProto(result.schema));
}
builder.setAutoPartitionNum(result.getAutoPartitionNum());
if (result.getViewIndexId() != null) {
builder.setViewIndexId(result.getViewIndexId());
}
}
return builder.build();
}
Expand Down

0 comments on commit 655fe2b

Please sign in to comment.