Skip to content

Commit

Permalink
PHOENIX-4386 Calculate the estimatedSize of MutationState using Map<T…
Browse files Browse the repository at this point in the history
…ableRef, Map<ImmutableBytesPtr,RowMutationState>> mutations(Thomas D'Silva)
  • Loading branch information
chrajeshbabu committed Feb 20, 2018
1 parent 6cadbab commit aeb33b9
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 140 deletions.
@@ -0,0 +1,144 @@
package org.apache.phoenix.end2end;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
import org.junit.Test;

public class MutationStateIT extends ParallelStatsDisabledIT {

private static final String DDL =
" (ORGANIZATION_ID CHAR(15) NOT NULL, SCORE DOUBLE, "
+ "ENTITY_ID CHAR(15) NOT NULL, TAGS VARCHAR, CONSTRAINT PAGE_SNAPSHOT_PK "
+ "PRIMARY KEY (ORGANIZATION_ID, ENTITY_ID DESC)) MULTI_TENANT=TRUE";

private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException {
PreparedStatement stmt =
conn.prepareStatement("upsert into " + fullTableName
+ " (organization_id, entity_id, score) values (?,?,?)");
for (int i = 0; i < 10000; i++) {
stmt.setString(1, "AAAA" + i);
stmt.setString(2, "BBBB" + i);
stmt.setInt(3, 1);
stmt.execute();
}
}

@Test
public void testMaxMutationSize() throws Exception {
Properties connectionProperties = new Properties();
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3");
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000");
PhoenixConnection connection =
(PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
String fullTableName = generateUniqueName();
try (Statement stmt = connection.createStatement()) {
stmt.execute(
"CREATE TABLE " + fullTableName + DDL);
}
try {
upsertRows(connection, fullTableName);
fail();
} catch (SQLException e) {
assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(),
e.getErrorCode());
}

// set the max mutation size (bytes) to a low value
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000");
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4");
connection =
(PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
try {
upsertRows(connection, fullTableName);
fail();
} catch (SQLException e) {
assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(),
e.getErrorCode());
}
}

@Test
public void testMutationEstimatedSize() throws Exception {
PhoenixConnection conn = (PhoenixConnection) DriverManager.getConnection(getUrl());
conn.setAutoCommit(false);
String fullTableName = generateUniqueName();
try (Statement stmt = conn.createStatement()) {
stmt.execute(
"CREATE TABLE " + fullTableName + DDL);
}

// upserting rows should increase the mutation state size
MutationState state = conn.unwrap(PhoenixConnection.class).getMutationState();
long prevEstimatedSize = state.getEstimatedSize();
upsertRows(conn, fullTableName);
assertTrue("Mutation state size should have increased",
state.getEstimatedSize() > prevEstimatedSize);


// after commit or rollback the size should be zero
conn.commit();
assertEquals("Mutation state size should be zero after commit", 0,
state.getEstimatedSize());
upsertRows(conn, fullTableName);
conn.rollback();
assertEquals("Mutation state size should be zero after rollback", 0,
state.getEstimatedSize());

// upsert one row
PreparedStatement stmt =
conn.prepareStatement("upsert into " + fullTableName
+ " (organization_id, entity_id, score) values (?,?,?)");
stmt.setString(1, "ZZZZ");
stmt.setString(2, "YYYY");
stmt.setInt(3, 1);
stmt.execute();
assertTrue("Mutation state size should be greater than zero ", state.getEstimatedSize()>0);

prevEstimatedSize = state.getEstimatedSize();
// upserting the same row twice should not increase the size
stmt.setString(1, "ZZZZ");
stmt.setString(2, "YYYY");
stmt.setInt(3, 1);
stmt.execute();
assertEquals(
"Mutation state size should only increase 4 bytes (size of the new statement index)",
prevEstimatedSize + 4, state.getEstimatedSize());

prevEstimatedSize = state.getEstimatedSize();
// changing the value of one column of a row to a larger value should increase the estimated size
stmt =
conn.prepareStatement("upsert into " + fullTableName
+ " (organization_id, entity_id, score, tags) values (?,?,?,?)");
stmt.setString(1, "ZZZZ");
stmt.setString(2, "YYYY");
stmt.setInt(3, 1);
stmt.setString(4, "random text string random text string random text string");
stmt.execute();
assertTrue("Mutation state size should increase", prevEstimatedSize+4 < state.getEstimatedSize());

prevEstimatedSize = state.getEstimatedSize();
// changing the value of one column of a row to a smaller value should decrease the estimated size
stmt =
conn.prepareStatement("upsert into " + fullTableName
+ " (organization_id, entity_id, score, tags) values (?,?,?,?)");
stmt.setString(1, "ZZZZ");
stmt.setString(2, "YYYY");
stmt.setInt(3, 1);
stmt.setString(4, "");
stmt.execute();
assertTrue("Mutation state size should decrease", prevEstimatedSize+4 > state.getEstimatedSize());
}

}
Expand Up @@ -22,7 +22,6 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.sql.Connection;
import java.sql.Date;
Expand All @@ -39,7 +38,6 @@

import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.util.PhoenixRuntime;
Expand Down Expand Up @@ -510,46 +508,6 @@ public void testMutationBatch() throws Exception {
assertEquals(4L, connection.getMutationState().getBatchCount());
}

@Test
public void testMaxMutationSize() throws Exception {
Properties connectionProperties = new Properties();
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "3");
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "1000000");
PhoenixConnection connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
String fullTableName = generateUniqueName();
try (Statement stmt = connection.createStatement()) {
stmt.execute("CREATE TABLE " + fullTableName + "(\n" +
" ORGANIZATION_ID CHAR(15) NOT NULL,\n" +
" SCORE DOUBLE NOT NULL,\n" +
" ENTITY_ID CHAR(15) NOT NULL\n" +
" CONSTRAINT PAGE_SNAPSHOT_PK PRIMARY KEY (\n" +
" ORGANIZATION_ID,\n" +
" SCORE DESC,\n" +
" ENTITY_ID DESC\n" +
" )\n" +
") MULTI_TENANT=TRUE");
}
try {
upsertRows(connection, fullTableName);
fail();
}
catch(SQLException e) {
assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_EXCEEDED.getErrorCode(), e.getErrorCode());
}

// set the max mutation size (bytes) to a low value
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_ATTRIB, "1000");
connectionProperties.setProperty(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB, "4");
connection = (PhoenixConnection) DriverManager.getConnection(getUrl(), connectionProperties);
try {
upsertRows(connection, fullTableName);
fail();
}
catch(SQLException e) {
assertEquals(SQLExceptionCode.MAX_MUTATION_SIZE_BYTES_EXCEEDED.getErrorCode(), e.getErrorCode());
}
}

private void upsertRows(PhoenixConnection conn, String fullTableName) throws SQLException {
PreparedStatement stmt = conn.prepareStatement("upsert into " + fullTableName +
" (organization_id, entity_id, score) values (?,?,?)");
Expand Down
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.phoenix.end2end.BaseOwnClusterIT;
import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
import org.apache.phoenix.hbase.index.Indexer;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixConnection;
Expand Down Expand Up @@ -284,7 +285,7 @@ private void testPartialCommit(List<String> statements, int[] expectedUncommitte
private PhoenixConnection getConnectionWithTableOrderPreservingMutationState() throws SQLException {
Connection con = driver.connect(url, new Properties());
PhoenixConnection phxCon = new PhoenixConnection(con.unwrap(PhoenixConnection.class));
final Map<TableRef,Map<ImmutableBytesPtr,MutationState.RowMutationState>> mutations = Maps.newTreeMap(new TableRefComparator());
final Map<TableRef, MultiRowMutationState> mutations = Maps.newTreeMap(new TableRefComparator());
// passing a null mutation state forces the connection.newMutationState() to be used to create the MutationState
return new PhoenixConnection(phxCon, null) {
@Override
Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.filter.SkipScanFilter;
import org.apache.phoenix.hbase.index.ValueGetter;
Expand Down Expand Up @@ -92,7 +93,6 @@

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.sun.istack.NotNull;

public class DeleteCompiler {
Expand Down Expand Up @@ -122,14 +122,14 @@ private static MutationState deleteRows(StatementContext context, ResultIterator
final int maxSize = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE);
final int maxSizeBytes = services.getProps().getInt(QueryServices.MAX_MUTATION_SIZE_BYTES_ATTRIB,QueryServicesOptions.DEFAULT_MAX_MUTATION_SIZE_BYTES);
final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize);
Map<ImmutableBytesPtr,RowMutationState> mutations = Maps.newHashMapWithExpectedSize(batchSize);
List<Map<ImmutableBytesPtr,RowMutationState>> indexMutations = null;
MultiRowMutationState mutations = new MultiRowMutationState(batchSize);
List<MultiRowMutationState> indexMutations = null;
// If indexTableRef is set, we're deleting the rows from both the index table and
// the data table through a single query to save executing an additional one.
if (!otherTableRefs.isEmpty()) {
indexMutations = Lists.newArrayListWithExpectedSize(otherTableRefs.size());
for (int i = 0; i < otherTableRefs.size(); i++) {
indexMutations.add(Maps.<ImmutableBytesPtr,RowMutationState>newHashMapWithExpectedSize(batchSize));
indexMutations.add(new MultiRowMutationState(batchSize));
}
}
List<PColumn> pkColumns = table.getPKColumns();
Expand Down Expand Up @@ -208,7 +208,7 @@ public byte[] getRowKey() {
// row key will already have its value.
// Check for otherTableRefs being empty required when deleting directly from the index
if (otherTableRefs.isEmpty() || table.getIndexType() != IndexType.LOCAL) {
mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
mutations.put(rowKeyPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
}
for (int i = 0; i < otherTableRefs.size(); i++) {
PTable otherTable = otherTableRefs.get(i).getTable();
Expand All @@ -222,7 +222,7 @@ public byte[] getRowKey() {
} else {
indexPtr.set(maintainers[i].buildRowKey(getter, rowKeyPtr, null, null, HConstants.LATEST_TIMESTAMP));
}
indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
indexMutations.get(i).put(indexPtr, new RowMutationState(PRow.DELETE_MARKER, 0, statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
}
if (mutations.size() > maxSize) {
throw new IllegalArgumentException("MutationState size of " + mutations.size() + " is bigger than max allowed size of " + maxSize);
Expand All @@ -239,7 +239,7 @@ public byte[] getRowKey() {
connection.getMutationState().send();
mutations.clear();
if (indexMutations != null) {
for (Map<ImmutableBytesPtr, RowMutationState> multiRowMutationState : indexMutations) {
for (MultiRowMutationState multiRowMutationState : indexMutations) {
multiRowMutationState.clear();
}
}
Expand Down Expand Up @@ -651,10 +651,10 @@ public MutationState execute() throws SQLException {
// keys for our ranges
ScanRanges ranges = context.getScanRanges();
Iterator<KeyRange> iterator = ranges.getPointLookupKeyIterator();
Map<ImmutableBytesPtr,RowMutationState> mutation = Maps.newHashMapWithExpectedSize(ranges.getPointLookupCount());
MultiRowMutationState mutation = new MultiRowMutationState(ranges.getPointLookupCount());
while (iterator.hasNext()) {
mutation.put(new ImmutableBytesPtr(iterator.next().getLowerRange()),
new RowMutationState(PRow.DELETE_MARKER,
new RowMutationState(PRow.DELETE_MARKER, 0,
statement.getConnection().getStatementExecutionCounter(), NULL_ROWTIMESTAMP_INFO, null));
}
return new MutationState(dataPlan.getTableRef(), mutation, 0, maxSize, maxSizeBytes, connection);
Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.execute.MutationState.MultiRowMutationState;
import org.apache.phoenix.execute.MutationState.RowMutationState;
import org.apache.phoenix.execute.MutationState.RowTimestampColInfo;
import org.apache.phoenix.expression.Determinism;
Expand Down Expand Up @@ -116,9 +117,10 @@
public class UpsertCompiler {

private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIndexes,
PTable table, Map<ImmutableBytesPtr, RowMutationState> mutation,
PTable table, MultiRowMutationState mutation,
PhoenixStatement statement, boolean useServerTimestamp, IndexMaintainer maintainer,
byte[][] viewConstants, byte[] onDupKeyBytes, int numSplColumns) throws SQLException {
long columnValueSize = 0;
Map<PColumn,byte[]> columnValues = Maps.newHashMapWithExpectedSize(columnIndexes.length);
byte[][] pkValues = new byte[table.getPKColumns().size()][];
// If the table uses salting, the first byte is the salting byte, set to an empty array
Expand Down Expand Up @@ -148,6 +150,7 @@ private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIn
}
} else {
columnValues.put(column, value);
columnValueSize += (column.getEstimatedSize() + value.length);
}
}
ImmutableBytesPtr ptr = new ImmutableBytesPtr();
Expand All @@ -166,7 +169,7 @@ private static void setValues(byte[][] values, int[] pkSlotIndex, int[] columnIn
regionPrefix.length));
}
}
mutation.put(ptr, new RowMutationState(columnValues, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
mutation.put(ptr, new RowMutationState(columnValues, columnValueSize, statement.getConnection().getStatementExecutionCounter(), rowTsColInfo, onDupKeyBytes));
}

public static MutationState upsertSelect(StatementContext childContext, TableRef tableRef, RowProjector projector,
Expand Down Expand Up @@ -195,7 +198,7 @@ public static MutationState upsertSelect(StatementContext childContext, TableRef
}
}
int rowCount = 0;
Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(batchSize);
MultiRowMutationState mutation = new MultiRowMutationState(batchSize);
PTable table = tableRef.getTable();
IndexMaintainer indexMaintainer = null;
byte[][] viewConstants = null;
Expand Down Expand Up @@ -1180,7 +1183,7 @@ public MutationState execute() throws SQLException {
throw new IllegalStateException();
}
}
Map<ImmutableBytesPtr, RowMutationState> mutation = Maps.newHashMapWithExpectedSize(1);
MultiRowMutationState mutation = new MultiRowMutationState(1);
IndexMaintainer indexMaintainer = null;
byte[][] viewConstants = null;
if (table.getIndexType() == IndexType.LOCAL) {
Expand Down

0 comments on commit aeb33b9

Please sign in to comment.