diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/Bulk.java b/core/src/main/java/org/apache/accumulo/core/client/impl/Bulk.java index 4f09689246c..dcea41c913e 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/Bulk.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Bulk.java @@ -16,7 +16,6 @@ */ package org.apache.accumulo.core.client.impl; -import java.io.Serializable; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -97,7 +96,7 @@ public String toString() { /** * WARNING : do not change this class, its used for serialization to Json */ - public static class FileInfo implements Serializable { + public static class FileInfo { final String name; final long estSize; final long estEntries; @@ -151,7 +150,7 @@ public int hashCode() { } } - public static class Files implements Iterable, Serializable { + public static class Files implements Iterable { Map files = new HashMap<>(); public Files(Collection files) { diff --git a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java index 61f10b91c38..a6473a9f40f 100644 --- a/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java +++ b/proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java @@ -61,6 +61,7 @@ import org.apache.accumulo.core.client.admin.ActiveScan; import org.apache.accumulo.core.client.admin.CompactionConfig; import org.apache.accumulo.core.client.admin.NewTableConfiguration; +import org.apache.accumulo.core.client.admin.TableOperations.ImportExecutorOptions; import org.apache.accumulo.core.client.admin.TimeType; import org.apache.accumulo.core.client.impl.ClientConfConverter; import org.apache.accumulo.core.client.impl.Credentials; @@ -1767,8 +1768,13 @@ public void importDirectory(ByteBuffer login, String tableName, String importDir org.apache.accumulo.proxy.thrift.AccumuloException, org.apache.accumulo.proxy.thrift.AccumuloSecurityException, TException { try { - getConnector(login).tableOperations().importDirectory(tableName, importDir, failureDir, - setTime); + ImportExecutorOptions loader = getConnector(login).tableOperations().addFilesTo(tableName) + .from(importDir); + if (setTime) { + loader.settingLogicalTime().load(); + } else { + loader.load(); + } } catch (Exception e) { handleExceptionTNF(e); } diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/deprecated/BulkImporter.java b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java similarity index 99% rename from server/base/src/main/java/org/apache/accumulo/server/client/deprecated/BulkImporter.java rename to server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java index b5d2a06f41e..80df178b999 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/client/deprecated/BulkImporter.java +++ b/server/base/src/main/java/org/apache/accumulo/server/client/BulkImporter.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.server.client.deprecated; +package org.apache.accumulo.server.client; import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly; @@ -75,12 +75,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Deprecated public class BulkImporter { private static final Logger log = LoggerFactory.getLogger(BulkImporter.class); - @Deprecated public static List bulkLoad(ClientContext context, long tid, String tableId, List files, String errorDir, boolean setTime) throws IOException, AccumuloException, AccumuloSecurityException, ThriftTableOperationException { diff --git a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java index 55ae999de97..d63e5fc2f34 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java +++ b/server/base/src/main/java/org/apache/accumulo/server/client/ClientServiceHandler.java @@ -60,7 +60,6 @@ import org.apache.accumulo.core.security.thrift.TCredentials; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.server.AccumuloServerContext; -import org.apache.accumulo.server.client.deprecated.BulkImporter; import org.apache.accumulo.server.conf.ServerConfigurationFactory; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.security.AuditedSecurityOperation; diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java index 15c2b96ee0d..44ab47eddf1 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java @@ -419,7 +419,7 @@ public boolean canDeleteRange(TCredentials c, Table.ID tableId, String tableName public boolean canBulkImport(TCredentials c, Table.ID tableId, String tableName, String dir, String failDir, Namespace.ID namespaceId) throws ThriftSecurityException { try { - boolean result = super.canBulkImport(c, tableId, namespaceId); + boolean result = super.canBulkImport(c, tableId, tableName, dir, failDir, namespaceId); audit(c, result, CAN_BULK_IMPORT_AUDIT_TEMPLATE, tableName, dir, failDir); return result; } catch (ThriftSecurityException ex) { diff --git a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java index b1692943cd1..ae9d0cf9f58 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java +++ b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java @@ -578,11 +578,6 @@ public boolean canDeleteRange(TCredentials c, Table.ID tableId, String tableName public boolean canBulkImport(TCredentials c, Table.ID tableId, String tableName, String dir, String failDir, Namespace.ID namespaceId) throws ThriftSecurityException { - return canBulkImport(c, tableId, namespaceId); - } - - public boolean canBulkImport(TCredentials c, Table.ID tableId, Namespace.ID namespaceId) - throws ThriftSecurityException { authenticate(c); return hasTablePermission(c, tableId, namespaceId, TablePermission.BULK_IMPORT, false); } diff --git a/server/base/src/test/java/org/apache/accumulo/server/client/deprecated/BulkImporterTest.java b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java similarity index 98% rename from server/base/src/test/java/org/apache/accumulo/server/client/deprecated/BulkImporterTest.java rename to server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java index 713b883d8b0..8aad354b630 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/client/deprecated/BulkImporterTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/client/BulkImporterTest.java @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.accumulo.server.client.deprecated; +package org.apache.accumulo.server.client; import java.util.Collection; import java.util.Collections; @@ -40,7 +40,7 @@ import org.apache.accumulo.core.file.FileOperations; import org.apache.accumulo.core.file.FileSKVWriter; import org.apache.accumulo.core.util.CachedConfiguration; -import org.apache.accumulo.server.client.deprecated.BulkImporter; +import org.apache.accumulo.server.client.BulkImporter; import org.apache.accumulo.server.fs.VolumeManager; import org.apache.accumulo.server.fs.VolumeManagerImpl; import org.apache.commons.lang.NotImplementedException; diff --git a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java index d4f04b2678f..0baec1fcebd 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java +++ b/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java @@ -525,10 +525,16 @@ public String invalidMessage(String argument) { final boolean canBulkImport; try { - canBulkImport = master.security.canBulkImport(c, tableId, namespaceId); + String tableName = Tables.getTableName(master.getInstance(), tableId); + canBulkImport = master.security.canBulkImport(c, tableId, tableName, dir, null, + namespaceId); } catch (ThriftSecurityException e) { throwIfTableMissingSecurityException(e, tableId, "", TableOperation.BULK_IMPORT); throw e; + } catch (TableNotFoundException e) { + throw new ThriftTableOperationException(tableId.canonicalID(), null, + TableOperation.BULK_IMPORT, TableOperationExceptionType.NOTFOUND, + "Table no longer exists"); } if (!canBulkImport) diff --git a/test/src/main/java/org/apache/accumulo/test/AuditMessageIT.java b/test/src/main/java/org/apache/accumulo/test/AuditMessageIT.java index 575d55cc6bc..b98dda0e820 100644 --- a/test/src/main/java/org/apache/accumulo/test/AuditMessageIT.java +++ b/test/src/main/java/org/apache/accumulo/test/AuditMessageIT.java @@ -306,8 +306,10 @@ public void testImportExportOperationsAudits() // Prepare to export the table File exportDir = new File(getCluster().getConfig().getDir() + "/export"); + File exportDirBulk = new File(getCluster().getConfig().getDir() + "/export_bulk"); + assertTrue(exportDirBulk.mkdir() || exportDirBulk.isDirectory()); - auditConnector.tableOperations().offline(OLD_TEST_TABLE_NAME); + auditConnector.tableOperations().offline(OLD_TEST_TABLE_NAME, true); auditConnector.tableOperations().exportTable(OLD_TEST_TABLE_NAME, exportDir.toString()); // We've exported the table metadata to the MiniAccumuloCluster root dir. Grab the .rf file path @@ -327,14 +329,13 @@ public void testImportExportOperationsAudits() } } FileUtils.copyFileToDirectory(importFile, exportDir); + FileUtils.copyFileToDirectory(importFile, exportDirBulk); auditConnector.tableOperations().importTable(NEW_TEST_TABLE_NAME, exportDir.toString()); // Now do a Directory (bulk) import of the same data. auditConnector.tableOperations().create(THIRD_TEST_TABLE_NAME); - File failDir = new File(exportDir + "/tmp"); - assertTrue(failDir.mkdirs() || failDir.isDirectory()); - auditConnector.tableOperations().importDirectory(THIRD_TEST_TABLE_NAME, exportDir.toString(), - failDir.toString(), false); + auditConnector.tableOperations().addFilesTo(THIRD_TEST_TABLE_NAME) + .from(exportDirBulk.toString()).load(); auditConnector.tableOperations().online(OLD_TEST_TABLE_NAME); // Stop testing activities here @@ -360,7 +361,7 @@ public void testImportExportOperationsAudits() assertEquals(1, findAuditMessage(auditMessages, String.format(AuditedSecurityOperation.CAN_BULK_IMPORT_AUDIT_TEMPLATE, - THIRD_TEST_TABLE_NAME, filePrefix + exportDir, filePrefix + failDir))); + THIRD_TEST_TABLE_NAME, filePrefix + exportDirBulk, null))); assertEquals(1, findAuditMessage(auditMessages, String.format(AuditedSecurityOperation.CAN_ONLINE_OFFLINE_TABLE_AUDIT_TEMPLATE, diff --git a/test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java b/test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java index 1e651843617..0ad03363996 100644 --- a/test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java +++ b/test/src/main/java/org/apache/accumulo/test/BulkImportSequentialRowsIT.java @@ -74,11 +74,8 @@ public void testBulkImportFailure() throws Exception { Path bulk = new Path(rootPath, "bulk"); log.info("bulk: {}", bulk); assertTrue(fs.mkdirs(bulk)); - Path err = new Path(rootPath, "err"); - log.info("err: {}", err); assertTrue(fs.mkdirs(bulk)); - assertTrue(fs.mkdirs(err)); Path rfile = new Path(bulk, "file.rf"); @@ -99,7 +96,7 @@ public void testBulkImportFailure() throws Exception { // Then import a single rfile to all the tablets, hoping that we get a failure to import because // of the balancer moving tablets around // and then we get to verify that the bug is actually fixed. - to.importDirectory(tableName, bulk.toString(), err.toString(), false); + to.addFilesTo(tableName).from(bulk.toString()).load(); // The bug is that some tablets don't get imported into. assertEquals(NR * NV, diff --git a/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java b/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java index db27cebecce..2f037b4be2a 100644 --- a/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java +++ b/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java @@ -1245,7 +1245,7 @@ public void verifyTableOperationsExceptions() throws Exception { fail(); break; case 18: - ops.importDirectory(tableName, "", "", false); + ops.addFilesTo(tableName).from("").load(); fail(); break; case 19: diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java index 4a33aaea189..1e27ac02cec 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkFileIT.java @@ -93,7 +93,7 @@ public void testBulkFile() throws Exception { writeData(writer3, 1000, 1999); writer3.close(); - FunctionalTestUtils.bulkImport(c, fs, tableName, dir); + c.tableOperations().addFilesTo(tableName).from(dir).load(); FunctionalTestUtils.checkRFiles(c, tableName, 6, 6, 1, 1); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java index f2852faae85..b17d7a8b541 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkSplitOptimizationIT.java @@ -22,7 +22,6 @@ import org.apache.accumulo.core.cli.ScannerOpts; import org.apache.accumulo.core.client.Connector; -import org.apache.accumulo.core.client.security.tokens.AuthenticationToken; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.harness.AccumuloClusterHarness; import org.apache.accumulo.minicluster.ServerType; @@ -93,7 +92,7 @@ public void testBulkSplitOptimization() throws Exception { FileStatus[] stats = fs.listStatus(testDir); System.out.println("Number of generated files: " + stats.length); - FunctionalTestUtils.bulkImport(c, fs, tableName, testDir.toString()); + c.tableOperations().addFilesTo(tableName).from(testDir.toString()).load(); FunctionalTestUtils.checkSplits(c, tableName, 0, 0); FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100); @@ -118,7 +117,6 @@ public void testBulkSplitOptimization() throws Exception { opts.cols = 1; opts.setTableName(tableName); - AuthenticationToken adminToken = getAdminToken(); opts.setClientInfo(getClientInfo()); VerifyIngest.verifyIngest(c, opts, new ScannerOpts()); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java index fa9dc2f412b..498997670aa 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java @@ -114,7 +114,7 @@ public void test() throws Exception { Path testrf = new Path(root, "testrf"); FunctionalTestUtils.createRFiles(c, fs, testrf.toString(), 500000, 59, 4); - FunctionalTestUtils.bulkImport(c, fs, tableName, testrf.toString()); + c.tableOperations().addFilesTo(tableName).from(testrf.toString()).load(); int beforeCount = countFiles(c); final AtomicBoolean fail = new AtomicBoolean(false); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java index eca5e0bd5fe..ddeee7d49ed 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java @@ -114,22 +114,6 @@ static void checkRFiles(Connector c, String tableName, int minTablets, int maxTa } } - static public void bulkImport(Connector c, FileSystem fs, String table, String dir) - throws Exception { - String failDir = dir + "_failures"; - Path failPath = new Path(failDir); - fs.delete(failPath, true); - fs.mkdirs(failPath); - - // Ensure server can read/modify files - c.tableOperations().importDirectory(table, dir, failDir, false); - - if (fs.listStatus(failPath).length > 0) { - throw new Exception("Some files failed to bulk import"); - } - - } - static public void checkSplits(Connector c, String table, int min, int max) throws Exception { Collection splits = c.tableOperations().listSplits(table); if (splits.size() < min || splits.size() > max) { diff --git a/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java b/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java index 873312a4982..af75b902a47 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapred/TokenFileIT.java @@ -91,7 +91,6 @@ public void close() throws IOException { } - @SuppressWarnings("deprecation") @Override public int run(String[] args) throws Exception { @@ -100,8 +99,7 @@ public int run(String[] args) throws Exception { + " "); } - String user = getAdminPrincipal(); - String tokenFile = args[0]; + // String tokenFile = args[0]; String table1 = args[1]; String table2 = args[2]; @@ -120,10 +118,9 @@ public int run(String[] args) throws Exception { job.setOutputKeyClass(Text.class); job.setOutputValueClass(Mutation.class); - AccumuloOutputFormat.setConnectorInfo(job, user, tokenFile); + AccumuloOutputFormat.setClientInfo(job, getCluster().getClientInfo()); AccumuloOutputFormat.setCreateTables(job, false); AccumuloOutputFormat.setDefaultTableName(job, table2); - AccumuloOutputFormat.setZooKeeperInstance(job, getCluster().getClientConfig()); job.setNumReduceTasks(0); diff --git a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java index 5acc3ace1b7..a69810b80a2 100644 --- a/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java +++ b/test/src/main/java/org/apache/accumulo/test/mapreduce/TokenFileIT.java @@ -89,8 +89,7 @@ public int run(String[] args) throws Exception { + " "); } - String user = getAdminPrincipal(); - String tokenFile = args[0]; + // String tokenFile = args[0]; String table1 = args[1]; String table2 = args[2];