Skip to content

Commit

Permalink
PHOENIX-4381 Calculate the estimatedSize of MutationState incrementally
Browse files Browse the repository at this point in the history
  • Loading branch information
twdsilva committed Nov 16, 2017
1 parent 693fa65 commit 6a45c40
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 37 deletions.
Expand Up @@ -128,6 +128,7 @@ public class MutationState implements SQLCloseable {


private long sizeOffset; private long sizeOffset;
private int numRows = 0; private int numRows = 0;
private long estimatedSize = 0;
private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY;
private boolean isExternalTxContext = false; private boolean isExternalTxContext = false;
private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap(); private Map<TableRef, Map<ImmutableBytesPtr,RowMutationState>> txMutations = Collections.emptyMap();
Expand Down Expand Up @@ -194,6 +195,7 @@ public MutationState(TableRef table, Map<ImmutableBytesPtr,RowMutationState> mut
this.mutations.put(table, mutations); this.mutations.put(table, mutations);
} }
this.numRows = mutations.size(); this.numRows = mutations.size();
this.estimatedSize = KeyValueUtil.getEstimatedRowSize(table, mutations);
throwIfTooBig(); throwIfTooBig();
} }


Expand Down Expand Up @@ -355,7 +357,6 @@ private void throwIfTooBig() throws SQLException {
throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED).build() throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED).build()
.buildException(); .buildException();
} }
long estimatedSize = PhoenixKeyValueUtil.getEstimatedRowSize(mutations);
if (estimatedSize > maxSizeBytes) { if (estimatedSize > maxSizeBytes) {
resetState(); resetState();
throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED) throw new SQLExceptionInfo.Builder(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED)
Expand Down Expand Up @@ -434,7 +435,12 @@ public void join(MutationState newMutationState) throws SQLException {
phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext()); phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext());


this.sizeOffset += newMutationState.sizeOffset; this.sizeOffset += newMutationState.sizeOffset;
int oldNumRows = this.numRows;
joinMutationState(newMutationState.mutations, this.mutations); joinMutationState(newMutationState.mutations, this.mutations);
// here we increment the estimated size by the fraction of new rows we added from the newMutationState
if (newMutationState.numRows>0) {
this.estimatedSize += ((double)(this.numRows-oldNumRows)/newMutationState.numRows) * newMutationState.estimatedSize;
}
if (!newMutationState.txMutations.isEmpty()) { if (!newMutationState.txMutations.isEmpty()) {
if (txMutations.isEmpty()) { if (txMutations.isEmpty()) {
txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); txMutations = Maps.newHashMapWithExpectedSize(mutations.size());
Expand Down Expand Up @@ -968,6 +974,8 @@ private void send(Iterator<TableRef> tableRefIterator) throws SQLException {
long mutationCommitTime = 0; long mutationCommitTime = 0;
long numFailedMutations = 0;; long numFailedMutations = 0;;
long startTime = 0; long startTime = 0;
long startNumRows = numRows;
long startEstimatedSize = estimatedSize;
do { do {
TableRef origTableRef = tableInfo.getOrigTableRef(); TableRef origTableRef = tableInfo.getOrigTableRef();
PTable table = origTableRef.getTable(); PTable table = origTableRef.getTable();
Expand Down Expand Up @@ -1005,8 +1013,8 @@ private void send(Iterator<TableRef> tableRefIterator) throws SQLException {
// TODO need to get the the results of batch and fail if any exceptions. // TODO need to get the the results of batch and fail if any exceptions.
hTable.batch(mutationBatch, null); hTable.batch(mutationBatch, null);
batchCount++; batchCount++;
if (logger.isDebugEnabled()) logger.debug("Sent batch of " + mutationBatch.size() + " for " + Bytes.toString(htableName));
} }
if (logger.isDebugEnabled()) logger.debug("Sent batch of " + numMutations + " for " + Bytes.toString(htableName));
child.stop(); child.stop();
child.stop(); child.stop();
shouldRetry = false; shouldRetry = false;
Expand All @@ -1016,6 +1024,8 @@ private void send(Iterator<TableRef> tableRefIterator) throws SQLException {


if (tableInfo.isDataTable()) { if (tableInfo.isDataTable()) {
numRows -= numMutations; numRows -= numMutations;
// decrement estimated size by the fraction of rows we sent to hbase
estimatedSize -= ((double)numMutations/startNumRows)*startEstimatedSize;
} }
// Remove batches as we process them // Remove batches as we process them
mutations.remove(origTableRef); mutations.remove(origTableRef);
Expand Down Expand Up @@ -1181,6 +1191,7 @@ public void close() throws SQLException {


private void resetState() { private void resetState() {
numRows = 0; numRows = 0;
estimatedSize = 0;
this.mutations.clear(); this.mutations.clear();
resetTransactionalState(); resetTransactionalState();
} }
Expand Down
Expand Up @@ -189,46 +189,41 @@ public static long calculateMutationDiskSize(Mutation m) {
* @return estimated row size * @return estimated row size
*/ */
public static long public static long
getEstimatedRowSize(Map<TableRef, Map<ImmutableBytesPtr, RowMutationState>> mutations) { getEstimatedRowSize(TableRef tableRef, Map<ImmutableBytesPtr, RowMutationState> mutations) {
long size = 0; long size = 0;
// iterate over tables PTable table = tableRef.getTable();
for (Entry<TableRef, Map<ImmutableBytesPtr, RowMutationState>> tableEntry : mutations // iterate over rows
.entrySet()) { for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : mutations.entrySet()) {
PTable table = tableEntry.getKey().getTable(); int rowLength = rowEntry.getKey().getLength();
// iterate over rows Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues();
for (Entry<ImmutableBytesPtr, RowMutationState> rowEntry : tableEntry.getValue() switch (table.getImmutableStorageScheme()) {
.entrySet()) { case ONE_CELL_PER_COLUMN:
int rowLength = rowEntry.getKey().getLength(); // iterate over columns
Map<PColumn, byte[]> colValueMap = rowEntry.getValue().getColumnValues(); for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) {
switch (table.getImmutableStorageScheme()) { PColumn pColumn = colValueEntry.getKey();
case ONE_CELL_PER_COLUMN:
// iterate over columns
for (Entry<PColumn, byte[]> colValueEntry : colValueMap.entrySet()) {
PColumn pColumn = colValueEntry.getKey();
size +=
KeyValue.getKeyValueDataStructureSize(rowLength,
pColumn.getFamilyName().getBytes().length,
pColumn.getColumnQualifierBytes().length,
colValueEntry.getValue().length);
}
break;
case SINGLE_CELL_ARRAY_WITH_OFFSETS:
// we store all the column values in a single key value that contains all the
// column values followed by an offset array
size += size +=
PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength, KeyValue.getKeyValueDataStructureSize(rowLength,
colValueMap); pColumn.getFamilyName().getBytes().length,
break; pColumn.getColumnQualifierBytes().length,
colValueEntry.getValue().length);
} }
// count the empty key value break;
Pair<byte[], byte[]> emptyKeyValueInfo = case SINGLE_CELL_ARRAY_WITH_OFFSETS:
EncodedColumnsUtil.getEmptyKeyValueInfo(table); // we store all the column values in a single key value that contains all the
// column values followed by an offset array
size += size +=
KeyValue.getKeyValueDataStructureSize(rowLength, PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength,
SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(), colValueMap);
emptyKeyValueInfo.getFirst().length, break;
emptyKeyValueInfo.getSecond().length);
} }
// count the empty key value
Pair<byte[], byte[]> emptyKeyValueInfo =
EncodedColumnsUtil.getEmptyKeyValueInfo(table);
size +=
KeyValue.getKeyValueDataStructureSize(rowLength,
SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(),
emptyKeyValueInfo.getFirst().length,
emptyKeyValueInfo.getSecond().length);
} }
return size; return size;
} }
Expand Down

0 comments on commit 6a45c40

Please sign in to comment.