From cee2cd8d4b554ad18554f9caac3c18c5d0791552 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Fri, 27 May 2016 17:02:49 -0400 Subject: [PATCH 1/2] ACCUMULO-4318 Made writers and scanners auto closeable --- .../accumulo/core/client/BatchWriter.java | 3 +- .../core/client/ConditionalWriter.java | 3 +- .../accumulo/core/client/ScannerBase.java | 3 +- .../core/client/impl/ScannerImplTest.java | 4 + .../impl/TabletServerBatchReaderTest.java | 6 +- .../examples/simple/reservations/ARS.java | 45 +- .../server/util/MasterMetadataUtil.java | 57 +- .../server/util/MetadataTableUtil.java | 61 +- .../accumulo/gc/SimpleGarbageCollector.java | 4 +- .../accumulo/master/tableOps/CopyFailed.java | 2 + .../apache/accumulo/tserver/TabletServer.java | 6 +- .../accumulo/test/ConditionalWriterIT.java | 1144 ++++++++--------- .../test/functional/BatchWriterFlushIT.java | 33 +- .../accumulo/test/functional/ReadWriteIT.java | 62 +- .../test/functional/SplitRecoveryIT.java | 72 +- .../randomwalk/bulk/ConsistencyCheck.java | 24 +- .../test/randomwalk/conditional/Transfer.java | 94 +- .../test/randomwalk/conditional/Verify.java | 2 + 18 files changed, 817 insertions(+), 808 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java b/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java index b4d81aab4f4..95d87c5337a 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/BatchWriter.java @@ -29,7 +29,7 @@ * In the event that an MutationsRejectedException exception is thrown by one of the methods on a BatchWriter instance, the user should close the current * instance and create a new instance. This is a known limitation which will be addressed by ACCUMULO-2990 in the future. */ -public interface BatchWriter { +public interface BatchWriter extends AutoCloseable { /** * Queues one mutation to write. @@ -66,6 +66,7 @@ public interface BatchWriter { * @throws MutationsRejectedException * this could be thrown because current or previous mutations failed */ + @Override void close() throws MutationsRejectedException; } diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java index 62244e6d053..d13dc0960e8 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java @@ -28,7 +28,7 @@ * * @since 1.6.0 */ -public interface ConditionalWriter { +public interface ConditionalWriter extends AutoCloseable { class Result { private Status status; @@ -131,5 +131,6 @@ public static enum Status { /** * release any resources (like threads pools) used by conditional writer */ + @Override void close(); } diff --git a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java index 354f6f43ee0..6835849bba2 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java @@ -31,7 +31,7 @@ * This class hosts configuration methods that are shared between different types of scanners. * */ -public interface ScannerBase extends Iterable> { +public interface ScannerBase extends Iterable>, AutoCloseable { /** * Add a server-side scan iterator. @@ -164,6 +164,7 @@ public interface ScannerBase extends Iterable> { * * @since 1.5.0 */ + @Override void close(); /** diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java index eedc61d4ad2..38e3c075d40 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/impl/ScannerImplTest.java @@ -45,12 +45,14 @@ public void testValidReadaheadValues() { s.setReadaheadThreshold(Long.MAX_VALUE); Assert.assertEquals(Long.MAX_VALUE, s.getReadaheadThreshold()); + s.close(); } @Test(expected = IllegalArgumentException.class) public void testInValidReadaheadValues() { Scanner s = new ScannerImpl(context, "foo", Authorizations.EMPTY); s.setReadaheadThreshold(-1); + s.close(); } @Test @@ -58,8 +60,10 @@ public void testGetAuthorizations() { Authorizations expected = new Authorizations("a,b"); Scanner s = new ScannerImpl(context, "foo", expected); assertEquals(expected, s.getAuthorizations()); + s.close(); } + @SuppressWarnings("resource") @Test(expected = IllegalArgumentException.class) public void testNullAuthorizationsFails() { new ScannerImpl(context, "foo", null); diff --git a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java index b31050a85d4..af4a4746b50 100644 --- a/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java +++ b/core/src/test/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderTest.java @@ -36,10 +36,12 @@ public void setup() { @Test public void testGetAuthorizations() { Authorizations expected = new Authorizations("a,b"); - BatchScanner s = new TabletServerBatchReader(context, "foo", expected, 1); - assertEquals(expected, s.getAuthorizations()); + try (BatchScanner s = new TabletServerBatchReader(context, "foo", expected, 1)) { + assertEquals(expected, s.getAuthorizations()); + } } + @SuppressWarnings("resource") @Test(expected = IllegalArgumentException.class) public void testNullAuthorizationsFails() { new TabletServerBatchReader(context, "foo", null, 1); diff --git a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java index b9e1a83736c..d99f7af25bb 100644 --- a/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java +++ b/examples/simple/src/main/java/org/apache/accumulo/examples/simple/reservations/ARS.java @@ -20,8 +20,6 @@ import java.util.List; import java.util.Map.Entry; -import jline.console.ConsoleReader; - import org.apache.accumulo.core.client.ClientConfiguration; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.ConditionalWriter.Status; @@ -41,6 +39,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import jline.console.ConsoleReader; + /** * Accumulo Reservation System : An example reservation system using Accumulo. Supports atomic reservations of a resource at a date. Wait list are also * supported. In order to keep the example simple, no checking is done of the date. Also the code is inefficient, if interested in improving it take a look at @@ -88,9 +88,9 @@ public ReservationResult reserve(String what, String when, String who) throws Ex ReservationResult result = ReservationResult.RESERVED; - ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig()); - - try { + // it is important to use an isolated scanner so that only whole mutations are seen + try (ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig()); + Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) { while (true) { Status status = cwriter.write(update).getStatus(); switch (status) { @@ -109,8 +109,6 @@ public ReservationResult reserve(String what, String when, String who) throws Ex // that attempted to make a reservation by putting them later in the list. A more complex solution could involve having independent sub-queues within // the row that approximately maintain arrival order and use exponential back off to fairly merge the sub-queues into the main queue. - // it is important to use an isolated scanner so that only whole mutations are seen - Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY)); scanner.setRange(new Range(row)); int seq = -1; @@ -152,10 +150,7 @@ public ReservationResult reserve(String what, String when, String who) throws Ex else result = ReservationResult.WAIT_LISTED; } - } finally { - cwriter.close(); } - } public void cancel(String what, String when, String who) throws Exception { @@ -166,13 +161,10 @@ public void cancel(String what, String when, String who) throws Exception { // will cause any concurrent reservations to retry. If this delete were done using a batch writer, then a concurrent reservation could report WAIT_LISTED // when it actually got the reservation. - ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig()); - - try { + // its important to use an isolated scanner so that only whole mutations are seen + try (ConditionalWriter cwriter = conn.createConditionalWriter(rTable, new ConditionalWriterConfig()); + Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) { while (true) { - - // its important to use an isolated scanner so that only whole mutations are seen - Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY)); scanner.setRange(new Range(row)); int seq = -1; @@ -217,8 +209,6 @@ public void cancel(String what, String when, String who) throws Exception { } } - } finally { - cwriter.close(); } } @@ -226,18 +216,19 @@ public List list(String what, String when) throws Exception { String row = what + ":" + when; // its important to use an isolated scanner so that only whole mutations are seen - Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY)); - scanner.setRange(new Range(row)); - scanner.fetchColumnFamily(new Text("res")); + try (Scanner scanner = new IsolatedScanner(conn.createScanner(rTable, Authorizations.EMPTY))) { + scanner.setRange(new Range(row)); + scanner.fetchColumnFamily(new Text("res")); - List reservations = new ArrayList(); + List reservations = new ArrayList(); - for (Entry entry : scanner) { - String val = entry.getValue().toString(); - reservations.add(val); - } + for (Entry entry : scanner) { + String val = entry.getValue().toString(); + reservations.add(val); + } - return reservations; + return reservations; + } } public static void main(String[] args) throws Exception { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java index b9e52e3388b..01b4f043335 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java @@ -16,6 +16,7 @@ */ package org.apache.accumulo.server.util; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static java.nio.charset.StandardCharsets.UTF_8; import java.io.IOException; @@ -61,8 +62,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; - /** * */ @@ -151,42 +150,44 @@ private static KeyExtent fixSplit(ClientContext context, String table, Text meta // check to see if prev tablet exist in metadata tablet Key prevRowKey = new Key(new Text(KeyExtent.getMetadataEntry(table, metadataPrevEndRow))); - ScannerImpl scanner2 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); - scanner2.setRange(new Range(prevRowKey, prevRowKey.followingKey(PartialKey.ROW))); + try (ScannerImpl scanner2 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) { + scanner2.setRange(new Range(prevRowKey, prevRowKey.followingKey(PartialKey.ROW))); - VolumeManager fs = VolumeManagerImpl.get(); - if (!scanner2.iterator().hasNext()) { - log.info("Rolling back incomplete split " + metadataEntry + " " + metadataPrevEndRow); - MetadataTableUtil.rollBackSplit(metadataEntry, KeyExtent.decodePrevEndRow(oper), context, lock); - return new KeyExtent(metadataEntry, KeyExtent.decodePrevEndRow(oper)); - } else { - log.info("Finishing incomplete split " + metadataEntry + " " + metadataPrevEndRow); + VolumeManager fs = VolumeManagerImpl.get(); + if (!scanner2.iterator().hasNext()) { + log.info("Rolling back incomplete split " + metadataEntry + " " + metadataPrevEndRow); + MetadataTableUtil.rollBackSplit(metadataEntry, KeyExtent.decodePrevEndRow(oper), context, lock); + return new KeyExtent(metadataEntry, KeyExtent.decodePrevEndRow(oper)); + } else { + log.info("Finishing incomplete split " + metadataEntry + " " + metadataPrevEndRow); - List highDatafilesToRemove = new ArrayList(); + List highDatafilesToRemove = new ArrayList(); - Scanner scanner3 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); - Key rowKey = new Key(metadataEntry); + Scanner scanner3 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); + Key rowKey = new Key(metadataEntry); - SortedMap origDatafileSizes = new TreeMap(); - SortedMap highDatafileSizes = new TreeMap(); - SortedMap lowDatafileSizes = new TreeMap(); - scanner3.fetchColumnFamily(DataFileColumnFamily.NAME); - scanner3.setRange(new Range(rowKey, rowKey.followingKey(PartialKey.ROW))); + SortedMap origDatafileSizes = new TreeMap(); + SortedMap highDatafileSizes = new TreeMap(); + SortedMap lowDatafileSizes = new TreeMap(); + scanner3.fetchColumnFamily(DataFileColumnFamily.NAME); + scanner3.setRange(new Range(rowKey, rowKey.followingKey(PartialKey.ROW))); - for (Entry entry : scanner3) { - if (entry.getKey().compareColumnFamily(DataFileColumnFamily.NAME) == 0) { - origDatafileSizes.put(new FileRef(fs, entry.getKey()), new DataFileValue(entry.getValue().get())); + for (Entry entry : scanner3) { + if (entry.getKey().compareColumnFamily(DataFileColumnFamily.NAME) == 0) { + origDatafileSizes.put(new FileRef(fs, entry.getKey()), new DataFileValue(entry.getValue().get())); + } } - } - MetadataTableUtil.splitDatafiles(table, metadataPrevEndRow, splitRatio, new HashMap(), origDatafileSizes, lowDatafileSizes, - highDatafileSizes, highDatafilesToRemove); + scanner3.close(); - MetadataTableUtil.finishSplit(metadataEntry, highDatafileSizes, highDatafilesToRemove, context, lock); + MetadataTableUtil.splitDatafiles(table, metadataPrevEndRow, splitRatio, new HashMap(), origDatafileSizes, lowDatafileSizes, + highDatafileSizes, highDatafilesToRemove); - return new KeyExtent(metadataEntry, KeyExtent.encodePrevEndRow(metadataPrevEndRow)); - } + MetadataTableUtil.finishSplit(metadataEntry, highDatafileSizes, highDatafilesToRemove, context, lock); + return new KeyExtent(metadataEntry, KeyExtent.encodePrevEndRow(metadataPrevEndRow)); + } + } } private static TServerInstance getTServerInstance(String address, ZooLock zooLock) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index 5081a9ce7bd..c04d43e5c44 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -297,6 +297,8 @@ public static SortedMap getDataFileSizes(KeyExtent extent sizes.put(new FileRef(fs, entry.getKey()), dfv); } + mdScanner.close(); + return sizes; } @@ -521,23 +523,24 @@ public static Pair,SortedMap> getFileAndLo } else { String systemTableToCheck = extent.isMeta() ? RootTable.ID : MetadataTable.ID; - Scanner scanner = new ScannerImpl(context, systemTableToCheck, Authorizations.EMPTY); - scanner.fetchColumnFamily(LogColumnFamily.NAME); - scanner.fetchColumnFamily(DataFileColumnFamily.NAME); - scanner.setRange(extent.toMetadataRange()); + try (Scanner scanner = new ScannerImpl(context, systemTableToCheck, Authorizations.EMPTY)) { + scanner.fetchColumnFamily(LogColumnFamily.NAME); + scanner.fetchColumnFamily(DataFileColumnFamily.NAME); + scanner.setRange(extent.toMetadataRange()); - for (Entry entry : scanner) { - if (!entry.getKey().getRow().equals(extent.getMetadataEntry())) { - throw new RuntimeException("Unexpected row " + entry.getKey().getRow() + " expected " + extent.getMetadataEntry()); - } + for (Entry entry : scanner) { + if (!entry.getKey().getRow().equals(extent.getMetadataEntry())) { + throw new RuntimeException("Unexpected row " + entry.getKey().getRow() + " expected " + extent.getMetadataEntry()); + } - if (entry.getKey().getColumnFamily().equals(LogColumnFamily.NAME)) { - result.add(LogEntry.fromKeyValue(entry.getKey(), entry.getValue())); - } else if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) { - DataFileValue dfv = new DataFileValue(entry.getValue().get()); - sizes.put(new FileRef(fs, entry.getKey()), dfv); - } else { - throw new RuntimeException("Unexpected col fam " + entry.getKey().getColumnFamily()); + if (entry.getKey().getColumnFamily().equals(LogColumnFamily.NAME)) { + result.add(LogEntry.fromKeyValue(entry.getKey(), entry.getValue())); + } else if (entry.getKey().getColumnFamily().equals(DataFileColumnFamily.NAME)) { + DataFileValue dfv = new DataFileValue(entry.getValue().get()); + sizes.put(new FileRef(fs, entry.getKey()), dfv); + } else { + throw new RuntimeException("Unexpected col fam " + entry.getKey().getColumnFamily()); + } } } } @@ -889,20 +892,20 @@ public static void chopped(AccumuloServerContext context, KeyExtent extent, ZooL } public static void removeBulkLoadEntries(Connector conn, String tableId, long tid) throws Exception { - Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); - mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); - mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); - BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); - for (Entry entry : mscanner) { - log.debug("Looking at entry " + entry + " with tid " + tid); - if (Long.parseLong(entry.getValue().toString()) == tid) { - log.debug("deleting entry " + entry); - Mutation m = new Mutation(entry.getKey().getRow()); - m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()); - bw.addMutation(m); + try (Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); + BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig())) { + mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); + mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); + for (Entry entry : mscanner) { + log.debug("Looking at entry " + entry + " with tid " + tid); + if (Long.parseLong(entry.getValue().toString()) == tid) { + log.debug("deleting entry " + entry); + Mutation m = new Mutation(entry.getKey().getRow()); + m.putDelete(entry.getKey().getColumnFamily(), entry.getKey().getColumnQualifier()); + bw.addMutation(m); + } } } - bw.close(); } public static List getBulkFilesLoaded(Connector conn, KeyExtent extent, long tid) throws IOException { @@ -917,6 +920,7 @@ public static List getBulkFilesLoaded(Connector conn, KeyExtent extent, result.add(new FileRef(fs, entry.getKey())); } } + mscanner.close(); return result; } catch (TableNotFoundException ex) { // unlikely @@ -940,6 +944,7 @@ public static Map> getBulkFilesLoaded(ClientC } lst.add(new FileRef(fs, entry.getKey())); } + scanner.close(); return result; } @@ -995,6 +1000,7 @@ public static void moveMetaDeleteMarkers(ClientContext context) { break; } } + scanner.close(); } public static void moveMetaDeleteMarkersFrom14(ClientContext context) { @@ -1012,6 +1018,7 @@ public static void moveMetaDeleteMarkersFrom14(ClientContext context) { break; } } + scanner.close(); } private static void moveDeleteEntry(ClientContext context, KeyExtent oldExtent, Entry entry, String rowID, String prefix) { diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 5e8c038ceae..cc438026ec2 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -16,6 +16,8 @@ */ package org.apache.accumulo.gc; +import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; + import java.io.FileNotFoundException; import java.io.IOException; import java.net.UnknownHostException; @@ -110,7 +112,6 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Maps; import com.google.common.net.HostAndPort; -import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import com.google.protobuf.InvalidProtocolBufferException; public class SimpleGarbageCollector extends AccumuloServerContext implements Iface { @@ -269,6 +270,7 @@ public boolean getCandidates(String continuePoint, List result) throws T @Override public Iterator getBlipIterator() throws TableNotFoundException, AccumuloException, AccumuloSecurityException { + @SuppressWarnings("resource") IsolatedScanner scanner = new IsolatedScanner(getConnector().createScanner(tableName, Authorizations.EMPTY)); scanner.setRange(MetadataSchema.BlipSection.getRange()); diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java index 068aa818467..298a0ce54bb 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java @@ -125,6 +125,8 @@ public Repo call(long tid, Master master) throws Exception { } } + mscanner.close(); + // move failed files that were not loaded for (String failure : failures.values()) { Path orig = new Path(failure); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 1523c5589d0..b1b4202efa0 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -142,6 +142,8 @@ import org.apache.accumulo.core.util.ServerServices; import org.apache.accumulo.core.util.ServerServices.Service; import org.apache.accumulo.core.util.SimpleThreadPool; +import org.apache.accumulo.core.util.ratelimit.RateLimiter; +import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.util.LoggingRunnable; import org.apache.accumulo.fate.zookeeper.IZooReaderWriter; @@ -256,8 +258,6 @@ import org.slf4j.LoggerFactory; import com.google.common.net.HostAndPort; -import org.apache.accumulo.core.util.ratelimit.RateLimiter; -import org.apache.accumulo.core.util.ratelimit.SharedRateLimiterFactory; public class TabletServer extends AccumuloServerContext implements Runnable { @@ -2602,6 +2602,8 @@ public static Pair verifyTabletInformation(AccumuloServerContext for (Entry entry : scanner) tkv.put(entry.getKey(), entry.getValue()); + scanner.close(); + // only populate map after success if (tabletsKeyValues == null) { tabletsKeyValues = tkv; diff --git a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java index 4018d782365..a3eed9f67b6 100644 --- a/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java +++ b/test/src/main/java/org/apache/accumulo/test/ConditionalWriterIT.java @@ -141,77 +141,78 @@ public void testBasic() throws Exception { conn.tableOperations().create(tableName); - ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig()); - - // mutation conditional on column tx:seq not existing - ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq")); - cm0.put("name", "last", "doe"); - cm0.put("name", "first", "john"); - cm0.put("tx", "seq", "1"); - Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus()); - Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus()); - - // mutation conditional on column tx:seq being 1 - ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1")); - cm1.put("name", "last", "Doe"); - cm1.put("tx", "seq", "2"); - Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus()); - - // test condition where value differs - ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1")); - cm2.put("name", "last", "DOE"); - cm2.put("tx", "seq", "2"); - Assert.assertEquals(Status.REJECTED, cw.write(cm2).getStatus()); - - // test condition where column does not exists - ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("txtypo", "seq").setValue("1")); - cm3.put("name", "last", "deo"); - cm3.put("tx", "seq", "2"); - Assert.assertEquals(Status.REJECTED, cw.write(cm3).getStatus()); - - // test two conditions, where one should fail - ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("2"), new Condition("name", "last").setValue("doe")); - cm4.put("name", "last", "deo"); - cm4.put("tx", "seq", "3"); - Assert.assertEquals(Status.REJECTED, cw.write(cm4).getStatus()); - - // test two conditions, where one should fail - ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"), new Condition("name", "last").setValue("Doe")); - cm5.put("name", "last", "deo"); - cm5.put("tx", "seq", "3"); - Assert.assertEquals(Status.REJECTED, cw.write(cm5).getStatus()); - - // ensure rejected mutations did not write - Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY); - scanner.fetchColumn(new Text("name"), new Text("last")); - scanner.setRange(new Range("99006")); - Entry entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("Doe", entry.getValue().toString()); + try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig())) { + + // mutation conditional on column tx:seq not existing + ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq")); + cm0.put("name", "last", "doe"); + cm0.put("name", "first", "john"); + cm0.put("tx", "seq", "1"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus()); + Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus()); + + // mutation conditional on column tx:seq being 1 + ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1")); + cm1.put("name", "last", "Doe"); + cm1.put("tx", "seq", "2"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus()); + + // test condition where value differs + ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1")); + cm2.put("name", "last", "DOE"); + cm2.put("tx", "seq", "2"); + Assert.assertEquals(Status.REJECTED, cw.write(cm2).getStatus()); + + // test condition where column does not exists + ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("txtypo", "seq").setValue("1")); + cm3.put("name", "last", "deo"); + cm3.put("tx", "seq", "2"); + Assert.assertEquals(Status.REJECTED, cw.write(cm3).getStatus()); + + // test two conditions, where one should fail + ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("2"), new Condition("name", "last").setValue("doe")); + cm4.put("name", "last", "deo"); + cm4.put("tx", "seq", "3"); + Assert.assertEquals(Status.REJECTED, cw.write(cm4).getStatus()); + + // test two conditions, where one should fail + ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("1"), new Condition("name", "last").setValue("Doe")); + cm5.put("name", "last", "deo"); + cm5.put("tx", "seq", "3"); + Assert.assertEquals(Status.REJECTED, cw.write(cm5).getStatus()); + + // ensure rejected mutations did not write + Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY); + scanner.fetchColumn(new Text("name"), new Text("last")); + scanner.setRange(new Range("99006")); + Entry entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("Doe", entry.getValue().toString()); - // test w/ two conditions that are met - ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("2"), new Condition("name", "last").setValue("Doe")); - cm6.put("name", "last", "DOE"); - cm6.put("tx", "seq", "3"); - Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus()); + // test w/ two conditions that are met + ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("2"), new Condition("name", "last").setValue("Doe")); + cm6.put("name", "last", "DOE"); + cm6.put("tx", "seq", "3"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus()); - entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("DOE", entry.getValue().toString()); + entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("DOE", entry.getValue().toString()); - // test a conditional mutation that deletes - ConditionalMutation cm7 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("3")); - cm7.putDelete("name", "last"); - cm7.putDelete("name", "first"); - cm7.putDelete("tx", "seq"); - Assert.assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus()); + // test a conditional mutation that deletes + ConditionalMutation cm7 = new ConditionalMutation("99006", new Condition("tx", "seq").setValue("3")); + cm7.putDelete("name", "last"); + cm7.putDelete("name", "first"); + cm7.putDelete("tx", "seq"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus()); - Assert.assertFalse("Did not expect to find any results", scanner.iterator().hasNext()); + Assert.assertFalse("Did not expect to find any results", scanner.iterator().hasNext()); - // add the row back - Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus()); - Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus()); + // add the row back + Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus()); + Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus()); - entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("doe", entry.getValue().toString()); + entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("doe", entry.getValue().toString()); + } } @Test @@ -242,74 +243,74 @@ public void testFields() throws Exception { conn.tableOperations().create(tableName); - ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(auths)); + try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(auths))) { - ColumnVisibility cva = new ColumnVisibility("A"); - ColumnVisibility cvb = new ColumnVisibility("B"); + ColumnVisibility cva = new ColumnVisibility("A"); + ColumnVisibility cvb = new ColumnVisibility("B"); - ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva)); - cm0.put("name", "last", cva, "doe"); - cm0.put("name", "first", cva, "john"); - cm0.put("tx", "seq", cva, "1"); - Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus()); + ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva)); + cm0.put("name", "last", cva, "doe"); + cm0.put("name", "first", cva, "john"); + cm0.put("tx", "seq", cva, "1"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus()); - Scanner scanner = conn.createScanner(tableName, auths); - scanner.setRange(new Range("99006")); - // TODO verify all columns - scanner.fetchColumn(new Text("tx"), new Text("seq")); - Entry entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("1", entry.getValue().toString()); - long ts = entry.getKey().getTimestamp(); - - // test wrong colf - ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("txA", "seq").setVisibility(cva).setValue("1")); - cm1.put("name", "last", cva, "Doe"); - cm1.put("name", "first", cva, "John"); - cm1.put("tx", "seq", cva, "2"); - Assert.assertEquals(Status.REJECTED, cw.write(cm1).getStatus()); - - // test wrong colq - ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seqA").setVisibility(cva).setValue("1")); - cm2.put("name", "last", cva, "Doe"); - cm2.put("name", "first", cva, "John"); - cm2.put("tx", "seq", cva, "2"); - Assert.assertEquals(Status.REJECTED, cw.write(cm2).getStatus()); - - // test wrong colv - ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1")); - cm3.put("name", "last", cva, "Doe"); - cm3.put("name", "first", cva, "John"); - cm3.put("tx", "seq", cva, "2"); - Assert.assertEquals(Status.REJECTED, cw.write(cm3).getStatus()); - - // test wrong timestamp - ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts + 1).setValue("1")); - cm4.put("name", "last", cva, "Doe"); - cm4.put("name", "first", cva, "John"); - cm4.put("tx", "seq", cva, "2"); - Assert.assertEquals(Status.REJECTED, cw.write(cm4).getStatus()); - - // test wrong timestamp - ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts - 1).setValue("1")); - cm5.put("name", "last", cva, "Doe"); - cm5.put("name", "first", cva, "John"); - cm5.put("tx", "seq", cva, "2"); - Assert.assertEquals(Status.REJECTED, cw.write(cm5).getStatus()); - - // ensure no updates were made - entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("1", entry.getValue().toString()); - - // set all columns correctly - ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts).setValue("1")); - cm6.put("name", "last", cva, "Doe"); - cm6.put("name", "first", cva, "John"); - cm6.put("tx", "seq", cva, "2"); - Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus()); - - entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("2", entry.getValue().toString()); + Scanner scanner = conn.createScanner(tableName, auths); + scanner.setRange(new Range("99006")); + // TODO verify all columns + scanner.fetchColumn(new Text("tx"), new Text("seq")); + Entry entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("1", entry.getValue().toString()); + long ts = entry.getKey().getTimestamp(); + + // test wrong colf + ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("txA", "seq").setVisibility(cva).setValue("1")); + cm1.put("name", "last", cva, "Doe"); + cm1.put("name", "first", cva, "John"); + cm1.put("tx", "seq", cva, "2"); + Assert.assertEquals(Status.REJECTED, cw.write(cm1).getStatus()); + + // test wrong colq + ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seqA").setVisibility(cva).setValue("1")); + cm2.put("name", "last", cva, "Doe"); + cm2.put("name", "first", cva, "John"); + cm2.put("tx", "seq", cva, "2"); + Assert.assertEquals(Status.REJECTED, cw.write(cm2).getStatus()); + + // test wrong colv + ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1")); + cm3.put("name", "last", cva, "Doe"); + cm3.put("name", "first", cva, "John"); + cm3.put("tx", "seq", cva, "2"); + Assert.assertEquals(Status.REJECTED, cw.write(cm3).getStatus()); + + // test wrong timestamp + ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts + 1).setValue("1")); + cm4.put("name", "last", cva, "Doe"); + cm4.put("name", "first", cva, "John"); + cm4.put("tx", "seq", cva, "2"); + Assert.assertEquals(Status.REJECTED, cw.write(cm4).getStatus()); + + // test wrong timestamp + ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts - 1).setValue("1")); + cm5.put("name", "last", cva, "Doe"); + cm5.put("name", "first", cva, "John"); + cm5.put("tx", "seq", cva, "2"); + Assert.assertEquals(Status.REJECTED, cw.write(cm5).getStatus()); + + // ensure no updates were made + entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("1", entry.getValue().toString()); + + // set all columns correctly + ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cva).setTimestamp(ts).setValue("1")); + cm6.put("name", "last", cva, "Doe"); + cm6.put("name", "first", cva, "John"); + cm6.put("tx", "seq", cva, "2"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus()); + entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("2", entry.getValue().toString()); + } } @Test @@ -327,87 +328,86 @@ public void testBadColVis() throws Exception { Authorizations filteredAuths = new Authorizations("A"); - ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(filteredAuths)); - ColumnVisibility cva = new ColumnVisibility("A"); ColumnVisibility cvb = new ColumnVisibility("B"); ColumnVisibility cvc = new ColumnVisibility("C"); - // User has authorization, but didn't include it in the writer - ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb)); - cm0.put("name", "last", cva, "doe"); - cm0.put("name", "first", cva, "john"); - cm0.put("tx", "seq", cva, "1"); - Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm0).getStatus()); - - ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1")); - cm1.put("name", "last", cva, "doe"); - cm1.put("name", "first", cva, "john"); - cm1.put("tx", "seq", cva, "1"); - Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm1).getStatus()); - - // User does not have the authorization - ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvc)); - cm2.put("name", "last", cva, "doe"); - cm2.put("name", "first", cva, "john"); - cm2.put("tx", "seq", cva, "1"); - Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm2).getStatus()); - - ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvc).setValue("1")); - cm3.put("name", "last", cva, "doe"); - cm3.put("name", "first", cva, "john"); - cm3.put("tx", "seq", cva, "1"); - Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm3).getStatus()); - - // if any visibility is bad, good visibilities don't override - ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva)); - - cm4.put("name", "last", cva, "doe"); - cm4.put("name", "first", cva, "john"); - cm4.put("tx", "seq", cva, "1"); - Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm4).getStatus()); - - ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"), new Condition("tx", "seq") - .setVisibility(cva).setValue("1")); - cm5.put("name", "last", cva, "doe"); - cm5.put("name", "first", cva, "john"); - cm5.put("tx", "seq", cva, "1"); - Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm5).getStatus()); - - ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"), - new Condition("tx", "seq").setVisibility(cva)); - cm6.put("name", "last", cva, "doe"); - cm6.put("name", "first", cva, "john"); - cm6.put("tx", "seq", cva, "1"); - Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm6).getStatus()); - - ConditionalMutation cm7 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva) - .setValue("1")); - cm7.put("name", "last", cva, "doe"); - cm7.put("name", "first", cva, "john"); - cm7.put("tx", "seq", cva, "1"); - Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm7).getStatus()); - - cw.close(); + try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(filteredAuths))) { + + // User has authorization, but didn't include it in the writer + ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb)); + cm0.put("name", "last", cva, "doe"); + cm0.put("name", "first", cva, "john"); + cm0.put("tx", "seq", cva, "1"); + Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm0).getStatus()); + + ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1")); + cm1.put("name", "last", cva, "doe"); + cm1.put("name", "first", cva, "john"); + cm1.put("tx", "seq", cva, "1"); + Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm1).getStatus()); + + // User does not have the authorization + ConditionalMutation cm2 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvc)); + cm2.put("name", "last", cva, "doe"); + cm2.put("name", "first", cva, "john"); + cm2.put("tx", "seq", cva, "1"); + Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm2).getStatus()); + + ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvc).setValue("1")); + cm3.put("name", "last", cva, "doe"); + cm3.put("name", "first", cva, "john"); + cm3.put("tx", "seq", cva, "1"); + Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm3).getStatus()); + + // if any visibility is bad, good visibilities don't override + ConditionalMutation cm4 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva)); + + cm4.put("name", "last", cva, "doe"); + cm4.put("name", "first", cva, "john"); + cm4.put("tx", "seq", cva, "1"); + Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm4).getStatus()); + + ConditionalMutation cm5 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"), new Condition("tx", "seq") + .setVisibility(cva).setValue("1")); + cm5.put("name", "last", cva, "doe"); + cm5.put("name", "first", cva, "john"); + cm5.put("tx", "seq", cva, "1"); + Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm5).getStatus()); + + ConditionalMutation cm6 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb).setValue("1"), + new Condition("tx", "seq").setVisibility(cva)); + cm6.put("name", "last", cva, "doe"); + cm6.put("name", "first", cva, "john"); + cm6.put("tx", "seq", cva, "1"); + Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm6).getStatus()); + + ConditionalMutation cm7 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva) + .setValue("1")); + cm7.put("name", "last", cva, "doe"); + cm7.put("name", "first", cva, "john"); + cm7.put("tx", "seq", cva, "1"); + Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm7).getStatus()); + + } // test passing auths that exceed users configured auths Authorizations exceedingAuths = new Authorizations("A", "B", "D"); - ConditionalWriter cw2 = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(exceedingAuths)); + try (ConditionalWriter cw2 = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(exceedingAuths))) { - ConditionalMutation cm8 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva) - .setValue("1")); - cm8.put("name", "last", cva, "doe"); - cm8.put("name", "first", cva, "john"); - cm8.put("tx", "seq", cva, "1"); + ConditionalMutation cm8 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva) + .setValue("1")); + cm8.put("name", "last", cva, "doe"); + cm8.put("name", "first", cva, "john"); + cm8.put("tx", "seq", cva, "1"); - try { - Status status = cw2.write(cm8).getStatus(); - Assert.fail("Writing mutation with Authorizations the user doesn't have should fail. Got status: " + status); - } catch (AccumuloSecurityException ase) { - // expected, check specific failure? - } finally { - cw2.close(); + try { + Status status = cw2.write(cm8).getStatus(); + Assert.fail("Writing mutation with Authorizations the user doesn't have should fail. Got status: " + status); + } catch (AccumuloSecurityException ase) { + // expected, check specific failure? + } } } @@ -424,21 +424,20 @@ public void testConstraints() throws Exception { Scanner scanner = conn.createScanner(tableName + "_clone", new Authorizations()); - ConditionalWriter cw = conn.createConditionalWriter(tableName + "_clone", new ConditionalWriterConfig()); + try (ConditionalWriter cw = conn.createConditionalWriter(tableName + "_clone", new ConditionalWriterConfig())) { - ConditionalMutation cm0 = new ConditionalMutation("99006+", new Condition("tx", "seq")); - cm0.put("tx", "seq", "1"); + ConditionalMutation cm0 = new ConditionalMutation("99006+", new Condition("tx", "seq")); + cm0.put("tx", "seq", "1"); - Assert.assertEquals(Status.VIOLATED, cw.write(cm0).getStatus()); - Assert.assertFalse("Should find no results in the table is mutation result was violated", scanner.iterator().hasNext()); - - ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq")); - cm1.put("tx", "seq", "1"); + Assert.assertEquals(Status.VIOLATED, cw.write(cm0).getStatus()); + Assert.assertFalse("Should find no results in the table is mutation result was violated", scanner.iterator().hasNext()); - Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus()); - Assert.assertTrue("Accepted result should be returned when reading table", scanner.iterator().hasNext()); + ConditionalMutation cm1 = new ConditionalMutation("99006", new Condition("tx", "seq")); + cm1.put("tx", "seq", "1"); - cw.close(); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus()); + Assert.assertTrue("Accepted result should be returned when reading table", scanner.iterator().hasNext()); + } } @Test @@ -488,55 +487,55 @@ public void testIterators() throws Exception { Entry entry = Iterables.getOnlyElement(scanner); Assert.assertEquals("3", entry.getValue().toString()); - ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig()); + try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig())) { - ConditionalMutation cm0 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("3")); - cm0.put("count", "comments", "1"); - Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus()); - entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("3", entry.getValue().toString()); + ConditionalMutation cm0 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("3")); + cm0.put("count", "comments", "1"); + Assert.assertEquals(Status.REJECTED, cw.write(cm0).getStatus()); + entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("3", entry.getValue().toString()); - ConditionalMutation cm1 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(iterConfig).setValue("3")); - cm1.put("count", "comments", "1"); - Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus()); - entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("4", entry.getValue().toString()); + ConditionalMutation cm1 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(iterConfig).setValue("3")); + cm1.put("count", "comments", "1"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus()); + entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("4", entry.getValue().toString()); - ConditionalMutation cm2 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("4")); - cm2.put("count", "comments", "1"); - Assert.assertEquals(Status.REJECTED, cw.write(cm1).getStatus()); - entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("4", entry.getValue().toString()); + ConditionalMutation cm2 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("4")); + cm2.put("count", "comments", "1"); + Assert.assertEquals(Status.REJECTED, cw.write(cm1).getStatus()); + entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("4", entry.getValue().toString()); - // run test with multiple iterators passed in same batch and condition with two iterators + // run test with multiple iterators passed in same batch and condition with two iterators - ConditionalMutation cm3 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(iterConfig).setValue("4")); - cm3.put("count", "comments", "1"); + ConditionalMutation cm3 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(iterConfig).setValue("4")); + cm3.put("count", "comments", "1"); - ConditionalMutation cm4 = new ConditionalMutation("ACCUMULO-1001", new Condition("count2", "comments").setIterators(iterConfig2).setValue("2")); - cm4.put("count2", "comments", "1"); + ConditionalMutation cm4 = new ConditionalMutation("ACCUMULO-1001", new Condition("count2", "comments").setIterators(iterConfig2).setValue("2")); + cm4.put("count2", "comments", "1"); - ConditionalMutation cm5 = new ConditionalMutation("ACCUMULO-1002", new Condition("count2", "comments").setIterators(iterConfig2, iterConfig3).setValue("2")); - cm5.put("count2", "comments", "1"); + ConditionalMutation cm5 = new ConditionalMutation("ACCUMULO-1002", new Condition("count2", "comments").setIterators(iterConfig2, iterConfig3).setValue( + "2")); + cm5.put("count2", "comments", "1"); - Iterator results = cw.write(Arrays.asList(cm3, cm4, cm5).iterator()); - Map actual = new HashMap(); + Iterator results = cw.write(Arrays.asList(cm3, cm4, cm5).iterator()); + Map actual = new HashMap(); - while (results.hasNext()) { - Result result = results.next(); - String k = new String(result.getMutation().getRow()); - Assert.assertFalse("Did not expect to see multiple resultus for the row: " + k, actual.containsKey(k)); - actual.put(k, result.getStatus()); - } - - Map expected = new HashMap(); - expected.put("ACCUMULO-1000", Status.ACCEPTED); - expected.put("ACCUMULO-1001", Status.ACCEPTED); - expected.put("ACCUMULO-1002", Status.REJECTED); + while (results.hasNext()) { + Result result = results.next(); + String k = new String(result.getMutation().getRow()); + Assert.assertFalse("Did not expect to see multiple resultus for the row: " + k, actual.containsKey(k)); + actual.put(k, result.getStatus()); + } - Assert.assertEquals(expected, actual); + Map expected = new HashMap(); + expected.put("ACCUMULO-1000", Status.ACCEPTED); + expected.put("ACCUMULO-1001", Status.ACCEPTED); + expected.put("ACCUMULO-1002", Status.REJECTED); - cw.close(); + Assert.assertEquals(expected, actual); + } } public static class AddingIterator extends WrappingIterator { @@ -611,62 +610,59 @@ public void testTableAndConditionIterators() throws Exception { conn.tableOperations().offline(tableName, true); conn.tableOperations().online(tableName, true); - ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig()); - - ConditionalMutation cm6 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("8")); - cm6.put("count", "comments", "7"); - Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus()); - - Scanner scanner = conn.createScanner(tableName, new Authorizations()); - scanner.setRange(new Range("ACCUMULO-1000")); - scanner.fetchColumn(new Text("count"), new Text("comments")); + try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig())) { - Entry entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("9", entry.getValue().toString()); + ConditionalMutation cm6 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("8")); + cm6.put("count", "comments", "7"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm6).getStatus()); - ConditionalMutation cm7 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2).setValue("27")); - cm7.put("count", "comments", "8"); - Assert.assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus()); + Scanner scanner = conn.createScanner(tableName, new Authorizations()); + scanner.setRange(new Range("ACCUMULO-1000")); + scanner.fetchColumn(new Text("count"), new Text("comments")); - entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("10", entry.getValue().toString()); + Entry entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("9", entry.getValue().toString()); - ConditionalMutation cm8 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2, aiConfig3).setValue("35")); - cm8.put("count", "comments", "9"); - Assert.assertEquals(Status.ACCEPTED, cw.write(cm8).getStatus()); + ConditionalMutation cm7 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2).setValue("27")); + cm7.put("count", "comments", "8"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm7).getStatus()); - entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("11", entry.getValue().toString()); + entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("10", entry.getValue().toString()); - ConditionalMutation cm3 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2).setValue("33")); - cm3.put("count", "comments", "3"); + ConditionalMutation cm8 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2, aiConfig3).setValue("35")); + cm8.put("count", "comments", "9"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm8).getStatus()); - ConditionalMutation cm4 = new ConditionalMutation("ACCUMULO-1001", new Condition("count", "comments").setIterators(aiConfig3).setValue("14")); - cm4.put("count", "comments", "3"); + entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("11", entry.getValue().toString()); - ConditionalMutation cm5 = new ConditionalMutation("ACCUMULO-1002", new Condition("count", "comments").setIterators(aiConfig3).setValue("10")); - cm5.put("count", "comments", "3"); + ConditionalMutation cm3 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setIterators(aiConfig2).setValue("33")); + cm3.put("count", "comments", "3"); - Iterator results = cw.write(Arrays.asList(cm3, cm4, cm5).iterator()); - Map actual = new HashMap(); + ConditionalMutation cm4 = new ConditionalMutation("ACCUMULO-1001", new Condition("count", "comments").setIterators(aiConfig3).setValue("14")); + cm4.put("count", "comments", "3"); - while (results.hasNext()) { - Result result = results.next(); - String k = new String(result.getMutation().getRow()); - Assert.assertFalse("Did not expect to see multiple resultus for the row: " + k, actual.containsKey(k)); - actual.put(k, result.getStatus()); - } + ConditionalMutation cm5 = new ConditionalMutation("ACCUMULO-1002", new Condition("count", "comments").setIterators(aiConfig3).setValue("10")); + cm5.put("count", "comments", "3"); - cw.close(); + Iterator results = cw.write(Arrays.asList(cm3, cm4, cm5).iterator()); + Map actual = new HashMap(); - Map expected = new HashMap(); - expected.put("ACCUMULO-1000", Status.ACCEPTED); - expected.put("ACCUMULO-1001", Status.ACCEPTED); - expected.put("ACCUMULO-1002", Status.REJECTED); + while (results.hasNext()) { + Result result = results.next(); + String k = new String(result.getMutation().getRow()); + Assert.assertFalse("Did not expect to see multiple resultus for the row: " + k, actual.containsKey(k)); + actual.put(k, result.getStatus()); + } - Assert.assertEquals(expected, actual); + Map expected = new HashMap(); + expected.put("ACCUMULO-1000", Status.ACCEPTED); + expected.put("ACCUMULO-1001", Status.ACCEPTED); + expected.put("ACCUMULO-1002", Status.REJECTED); - cw.close(); + Assert.assertEquals(expected, actual); + } } @Test @@ -701,81 +697,80 @@ public void testBatch() throws Exception { cm2.put("tx", "seq", cvab, "1"); mutations.add(cm2); - ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(new Authorizations("A"))); - Iterator results = cw.write(mutations.iterator()); - int count = 0; - while (results.hasNext()) { - Result result = results.next(); - Assert.assertEquals(Status.ACCEPTED, result.getStatus()); - count++; - } - - Assert.assertEquals(3, count); + try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(new Authorizations("A")))) { + Iterator results = cw.write(mutations.iterator()); + int count = 0; + while (results.hasNext()) { + Result result = results.next(); + Assert.assertEquals(Status.ACCEPTED, result.getStatus()); + count++; + } - Scanner scanner = conn.createScanner(tableName, new Authorizations("A")); - scanner.fetchColumn(new Text("tx"), new Text("seq")); + Assert.assertEquals(3, count); - for (String row : new String[] {"99006", "59056", "19059"}) { - scanner.setRange(new Range(row)); - Entry entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("1", entry.getValue().toString()); - } + Scanner scanner = conn.createScanner(tableName, new Authorizations("A")); + scanner.fetchColumn(new Text("tx"), new Text("seq")); - TreeSet splits = new TreeSet(); - splits.add(new Text("7")); - splits.add(new Text("3")); - conn.tableOperations().addSplits(tableName, splits); + for (String row : new String[] {"99006", "59056", "19059"}) { + scanner.setRange(new Range(row)); + Entry entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("1", entry.getValue().toString()); + } - mutations.clear(); + TreeSet splits = new TreeSet(); + splits.add(new Text("7")); + splits.add(new Text("3")); + conn.tableOperations().addSplits(tableName, splits); + + mutations.clear(); + + ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvab).setValue("1")); + cm3.put("name", "last", cvab, "Doe"); + cm3.put("tx", "seq", cvab, "2"); + mutations.add(cm3); + + ConditionalMutation cm4 = new ConditionalMutation("59056", new Condition("tx", "seq").setVisibility(cvab)); + cm4.put("name", "last", cvab, "Doe"); + cm4.put("tx", "seq", cvab, "1"); + mutations.add(cm4); + + ConditionalMutation cm5 = new ConditionalMutation("19059", new Condition("tx", "seq").setVisibility(cvab).setValue("2")); + cm5.put("name", "last", cvab, "Doe"); + cm5.put("tx", "seq", cvab, "3"); + mutations.add(cm5); + + results = cw.write(mutations.iterator()); + int accepted = 0; + int rejected = 0; + while (results.hasNext()) { + Result result = results.next(); + if (new String(result.getMutation().getRow()).equals("99006")) { + Assert.assertEquals(Status.ACCEPTED, result.getStatus()); + accepted++; + } else { + Assert.assertEquals(Status.REJECTED, result.getStatus()); + rejected++; + } + } - ConditionalMutation cm3 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvab).setValue("1")); - cm3.put("name", "last", cvab, "Doe"); - cm3.put("tx", "seq", cvab, "2"); - mutations.add(cm3); + Assert.assertEquals("Expected only one accepted conditional mutation", 1, accepted); + Assert.assertEquals("Expected two rejected conditional mutations", 2, rejected); - ConditionalMutation cm4 = new ConditionalMutation("59056", new Condition("tx", "seq").setVisibility(cvab)); - cm4.put("name", "last", cvab, "Doe"); - cm4.put("tx", "seq", cvab, "1"); - mutations.add(cm4); - - ConditionalMutation cm5 = new ConditionalMutation("19059", new Condition("tx", "seq").setVisibility(cvab).setValue("2")); - cm5.put("name", "last", cvab, "Doe"); - cm5.put("tx", "seq", cvab, "3"); - mutations.add(cm5); - - results = cw.write(mutations.iterator()); - int accepted = 0; - int rejected = 0; - while (results.hasNext()) { - Result result = results.next(); - if (new String(result.getMutation().getRow()).equals("99006")) { - Assert.assertEquals(Status.ACCEPTED, result.getStatus()); - accepted++; - } else { - Assert.assertEquals(Status.REJECTED, result.getStatus()); - rejected++; + for (String row : new String[] {"59056", "19059"}) { + scanner.setRange(new Range(row)); + Entry entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("1", entry.getValue().toString()); } - } - Assert.assertEquals("Expected only one accepted conditional mutation", 1, accepted); - Assert.assertEquals("Expected two rejected conditional mutations", 2, rejected); - - for (String row : new String[] {"59056", "19059"}) { - scanner.setRange(new Range(row)); + scanner.setRange(new Range("99006")); Entry entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("1", entry.getValue().toString()); - } - - scanner.setRange(new Range("99006")); - Entry entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("2", entry.getValue().toString()); - - scanner.clearColumns(); - scanner.fetchColumn(new Text("name"), new Text("last")); - entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("Doe", entry.getValue().toString()); + Assert.assertEquals("2", entry.getValue().toString()); - cw.close(); + scanner.clearColumns(); + scanner.fetchColumn(new Text("name"), new Text("last")); + entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("Doe", entry.getValue().toString()); + } } @Test @@ -810,45 +805,44 @@ public void testBigBatch() throws Exception { cml.add(cm); } - ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig()); + try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig())) { - Iterator results = cw.write(cml.iterator()); + Iterator results = cw.write(cml.iterator()); - int count = 0; + int count = 0; - // TODO check got each row back - while (results.hasNext()) { - Result result = results.next(); - Assert.assertEquals(Status.ACCEPTED, result.getStatus()); - count++; - } + // TODO check got each row back + while (results.hasNext()) { + Result result = results.next(); + Assert.assertEquals(Status.ACCEPTED, result.getStatus()); + count++; + } - Assert.assertEquals("Did not receive the expected number of results", num, count); + Assert.assertEquals("Did not receive the expected number of results", num, count); - ArrayList cml2 = new ArrayList(num); + ArrayList cml2 = new ArrayList(num); - for (int i = 0; i < num; i++) { - ConditionalMutation cm = new ConditionalMutation(rows.get(i), new Condition("meta", "seq").setValue("1")); + for (int i = 0; i < num; i++) { + ConditionalMutation cm = new ConditionalMutation(rows.get(i), new Condition("meta", "seq").setValue("1")); - cm.put("meta", "seq", "2"); - cm.put("meta", "tx", UUID.randomUUID().toString()); - - cml2.add(cm); - } + cm.put("meta", "seq", "2"); + cm.put("meta", "tx", UUID.randomUUID().toString()); - count = 0; + cml2.add(cm); + } - results = cw.write(cml2.iterator()); + count = 0; - while (results.hasNext()) { - Result result = results.next(); - Assert.assertEquals(Status.ACCEPTED, result.getStatus()); - count++; - } + results = cw.write(cml2.iterator()); - Assert.assertEquals("Did not receive the expected number of results", num, count); + while (results.hasNext()) { + Result result = results.next(); + Assert.assertEquals(Status.ACCEPTED, result.getStatus()); + count++; + } - cw.close(); + Assert.assertEquals("Did not receive the expected number of results", num, count); + } } @Test @@ -901,33 +895,32 @@ public void testBatchErrors() throws Exception { cm3.put("tx", "seq", cvaob, "2"); mutations.add(cm3); - ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(new Authorizations("A"))); - Iterator results = cw.write(mutations.iterator()); - HashSet rows = new HashSet(); - while (results.hasNext()) { - Result result = results.next(); - String row = new String(result.getMutation().getRow()); - if (row.equals("19059")) { - Assert.assertEquals(Status.ACCEPTED, result.getStatus()); - } else if (row.equals("59056")) { - Assert.assertEquals(Status.INVISIBLE_VISIBILITY, result.getStatus()); - } else if (row.equals("99006")) { - Assert.assertEquals(Status.VIOLATED, result.getStatus()); - } else if (row.equals("90909")) { - Assert.assertEquals(Status.REJECTED, result.getStatus()); + try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig().setAuthorizations(new Authorizations("A")))) { + Iterator results = cw.write(mutations.iterator()); + HashSet rows = new HashSet(); + while (results.hasNext()) { + Result result = results.next(); + String row = new String(result.getMutation().getRow()); + if (row.equals("19059")) { + Assert.assertEquals(Status.ACCEPTED, result.getStatus()); + } else if (row.equals("59056")) { + Assert.assertEquals(Status.INVISIBLE_VISIBILITY, result.getStatus()); + } else if (row.equals("99006")) { + Assert.assertEquals(Status.VIOLATED, result.getStatus()); + } else if (row.equals("90909")) { + Assert.assertEquals(Status.REJECTED, result.getStatus()); + } + rows.add(row); } - rows.add(row); - } - Assert.assertEquals(4, rows.size()); + Assert.assertEquals(4, rows.size()); - Scanner scanner = conn.createScanner(tableName, new Authorizations("A")); - scanner.fetchColumn(new Text("tx"), new Text("seq")); - - Entry entry = Iterables.getOnlyElement(scanner); - Assert.assertEquals("1", entry.getValue().toString()); + Scanner scanner = conn.createScanner(tableName, new Authorizations("A")); + scanner.fetchColumn(new Text("tx"), new Text("seq")); - cw.close(); + Entry entry = Iterables.getOnlyElement(scanner); + Assert.assertEquals("1", entry.getValue().toString()); + } } @Test @@ -939,46 +932,45 @@ public void testSameRow() throws Exception { conn.tableOperations().create(tableName); - ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig()); + try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig())) { - ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq")); - cm1.put("tx", "seq", "1"); - cm1.put("data", "x", "a"); - - Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus()); + ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq")); + cm1.put("tx", "seq", "1"); + cm1.put("data", "x", "a"); - ConditionalMutation cm2 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1")); - cm2.put("tx", "seq", "2"); - cm2.put("data", "x", "b"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm1).getStatus()); - ConditionalMutation cm3 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1")); - cm3.put("tx", "seq", "2"); - cm3.put("data", "x", "c"); + ConditionalMutation cm2 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1")); + cm2.put("tx", "seq", "2"); + cm2.put("data", "x", "b"); - ConditionalMutation cm4 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1")); - cm4.put("tx", "seq", "2"); - cm4.put("data", "x", "d"); + ConditionalMutation cm3 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1")); + cm3.put("tx", "seq", "2"); + cm3.put("data", "x", "c"); - Iterator results = cw.write(Arrays.asList(cm2, cm3, cm4).iterator()); + ConditionalMutation cm4 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1")); + cm4.put("tx", "seq", "2"); + cm4.put("data", "x", "d"); - int accepted = 0; - int rejected = 0; - int total = 0; + Iterator results = cw.write(Arrays.asList(cm2, cm3, cm4).iterator()); - while (results.hasNext()) { - Status status = results.next().getStatus(); - if (status == Status.ACCEPTED) - accepted++; - if (status == Status.REJECTED) - rejected++; - total++; - } + int accepted = 0; + int rejected = 0; + int total = 0; - Assert.assertEquals("Expected one accepted result", 1, accepted); - Assert.assertEquals("Expected two rejected results", 2, rejected); - Assert.assertEquals("Expected three total results", 3, total); + while (results.hasNext()) { + Status status = results.next().getStatus(); + if (status == Status.ACCEPTED) + accepted++; + if (status == Status.REJECTED) + rejected++; + total++; + } - cw.close(); + Assert.assertEquals("Expected one accepted result", 1, accepted); + Assert.assertEquals("Expected two rejected results", 2, rejected); + Assert.assertEquals("Expected three total results", 3, total); + } } private static class Stats { @@ -1073,11 +1065,9 @@ public MutatorTask(String tableName, Connector conn, ArrayList row @Override public void run() { - try { + try (Scanner scanner = new IsolatedScanner(conn.createScanner(tableName, Authorizations.EMPTY))) { Random rand = new Random(); - Scanner scanner = new IsolatedScanner(conn.createScanner(tableName, Authorizations.EMPTY)); - for (int i = 0; i < 20; i++) { int numRows = rand.nextInt(10) + 1; @@ -1105,9 +1095,7 @@ public void run() { Collections.sort(changed); Assert.assertEquals(changes, changed); - } - } catch (Exception e) { log.error("{}", e.getMessage(), e); failed.set(true); @@ -1135,54 +1123,56 @@ public void testThreads() throws Exception { break; } - ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig()); + try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig())) { - ArrayList rows = new ArrayList(); + ArrayList rows = new ArrayList(); - for (int i = 0; i < 1000; i++) { - rows.add(new ArrayByteSequence(FastFormat.toZeroPaddedString(abs(rand.nextLong()), 16, 16, new byte[0]))); - } + for (int i = 0; i < 1000; i++) { + rows.add(new ArrayByteSequence(FastFormat.toZeroPaddedString(abs(rand.nextLong()), 16, 16, new byte[0]))); + } - ArrayList mutations = new ArrayList(); + ArrayList mutations = new ArrayList(); - for (ByteSequence row : rows) - mutations.add(new Stats(row).toMutation()); + for (ByteSequence row : rows) + mutations.add(new Stats(row).toMutation()); - ArrayList rows2 = new ArrayList(); - Iterator results = cw.write(mutations.iterator()); - while (results.hasNext()) { - Result result = results.next(); - Assert.assertEquals(Status.ACCEPTED, result.getStatus()); - rows2.add(new ArrayByteSequence(result.getMutation().getRow())); - } + ArrayList rows2 = new ArrayList(); + Iterator results = cw.write(mutations.iterator()); + while (results.hasNext()) { + Result result = results.next(); + Assert.assertEquals(Status.ACCEPTED, result.getStatus()); + rows2.add(new ArrayByteSequence(result.getMutation().getRow())); + } - Collections.sort(rows); - Collections.sort(rows2); + Collections.sort(rows); + Collections.sort(rows2); - Assert.assertEquals(rows, rows2); + Assert.assertEquals(rows, rows2); - AtomicBoolean failed = new AtomicBoolean(false); + AtomicBoolean failed = new AtomicBoolean(false); - ExecutorService tp = Executors.newFixedThreadPool(5); - for (int i = 0; i < 5; i++) { - tp.submit(new MutatorTask(tableName, conn, rows, cw, failed)); - } + ExecutorService tp = Executors.newFixedThreadPool(5); + for (int i = 0; i < 5; i++) { + tp.submit(new MutatorTask(tableName, conn, rows, cw, failed)); + } - tp.shutdown(); + tp.shutdown(); - while (!tp.isTerminated()) { - tp.awaitTermination(1, TimeUnit.MINUTES); - } + while (!tp.isTerminated()) { + tp.awaitTermination(1, TimeUnit.MINUTES); + } - Assert.assertFalse("A MutatorTask failed with an exception", failed.get()); + Assert.assertFalse("A MutatorTask failed with an exception", failed.get()); + } - Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY); + try (Scanner scanner = conn.createScanner(tableName, Authorizations.EMPTY)) { - RowIterator rowIter = new RowIterator(scanner); + RowIterator rowIter = new RowIterator(scanner); - while (rowIter.hasNext()) { - Iterator> row = rowIter.next(); - new Stats(row); + while (rowIter.hasNext()) { + Iterator> row = rowIter.next(); + new Stats(row); + } } } @@ -1232,27 +1222,28 @@ public void testSecurity() throws Exception { cm1.put("tx", "seq", "1"); cm1.put("data", "x", "a"); - ConditionalWriter cw1 = conn2.createConditionalWriter(table1, new ConditionalWriterConfig()); - ConditionalWriter cw2 = conn2.createConditionalWriter(table2, new ConditionalWriterConfig()); - ConditionalWriter cw3 = conn2.createConditionalWriter(table3, new ConditionalWriterConfig()); + try (ConditionalWriter cw1 = conn2.createConditionalWriter(table1, new ConditionalWriterConfig()); + ConditionalWriter cw2 = conn2.createConditionalWriter(table2, new ConditionalWriterConfig()); + ConditionalWriter cw3 = conn2.createConditionalWriter(table3, new ConditionalWriterConfig())) { - // Should be able to conditional-update a table we have R/W on - Assert.assertEquals(Status.ACCEPTED, cw3.write(cm1).getStatus()); + // Should be able to conditional-update a table we have R/W on + Assert.assertEquals(Status.ACCEPTED, cw3.write(cm1).getStatus()); - // Conditional-update to a table we only have read on should fail - try { - Status status = cw1.write(cm1).getStatus(); - Assert.fail("Expected exception writing conditional mutation to table the user doesn't have write access to, Got status: " + status); - } catch (AccumuloSecurityException ase) { + // Conditional-update to a table we only have read on should fail + try { + Status status = cw1.write(cm1).getStatus(); + Assert.fail("Expected exception writing conditional mutation to table the user doesn't have write access to, Got status: " + status); + } catch (AccumuloSecurityException ase) { - } + } - // Conditional-update to a table we only have writer on should fail - try { - Status status = cw2.write(cm1).getStatus(); - Assert.fail("Expected exception writing conditional mutation to table the user doesn't have read access to. Got status: " + status); - } catch (AccumuloSecurityException ase) { + // Conditional-update to a table we only have writer on should fail + try { + Status status = cw2.write(cm1).getStatus(); + Assert.fail("Expected exception writing conditional mutation to table the user doesn't have read access to. Got status: " + status); + } catch (AccumuloSecurityException ase) { + } } } @@ -1264,45 +1255,44 @@ public void testTimeout() throws Exception { conn.tableOperations().create(table); - ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setTimeout(3, TimeUnit.SECONDS)); - - ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq")); - cm1.put("tx", "seq", "1"); - cm1.put("data", "x", "a"); - - Assert.assertEquals(cw.write(cm1).getStatus(), Status.ACCEPTED); + try (ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setTimeout(3, TimeUnit.SECONDS))) { - IteratorSetting is = new IteratorSetting(5, SlowIterator.class); - SlowIterator.setSeekSleepTime(is, 5000); + ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq")); + cm1.put("tx", "seq", "1"); + cm1.put("data", "x", "a"); - ConditionalMutation cm2 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1").setIterators(is)); - cm2.put("tx", "seq", "2"); - cm2.put("data", "x", "b"); + Assert.assertEquals(cw.write(cm1).getStatus(), Status.ACCEPTED); - Assert.assertEquals(cw.write(cm2).getStatus(), Status.UNKNOWN); + IteratorSetting is = new IteratorSetting(5, SlowIterator.class); + SlowIterator.setSeekSleepTime(is, 5000); - Scanner scanner = conn.createScanner(table, Authorizations.EMPTY); + ConditionalMutation cm2 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1").setIterators(is)); + cm2.put("tx", "seq", "2"); + cm2.put("data", "x", "b"); - for (Entry entry : scanner) { - String cf = entry.getKey().getColumnFamilyData().toString(); - String cq = entry.getKey().getColumnQualifierData().toString(); - String val = entry.getValue().toString(); + Assert.assertEquals(cw.write(cm2).getStatus(), Status.UNKNOWN); - if (cf.equals("tx") && cq.equals("seq")) - Assert.assertEquals("Unexpected value in tx:seq", "1", val); - else if (cf.equals("data") && cq.equals("x")) - Assert.assertEquals("Unexpected value in data:x", "a", val); - else - Assert.fail("Saw unexpected column family and qualifier: " + entry); - } + Scanner scanner = conn.createScanner(table, Authorizations.EMPTY); - ConditionalMutation cm3 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1")); - cm3.put("tx", "seq", "2"); - cm3.put("data", "x", "b"); + for (Entry entry : scanner) { + String cf = entry.getKey().getColumnFamilyData().toString(); + String cq = entry.getKey().getColumnQualifierData().toString(); + String val = entry.getValue().toString(); + + if (cf.equals("tx") && cq.equals("seq")) + Assert.assertEquals("Unexpected value in tx:seq", "1", val); + else if (cf.equals("data") && cq.equals("x")) + Assert.assertEquals("Unexpected value in data:x", "a", val); + else + Assert.fail("Saw unexpected column family and qualifier: " + entry); + } - Assert.assertEquals(cw.write(cm3).getStatus(), Status.ACCEPTED); + ConditionalMutation cm3 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1")); + cm3.put("tx", "seq", "2"); + cm3.put("data", "x", "b"); - cw.close(); + Assert.assertEquals(cw.write(cm3).getStatus(), Status.ACCEPTED); + } } @Test @@ -1317,21 +1307,22 @@ public void testDeleteTable() throws Exception { conn.tableOperations().create(table); - ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig()); + try (ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig())) { - conn.tableOperations().delete(table); + conn.tableOperations().delete(table); - ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq")); - cm1.put("tx", "seq", "1"); - cm1.put("data", "x", "a"); + ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq")); + cm1.put("tx", "seq", "1"); + cm1.put("data", "x", "a"); - Result result = cw.write(cm1); + Result result = cw.write(cm1); - try { - Status status = result.getStatus(); - Assert.fail("Expected exception writing conditional mutation to deleted table. Got status: " + status); - } catch (AccumuloException ae) { - Assert.assertEquals(TableDeletedException.class, ae.getCause().getClass()); + try { + Status status = result.getStatus(); + Assert.fail("Expected exception writing conditional mutation to deleted table. Got status: " + status); + } catch (AccumuloException ae) { + Assert.assertEquals(TableDeletedException.class, ae.getCause().getClass()); + } } } @@ -1342,29 +1333,28 @@ public void testOffline() throws Exception { conn.tableOperations().create(table); - ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig()); + try (ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig())) { - conn.tableOperations().offline(table, true); + conn.tableOperations().offline(table, true); - ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq")); - cm1.put("tx", "seq", "1"); - cm1.put("data", "x", "a"); + ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq")); + cm1.put("tx", "seq", "1"); + cm1.put("data", "x", "a"); - Result result = cw.write(cm1); - - try { - Status status = result.getStatus(); - Assert.fail("Expected exception writing conditional mutation to offline table. Got status: " + status); - } catch (AccumuloException ae) { - Assert.assertEquals(TableOfflineException.class, ae.getCause().getClass()); - } + Result result = cw.write(cm1); - cw.close(); + try { + Status status = result.getStatus(); + Assert.fail("Expected exception writing conditional mutation to offline table. Got status: " + status); + } catch (AccumuloException ae) { + Assert.assertEquals(TableOfflineException.class, ae.getCause().getClass()); + } - try { - conn.createConditionalWriter(table, new ConditionalWriterConfig()); - Assert.fail("Expected exception creating conditional writer to offline table"); - } catch (TableOfflineException e) {} + try { + conn.createConditionalWriter(table, new ConditionalWriterConfig()); + Assert.fail("Expected exception creating conditional writer to offline table"); + } catch (TableOfflineException e) {} + } } @Test @@ -1374,24 +1364,24 @@ public void testError() throws Exception { conn.tableOperations().create(table); - ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig()); + try (ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig())) { - IteratorSetting iterSetting = new IteratorSetting(5, BadIterator.class); + IteratorSetting iterSetting = new IteratorSetting(5, BadIterator.class); - ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq").setIterators(iterSetting)); - cm1.put("tx", "seq", "1"); - cm1.put("data", "x", "a"); + ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq").setIterators(iterSetting)); + cm1.put("tx", "seq", "1"); + cm1.put("data", "x", "a"); - Result result = cw.write(cm1); + Result result = cw.write(cm1); - try { - Status status = result.getStatus(); - Assert.fail("Expected exception using iterator which throws an error, Got status: " + status); - } catch (AccumuloException ae) { + try { + Status status = result.getStatus(); + Assert.fail("Expected exception using iterator which throws an error, Got status: " + status); + } catch (AccumuloException ae) { - } + } - cw.close(); + } } @Test(expected = IllegalArgumentException.class) @@ -1401,13 +1391,14 @@ public void testNoConditions() throws AccumuloException, AccumuloSecurityExcepti conn.tableOperations().create(table); - ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig()); + try (ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig())) { - ConditionalMutation cm1 = new ConditionalMutation("r1"); - cm1.put("tx", "seq", "1"); - cm1.put("data", "x", "a"); + ConditionalMutation cm1 = new ConditionalMutation("r1"); + cm1.put("tx", "seq", "1"); + cm1.put("data", "x", "a"); - cw.write(cm1); + cw.write(cm1); + } } @Test @@ -1431,15 +1422,16 @@ public void testTrace() throws Exception { DistributedTrace.enable("localhost", "testTrace", mac.getClientConfig()); sleepUninterruptibly(1, TimeUnit.SECONDS); Span root = Trace.on("traceTest"); - ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig()); - - // mutation conditional on column tx:seq not exiting - ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq")); - cm0.put("name", "last", "doe"); - cm0.put("name", "first", "john"); - cm0.put("tx", "seq", "1"); - Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus()); - root.stop(); + try (ConditionalWriter cw = conn.createConditionalWriter(tableName, new ConditionalWriterConfig())) { + + // mutation conditional on column tx:seq not exiting + ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq")); + cm0.put("name", "last", "doe"); + cm0.put("name", "first", "john"); + cm0.put("tx", "seq", "1"); + Assert.assertEquals(Status.ACCEPTED, cw.write(cm0).getStatus()); + root.stop(); + } final Scanner scanner = conn.createScanner("trace", Authorizations.EMPTY); scanner.setRange(new Range(new Text(Long.toHexString(root.traceId())))); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java index 5aa0c84ef72..967ac240a46 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java @@ -44,8 +44,8 @@ import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.security.Authorizations; -import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.core.util.SimpleThreadPool; +import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.hadoop.io.Text; import org.junit.Assert; import org.junit.Test; @@ -76,30 +76,29 @@ public void run() throws Exception { private void runLatencyTest(String tableName) throws Exception { // should automatically flush after 2 seconds - BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig().setMaxLatency(1000, TimeUnit.MILLISECONDS)); - Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY); + try (BatchWriter bw = getConnector().createBatchWriter(tableName, new BatchWriterConfig().setMaxLatency(1000, TimeUnit.MILLISECONDS))) { + Scanner scanner = getConnector().createScanner(tableName, Authorizations.EMPTY); - Mutation m = new Mutation(new Text(String.format("r_%10d", 1))); - m.put(new Text("cf"), new Text("cq"), new Value("1".getBytes(UTF_8))); - bw.addMutation(m); + Mutation m = new Mutation(new Text(String.format("r_%10d", 1))); + m.put(new Text("cf"), new Text("cq"), new Value("1".getBytes(UTF_8))); + bw.addMutation(m); - sleepUninterruptibly(500, TimeUnit.MILLISECONDS); + sleepUninterruptibly(500, TimeUnit.MILLISECONDS); - int count = Iterators.size(scanner.iterator()); + int count = Iterators.size(scanner.iterator()); - if (count != 0) { - throw new Exception("Flushed too soon"); - } + if (count != 0) { + throw new Exception("Flushed too soon"); + } - sleepUninterruptibly(1500, TimeUnit.MILLISECONDS); + sleepUninterruptibly(1500, TimeUnit.MILLISECONDS); - count = Iterators.size(scanner.iterator()); + count = Iterators.size(scanner.iterator()); - if (count != 1) { - throw new Exception("Did not flush"); + if (count != 1) { + throw new Exception("Did not flush"); + } } - - bw.close(); } private void runFlushTest(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, MutationsRejectedException, diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java index edf73eb3e8b..8b091ca43a5 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ReadWriteIT.java @@ -405,40 +405,40 @@ public void sunnyLG() throws Exception { ingest(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName); verify(connector, getCluster().getClientConfig(), getAdminPrincipal(), 2000, 1, 50, 0, tableName); connector.tableOperations().flush(tableName, null, null, true); - BatchScanner bscanner = connector.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 1); - String tableId = connector.tableOperations().tableIdMap().get(tableName); - bscanner.setRanges(Collections.singletonList(new Range(new Text(tableId + ";"), new Text(tableId + "<")))); - bscanner.fetchColumnFamily(DataFileColumnFamily.NAME); - boolean foundFile = false; - for (Entry entry : bscanner) { - foundFile = true; - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - PrintStream newOut = new PrintStream(baos); - PrintStream oldOut = System.out; - try { - System.setOut(newOut); - List args = new ArrayList<>(); - args.add(entry.getKey().getColumnQualifier().toString()); - if (ClusterType.STANDALONE == getClusterType() && cluster.getClientConfig().getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { - args.add("--config"); - StandaloneAccumuloCluster sac = (StandaloneAccumuloCluster) cluster; - String hadoopConfDir = sac.getHadoopConfDir(); - args.add(new Path(hadoopConfDir, "core-site.xml").toString()); - args.add(new Path(hadoopConfDir, "hdfs-site.xml").toString()); + try (BatchScanner bscanner = connector.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 1)) { + String tableId = connector.tableOperations().tableIdMap().get(tableName); + bscanner.setRanges(Collections.singletonList(new Range(new Text(tableId + ";"), new Text(tableId + "<")))); + bscanner.fetchColumnFamily(DataFileColumnFamily.NAME); + boolean foundFile = false; + for (Entry entry : bscanner) { + foundFile = true; + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + PrintStream newOut = new PrintStream(baos); + PrintStream oldOut = System.out; + try { + System.setOut(newOut); + List args = new ArrayList<>(); + args.add(entry.getKey().getColumnQualifier().toString()); + if (ClusterType.STANDALONE == getClusterType() && cluster.getClientConfig().getBoolean(ClientProperty.INSTANCE_RPC_SASL_ENABLED.getKey(), false)) { + args.add("--config"); + StandaloneAccumuloCluster sac = (StandaloneAccumuloCluster) cluster; + String hadoopConfDir = sac.getHadoopConfDir(); + args.add(new Path(hadoopConfDir, "core-site.xml").toString()); + args.add(new Path(hadoopConfDir, "hdfs-site.xml").toString()); + } + log.info("Invoking PrintInfo with " + args); + PrintInfo.main(args.toArray(new String[args.size()])); + newOut.flush(); + String stdout = baos.toString(); + assertTrue(stdout.contains("Locality group : g1")); + assertTrue(stdout.contains("families : [colf]")); + } finally { + newOut.close(); + System.setOut(oldOut); } - log.info("Invoking PrintInfo with " + args); - PrintInfo.main(args.toArray(new String[args.size()])); - newOut.flush(); - String stdout = baos.toString(); - assertTrue(stdout.contains("Locality group : g1")); - assertTrue(stdout.contains("families : [colf]")); - } finally { - newOut.close(); - System.setOut(oldOut); } + assertTrue(foundFile); } - bscanner.close(); - assertTrue(foundFile); } @Test diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java index c48b1ed9e6b..80e53744eb9 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java @@ -208,47 +208,49 @@ private void splitPartiallyAndRecover(AccumuloServerContext context, KeyExtent e private void ensureTabletHasNoUnexpectedMetadataEntries(AccumuloServerContext context, KeyExtent extent, SortedMap expectedMapFiles) throws Exception { - Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); - scanner.setRange(extent.toMetadataRange()); - - HashSet expectedColumns = new HashSet(); - expectedColumns.add(TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN); - expectedColumns.add(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN); - expectedColumns.add(TabletsSection.ServerColumnFamily.TIME_COLUMN); - expectedColumns.add(TabletsSection.ServerColumnFamily.LOCK_COLUMN); - - HashSet expectedColumnFamilies = new HashSet(); - expectedColumnFamilies.add(DataFileColumnFamily.NAME); - expectedColumnFamilies.add(TabletsSection.FutureLocationColumnFamily.NAME); - expectedColumnFamilies.add(TabletsSection.CurrentLocationColumnFamily.NAME); - expectedColumnFamilies.add(TabletsSection.LastLocationColumnFamily.NAME); - expectedColumnFamilies.add(TabletsSection.BulkFileColumnFamily.NAME); - - Iterator> iter = scanner.iterator(); - while (iter.hasNext()) { - Key key = iter.next().getKey(); - - if (!key.getRow().equals(extent.getMetadataEntry())) { - throw new Exception("Tablet " + extent + " contained unexpected " + MetadataTable.NAME + " entry " + key); - } + try (Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) { + scanner.setRange(extent.toMetadataRange()); + + HashSet expectedColumns = new HashSet(); + expectedColumns.add(TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN); + expectedColumns.add(TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN); + expectedColumns.add(TabletsSection.ServerColumnFamily.TIME_COLUMN); + expectedColumns.add(TabletsSection.ServerColumnFamily.LOCK_COLUMN); + + HashSet expectedColumnFamilies = new HashSet(); + expectedColumnFamilies.add(DataFileColumnFamily.NAME); + expectedColumnFamilies.add(TabletsSection.FutureLocationColumnFamily.NAME); + expectedColumnFamilies.add(TabletsSection.CurrentLocationColumnFamily.NAME); + expectedColumnFamilies.add(TabletsSection.LastLocationColumnFamily.NAME); + expectedColumnFamilies.add(TabletsSection.BulkFileColumnFamily.NAME); + + Iterator> iter = scanner.iterator(); + while (iter.hasNext()) { + Key key = iter.next().getKey(); + + if (!key.getRow().equals(extent.getMetadataEntry())) { + throw new Exception("Tablet " + extent + " contained unexpected " + MetadataTable.NAME + " entry " + key); + } + + if (expectedColumnFamilies.contains(key.getColumnFamily())) { + continue; + } + + if (expectedColumns.remove(new ColumnFQ(key))) { + continue; + } - if (expectedColumnFamilies.contains(key.getColumnFamily())) { - continue; + throw new Exception("Tablet " + extent + " contained unexpected " + MetadataTable.NAME + " entry " + key); } - if (expectedColumns.remove(new ColumnFQ(key))) { - continue; + System.out.println("expectedColumns " + expectedColumns); + if (expectedColumns.size() > 1 || (expectedColumns.size() == 1)) { + throw new Exception("Not all expected columns seen " + extent + " " + expectedColumns); } - throw new Exception("Tablet " + extent + " contained unexpected " + MetadataTable.NAME + " entry " + key); + SortedMap fixedMapFiles = MetadataTableUtil.getDataFileSizes(extent, context); + verifySame(expectedMapFiles, fixedMapFiles); } - System.out.println("expectedColumns " + expectedColumns); - if (expectedColumns.size() > 1 || (expectedColumns.size() == 1)) { - throw new Exception("Not all expected columns seen " + extent + " " + expectedColumns); - } - - SortedMap fixedMapFiles = MetadataTableUtil.getDataFileSizes(extent, context); - verifySame(expectedMapFiles, fixedMapFiles); } private void verifySame(SortedMap datafileSizes, SortedMap fixedDatafileSizes) throws Exception { diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java index 39ef3d8260a..0c7cfb6f1ac 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/bulk/ConsistencyCheck.java @@ -38,19 +38,19 @@ protected void runLater(State state, Environment env) throws Exception { log.info("Checking " + row); String user = env.getConnector().whoami(); Authorizations auths = env.getConnector().securityOperations().getUserAuthorizations(user); - Scanner scanner = env.getConnector().createScanner(Setup.getTableName(), auths); - scanner = new IsolatedScanner(scanner); - scanner.setRange(new Range(row)); - scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY); - Value v = null; - Key first = null; - for (Entry entry : scanner) { - if (v == null) { - v = entry.getValue(); - first = entry.getKey(); + try (Scanner scanner = new IsolatedScanner(env.getConnector().createScanner(Setup.getTableName(), auths))) { + scanner.setRange(new Range(row)); + scanner.fetchColumnFamily(BulkPlusOne.CHECK_COLUMN_FAMILY); + Value v = null; + Key first = null; + for (Entry entry : scanner) { + if (v == null) { + v = entry.getValue(); + first = entry.getKey(); + } + if (!v.equals(entry.getValue())) + throw new RuntimeException("Inconsistent value at " + entry.getKey() + " was " + entry.getValue() + " should be " + v + " first read at " + first); } - if (!v.equals(entry.getValue())) - throw new RuntimeException("Inconsistent value at " + entry.getKey() + " was " + entry.getValue() + " should be " + v + " first read at " + first); } } diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Transfer.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Transfer.java index 35636e4e729..2f3aacd800d 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Transfer.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Transfer.java @@ -80,56 +80,56 @@ public void visit(State state, Environment env, Properties props) throws Excepti } // TODO document how data should be read when using ConditionalWriter - Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY)); - - scanner.setRange(new Range(bank)); - scanner.fetchColumnFamily(new Text(acct1)); - scanner.fetchColumnFamily(new Text(acct2)); - - Account a1 = new Account(); - Account a2 = new Account(); - Account a; - - for (Entry entry : scanner) { - String cf = entry.getKey().getColumnFamilyData().toString(); - String cq = entry.getKey().getColumnQualifierData().toString(); - - if (cf.equals(acct1)) - a = a1; - else if (cf.equals(acct2)) - a = a2; - else - throw new Exception("Unexpected column fam: " + cf); - - if (cq.equals("bal")) - a.setBal(entry.getValue().toString()); - else if (cq.equals("seq")) - a.setSeq(entry.getValue().toString()); - else - throw new Exception("Unexpected column qual: " + cq); - } - - int amt = rand.nextInt(50); - - log.debug("transfer req " + bank + " " + amt + " " + acct1 + " " + a1 + " " + acct2 + " " + a2); - - if (a1.bal >= amt) { - ConditionalMutation cm = new ConditionalMutation(bank, new Condition(acct1, "seq").setValue(Utils.getSeq(a1.seq)), - new Condition(acct2, "seq").setValue(Utils.getSeq(a2.seq))); - cm.put(acct1, "bal", (a1.bal - amt) + ""); - cm.put(acct2, "bal", (a2.bal + amt) + ""); - cm.put(acct1, "seq", Utils.getSeq(a1.seq + 1)); - cm.put(acct2, "seq", Utils.getSeq(a2.seq + 1)); + try (Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY))) { + + scanner.setRange(new Range(bank)); + scanner.fetchColumnFamily(new Text(acct1)); + scanner.fetchColumnFamily(new Text(acct2)); + + Account a1 = new Account(); + Account a2 = new Account(); + Account a; + + for (Entry entry : scanner) { + String cf = entry.getKey().getColumnFamilyData().toString(); + String cq = entry.getKey().getColumnQualifierData().toString(); + + if (cf.equals(acct1)) + a = a1; + else if (cf.equals(acct2)) + a = a2; + else + throw new Exception("Unexpected column fam: " + cf); + + if (cq.equals("bal")) + a.setBal(entry.getValue().toString()); + else if (cq.equals("seq")) + a.setSeq(entry.getValue().toString()); + else + throw new Exception("Unexpected column qual: " + cq); + } - ConditionalWriter cw = (ConditionalWriter) state.get("cw"); - Status status = cw.write(cm).getStatus(); - while (status == Status.UNKNOWN) { - log.debug("retrying transfer " + status); - status = cw.write(cm).getStatus(); + int amt = rand.nextInt(50); + + log.debug("transfer req " + bank + " " + amt + " " + acct1 + " " + a1 + " " + acct2 + " " + a2); + + if (a1.bal >= amt) { + ConditionalMutation cm = new ConditionalMutation(bank, new Condition(acct1, "seq").setValue(Utils.getSeq(a1.seq)), + new Condition(acct2, "seq").setValue(Utils.getSeq(a2.seq))); + cm.put(acct1, "bal", (a1.bal - amt) + ""); + cm.put(acct2, "bal", (a2.bal + amt) + ""); + cm.put(acct1, "seq", Utils.getSeq(a1.seq + 1)); + cm.put(acct2, "seq", Utils.getSeq(a2.seq + 1)); + + ConditionalWriter cw = (ConditionalWriter) state.get("cw"); + Status status = cw.write(cm).getStatus(); + while (status == Status.UNKNOWN) { + log.debug("retrying transfer " + status); + status = cw.write(cm).getStatus(); + } + log.debug("transfer result " + bank + " " + status + " " + a1 + " " + a2); } - log.debug("transfer result " + bank + " " + status + " " + a1 + " " + a2); } - } } diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Verify.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Verify.java index 2690ffccbe3..c0c81ddad9c 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Verify.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Verify.java @@ -76,6 +76,8 @@ private void verifyBank(String table, Connector conn, String row, int numAccts) count++; } + scanner.close(); + if (count > 0 && sum != numAccts * 100) { throw new Exception("Sum is off " + sum); } From ced384ba3f2eb7aba5818ff7c050e84c3bb3b6ef Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Tue, 31 May 2016 18:39:59 -0400 Subject: [PATCH 2/2] code review updates --- .../accumulo/core/client/ScannerBase.java | 2 +- .../server/util/MasterMetadataUtil.java | 20 +- .../server/util/MetadataTableUtil.java | 239 +++++++++--------- .../accumulo/master/tableOps/CopyFailed.java | 23 +- .../apache/accumulo/tserver/TabletServer.java | 12 +- .../test/randomwalk/conditional/Verify.java | 39 +-- 6 files changed, 164 insertions(+), 171 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java index 6835849bba2..211005014f3 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java @@ -160,7 +160,7 @@ public interface ScannerBase extends Iterable>, AutoCloseable { long getTimeout(TimeUnit timeUnit); /** - * Closes any underlying connections on the scanner + * Closes any underlying connections on the scanner. This may invalidate any iterators derived from the Scanner, causing them to throw exceptions. * * @since 1.5.0 */ diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java index 01b4f043335..5aa61bc5875 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MasterMetadataUtil.java @@ -163,23 +163,23 @@ private static KeyExtent fixSplit(ClientContext context, String table, Text meta List highDatafilesToRemove = new ArrayList(); - Scanner scanner3 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); - Key rowKey = new Key(metadataEntry); - SortedMap origDatafileSizes = new TreeMap(); SortedMap highDatafileSizes = new TreeMap(); SortedMap lowDatafileSizes = new TreeMap(); - scanner3.fetchColumnFamily(DataFileColumnFamily.NAME); - scanner3.setRange(new Range(rowKey, rowKey.followingKey(PartialKey.ROW))); - for (Entry entry : scanner3) { - if (entry.getKey().compareColumnFamily(DataFileColumnFamily.NAME) == 0) { - origDatafileSizes.put(new FileRef(fs, entry.getKey()), new DataFileValue(entry.getValue().get())); + try (Scanner scanner3 = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) { + Key rowKey = new Key(metadataEntry); + + scanner3.fetchColumnFamily(DataFileColumnFamily.NAME); + scanner3.setRange(new Range(rowKey, rowKey.followingKey(PartialKey.ROW))); + + for (Entry entry : scanner3) { + if (entry.getKey().compareColumnFamily(DataFileColumnFamily.NAME) == 0) { + origDatafileSizes.put(new FileRef(fs, entry.getKey()), new DataFileValue(entry.getValue().get())); + } } } - scanner3.close(); - MetadataTableUtil.splitDatafiles(table, metadataPrevEndRow, splitRatio, new HashMap(), origDatafileSizes, lowDatafileSizes, highDatafileSizes, highDatafilesToRemove); diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java index c04d43e5c44..416a296c078 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java @@ -280,26 +280,25 @@ public void run(IZooReaderWriter rw) throws KeeperException, InterruptedExceptio public static SortedMap getDataFileSizes(KeyExtent extent, ClientContext context) throws IOException { TreeMap sizes = new TreeMap(); - Scanner mdScanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); - mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME); - Text row = extent.getMetadataEntry(); - VolumeManager fs = VolumeManagerImpl.get(); - - Key endKey = new Key(row, DataFileColumnFamily.NAME, new Text("")); - endKey = endKey.followingKey(PartialKey.ROW_COLFAM); + try (Scanner mdScanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) { + mdScanner.fetchColumnFamily(DataFileColumnFamily.NAME); + Text row = extent.getMetadataEntry(); + VolumeManager fs = VolumeManagerImpl.get(); - mdScanner.setRange(new Range(new Key(row), endKey)); - for (Entry entry : mdScanner) { + Key endKey = new Key(row, DataFileColumnFamily.NAME, new Text("")); + endKey = endKey.followingKey(PartialKey.ROW_COLFAM); - if (!entry.getKey().getRow().equals(row)) - break; - DataFileValue dfv = new DataFileValue(entry.getValue().get()); - sizes.put(new FileRef(fs, entry.getKey()), dfv); - } + mdScanner.setRange(new Range(new Key(row), endKey)); + for (Entry entry : mdScanner) { - mdScanner.close(); + if (!entry.getKey().getRow().equals(row)) + break; + DataFileValue dfv = new DataFileValue(entry.getValue().get()); + sizes.put(new FileRef(fs, entry.getKey()), dfv); + } - return sizes; + return sizes; + } } public static void rollBackSplit(Text metadataEntry, Text oldPrevEndRow, ClientContext context, ZooLock zooLock) { @@ -417,60 +416,59 @@ public static void splitDatafiles(String tableId, Text midRow, double splitRatio } public static void deleteTable(String tableId, boolean insertDeletes, ClientContext context, ZooLock lock) throws AccumuloException, IOException { - Scanner ms = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); - BatchWriter bw = new BatchWriterImpl(context, MetadataTable.ID, new BatchWriterConfig().setMaxMemory(1000000).setMaxLatency(120000l, TimeUnit.MILLISECONDS) - .setMaxWriteThreads(2)); + try (Scanner ms = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); + BatchWriter bw = new BatchWriterImpl(context, MetadataTable.ID, new BatchWriterConfig().setMaxMemory(1000000) + .setMaxLatency(120000l, TimeUnit.MILLISECONDS).setMaxWriteThreads(2))) { - // scan metadata for our table and delete everything we find - Mutation m = null; - ms.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); + // scan metadata for our table and delete everything we find + Mutation m = null; + ms.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); - // insert deletes before deleting data from metadata... this makes the code fault tolerant - if (insertDeletes) { + // insert deletes before deleting data from metadata... this makes the code fault tolerant + if (insertDeletes) { - ms.fetchColumnFamily(DataFileColumnFamily.NAME); - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(ms); + ms.fetchColumnFamily(DataFileColumnFamily.NAME); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.fetch(ms); - for (Entry cell : ms) { - Key key = cell.getKey(); + for (Entry cell : ms) { + Key key = cell.getKey(); - if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { - FileRef ref = new FileRef(VolumeManagerImpl.get(), key); - bw.addMutation(createDeleteMutation(tableId, ref.meta().toString())); - } + if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { + FileRef ref = new FileRef(VolumeManagerImpl.get(), key); + bw.addMutation(createDeleteMutation(tableId, ref.meta().toString())); + } - if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { - bw.addMutation(createDeleteMutation(tableId, cell.getValue().toString())); + if (TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.hasColumns(key)) { + bw.addMutation(createDeleteMutation(tableId, cell.getValue().toString())); + } } - } - bw.flush(); + bw.flush(); - ms.clearColumns(); - } + ms.clearColumns(); + } + + for (Entry cell : ms) { + Key key = cell.getKey(); - for (Entry cell : ms) { - Key key = cell.getKey(); + if (m == null) { + m = new Mutation(key.getRow()); + if (lock != null) + putLockID(lock, m); + } - if (m == null) { - m = new Mutation(key.getRow()); - if (lock != null) - putLockID(lock, m); + if (key.getRow().compareTo(m.getRow(), 0, m.getRow().length) != 0) { + bw.addMutation(m); + m = new Mutation(key.getRow()); + if (lock != null) + putLockID(lock, m); + } + m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); } - if (key.getRow().compareTo(m.getRow(), 0, m.getRow().length) != 0) { + if (m != null) bw.addMutation(m); - m = new Mutation(key.getRow()); - if (lock != null) - putLockID(lock, m); - } - m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); } - - if (m != null) - bw.addMutation(m); - - bw.close(); } static String getZookeeperLogLocation() { @@ -831,58 +829,56 @@ public static int checkClone(String tableName, String srcTableId, String tableId public static void cloneTable(ClientContext context, String srcTableId, String tableId, VolumeManager volumeManager) throws Exception { Connector conn = context.getConnector(); - BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig()); + try (BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig())) { - while (true) { + while (true) { - try { - initializeClone(MetadataTable.NAME, srcTableId, tableId, conn, bw); + try { + initializeClone(MetadataTable.NAME, srcTableId, tableId, conn, bw); - // the following loop looks changes in the file that occurred during the copy.. if files were dereferenced then they could have been GCed + // the following loop looks changes in the file that occurred during the copy.. if files were dereferenced then they could have been GCed - while (true) { - int rewrites = checkClone(MetadataTable.NAME, srcTableId, tableId, conn, bw); + while (true) { + int rewrites = checkClone(MetadataTable.NAME, srcTableId, tableId, conn, bw); - if (rewrites == 0) - break; - } + if (rewrites == 0) + break; + } - bw.flush(); - break; + bw.flush(); + break; - } catch (TabletIterator.TabletDeletedException tde) { - // tablets were merged in the src table - bw.flush(); + } catch (TabletIterator.TabletDeletedException tde) { + // tablets were merged in the src table + bw.flush(); - // delete what we have cloned and try again - deleteTable(tableId, false, context, null); + // delete what we have cloned and try again + deleteTable(tableId, false, context, null); - log.debug("Tablets merged in table " + srcTableId + " while attempting to clone, trying again"); + log.debug("Tablets merged in table " + srcTableId + " while attempting to clone, trying again"); - sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + } } - } - // delete the clone markers and create directory entries - Scanner mscanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); - mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); - mscanner.fetchColumnFamily(ClonedColumnFamily.NAME); + // delete the clone markers and create directory entries + Scanner mscanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY); + mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); + mscanner.fetchColumnFamily(ClonedColumnFamily.NAME); - int dirCount = 0; + int dirCount = 0; - for (Entry entry : mscanner) { - Key k = entry.getKey(); - Mutation m = new Mutation(k.getRow()); - m.putDelete(k.getColumnFamily(), k.getColumnQualifier()); - String dir = volumeManager.choose(Optional.of(tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + tableId - + Path.SEPARATOR + new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES)); - TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(dir.getBytes(UTF_8))); + for (Entry entry : mscanner) { + Key k = entry.getKey(); + Mutation m = new Mutation(k.getRow()); + m.putDelete(k.getColumnFamily(), k.getColumnQualifier()); + String dir = volumeManager.choose(Optional.of(tableId), ServerConstants.getBaseUris()) + Constants.HDFS_TABLES_DIR + Path.SEPARATOR + tableId + + Path.SEPARATOR + new String(FastFormat.toZeroPaddedString(dirCount++, 8, 16, Constants.CLONE_PREFIX_BYTES)); + TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN.put(m, new Value(dir.getBytes(UTF_8))); - bw.addMutation(m); + bw.addMutation(m); + } } - - bw.close(); - } public static void chopped(AccumuloServerContext context, KeyExtent extent, ZooLock zooLock) { @@ -910,9 +906,8 @@ public static void removeBulkLoadEntries(Connector conn, String tableId, long ti public static List getBulkFilesLoaded(Connector conn, KeyExtent extent, long tid) throws IOException { List result = new ArrayList(); - try { + try (Scanner mscanner = new IsolatedScanner(conn.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY))) { VolumeManager fs = VolumeManagerImpl.get(); - Scanner mscanner = new IsolatedScanner(conn.createScanner(extent.isMeta() ? RootTable.NAME : MetadataTable.NAME, Authorizations.EMPTY)); mscanner.setRange(extent.toMetadataRange()); mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); for (Entry entry : mscanner) { @@ -920,7 +915,7 @@ public static List getBulkFilesLoaded(Connector conn, KeyExtent extent, result.add(new FileRef(fs, entry.getKey())); } } - mscanner.close(); + return result; } catch (TableNotFoundException ex) { // unlikely @@ -933,18 +928,18 @@ public static Map> getBulkFilesLoaded(ClientC Map> result = new HashMap<>(); VolumeManager fs = VolumeManagerImpl.get(); - Scanner scanner = new ScannerImpl(context, extent.isMeta() ? RootTable.ID : MetadataTable.ID, Authorizations.EMPTY); - scanner.setRange(new Range(metadataRow)); - scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); - for (Entry entry : scanner) { - Long tid = Long.parseLong(entry.getValue().toString()); - List lst = result.get(tid); - if (lst == null) { - result.put(tid, lst = new ArrayList<>()); + try (Scanner scanner = new ScannerImpl(context, extent.isMeta() ? RootTable.ID : MetadataTable.ID, Authorizations.EMPTY)) { + scanner.setRange(new Range(metadataRow)); + scanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); + for (Entry entry : scanner) { + Long tid = Long.parseLong(entry.getValue().toString()); + List lst = result.get(tid); + if (lst == null) { + result.put(tid, lst = new ArrayList<>()); + } + lst.add(new FileRef(fs, entry.getKey())); } - lst.add(new FileRef(fs, entry.getKey())); } - scanner.close(); return result; } @@ -990,17 +985,17 @@ public static void moveMetaDeleteMarkers(ClientContext context) { Range oldDeletesRange = new Range(oldDeletesPrefix, true, "!!~dem", false); // move old delete markers to new location, to standardize table schema between all metadata tables - Scanner scanner = new ScannerImpl(context, RootTable.ID, Authorizations.EMPTY); - scanner.setRange(oldDeletesRange); - for (Entry entry : scanner) { - String row = entry.getKey().getRow().toString(); - if (row.startsWith(oldDeletesPrefix)) { - moveDeleteEntry(context, RootTable.OLD_EXTENT, entry, row, oldDeletesPrefix); - } else { - break; + try (Scanner scanner = new ScannerImpl(context, RootTable.ID, Authorizations.EMPTY)) { + scanner.setRange(oldDeletesRange); + for (Entry entry : scanner) { + String row = entry.getKey().getRow().toString(); + if (row.startsWith(oldDeletesPrefix)) { + moveDeleteEntry(context, RootTable.OLD_EXTENT, entry, row, oldDeletesPrefix); + } else { + break; + } } } - scanner.close(); } public static void moveMetaDeleteMarkersFrom14(ClientContext context) { @@ -1008,17 +1003,17 @@ public static void moveMetaDeleteMarkersFrom14(ClientContext context) { KeyExtent notMetadata = new KeyExtent("anythingNotMetadata", null, null); // move delete markers from the normal delete keyspace to the root tablet delete keyspace if the files are for the !METADATA table - Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY); - scanner.setRange(MetadataSchema.DeletesSection.getRange()); - for (Entry entry : scanner) { - String row = entry.getKey().getRow().toString(); - if (row.startsWith(MetadataSchema.DeletesSection.getRowPrefix() + "/" + MetadataTable.ID)) { - moveDeleteEntry(context, notMetadata, entry, row, MetadataSchema.DeletesSection.getRowPrefix()); - } else { - break; + try (Scanner scanner = new ScannerImpl(context, MetadataTable.ID, Authorizations.EMPTY)) { + scanner.setRange(MetadataSchema.DeletesSection.getRange()); + for (Entry entry : scanner) { + String row = entry.getKey().getRow().toString(); + if (row.startsWith(MetadataSchema.DeletesSection.getRowPrefix() + "/" + MetadataTable.ID)) { + moveDeleteEntry(context, notMetadata, entry, row, MetadataSchema.DeletesSection.getRowPrefix()); + } else { + break; + } } } - scanner.close(); } private static void moveDeleteEntry(ClientContext context, KeyExtent oldExtent, Entry entry, String rowID, String prefix) { diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java index 298a0ce54bb..5fbf3a06563 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CopyFailed.java @@ -111,22 +111,21 @@ public Repo call(long tid, Master master) throws Exception { // determine which failed files were loaded Connector conn = master.getConnector(); - Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)); - mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); - mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); - - for (Entry entry : mscanner) { - if (Long.parseLong(entry.getValue().toString()) == tid) { - FileRef loadedFile = new FileRef(fs, entry.getKey()); - String absPath = failures.remove(loadedFile); - if (absPath != null) { - loadedFailures.put(loadedFile, absPath); + try (Scanner mscanner = new IsolatedScanner(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY))) { + mscanner.setRange(new KeyExtent(tableId, null, null).toMetadataRange()); + mscanner.fetchColumnFamily(TabletsSection.BulkFileColumnFamily.NAME); + + for (Entry entry : mscanner) { + if (Long.parseLong(entry.getValue().toString()) == tid) { + FileRef loadedFile = new FileRef(fs, entry.getKey()); + String absPath = failures.remove(loadedFile); + if (absPath != null) { + loadedFailures.put(loadedFile, absPath); + } } } } - mscanner.close(); - // move failed files that were not loaded for (String failure : failures.values()) { Path orig = new Path(failure); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index b1b4202efa0..6427b2988f3 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -2595,14 +2595,12 @@ public static Pair verifyTabletInformation(AccumuloServerContext TabletsSection.TabletColumnFamily.PREV_ROW_COLUMN, TabletsSection.TabletColumnFamily.SPLIT_RATIO_COLUMN, TabletsSection.TabletColumnFamily.OLD_PREV_ROW_COLUMN, TabletsSection.ServerColumnFamily.TIME_COLUMN}); - ScannerImpl scanner = new ScannerImpl(context, tableToVerify, Authorizations.EMPTY); - scanner.setRange(extent.toMetadataRange()); - TreeMap tkv = new TreeMap(); - for (Entry entry : scanner) - tkv.put(entry.getKey(), entry.getValue()); - - scanner.close(); + try (ScannerImpl scanner = new ScannerImpl(context, tableToVerify, Authorizations.EMPTY)) { + scanner.setRange(extent.toMetadataRange()); + for (Entry entry : scanner) + tkv.put(entry.getKey(), entry.getValue()); + } // only populate map after success if (tabletsKeyValues == null) { diff --git a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Verify.java b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Verify.java index c0c81ddad9c..6c46f734216 100644 --- a/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Verify.java +++ b/test/src/main/java/org/apache/accumulo/test/randomwalk/conditional/Verify.java @@ -53,30 +53,31 @@ public void visit(State state, Environment env, Properties props) throws Excepti private void verifyBank(String table, Connector conn, String row, int numAccts) throws TableNotFoundException, Exception { log.debug("Verifying bank " + row); - // TODO do not use IsolatedScanner, just enable isolation on scanner - Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY)); - - scanner.setRange(new Range(row)); - IteratorSetting iterConf = new IteratorSetting(100, "cqsl", ColumnSliceFilter.class); - ColumnSliceFilter.setSlice(iterConf, "bal", true, "bal", true); - scanner.clearScanIterators(); - scanner.addScanIterator(iterConf); - int count = 0; int sum = 0; int min = Integer.MAX_VALUE; int max = Integer.MIN_VALUE; - for (Entry entry : scanner) { - int bal = Integer.parseInt(entry.getValue().toString()); - sum += bal; - if (bal > max) - max = bal; - if (bal < min) - min = bal; - count++; - } - scanner.close(); + // TODO do not use IsolatedScanner, just enable isolation on scanner + try (Scanner scanner = new IsolatedScanner(conn.createScanner(table, Authorizations.EMPTY))) { + + scanner.setRange(new Range(row)); + IteratorSetting iterConf = new IteratorSetting(100, "cqsl", ColumnSliceFilter.class); + ColumnSliceFilter.setSlice(iterConf, "bal", true, "bal", true); + scanner.clearScanIterators(); + scanner.addScanIterator(iterConf); + + for (Entry entry : scanner) { + int bal = Integer.parseInt(entry.getValue().toString()); + sum += bal; + if (bal > max) + max = bal; + if (bal < min) + min = bal; + count++; + } + + } if (count > 0 && sum != numAccts * 100) { throw new Exception("Sum is off " + sum);