Permalink
Browse files

ACCUMULO-706 Added timeout to batch writer

git-svn-id: https://svn.apache.org/repos/asf/accumulo/trunk@1387182 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
1 parent d73ebdc commit 5bd33fa1e9625e3c088ee39f626c256aca75b0da @keith-turner keith-turner committed Sep 18, 2012
Showing with 619 additions and 164 deletions.
  1. +108 −0 core/src/main/java/org/apache/accumulo/core/client/BatchWriterConfig.java
  2. +55 −0 core/src/main/java/org/apache/accumulo/core/client/Connector.java
  3. +3 −2 core/src/main/java/org/apache/accumulo/core/client/impl/BatchWriterImpl.java
  4. +21 −3 core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java
  5. +3 −2 core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
  6. +6 −9 core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchDeleter.java
  7. +73 −8 core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchWriter.java
  8. +30 −1 core/src/main/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormat.java
  9. +20 −0 core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java
  10. +3 −1 core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteCommand.java
  11. +2 −1 core/src/main/java/org/apache/accumulo/core/util/shell/commands/DeleteManyCommand.java
  12. +3 −1 core/src/main/java/org/apache/accumulo/core/util/shell/commands/InsertCommand.java
  13. +2 −2 core/src/test/java/org/apache/accumulo/core/client/ClientSideIteratorTest.java
  14. +2 −1 core/src/test/java/org/apache/accumulo/core/client/admin/FindMaxTest.java
  15. +3 −2 core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloInputFormatTest.java
  16. +2 −1 core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloOutputFormatTest.java
  17. +2 −1 core/src/test/java/org/apache/accumulo/core/client/mapreduce/AccumuloRowInputFormatTest.java
  18. +10 −9 core/src/test/java/org/apache/accumulo/core/client/mock/MockConnectorTest.java
  19. +2 −1 core/src/test/java/org/apache/accumulo/core/iterators/user/IntersectingIteratorTest.java
  20. +2 −1 core/src/test/java/org/apache/accumulo/core/iterators/user/RowFilterTest.java
  21. +4 −1 examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RandomBatchWriter.java
  22. +2 −1 examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/ReadWriteExample.java
  23. +2 −1 examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/RowOperations.java
  24. +5 −1 examples/simple/src/main/java/org/apache/accumulo/examples/simple/client/SequentialBatchWriter.java
  25. +2 −1 examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/FileCount.java
  26. +4 −3 examples/simple/src/main/java/org/apache/accumulo/examples/simple/dirlist/Ingest.java
  27. +2 −1 examples/simple/src/main/java/org/apache/accumulo/examples/simple/filedata/FileDataIngest.java
  28. +2 −1 ...es/simple/src/main/java/org/apache/accumulo/examples/simple/helloworld/InsertWithBatchWriter.java
  29. +2 −1 examples/simple/src/main/java/org/apache/accumulo/examples/simple/isolation/InterferenceTest.java
  30. +2 −1 examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Index.java
  31. +2 −1 examples/simple/src/main/java/org/apache/accumulo/examples/simple/shard/Reverse.java
  32. +2 −4 examples/simple/src/test/java/org/apache/accumulo/examples/simple/dirlist/CountTest.java
  33. +4 −3 examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputFormatTest.java
  34. +2 −1 examples/simple/src/test/java/org/apache/accumulo/examples/simple/filedata/ChunkInputStreamTest.java
  35. +2 −1 server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
  36. +7 −6 server/src/main/java/org/apache/accumulo/server/master/Master.java
  37. +4 −1 server/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
  38. +2 −1 server/src/main/java/org/apache/accumulo/server/master/tableOps/ImportTable.java
  39. +2 −3 server/src/main/java/org/apache/accumulo/server/master/tableOps/TableRangeOp.java
  40. +2 −1 server/src/main/java/org/apache/accumulo/server/test/CreateTestTable.java
  41. +2 −1 server/src/main/java/org/apache/accumulo/server/test/GCLotsOfCandidatesTest.java
  42. +2 −1 server/src/main/java/org/apache/accumulo/server/test/TestBinaryRows.java
  43. +2 −1 server/src/main/java/org/apache/accumulo/server/test/TestIngest.java
  44. +2 −1 server/src/main/java/org/apache/accumulo/server/test/TestMultiTableIngest.java
  45. +2 −1 server/src/main/java/org/apache/accumulo/server/test/TestRandomDeletes.java
  46. +4 −1 server/src/main/java/org/apache/accumulo/server/test/continuous/ContinuousIngest.java
  47. +3 −2 server/src/main/java/org/apache/accumulo/server/test/functional/AddSplitTest.java
  48. +2 −1 server/src/main/java/org/apache/accumulo/server/test/functional/BadIteratorMincTest.java
  49. +3 −2 server/src/main/java/org/apache/accumulo/server/test/functional/BatchScanSplitTest.java
  50. +4 −3 server/src/main/java/org/apache/accumulo/server/test/functional/BatchWriterFlushTest.java
  51. +3 −2 server/src/main/java/org/apache/accumulo/server/test/functional/BloomFilterTest.java
  52. +2 −1 server/src/main/java/org/apache/accumulo/server/test/functional/ConcurrencyTest.java
  53. +8 −7 server/src/main/java/org/apache/accumulo/server/test/functional/ConstraintTest.java
  54. +3 −2 server/src/main/java/org/apache/accumulo/server/test/functional/CreateAndUseTest.java
  55. +2 −1 server/src/main/java/org/apache/accumulo/server/test/functional/DeleteEverythingTest.java
  56. +2 −1 server/src/main/java/org/apache/accumulo/server/test/functional/DeleteRowsSplitTest.java
  57. +2 −1 server/src/main/java/org/apache/accumulo/server/test/functional/DeleteRowsTest.java
  58. +3 −2 server/src/main/java/org/apache/accumulo/server/test/functional/LargeRowTest.java
  59. +2 −1 server/src/main/java/org/apache/accumulo/server/test/functional/LogicalTimeTest.java
  60. +2 −1 server/src/main/java/org/apache/accumulo/server/test/functional/MergeTest.java
  61. +4 −3 server/src/main/java/org/apache/accumulo/server/test/functional/PermissionsTest.java
  62. +2 −1 server/src/main/java/org/apache/accumulo/server/test/functional/RowDeleteTest.java
  63. +2 −1 server/src/main/java/org/apache/accumulo/server/test/functional/ScanIteratorTest.java
  64. +3 −2 server/src/main/java/org/apache/accumulo/server/test/functional/ScanRangeTest.java
  65. +2 −1 server/src/main/java/org/apache/accumulo/server/test/functional/ScanSessionTimeOutTest.java
  66. +2 −1 server/src/main/java/org/apache/accumulo/server/test/functional/ServerSideErrorTest.java
  67. +41 −0 server/src/main/java/org/apache/accumulo/server/test/functional/SlowConstraint.java
  68. +2 −1 server/src/main/java/org/apache/accumulo/server/test/functional/SparseColumnFamilyTest.java
  69. +36 −2 server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java
  70. +5 −4 server/src/main/java/org/apache/accumulo/server/test/functional/VisibilityTest.java
  71. +2 −1 server/src/main/java/org/apache/accumulo/server/test/performance/metadata/MetadataBatchScanTest.java
  72. +5 −2 server/src/main/java/org/apache/accumulo/server/test/randomwalk/State.java
  73. +2 −1 server/src/main/java/org/apache/accumulo/server/test/randomwalk/concurrent/BatchWrite.java
  74. +2 −1 server/src/main/java/org/apache/accumulo/server/test/randomwalk/security/TableOp.java
  75. +2 −1 server/src/main/java/org/apache/accumulo/server/test/randomwalk/shard/Reindex.java
  76. +4 −1 server/src/main/java/org/apache/accumulo/server/test/scalability/Ingest.java
  77. +5 −3 server/src/main/java/org/apache/accumulo/server/trace/TraceServer.java
  78. +2 −1 server/src/main/java/org/apache/accumulo/server/util/AddFilesWithMissingEntries.java
  79. +6 −3 server/src/main/java/org/apache/accumulo/server/util/MetadataTable.java
  80. +2 −1 server/src/main/java/org/apache/accumulo/server/util/RandomWriter.java
  81. +2 −1 server/src/test/java/org/apache/accumulo/server/gc/TestConfirmDeletes.java
  82. +3 −2 server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java
  83. +2 −1 server/src/test/java/org/apache/accumulo/server/test/iterator/RegExTest.java
  84. +15 −14 server/src/test/java/org/apache/accumulo/server/util/CloneTest.java
  85. +3 −2 server/src/test/java/org/apache/accumulo/server/util/TabletIteratorTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.client;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This object holds configuration settings used to instantiate a {@link BatchWriter}
+ */
+public class BatchWriterConfig {
+ private long maxMemory = 50 * 1024 * 1024;
+ private long maxLatency = Long.MAX_VALUE;
+ private long timeout = Long.MAX_VALUE;
+ private int maxWriteThreads = 3;
+
+ /**
+ *
+ * @param maxMemory
+ * size in bytes of the maximum memory to batch before writing. Defaults to 50M.
+ * @return
+ */
+
+ public BatchWriterConfig setMaxMemory(long maxMemory) {
+ this.maxMemory = maxMemory;
+ return this;
+ }
+
+ /**
+ * @param maxLatency
+ * The maximum amount of time to hold data in memory before flushing it to servers. For no max set to zero or Long.MAX_VALUE with TimeUnit.MILLIS.
+ * Defaults to no max.
+ * @param timeUnit
+ * Determines how maxLatency will be interpreted.
+ * @return this to allow chaining of set methods
+ */
+
+ public BatchWriterConfig setMaxLatency(long maxLatency, TimeUnit timeUnit) {
+ if (maxLatency < 0)
+ throw new IllegalArgumentException("Negative max latency not allowed " + maxLatency);
+
+ this.maxLatency = timeUnit.toMillis(maxLatency);
+ return this;
+ }
+
+ /**
+ *
+ * @param timeout
+ * The maximum amount of time an unresponsive server will be retried. When this timeout is exceeded, the BatchWriter should throw an exception. For
+ * no timeout set to zero or Long.MAX_VALUE with TimeUnit.MILLIS. Defaults to no timeout.
+ * @param timeUnit
+ * @return this to allow chaining of set methods
+ */
+
+ public BatchWriterConfig setTimeout(long timeout, TimeUnit timeUnit) {
+ if (timeout < 0)
+ throw new IllegalArgumentException("Negative timeout not allowed " + timeout);
+
+ if (timeout == 0)
+ timeout = Long.MAX_VALUE;
+ else
+ this.timeout = timeUnit.toMillis(timeout);
+ return this;
+ }
+
+ /**
+ * @param maxWriteThreads
+ * the maximum number of threads to use for writing data to the tablet servers. Defaults to 3.
+ * @return this to allow chaining of set methods
+ */
+
+ public BatchWriterConfig setMaxWriteThreads(int maxWriteThreads) {
+ if (maxWriteThreads <= 0)
+ throw new IllegalArgumentException("Max threads must be positive " + maxWriteThreads);
+
+ this.maxWriteThreads = maxWriteThreads;
+ return this;
+ }
+
+ public long getMaxMemory() {
+ return maxMemory;
+ }
+
+ public long getMaxLatency(TimeUnit timeUnit) {
+ return timeUnit.convert(maxLatency, TimeUnit.MILLISECONDS);
+ }
+
+ public long getTimeout(TimeUnit timeUnit) {
+ return timeUnit.convert(timeout, TimeUnit.MILLISECONDS);
+ }
+
+ public int getMaxWriteThreads() {
+ return maxWriteThreads;
+ }
+}
@@ -99,13 +99,36 @@ public BatchScanner createBatchScanner(String tableName, Authorizations authoriz
* @return BatchDeleter object for configuring and deleting
* @throws TableNotFoundException
* when the specified table doesn't exist
+ * @deprecated As of 1.5, replaced by {@link #createBatchDeleter(String, Authorizations, int, BatchWriterConfig)}
*/
+ @Deprecated
public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, long maxMemory, long maxLatency,
int maxWriteThreads) throws TableNotFoundException {
return impl.createBatchDeleter(tableName, authorizations, numQueryThreads, maxMemory, maxLatency, maxWriteThreads);
}
/**
+ *
+ * @param tableName
+ * the name of the table to query and delete from
+ * @param authorizations
+ * A set of authorization labels that will be checked against the column visibility of each key in order to filter data. The authorizations passed in
+ * must be a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations (A1, A2) and authorizations (A2, A3) are
+ * passed, then an exception will be thrown.
+ * @param numQueryThreads
+ * the number of concurrent threads to spawn for querying
+ * @param config
+ * configuration used to create batch writer
+ * @return BatchDeleter object for configuring and deleting
+ * @throws TableNotFoundException
+ */
+
+ public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, BatchWriterConfig config)
+ throws TableNotFoundException {
+ return impl.createBatchDeleter(tableName, authorizations, numQueryThreads, config);
+ }
+
+ /**
* Factory method to create a BatchWriter connected to Accumulo.
*
* @param tableName
@@ -120,12 +143,29 @@ public BatchDeleter createBatchDeleter(String tableName, Authorizations authoriz
* @return BatchWriter object for configuring and writing data to
* @throws TableNotFoundException
* when the specified table doesn't exist
+ * @deprecated As of 1.5, replaced by {@link #createBatchWriter(String, BatchWriterConfig)}
*/
+ @Deprecated
public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException {
return impl.createBatchWriter(tableName, maxMemory, maxLatency, maxWriteThreads);
}
/**
+ * Factory method to create a BatchWriter connected to Accumulo.
+ *
+ * @param tableName
+ * the name of the table to insert data into
+ * @param config
+ * configuration used to create batch writer
+ * @return BatchWriter object for configuring and writing data to
+ * @throws TableNotFoundException
+ */
+
+ public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException {
+ return impl.createBatchWriter(tableName, config);
+ }
+
+ /**
* Factory method to create a Multi-Table BatchWriter connected to Accumulo. Multi-table batch writers can queue data for multiple tables, which is good for
* ingesting data into multiple tables from the same source
*
@@ -137,12 +177,27 @@ public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxL
* the maximum number of threads to use for writing data to the tablet servers
*
* @return MultiTableBatchWriter object for configuring and writing data to
+ * @deprecated As of 1.5, replaced by {@link #createMultiTableBatchWriter(BatchWriterConfig)}
*/
+ @Deprecated
public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads) {
return impl.createMultiTableBatchWriter(maxMemory, maxLatency, maxWriteThreads);
}
/**
+ * Factory method to create a Multi-Table BatchWriter connected to Accumulo. Multi-table batch writers can queue data for multiple tables. Also data for
+ * multiple tables can be sent to a server in a single batch. Its an efficient way to ingest data into multiple tables from a single process.
+ *
+ * @param config
+ * configuration used to create multi-table batch writer
+ * @return MultiTableBatchWriter object for configuring and writing data to
+ */
+
+ public MultiTableBatchWriter createMultiTableBatchWriter(BatchWriterConfig config) {
+ return impl.createMultiTableBatchWriter(config);
+ }
+
+ /**
* Factory method to create a Scanner connected to Accumulo.
*
* @param tableName
@@ -17,6 +17,7 @@
package org.apache.accumulo.core.client.impl;
import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.data.Mutation;
@@ -28,10 +29,10 @@
private String table;
private TabletServerBatchWriter bw;
- public BatchWriterImpl(Instance instance, AuthInfo credentials, String table, long maxMemory, long maxLatency, int maxWriteThreads) {
+ public BatchWriterImpl(Instance instance, AuthInfo credentials, String table, BatchWriterConfig config) {
ArgumentChecker.notNull(instance, credentials, table);
this.table = table;
- this.bw = new TabletServerBatchWriter(instance, credentials, maxMemory, maxLatency, maxWriteThreads);
+ this.bw = new TabletServerBatchWriter(instance, credentials, config);
}
@Override
@@ -17,13 +17,15 @@
package org.apache.accumulo.core.client.impl;
import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.cloudtrace.instrument.Tracer;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchDeleter;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
@@ -106,18 +108,34 @@ public BatchScanner createBatchScanner(String tableName, Authorizations authoriz
public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, long maxMemory, long maxLatency,
int maxWriteThreads) throws TableNotFoundException {
ArgumentChecker.notNull(tableName, authorizations);
- return new TabletServerBatchDeleter(instance, credentials, getTableId(tableName), authorizations, numQueryThreads, maxMemory, maxLatency, maxWriteThreads);
+ return new TabletServerBatchDeleter(instance, credentials, getTableId(tableName), authorizations, numQueryThreads, new BatchWriterConfig()
+ .setMaxMemory(maxMemory).setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
+ }
+
+ @Override
+ public BatchDeleter createBatchDeleter(String tableName, Authorizations authorizations, int numQueryThreads, BatchWriterConfig config)
+ throws TableNotFoundException {
+ ArgumentChecker.notNull(tableName, authorizations);
+ return new TabletServerBatchDeleter(instance, credentials, getTableId(tableName), authorizations, numQueryThreads, config);
}
@Override
public BatchWriter createBatchWriter(String tableName, long maxMemory, long maxLatency, int maxWriteThreads) throws TableNotFoundException {
ArgumentChecker.notNull(tableName);
- return new BatchWriterImpl(instance, credentials, getTableId(tableName), maxMemory, maxLatency, maxWriteThreads);
+ return new BatchWriterImpl(instance, credentials, getTableId(tableName), new BatchWriterConfig().setMaxMemory(maxMemory)
+ .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
+ }
+
+ @Override
+ public BatchWriter createBatchWriter(String tableName, BatchWriterConfig config) throws TableNotFoundException {
+ ArgumentChecker.notNull(tableName);
+ return new BatchWriterImpl(instance, credentials, getTableId(tableName), config);
}
@Override
public MultiTableBatchWriter createMultiTableBatchWriter(long maxMemory, long maxLatency, int maxWriteThreads) {
- return new MultiTableBatchWriterImpl(instance, credentials, maxMemory, maxLatency, maxWriteThreads);
+ return new MultiTableBatchWriterImpl(instance, credentials, new BatchWriterConfig().setMaxMemory(maxMemory)
+ .setMaxLatency(maxLatency, TimeUnit.MILLISECONDS).setMaxWriteThreads(maxWriteThreads));
}
@Override
@@ -21,6 +21,7 @@
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.MultiTableBatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -71,10 +72,10 @@ public void flush() {
private HashMap<String,BatchWriter> tableWriters;
private Instance instance;
- public MultiTableBatchWriterImpl(Instance instance, AuthInfo credentials, long maxMemory, long maxLatency, int maxWriteThreads) {
+ public MultiTableBatchWriterImpl(Instance instance, AuthInfo credentials, BatchWriterConfig config) {
ArgumentChecker.notNull(instance, credentials);
this.instance = instance;
- this.bw = new TabletServerBatchWriter(instance, credentials, maxMemory, maxLatency, maxWriteThreads);
+ this.bw = new TabletServerBatchWriter(instance, credentials, config);
tableWriters = new HashMap<String,BatchWriter>();
this.closed = false;
}
@@ -21,6 +21,7 @@
import org.apache.accumulo.core.client.BatchDeleter;
import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -38,27 +39,23 @@
private Instance instance;
private AuthInfo credentials;
private String tableId;
- private long maxMemory;
- private long maxLatency;
- private int maxWriteThreads;
+ private BatchWriterConfig bwConfig;
- public TabletServerBatchDeleter(Instance instance, AuthInfo credentials, String tableId, Authorizations authorizations, int numQueryThreads, long maxMemory,
- long maxLatency, int maxWriteThreads) throws TableNotFoundException {
+ public TabletServerBatchDeleter(Instance instance, AuthInfo credentials, String tableId, Authorizations authorizations, int numQueryThreads,
+ BatchWriterConfig bwConfig) throws TableNotFoundException {
super(instance, credentials, tableId, authorizations, numQueryThreads);
this.instance = instance;
this.credentials = credentials;
this.tableId = tableId;
- this.maxMemory = maxMemory;
- this.maxLatency = maxLatency;
- this.maxWriteThreads = maxWriteThreads;
+ this.bwConfig = bwConfig;
super.addScanIterator(new IteratorSetting(Integer.MAX_VALUE, BatchDeleter.class.getName() + ".NOVALUE", SortedKeyIterator.class));
}
@Override
public void delete() throws MutationsRejectedException, TableNotFoundException {
BatchWriter bw = null;
try {
- bw = new BatchWriterImpl(instance, credentials, tableId, maxMemory, maxLatency, maxWriteThreads);
+ bw = new BatchWriterImpl(instance, credentials, tableId, bwConfig);
Iterator<Entry<Key,Value>> iter = super.iterator();
while (iter.hasNext()) {
Entry<Key,Value> next = iter.next();
Oops, something went wrong.

0 comments on commit 5bd33fa

Please sign in to comment.