Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.accumulo.core.client.impl;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -97,7 +96,7 @@ public String toString() {
/**
* WARNING : do not change this class, its used for serialization to Json
*/
public static class FileInfo implements Serializable {
public static class FileInfo {
final String name;
final long estSize;
final long estEntries;
Expand Down Expand Up @@ -151,7 +150,7 @@ public int hashCode() {
}
}

public static class Files implements Iterable<FileInfo>, Serializable {
public static class Files implements Iterable<FileInfo> {
Map<String,FileInfo> files = new HashMap<>();

public Files(Collection<FileInfo> files) {
Expand Down
10 changes: 8 additions & 2 deletions proxy/src/main/java/org/apache/accumulo/proxy/ProxyServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.accumulo.core.client.admin.ActiveScan;
import org.apache.accumulo.core.client.admin.CompactionConfig;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TableOperations.ImportExecutorOptions;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.client.impl.ClientConfConverter;
import org.apache.accumulo.core.client.impl.Credentials;
Expand Down Expand Up @@ -1767,8 +1768,13 @@ public void importDirectory(ByteBuffer login, String tableName, String importDir
org.apache.accumulo.proxy.thrift.AccumuloException,
org.apache.accumulo.proxy.thrift.AccumuloSecurityException, TException {
try {
getConnector(login).tableOperations().importDirectory(tableName, importDir, failureDir,
setTime);
ImportExecutorOptions loader = getConnector(login).tableOperations().addFilesTo(tableName)
.from(importDir);
if (setTime) {
loader.settingLogicalTime().load();
} else {
loader.load();
}
} catch (Exception e) {
handleExceptionTNF(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.server.client.deprecated;
package org.apache.accumulo.server.client;

import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;

Expand Down Expand Up @@ -75,12 +75,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
public class BulkImporter {

private static final Logger log = LoggerFactory.getLogger(BulkImporter.class);

@Deprecated
public static List<String> bulkLoad(ClientContext context, long tid, String tableId,
List<String> files, String errorDir, boolean setTime) throws IOException, AccumuloException,
AccumuloSecurityException, ThriftTableOperationException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.accumulo.core.security.thrift.TCredentials;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.client.deprecated.BulkImporter;
import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.security.AuditedSecurityOperation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ public boolean canDeleteRange(TCredentials c, Table.ID tableId, String tableName
public boolean canBulkImport(TCredentials c, Table.ID tableId, String tableName, String dir,
String failDir, Namespace.ID namespaceId) throws ThriftSecurityException {
try {
boolean result = super.canBulkImport(c, tableId, namespaceId);
boolean result = super.canBulkImport(c, tableId, tableName, dir, failDir, namespaceId);
audit(c, result, CAN_BULK_IMPORT_AUDIT_TEMPLATE, tableName, dir, failDir);
return result;
} catch (ThriftSecurityException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,11 +578,6 @@ public boolean canDeleteRange(TCredentials c, Table.ID tableId, String tableName

public boolean canBulkImport(TCredentials c, Table.ID tableId, String tableName, String dir,
String failDir, Namespace.ID namespaceId) throws ThriftSecurityException {
return canBulkImport(c, tableId, namespaceId);
}

public boolean canBulkImport(TCredentials c, Table.ID tableId, Namespace.ID namespaceId)
throws ThriftSecurityException {
authenticate(c);
return hasTablePermission(c, tableId, namespaceId, TablePermission.BULK_IMPORT, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.server.client.deprecated;
package org.apache.accumulo.server.client;

import java.util.Collection;
import java.util.Collections;
Expand All @@ -40,7 +40,7 @@
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.util.CachedConfiguration;
import org.apache.accumulo.server.client.deprecated.BulkImporter;
import org.apache.accumulo.server.client.BulkImporter;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.fs.VolumeManagerImpl;
import org.apache.commons.lang.NotImplementedException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,10 +525,16 @@ public String invalidMessage(String argument) {

final boolean canBulkImport;
try {
canBulkImport = master.security.canBulkImport(c, tableId, namespaceId);
String tableName = Tables.getTableName(master.getInstance(), tableId);
canBulkImport = master.security.canBulkImport(c, tableId, tableName, dir, null,
namespaceId);
} catch (ThriftSecurityException e) {
throwIfTableMissingSecurityException(e, tableId, "", TableOperation.BULK_IMPORT);
throw e;
} catch (TableNotFoundException e) {
throw new ThriftTableOperationException(tableId.canonicalID(), null,
TableOperation.BULK_IMPORT, TableOperationExceptionType.NOTFOUND,
"Table no longer exists");
}

if (!canBulkImport)
Expand Down
13 changes: 7 additions & 6 deletions test/src/main/java/org/apache/accumulo/test/AuditMessageIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,10 @@ public void testImportExportOperationsAudits()

// Prepare to export the table
File exportDir = new File(getCluster().getConfig().getDir() + "/export");
File exportDirBulk = new File(getCluster().getConfig().getDir() + "/export_bulk");
assertTrue(exportDirBulk.mkdir() || exportDirBulk.isDirectory());

auditConnector.tableOperations().offline(OLD_TEST_TABLE_NAME);
auditConnector.tableOperations().offline(OLD_TEST_TABLE_NAME, true);
auditConnector.tableOperations().exportTable(OLD_TEST_TABLE_NAME, exportDir.toString());

// We've exported the table metadata to the MiniAccumuloCluster root dir. Grab the .rf file path
Expand All @@ -327,14 +329,13 @@ public void testImportExportOperationsAudits()
}
}
FileUtils.copyFileToDirectory(importFile, exportDir);
FileUtils.copyFileToDirectory(importFile, exportDirBulk);
auditConnector.tableOperations().importTable(NEW_TEST_TABLE_NAME, exportDir.toString());

// Now do a Directory (bulk) import of the same data.
auditConnector.tableOperations().create(THIRD_TEST_TABLE_NAME);
File failDir = new File(exportDir + "/tmp");
assertTrue(failDir.mkdirs() || failDir.isDirectory());
auditConnector.tableOperations().importDirectory(THIRD_TEST_TABLE_NAME, exportDir.toString(),
failDir.toString(), false);
auditConnector.tableOperations().addFilesTo(THIRD_TEST_TABLE_NAME)
.from(exportDirBulk.toString()).load();
auditConnector.tableOperations().online(OLD_TEST_TABLE_NAME);

// Stop testing activities here
Expand All @@ -360,7 +361,7 @@ public void testImportExportOperationsAudits()
assertEquals(1,
findAuditMessage(auditMessages,
String.format(AuditedSecurityOperation.CAN_BULK_IMPORT_AUDIT_TEMPLATE,
THIRD_TEST_TABLE_NAME, filePrefix + exportDir, filePrefix + failDir)));
THIRD_TEST_TABLE_NAME, filePrefix + exportDirBulk, null)));
assertEquals(1,
findAuditMessage(auditMessages,
String.format(AuditedSecurityOperation.CAN_ONLINE_OFFLINE_TABLE_AUDIT_TEMPLATE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,8 @@ public void testBulkImportFailure() throws Exception {
Path bulk = new Path(rootPath, "bulk");
log.info("bulk: {}", bulk);
assertTrue(fs.mkdirs(bulk));
Path err = new Path(rootPath, "err");
log.info("err: {}", err);

assertTrue(fs.mkdirs(bulk));
assertTrue(fs.mkdirs(err));

Path rfile = new Path(bulk, "file.rf");

Expand All @@ -99,7 +96,7 @@ public void testBulkImportFailure() throws Exception {
// Then import a single rfile to all the tablets, hoping that we get a failure to import because
// of the balancer moving tablets around
// and then we get to verify that the bug is actually fixed.
to.importDirectory(tableName, bulk.toString(), err.toString(), false);
to.addFilesTo(tableName).from(bulk.toString()).load();

// The bug is that some tablets don't get imported into.
assertEquals(NR * NV,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1245,7 +1245,7 @@ public void verifyTableOperationsExceptions() throws Exception {
fail();
break;
case 18:
ops.importDirectory(tableName, "", "", false);
ops.addFilesTo(tableName).from("").load();
fail();
break;
case 19:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void testBulkFile() throws Exception {
writeData(writer3, 1000, 1999);
writer3.close();

FunctionalTestUtils.bulkImport(c, fs, tableName, dir);
c.tableOperations().addFilesTo(tableName).from(dir).load();

FunctionalTestUtils.checkRFiles(c, tableName, 6, 6, 1, 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import org.apache.accumulo.core.cli.ScannerOpts;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
Expand Down Expand Up @@ -93,7 +92,7 @@ public void testBulkSplitOptimization() throws Exception {
FileStatus[] stats = fs.listStatus(testDir);

System.out.println("Number of generated files: " + stats.length);
FunctionalTestUtils.bulkImport(c, fs, tableName, testDir.toString());
c.tableOperations().addFilesTo(tableName).from(testDir.toString()).load();
FunctionalTestUtils.checkSplits(c, tableName, 0, 0);
FunctionalTestUtils.checkRFiles(c, tableName, 1, 1, 100, 100);

Expand All @@ -118,7 +117,6 @@ public void testBulkSplitOptimization() throws Exception {
opts.cols = 1;
opts.setTableName(tableName);

AuthenticationToken adminToken = getAdminToken();
opts.setClientInfo(getClientInfo());
VerifyIngest.verifyIngest(c, opts, new ScannerOpts());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public void test() throws Exception {
Path testrf = new Path(root, "testrf");
FunctionalTestUtils.createRFiles(c, fs, testrf.toString(), 500000, 59, 4);

FunctionalTestUtils.bulkImport(c, fs, tableName, testrf.toString());
c.tableOperations().addFilesTo(tableName).from(testrf.toString()).load();
int beforeCount = countFiles(c);

final AtomicBoolean fail = new AtomicBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,6 @@ static void checkRFiles(Connector c, String tableName, int minTablets, int maxTa
}
}

static public void bulkImport(Connector c, FileSystem fs, String table, String dir)
throws Exception {
String failDir = dir + "_failures";
Path failPath = new Path(failDir);
fs.delete(failPath, true);
fs.mkdirs(failPath);

// Ensure server can read/modify files
c.tableOperations().importDirectory(table, dir, failDir, false);

if (fs.listStatus(failPath).length > 0) {
throw new Exception("Some files failed to bulk import");
}

}

static public void checkSplits(Connector c, String table, int min, int max) throws Exception {
Collection<Text> splits = c.tableOperations().listSplits(table);
if (splits.size() < min || splits.size() > max) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public void close() throws IOException {

}

@SuppressWarnings("deprecation")
@Override
public int run(String[] args) throws Exception {

Expand All @@ -100,8 +99,7 @@ public int run(String[] args) throws Exception {
+ " <token file> <inputtable> <outputtable>");
}

String user = getAdminPrincipal();
String tokenFile = args[0];
// String tokenFile = args[0];
String table1 = args[1];
String table2 = args[2];

Expand All @@ -120,10 +118,9 @@ public int run(String[] args) throws Exception {
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Mutation.class);

AccumuloOutputFormat.setConnectorInfo(job, user, tokenFile);
AccumuloOutputFormat.setClientInfo(job, getCluster().getClientInfo());
AccumuloOutputFormat.setCreateTables(job, false);
AccumuloOutputFormat.setDefaultTableName(job, table2);
AccumuloOutputFormat.setZooKeeperInstance(job, getCluster().getClientConfig());

job.setNumReduceTasks(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,7 @@ public int run(String[] args) throws Exception {
+ " <token file> <inputtable> <outputtable>");
}

String user = getAdminPrincipal();
String tokenFile = args[0];
// String tokenFile = args[0];
String table1 = args[1];
String table2 = args[2];

Expand Down