Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ACCUMULO-4813 New bulk import process and API #436

Merged
merged 6 commits into from May 5, 2018
@@ -39,6 +39,10 @@
<artifactId>auto-service</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
@@ -95,6 +95,8 @@
public static final String ZTABLE_LOCKS = "/table_locks";

public static final String BULK_PREFIX = "b-";
public static final String BULK_RENAME_FILE = "renames.json";
public static final String BULK_LOAD_MAPPING = "loadmap.json";

public static final String CLONE_PREFIX = "c-";
public static final byte[] CLONE_PREFIX_BYTES = CLONE_PREFIX.getBytes(UTF_8);
@@ -24,6 +24,7 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.Executor;
import java.util.function.Predicate;

import org.apache.accumulo.core.client.AccumuloException;
@@ -619,6 +620,63 @@ void setLocalityGroups(String tableName, Map<String,Set<Text>> groups)
void importDirectory(String tableName, String dir, String failureDir, boolean setTime)
throws TableNotFoundException, IOException, AccumuloException, AccumuloSecurityException;

/**
* @since 2.0.0
*/
public static interface ImportSourceOptions {
ImportSourceOptions settingLogicalTime();

void load()
throws TableNotFoundException, IOException, AccumuloException, AccumuloSecurityException;
}

/**
* @since 2.0.0
*/
public static interface ImportExecutorOptions extends ImportSourceOptions {
/**
* Files need must be inspected to determine what tablets they go to. This inspection is done in
* the current process. If this property is not set, then the client property
* {@code bulk.threads} is used to create a thread pool.
*
* @param service
* Use this executor to run file inspection task
* @return ImportSourceOptions
*/
ImportSourceOptions usingExecutor(Executor service);

/**
* Files need must be inspected to determine what tablets they go to. This inspection is done in
* the current process. If this property is not set, then the client property
* {@code bulk.threads} is used to create a thread pool.
*
* @param numThreads
* Create a thread pool with this many thread to run file inspection task.
* @return ImportSourceOptions
*/
ImportSourceOptions usingThreads(int numThreads);
}

/**
* @since 2.0.0
*/
public static interface ImportSourceArguments {
/**
*
* @param directory
* Load files from this directory
* @return ImportSourceOptions
*/
ImportSourceOptions from(String directory);

This comment has been minimized.

Copy link
@keith-turner

keith-turner Apr 20, 2018

Author Contributor

I think this should return ImportExecutorOptions

}

/**
* @since 2.0.0
*/
default ImportSourceArguments addFilesTo(String tableName) {
throw new UnsupportedOperationException();
}

/**
* Initiates taking a table offline, but does not wait for action to complete
*
@@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.accumulo.core.client.impl;

import java.io.Serializable;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;

import org.apache.accumulo.core.data.impl.KeyExtent;
import org.apache.accumulo.core.util.TextUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

import com.google.common.base.Preconditions;

public class Bulk {

/**
* WARNING : do not change this class, its used for serialization to Json
*/
public static class Mapping {
private Tablet tablet;
private Collection<FileInfo> files;

public Mapping(KeyExtent tablet, Files files) {
this.tablet = toTablet(tablet);
this.files = files.files.values();
}

public Tablet getTablet() {
return tablet;
}

public KeyExtent getKeyExtent(Table.ID tableId) {
return tablet.toKeyExtent(tableId);
}

public Files getFiles() {
return new Files(files);
}
}

/**
* WARNING : do not change this class, its used for serialization to Json
*/
public static class Tablet {

private byte[] endRow;
private byte[] prevEndRow;

public Tablet(Text endRow, Text prevEndRow) {
this.endRow = endRow == null ? null : TextUtil.getBytes(endRow);
this.prevEndRow = prevEndRow == null ? null : TextUtil.getBytes(prevEndRow);
}

public KeyExtent toKeyExtent(Table.ID tableId) {
return Bulk.toKeyExtent(tableId, this);
}

public Text getEndRow() {
if (endRow == null) {
return null;
}
return new Text(endRow);
}

public Text getPrevEndRow() {
if (prevEndRow == null) {
return null;
}
return new Text(prevEndRow);
}

@Override
public String toString() {
return getEndRow().toString() + ";" + getPrevEndRow().toString();
}
}

/**
* WARNING : do not change this class, its used for serialization to Json
*/
public static class FileInfo implements Serializable {
final String name;
final long estSize;
final long estEntries;

public FileInfo(String fileName, long estFileSize, long estNumEntries) {
this.name = fileName;
this.estSize = estFileSize;
this.estEntries = estNumEntries;
}

public FileInfo(Path path, long estSize) {
this(path.getName(), estSize, 0);
}

static FileInfo merge(FileInfo fi1, FileInfo fi2) {
Preconditions.checkArgument(fi1.name.equals(fi2.name));
return new FileInfo(fi1.name, fi1.estSize + fi2.estSize, fi1.estEntries + fi2.estEntries);
}

public String getFileName() {
return name;
}

public long getEstFileSize() {
return estSize;
}

public long getEstNumEntries() {
return estEntries;
}

@Override
public String toString() {
return String.format("file:%s estSize:%d estEntries:%s", name, estSize, estEntries);
}

@Override
public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof FileInfo))
return false;
FileInfo other = (FileInfo) o;
return this.name.equals(other.name) && this.estSize == other.estSize
&& this.estEntries == other.estEntries;
}

@Override
public int hashCode() {
return Objects.hash(name, estSize, estEntries);
}
}

public static class Files implements Iterable<FileInfo>, Serializable {
Map<String,FileInfo> files = new HashMap<>();

public Files(Collection<FileInfo> files) {
files.forEach(fi -> add(fi));
}

public Files() {}

public void add(FileInfo fi) {
if (files.putIfAbsent(fi.name, fi) != null) {
throw new IllegalArgumentException("File already present " + fi.name);
}
}

public FileInfo get(String fileName) {
return files.get(fileName);
}

public Files mapNames(Map<String,String> renames) {
Files renamed = new Files();

files.forEach((k, v) -> {
String newName = renames.get(k);
FileInfo nfi = new FileInfo(newName, v.estSize, v.estEntries);
renamed.files.put(newName, nfi);
});

return renamed;
}

void merge(Files other) {
other.files.forEach((k, v) -> {
files.merge(k, v, FileInfo::merge);
});
}

public int getSize() {
return this.files.size();
}

@Override
public Iterator<FileInfo> iterator() {
return files.values().iterator();
}

@Override
public boolean equals(Object o) {
if (o == this)
return true;
if (!(o instanceof Files))
return false;
Files other = (Files) o;
return this.files.equals(other.files);
}

@Override
public int hashCode() {
return files.hashCode();
}

@Override
public String toString() {
return files.toString();
}
}

public static Tablet toTablet(KeyExtent keyExtent) {
return new Tablet(keyExtent.getEndRow(), keyExtent.getPrevEndRow());
}

public static KeyExtent toKeyExtent(Table.ID tableId, Tablet tablet) {
return new KeyExtent(tableId, tablet.getEndRow(), tablet.getPrevEndRow());
}
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.