diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java new file mode 100644 index 00000000000..2d5f360a7e0 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/MutationStateIT.java @@ -0,0 +1,144 @@ +package org.apache.phoenix.end2end; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Properties; + +import org.apache.phoenix.exception.SQLExceptionCode; +import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.query.QueryServices; +import org.junit.Test; + +public class MutationStateIT extends ParallelStatsDisabledIT { + + private static final String DDL = + " (ORGANIZATION_ID CHAR(15) NOT NULL, SCORE DOUBLE, " + + "ENTITY_ID CHAR(15) NOT NULL, TAGS VARCHAR, CONSTRAINT PAGE_SNAPSHOT_PK " + + "PRIMARY KEY (ORGANIZATION_ID, ENTITY_ID DESC)) MULTI_TENANT=TRUE"; + + private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException { + PreparedStatement stmt = + conn.prepareStatement("upsert into " + fullTableName + + " (organization_id, entity_id, score) values (?,?,?)"); + for (int i = 0; i < 10000; i++) { + stmt.setString(1, "AAAA" + i); + stmt.setString(2, "BBBB" + i); + stmt.setInt(3, 1); + stmt.execute(); + } + } + + @Test + public void testMaxMutationSize() throws Exception { + Properties connectionProperties = new Properties(); + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3"); + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000"); + PhoenixConnection connection = + (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); + String fullTableName = generateUniqueName(); + try (Statement stmt = connection.createStatement()) { + stmt.execute( + "CREATE TABLE " + fullTableName + DDL); + } + try { + upsertRows(connection, fullTableName); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), + e.getErrorCode()); + } + + // set the max mutation size (bytes) to a low value + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000"); + connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4"); + connection = + (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); + try { + upsertRows(connection, fullTableName); + fail(); + } catch (SQLException e) { + assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(), + e.getErrorCode()); + } + } + + @Test + public void testMutationEstimatedSize() throws Exception { + PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl()); + conn.setAutoCommit(false); + String fullTableName = generateUniqueName(); + try (Statement stmt = conn.createStatement()) { + stmt.execute( + "CREATE TABLE " + fullTableName + DDL); + } + + // upserting rows should increase the mutation state size + MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState(); + long prevEstimatedSize = state.getEstimatedSize(); + upsertRows(conn, fullTableName); + assertTrue("Mutation state size should have increased", + state.getEstimatedSize() > prevEstimatedSize); + + + // after commit or rollback the size should be zero + conn.commit(); + assertEquals("Mutation state size should be zero after commit", 0, + state.getEstimatedSize()); + upsertRows(conn, fullTableName); + conn.rollback(); + assertEquals("Mutation state size should be zero after rollback", 0, + state.getEstimatedSize()); + + // upsert one row + PreparedStatement stmt = + conn.prepareStatement("upsert into " + fullTableName + + " (organization_id, entity_id, score) values (?,?,?)"); + stmt.setString(1, "ZZZZ"); + stmt.setString(2, "YYYY"); + stmt.setInt(3, 1); + stmt.execute(); + assertTrue("Mutation state size should be greater than zero ", state.getEstimatedSize()>0); + + prevEstimatedSize = state.getEstimatedSize(); + // upserting the same row twice should not increase the size + stmt.setString(1, "ZZZZ"); + stmt.setString(2, "YYYY"); + stmt.setInt(3, 1); + stmt.execute(); + assertEquals( + "Mutation state size should only increase 4 bytes (size of the new statement index)", + prevEstimatedSize + 4, state.getEstimatedSize()); + + prevEstimatedSize = state.getEstimatedSize(); + // changing the value of one column of a row to a larger value should increase the estimated size + stmt = + conn.prepareStatement("upsert into " + fullTableName + + " (organization_id, entity_id, score, tags) values (?,?,?,?)"); + stmt.setString(1, "ZZZZ"); + stmt.setString(2, "YYYY"); + stmt.setInt(3, 1); + stmt.setString(4, "random text string random text string random text string"); + stmt.execute(); + assertTrue("Mutation state size should increase", prevEstimatedSize+4 < state.getEstimatedSize()); + + prevEstimatedSize = state.getEstimatedSize(); + // changing the value of one column of a row to a smaller value should decrease the estimated size + stmt = + conn.prepareStatement("upsert into " + fullTableName + + " (organization_id, entity_id, score, tags) values (?,?,?,?)"); + stmt.setString(1, "ZZZZ"); + stmt.setString(2, "YYYY"); + stmt.setInt(3, 1); + stmt.setString(4, ""); + stmt.execute(); + assertTrue("Mutation state size should decrease", prevEstimatedSize+4 > state.getEstimatedSize()); + } + +} diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java index 77cb19f3f9e..9109c123e08 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/QueryMoreIT.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import java.sql.Connection; import java.sql.Date; @@ -39,7 +38,6 @@ import org.apache.hadoop.hbase.util.Base64; import org.apache.hadoop.hbase.util.Pair; -import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.util.PhoenixRuntime; @@ -510,46 +508,6 @@ public void testMutationBatch() throws Exception { assertEquals(4L, connection.getMutationState().getBatchCount()); } - @Test - public void testMaxMutationSize() throws Exception { - Properties connectionProperties = new Properties(); - connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3"); - connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000"); - PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); - String fullTableName = generateUniqueName(); - try (Statement stmt = connection.createStatement()) { - stmt.execute("CREATE TABLE " + fullTableName + "(\n" + - " ORGANIZATION_ID CHAR(15) NOT NULL,\n" + - " SCORE DOUBLE NOT NULL,\n" + - " ENTITY_ID CHAR(15) NOT NULL\n" + - " CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" + - " ORGANIZATION_ID,\n" + - " SCORE DESC,\n" + - " ENTITY_ID DESC\n" + - " )\n" + - ") MULTI_TENANT=TRUE"); - } - try { - upsertRows(connection, fullTableName); - fail(); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), e.getErrorCode()); - } - - // set the max mutation size (bytes) to a low value - connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000"); - connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4"); - connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties); - try { - upsertRows(connection, fullTableName); - fail(); - } - catch(SQLException e) { - assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(), e.getErrorCode()); - } - } - private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException { PreparedStatement stmt = conn.prepareStatement("upsert into " + fullTableName + " (organization_id, entity_id, score) values (?,?,?)"); diff --git a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java index 2ceac5528c5..58dccebf29d 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/execute/PartialCommitIT.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.phoenix.end2end.BaseOwnClusterIT; +import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.hbase.index.Indexer; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.jdbc.PhoenixConnection; @@ -284,7 +285,7 @@ private void testPartialCommit(List statements, int[] expectedUncommitte private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException { Connection con = driver.connect(url, new Properties()); PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class)); - final Map> mutations = Maps.newTreeMap(new TableRefComparator()); + final Map mutations = Maps.newTreeMap(new TableRefComparator()); // passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState return new PhoenixConnection(phxCon, null) { @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java index 53fc39870b1..a635c69cfa2 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java @@ -44,6 +44,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.filter.SkipScanFilter; import org.apache.phoenix.hbase.index.ValueGetter; @@ -92,7 +93,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.sun.istack.NotNull; public class DeleteCompiler { @@ -122,14 +122,14 @@ private static MutationState deleteRows(StatementContext context, ResultIterator final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE); final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES); final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); - Map mutations = Maps.newHashMapWithExpectedSize(batchSize); - List> indexMutations = null; + MultiRowMutationState mutations = new MultiRowMutationState(batchSize); + List indexMutations = null; // If indexTableRef is set, we're deleting the rows from both the index table and // the data table through a single query to save executing an additional one. if (!otherTableRefs.isEmpty()) { indexMutations = Lists.newArrayListWithExpectedSize(otherTableRefs.size()); for (int i = 0; i < otherTableRefs.size(); i++) { - indexMutations.add(Maps.newHashMapWithExpectedSize(batchSize)); + indexMutations.add(new MultiRowMutationState(batchSize)); } } List pkColumns = table.getPKColumns(); @@ -208,7 +208,7 @@ public byte[] getRowKey() { // row key will already have its value. // Check for otherTableRefs being empty required when deleting directly from the index if (otherTableRefs.isEmpty() || table.getIndexType() != IndexType.LOCAL) { - mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); + mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } for (int i = 0; i < otherTableRefs.size(); i++) { PTable otherTable = otherTableRefs.get(i).getTable(); @@ -222,7 +222,7 @@ public byte[] getRowKey() { } else { indexPtr.set(maintainers[i].buildRowKey(getter, rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP)); } - indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); + indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } if (mutations.size() > maxSize) { throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize); @@ -239,7 +239,7 @@ public byte[] getRowKey() { connection.getMutationState().send(); mutations.clear(); if (indexMutations != null) { - for (Map multiRowMutationState : indexMutations) { + for (MultiRowMutationState multiRowMutationState : indexMutations) { multiRowMutationState.clear(); } } @@ -651,10 +651,10 @@ public MutationState execute() throws SQLException { // keys for our ranges ScanRanges ranges = context.getScanRanges(); Iterator iterator = ranges.getPointLookupKeyIterator(); - Map mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount()); + MultiRowMutationState mutation = new MultiRowMutationState(ranges.getPointLookupCount()); while (iterator.hasNext()) { mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()), - new RowMutationState(PRow.DELETE_MARKER, + new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null)); } return new MutationState(dataPlan.getTableRef(), mutation, 0, maxSize, maxSizeBytes, connection); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java index 7e83ad5566f..d827cbe5f01 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java @@ -47,6 +47,7 @@ import org.apache.phoenix.exception.SQLExceptionInfo; import org.apache.phoenix.execute.AggregatePlan; import org.apache.phoenix.execute.MutationState; +import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.execute.MutationState.RowTimestampColInfo; import org.apache.phoenix.expression.Determinism; @@ -116,9 +117,10 @@ public class UpsertCompiler { private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes, - PTable table, Map mutation, + PTable table, MultiRowMutationState mutation, PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer, byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) throws SQLException { + long columnValueSize = 0; Map columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length); byte[][] pkValues = new byte[table.getPKColumns().size()][]; // If the table uses salting, the first byte is the salting byte, set to an empty array @@ -148,6 +150,7 @@ private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIn } } else { columnValues.put(column, value); + columnValueSize += (column.getEstimatedSize() + value.length); } } ImmutableBytesPtr ptr = new ImmutableBytesPtr(); @@ -166,7 +169,7 @@ private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIn regionPrefix.length)); } } - mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes)); + mutation.put(ptr, new RowMutationState(columnValues, columnValueSize, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes)); } public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector, @@ -195,7 +198,7 @@ public static MutationState upsertSelect(StatementContext childContext, TableRef } } int rowCount = 0; - Map mutation = Maps.newHashMapWithExpectedSize(batchSize); + MultiRowMutationState mutation = new MultiRowMutationState(batchSize); PTable table = tableRef.getTable(); IndexMaintainer indexMaintainer = null; byte[][] viewConstants = null; @@ -1180,7 +1183,7 @@ public MutationState execute() throws SQLException { throw new IllegalStateException(); } } - Map mutation = Maps.newHashMapWithExpectedSize(1); + MultiRowMutationState mutation = new MultiRowMutationState(1); IndexMaintainer indexMaintainer = null; byte[][] viewConstants = null; if (table.getIndexType() == IndexType.LOCAL) { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index e9547f2137a..510e6092888 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -97,6 +97,7 @@ import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; +import org.apache.phoenix.util.SizedUtil; import org.apache.phoenix.util.TransactionUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -123,7 +124,7 @@ public class MutationState implements SQLCloseable { private final long batchSize; private final long batchSizeBytes; private long batchCount = 0L; - private final Map> mutations; + private final Map mutations; private final Set uncommittedPhysicalNames = Sets.newHashSetWithExpectedSize(10); private long sizeOffset; @@ -131,7 +132,7 @@ public class MutationState implements SQLCloseable { private long estimatedSize = 0; private int[] uncommittedStatementIndexes = EMPTY_STATEMENT_INDEX_ARRAY; private boolean isExternalTxContext = false; - private Map> txMutations = Collections.emptyMap(); + private Map txMutations = Collections.emptyMap(); final PhoenixTransactionContext phoenixTransactionContext; @@ -159,12 +160,12 @@ private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connect } private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, boolean subTask, PhoenixTransactionContext txContext, long sizeOffset) { - this(maxSize, maxSizeBytes, connection, Maps.>newHashMapWithExpectedSize(5), subTask, txContext); + this(maxSize, maxSizeBytes, connection, Maps.newHashMapWithExpectedSize(5), subTask, txContext); this.sizeOffset = sizeOffset; } MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection, - Map> mutations, + Map mutations, boolean subTask, PhoenixTransactionContext txContext) { this.maxSize = maxSize; this.maxSizeBytes = maxSizeBytes; @@ -189,15 +190,19 @@ private MutationState(long maxSize, long maxSizeBytes, PhoenixConnection connect } } - public MutationState(TableRef table, Map mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException { + public MutationState(TableRef table, MultiRowMutationState mutations, long sizeOffset, long maxSize, long maxSizeBytes, PhoenixConnection connection) throws SQLException { this(maxSize, maxSizeBytes, connection, false, null, sizeOffset); if (!mutations.isEmpty()) { this.mutations.put(table, mutations); } this.numRows = mutations.size(); - this.estimatedSize = PhoenixKeyValueUtil.getEstimatedRowSize(table, mutations); + this.estimatedSize = PhoenixKeyValueUtil.getEstimatedRowMutationSize(this.mutations); throwIfTooBig(); } + + public long getEstimatedSize() { + return estimatedSize; + } public long getMaxSize() { return maxSize; @@ -346,7 +351,7 @@ public boolean startTransaction() throws SQLException { } public static MutationState emptyMutationState(long maxSize, long maxSizeBytes, PhoenixConnection connection) { - MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.>emptyMap(), false, null); + MutationState state = new MutationState(maxSize, maxSizeBytes, connection, Collections.emptyMap(), false, null); state.sizeOffset = 0; return state; } @@ -368,12 +373,12 @@ public long getUpdateCount() { return sizeOffset + numRows; } - private void joinMutationState(TableRef tableRef, Map srcRows, - Map> dstMutations) { + private void joinMutationState(TableRef tableRef, MultiRowMutationState srcRows, + Map dstMutations) { PTable table = tableRef.getTable(); boolean isIndex = table.getType() == PTableType.INDEX; boolean incrementRowCount = dstMutations == this.mutations; - Map existingRows = dstMutations.put(tableRef, srcRows); + MultiRowMutationState existingRows = dstMutations.put(tableRef, srcRows); if (existingRows != null) { // Rows for that table already exist // Loop through new rows and replace existing with new for (Map.Entry rowEntry : srcRows.entrySet()) { @@ -385,8 +390,12 @@ private void joinMutationState(TableRef tableRef, Map newRow = rowEntry.getValue().getColumnValues(); // if new row is PRow.DELETE_MARKER, it means delete, and we don't need to merge it with existing row. if (newRow != PRow.DELETE_MARKER) { + // decrement estimated size by the size of the old row + estimatedSize-=existingRowMutationState.calculateEstimatedSize(); // Merge existing column values with new column values existingRowMutationState.join(rowEntry.getValue()); + // increment estimated size by the size of the new row + estimatedSize+=existingRowMutationState.calculateEstimatedSize(); // Now that the existing row has been merged with the new row, replace it back // again (since it was merged with the new one above). existingRows.put(rowEntry.getKey(), existingRowMutationState); @@ -395,6 +404,8 @@ private void joinMutationState(TableRef tableRef, Map newRows = Maps.newHashMapWithExpectedSize(connection.getMutateBatchSize()); + MultiRowMutationState newRows = new MultiRowMutationState(connection.getMutateBatchSize()); newRows.putAll(srcRows); dstMutations.put(tableRef, newRows); if (incrementRowCount && !isIndex) { numRows += srcRows.size(); + // if we added all the rows from newMutationState we can just increment the + // estimatedSize by newMutationState.estimatedSize + estimatedSize += srcRows.estimatedSize; } } } - private void joinMutationState(Map> srcMutations, - Map> dstMutations) { + private void joinMutationState(Map srcMutations, + Map dstMutations) { // Merge newMutation with this one, keeping state from newMutation for any overlaps - for (Map.Entry> entry : srcMutations.entrySet()) { + for (Map.Entry entry : srcMutations.entrySet()) { // Replace existing entries for the table with new entries TableRef tableRef = entry.getKey(); - Map srcRows = entry.getValue(); + MultiRowMutationState srcRows = entry.getValue(); joinMutationState(tableRef, srcRows, dstMutations); } } @@ -435,12 +449,7 @@ public void join(MutationState newMutationState) throws SQLException { phoenixTransactionContext.join(newMutationState.getPhoenixTransactionContext()); this.sizeOffset += newMutationState.sizeOffset; - int oldNumRows = this.numRows; joinMutationState(newMutationState.mutations, this.mutations); - // here we increment the estimated size by the fraction of new rows we added from the newMutationState - if (newMutationState.numRows>0) { - this.estimatedSize += ((double)(this.numRows-oldNumRows)/newMutationState.numRows) * newMutationState.estimatedSize; - } if (!newMutationState.txMutations.isEmpty()) { if (txMutations.isEmpty()) { txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); @@ -478,7 +487,7 @@ private static ImmutableBytesPtr getNewRowKeyWithRowTimestamp(ImmutableBytesPtr return ptr; } - private Iterator>> addRowMutations(final TableRef tableRef, final Map values, + private Iterator>> addRowMutations(final TableRef tableRef, final MultiRowMutationState values, final long mutationTimestamp, final long serverTimestamp, boolean includeAllIndexes, final boolean sendAll) { final PTable table = tableRef.getTable(); final Iterator indexes = // Only maintain tables with immutable rows through this client-side mechanism @@ -513,10 +522,10 @@ public Pair> next() { // we may also have to include delete mutations for immutable tables if we are not processing all the tables in the mutations map if (!sendAll) { TableRef key = new TableRef(index); - Map rowToColumnMap = mutations.remove(key); - if (rowToColumnMap!=null) { + MultiRowMutationState multiRowMutationState = mutations.remove(key); + if (multiRowMutationState!=null) { final List deleteMutations = Lists.newArrayList(); - generateMutations(tableRef, mutationTimestamp, serverTimestamp, rowToColumnMap, deleteMutations, null); + generateMutations(tableRef, mutationTimestamp, serverTimestamp, multiRowMutationState, deleteMutations, null); indexMutations.addAll(deleteMutations); } } @@ -535,14 +544,14 @@ public void remove() { } private void generateMutations(final TableRef tableRef, final long mutationTimestamp, - final long serverTimestamp, final Map values, + final long serverTimestamp, final MultiRowMutationState values, final List mutationList, final List mutationsPertainingToIndex) { final PTable table = tableRef.getTable(); boolean tableWithRowTimestampCol = table.getRowTimestampColPos() != -1; Iterator> iterator = values.entrySet().iterator(); long timestampToUse = mutationTimestamp; - Map modifiedValues = Maps.newHashMap(); + MultiRowMutationState modifiedValues = new MultiRowMutationState(16); while (iterator.hasNext()) { Map.Entry rowEntry = iterator.next(); byte[] onDupKeyBytes = rowEntry.getValue().getOnDupKeyBytes(); @@ -617,7 +626,7 @@ public Iterator>> toMutations(final boolean includeMu } public Iterator>> toMutations(final boolean includeMutableIndexes, final Long tableTimestamp) { - final Iterator>> iterator = this.mutations.entrySet().iterator(); + final Iterator> iterator = this.mutations.entrySet().iterator(); if (!iterator.hasNext()) { return Collections.emptyIterator(); } @@ -625,7 +634,7 @@ public Iterator>> toMutations(final boolean includeMu final long serverTimestamp = getTableTimestamp(tableTimestamp, scn); final long mutationTimestamp = getMutationTimestamp(scn); return new Iterator>>() { - private Map.Entry> current = iterator.next(); + private Map.Entry current = iterator.next(); private Iterator>> innerIterator = init(); private Iterator>> init() { @@ -689,14 +698,14 @@ public static long getMutationTimestamp(final Long scn) { private long[] validateAll() throws SQLException { int i = 0; long[] timeStamps = new long[this.mutations.size()]; - for (Map.Entry> entry : mutations.entrySet()) { + for (Map.Entry entry : mutations.entrySet()) { TableRef tableRef = entry.getKey(); timeStamps[i++] = validateAndGetServerTimestamp(tableRef, entry.getValue()); } return timeStamps; } - private long validateAndGetServerTimestamp(TableRef tableRef, Map rowKeyToColumnMap) throws SQLException { + private long validateAndGetServerTimestamp(TableRef tableRef, MultiRowMutationState rowKeyToColumnMap) throws SQLException { Long scn = connection.getSCN(); MetaDataClient client = new MetaDataClient(connection); long serverTimeStamp = tableRef.getTimeStamp(); @@ -907,7 +916,7 @@ private void send(Iterator tableRefIterator) throws SQLException { sendAll = true; } - Map valuesMap; + MultiRowMutationState multiRowMutationState; Map> physicalTableMutationMap = Maps.newLinkedHashMap(); // add tracing for this operation try (TraceScope trace = Tracing.startNewSpan(connection, "Committing mutations to tables")) { @@ -916,16 +925,16 @@ private void send(Iterator tableRefIterator) throws SQLException { while (tableRefIterator.hasNext()) { // at this point we are going through mutations for each table final TableRef tableRef = tableRefIterator.next(); - valuesMap = mutations.get(tableRef); - if (valuesMap == null || valuesMap.isEmpty()) { + multiRowMutationState = mutations.get(tableRef); + if (multiRowMutationState == null || multiRowMutationState.isEmpty()) { continue; } // Validate as we go if transactional since we can undo if a problem occurs (which is unlikely) - long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, valuesMap) : serverTimeStamps[i++]; + long serverTimestamp = serverTimeStamps == null ? validateAndGetServerTimestamp(tableRef, multiRowMutationState) : serverTimeStamps[i++]; Long scn = connection.getSCN(); long mutationTimestamp = scn == null ? HConstants.LATEST_TIMESTAMP : scn; final PTable table = tableRef.getTable(); - Iterator>> mutationsIterator = addRowMutations(tableRef, valuesMap, mutationTimestamp, serverTimestamp, false, sendAll); + Iterator>> mutationsIterator = addRowMutations(tableRef, multiRowMutationState, mutationTimestamp, serverTimestamp, false, sendAll); // build map from physical table to mutation list boolean isDataTable = true; while (mutationsIterator.hasNext()) { @@ -943,7 +952,7 @@ private void send(Iterator tableRefIterator) throws SQLException { // involved in the transaction since none of them would have been // committed in the event of a failure. if (table.isTransactional()) { - addUncommittedStatementIndexes(valuesMap.values()); + addUncommittedStatementIndexes(multiRowMutationState.values()); if (txMutations.isEmpty()) { txMutations = Maps.newHashMapWithExpectedSize(mutations.size()); } @@ -952,7 +961,7 @@ private void send(Iterator tableRefIterator) throws SQLException { // in the event that we need to replay the commit. // Copy TableRef so we have the original PTable and know when the // indexes have changed. - joinMutationState(new TableRef(tableRef), valuesMap, txMutations); + joinMutationState(new TableRef(tableRef), multiRowMutationState, txMutations); } } long serverTimestamp = HConstants.LATEST_TIMESTAMP; @@ -974,8 +983,6 @@ private void send(Iterator tableRefIterator) throws SQLException { long mutationCommitTime = 0; long numFailedMutations = 0;; long startTime = 0; - long startNumRows = numRows; - long startEstimatedSize = estimatedSize; do { TableRef origTableRef = tableInfo.getOrigTableRef(); PTable table = origTableRef.getTable(); @@ -1022,13 +1029,13 @@ private void send(Iterator tableRefIterator) throws SQLException { GLOBAL_MUTATION_COMMIT_TIME.update(mutationCommitTime); numFailedMutations = 0; + // Remove batches as we process them + mutations.remove(origTableRef); if (tableInfo.isDataTable()) { numRows -= numMutations; - // decrement estimated size by the fraction of rows we sent to hbase - estimatedSize -= ((double)numMutations/startNumRows)*startEstimatedSize; + // recalculate the estimated size + estimatedSize = PhoenixKeyValueUtil.getEstimatedRowMutationSize(mutations); } - // Remove batches as we process them - mutations.remove(origTableRef); } catch (Exception e) { mutationCommitTime = System.currentTimeMillis() - startTime; serverTimestamp = ServerUtil.parseServerTimestamp(e); @@ -1179,7 +1186,7 @@ private void addUncommittedStatementIndexes(Collection rowMuta } private int[] getUncommittedStatementIndexes() { - for (Map rowMutationMap : mutations.values()) { + for (MultiRowMutationState rowMutationMap : mutations.values()) { addUncommittedStatementIndexes(rowMutationMap.values()); } return uncommittedStatementIndexes; @@ -1212,7 +1219,7 @@ public void rollback() throws SQLException { } public void commit() throws SQLException { - Map> txMutations = Collections.emptyMap(); + Map txMutations = Collections.emptyMap(); int retryCount = 0; do { boolean sendSuccessful=false; @@ -1422,13 +1429,54 @@ public Long getTimestamp() { } } + public static class MultiRowMutationState { + private Map rowKeyToRowMutationState; + private long estimatedSize; + + public MultiRowMutationState(int size) { + this.rowKeyToRowMutationState = Maps.newHashMapWithExpectedSize(size); + this.estimatedSize = 0; + } + + public RowMutationState put(ImmutableBytesPtr ptr, RowMutationState rowMutationState) { + estimatedSize += rowMutationState.calculateEstimatedSize(); + return rowKeyToRowMutationState.put(ptr, rowMutationState); + } + + public void putAll(MultiRowMutationState other) { + estimatedSize += other.estimatedSize; + rowKeyToRowMutationState.putAll(other.rowKeyToRowMutationState); + } + + public boolean isEmpty() { + return rowKeyToRowMutationState.isEmpty(); + } + + public int size() { + return rowKeyToRowMutationState.size(); + } + + public Set> entrySet() { + return rowKeyToRowMutationState.entrySet(); + } + + public void clear(){ + rowKeyToRowMutationState.clear(); + } + + public Collection values() { + return rowKeyToRowMutationState.values(); + } + } + public static class RowMutationState { @Nonnull private Map columnValues; private int[] statementIndexes; @Nonnull private final RowTimestampColInfo rowTsColInfo; private byte[] onDupKeyBytes; + private long colValuesSize; - public RowMutationState(@Nonnull Map columnValues, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo, + public RowMutationState(@Nonnull Map columnValues, long colValuesSize, int statementIndex, @Nonnull RowTimestampColInfo rowTsColInfo, byte[] onDupKeyBytes) { checkNotNull(columnValues); checkNotNull(rowTsColInfo); @@ -1436,6 +1484,12 @@ public RowMutationState(@Nonnull Map columnValues, int statement this.statementIndexes = new int[] {statementIndex}; this.rowTsColInfo = rowTsColInfo; this.onDupKeyBytes = onDupKeyBytes; + this.colValuesSize = colValuesSize; + } + + public long calculateEstimatedSize() { + return colValuesSize + statementIndexes.length * SizedUtil.INT_SIZE + SizedUtil.LONG_SIZE + + (onDupKeyBytes != null ? onDupKeyBytes.length : 0); } byte[] getOnDupKeyBytes() { @@ -1454,7 +1508,16 @@ void join(RowMutationState newRow) { // If we already have a row and the new row has an ON DUPLICATE KEY clause // ignore the new values (as that's what the server will do). if (newRow.onDupKeyBytes == null) { - getColumnValues().putAll(newRow.getColumnValues()); + // increment the column value size by the new row column value size + colValuesSize+=newRow.colValuesSize; + for (Map.Entry entry : newRow.columnValues.entrySet()) { + PColumn col = entry.getKey(); + byte[] oldValue = columnValues.put(col, entry.getValue()); + if (oldValue!=null) { + // decrement column value size by the size of all column values that were replaced + colValuesSize-=(col.getEstimatedSize() + oldValue.length); + } + } } // Concatenate ON DUPLICATE KEY bytes to allow multiple // increments of the same row in the same commit batch. @@ -1466,7 +1529,7 @@ void join(RowMutationState newRow) { RowTimestampColInfo getRowTimestampColInfo() { return rowTsColInfo; } - + } public ReadMetricQueue getReadMetricQueue() { diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index 5e97ce626e3..c6cbe3ecb38 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -73,7 +73,7 @@ import org.apache.phoenix.coprocessor.generated.MetaDataProtos.UpdateIndexStateRequest; import org.apache.phoenix.exception.SQLExceptionCode; import org.apache.phoenix.exception.SQLExceptionInfo; -import org.apache.phoenix.execute.MutationState.RowMutationState; +import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.TupleProjector; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.expression.KeyValueColumnExpression; @@ -296,7 +296,7 @@ public static List generateDeleteIndexData(final PTable table, PTable in } public static List generateIndexData(final PTable table, PTable index, - final Map valuesMap, List dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection) + final MultiRowMutationState multiRowMutationState, List dataMutations, final KeyValueBuilder kvBuilder, PhoenixConnection connection) throws SQLException { try { final ImmutableBytesPtr ptr = new ImmutableBytesPtr(); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java index 84525fd2a25..ce5cb557506 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixKeyValueUtil.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Pair; +import org.apache.phoenix.execute.MutationState.MultiRowMutationState; import org.apache.phoenix.execute.MutationState.RowMutationState; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; @@ -188,42 +189,15 @@ public static long calculateMutationDiskSize(Mutation m) { * @param mutations map from table to row to RowMutationState * @return estimated row size */ - public static long - getEstimatedRowSize(TableRef tableRef, Map mutations) { + public static long getEstimatedRowMutationSize( + Map tableMutationMap) { long size = 0; - PTable table = tableRef.getTable(); - // iterate over rows - for (Entry rowEntry : mutations.entrySet()) { - int rowLength = rowEntry.getKey().getLength(); - Map colValueMap = rowEntry.getValue().getColumnValues(); - switch (table.getImmutableStorageScheme()) { - case ONE_CELL_PER_COLUMN: - // iterate over columns - for (Entry colValueEntry : colValueMap.entrySet()) { - PColumn pColumn = colValueEntry.getKey(); - size += - KeyValue.getKeyValueDataStructureSize(rowLength, - pColumn.getFamilyName().getBytes().length, - pColumn.getColumnQualifierBytes().length, - colValueEntry.getValue().length); - } - break; - case SINGLE_CELL_ARRAY_WITH_OFFSETS: - // we store all the column values in a single key value that contains all the - // column values followed by an offset array - size += - PArrayDataTypeEncoder.getEstimatedByteSize(table, rowLength, - colValueMap); - break; + // iterate over table + for (Entry tableEntry : tableMutationMap.entrySet()) { + // iterate over rows + for (Entry rowEntry : tableEntry.getValue().entrySet()) { + size += calculateRowMutationSize(rowEntry); } - // count the empty key value - Pair emptyKeyValueInfo = - EncodedColumnsUtil.getEmptyKeyValueInfo(table); - size += - KeyValue.getKeyValueDataStructureSize(rowLength, - SchemaUtil.getEmptyColumnFamilyPtr(table).getLength(), - emptyKeyValueInfo.getFirst().length, - emptyKeyValueInfo.getSecond().length); } return size; } @@ -237,4 +211,10 @@ public static KeyValue maybeCopyCell(Cell c) { } return KeyValueUtil.copyToNewKeyValue(c); } + + private static long calculateRowMutationSize(Entry rowEntry) { + int rowLength = rowEntry.getKey().getLength(); + long colValuesLength = rowEntry.getValue().calculateEstimatedSize(); + return (rowLength + colValuesLength); + } }