Skip to content

Commit

Permalink
PHOENIX-900 Partial results for mutations
Browse files Browse the repository at this point in the history
  • Loading branch information
elilevine committed Apr 22, 2015
1 parent ed7d0e9 commit 67c4c45
Show file tree
Hide file tree
Showing 10 changed files with 542 additions and 88 deletions.

Large diffs are not rendered by default.

Expand Up @@ -39,6 +39,7 @@
import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.BaseQueryPlan; import org.apache.phoenix.execute.BaseQueryPlan;
import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMetaDataCacheClient; import org.apache.phoenix.index.IndexMetaDataCacheClient;
Expand Down Expand Up @@ -106,8 +107,8 @@ private static MutationState deleteRows(PhoenixStatement statement, TableRef tar
ConnectionQueryServices services = connection.getQueryServices(); ConnectionQueryServices services = connection.getQueryServices();
final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutations = Maps.newHashMapWithExpectedSize(batchSize); Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize);
Map<ImmutableBytesPtr,Map<PColumn,byte[]>> indexMutations = null; Map<ImmutableBytesPtr,RowMutationState> indexMutations = null;
// If indexTableRef is set, we're deleting the rows from both the index table and // If indexTableRef is set, we're deleting the rows from both the index table and
// the data table through a single query to save executing an additional one. // the data table through a single query to save executing an additional one.
if (indexTableRef != null) { if (indexTableRef != null) {
Expand Down Expand Up @@ -147,11 +148,11 @@ private static MutationState deleteRows(PhoenixStatement statement, TableRef tar
} }
table.newKey(ptr, values); table.newKey(ptr, values);
} }
mutations.put(ptr, PRow.DELETE_MARKER); mutations.put(ptr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
if (indexTableRef != null) { if (indexTableRef != null) {
ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map ImmutableBytesPtr indexPtr = new ImmutableBytesPtr(); // allocate new as this is a key in a Map
rs.getCurrentRow().getKey(indexPtr); rs.getCurrentRow().getKey(indexPtr);
indexMutations.put(indexPtr, PRow.DELETE_MARKER); indexMutations.put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
} }
if (mutations.size() > maxSize) { if (mutations.size() > maxSize) {
throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize); throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
Expand Down Expand Up @@ -429,9 +430,9 @@ public MutationState execute() {
// keys for our ranges // keys for our ranges
ScanRanges ranges = context.getScanRanges(); ScanRanges ranges = context.getScanRanges();
Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator(); Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount()); Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
while (iterator.hasNext()) { while (iterator.hasNext()) {
mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), PRow.DELETE_MARKER); mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter()));
} }
return new MutationState(tableRef, mutation, 0, maxSize, connection); return new MutationState(tableRef, mutation, 0, maxSize, connection);
} }
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.expression.Determinism; import org.apache.phoenix.expression.Determinism;
import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression; import org.apache.phoenix.expression.LiteralExpression;
Expand Down Expand Up @@ -95,7 +96,7 @@
import com.google.common.collect.Sets; import com.google.common.collect.Sets;


public class UpsertCompiler { public class UpsertCompiler {
private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation) { private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, PTable table, Map<ImmutableBytesPtr,RowMutationState> mutation, PhoenixStatement statement) {
Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length); Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
byte[][] pkValues = new byte[table.getPKColumns().size()][]; byte[][] pkValues = new byte[table.getPKColumns().size()][];
// If the table uses salting, the first byte is the salting byte, set to an empty array // If the table uses salting, the first byte is the salting byte, set to an empty array
Expand All @@ -114,7 +115,7 @@ private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIn
} }
ImmutableBytesPtr ptr = new ImmutableBytesPtr(); ImmutableBytesPtr ptr = new ImmutableBytesPtr();
table.newKey(ptr, pkValues); table.newKey(ptr, pkValues);
mutation.put(ptr, columnValues); mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter()));
} }


private static MutationState upsertSelect(PhoenixStatement statement, private static MutationState upsertSelect(PhoenixStatement statement,
Expand All @@ -128,7 +129,7 @@ private static MutationState upsertSelect(PhoenixStatement statement,
boolean isAutoCommit = connection.getAutoCommit(); boolean isAutoCommit = connection.getAutoCommit();
byte[][] values = new byte[columnIndexes.length][]; byte[][] values = new byte[columnIndexes.length][];
int rowCount = 0; int rowCount = 0;
Map<ImmutableBytesPtr,Map<PColumn,byte[]>> mutation = Maps.newHashMapWithExpectedSize(batchSize); Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
PTable table = tableRef.getTable(); PTable table = tableRef.getTable();
ResultSet rs = new PhoenixResultSet(iterator, projector, statement); ResultSet rs = new PhoenixResultSet(iterator, projector, statement);
ImmutableBytesWritable ptr = new ImmutableBytesWritable(); ImmutableBytesWritable ptr = new ImmutableBytesWritable();
Expand Down Expand Up @@ -156,7 +157,7 @@ private static MutationState upsertSelect(PhoenixStatement statement,
column.getMaxLength(), column.getScale(), column.getSortOrder()); column.getMaxLength(), column.getScale(), column.getSortOrder());
values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr); values[i] = ByteUtil.copyKeyBytesIfNecessary(ptr);
} }
setValues(values, pkSlotIndexes, columnIndexes, table, mutation); setValues(values, pkSlotIndexes, columnIndexes, table, mutation, statement);
rowCount++; rowCount++;
// Commit a batch if auto commit is true and we're at our batch size // Commit a batch if auto commit is true and we're at our batch size
if (isAutoCommit && rowCount % batchSize == 0) { if (isAutoCommit && rowCount % batchSize == 0) {
Expand Down Expand Up @@ -802,8 +803,8 @@ public MutationState execute() throws SQLException {
throw new IllegalStateException(); throw new IllegalStateException();
} }
} }
Map<ImmutableBytesPtr, Map<PColumn, byte[]>> mutation = Maps.newHashMapWithExpectedSize(1); Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation); setValues(values, pkSlotIndexes, columnIndexes, tableRef.getTable(), mutation, statement);
return new MutationState(tableRef, mutation, 0, maxSize, connection); return new MutationState(tableRef, mutation, 0, maxSize, connection);
} }


Expand Down
Expand Up @@ -19,23 +19,32 @@


import java.sql.SQLException; import java.sql.SQLException;


import org.apache.phoenix.jdbc.PhoenixConnection;

public class CommitException extends SQLException { public class CommitException extends SQLException {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 2L;
private final MutationState uncommittedState; private final int[] uncommittedStatementIndexes;
private final MutationState committedState;


public CommitException(Exception e, MutationState uncommittedState, MutationState committedState) { public CommitException(Exception e, int[] uncommittedStatementIndexes) {
super(e); super(e);
this.uncommittedState = uncommittedState; this.uncommittedStatementIndexes = uncommittedStatementIndexes;
this.committedState = committedState;
}

public MutationState getUncommittedState() {
return uncommittedState;
} }


public MutationState getCommittedState() { /**
return committedState; * Returns indexes of UPSERT and DELETE statements that have failed. Indexes returned
* correspond to each failed statement's order of creation within a {@link PhoenixConnection} up to
* commit/rollback.
* <p>
* Statements whose index is returned in this set correspond to one or more HBase mutations that have failed.
* <p>
* Statement indexes are maintained correctly for connections that mutate and query
* <b>data</b> (DELETE, UPSERT and SELECT) only. Statement (and their subsequent failure) order
* is undefined for connections that execute metadata operations due to the fact that Phoenix rolls
* back connections after metadata mutations.
*
* @see PhoenixConnection#getStatementExecutionCounter()
*/
public int[] getUncommittedStatementIndexes() {
return uncommittedStatementIndexes;
} }

} }

0 comments on commit 67c4c45

Please sign in to comment.