Skip to content

Commit

Permalink
Add compatibility with ES 2.0
Browse files Browse the repository at this point in the history
Remove compatibility with ES 0.90

relates elastic#524
relates elastic#525
  • Loading branch information
costin committed Aug 18, 2015
1 parent ee8f660 commit 4c111f4
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 62 deletions.
10 changes: 5 additions & 5 deletions README.md
@@ -1,9 +1,9 @@
# Hadoop HDFS Snapshot/Restore plugin

`elasticsearch-repository-hdfs` plugin allows Elasticsearch 1.4 to use `hdfs` file-system as a repository for [snapshot/restore](http://www.elasticsearch.org/guide/en/elasticsearch/reference/master/modules-snapshots.html). See [this blog](http://www.elasticsearch.org/blog/introducing-snapshot-restore/) entry for a quick introduction to snapshot/restore.
`elasticsearch-repository-hdfs` plugin allows Elasticsearch 2.0 to use `hdfs` file-system as a repository for [snapshot/restore](http://www.elasticsearch.org/guide/en/elasticsearch/reference/master/modules-snapshots.html). See [this blog](http://www.elasticsearch.org/blog/introducing-snapshot-restore/) entry for a quick introduction to snapshot/restore.

## Requirements
- Elasticsearch (version *1.4* or higher). For Elasticsearch *1.0*-*1.3* use the 2.0.x version of the plugin.
- Elasticsearch (version *2.0* or higher).
- HDFS accessible file-system (from the Elasticsearch classpath)

## Flavors
Expand All @@ -17,7 +17,7 @@ The `hadoop2` version contains the plugin jar plus the Hadoop 2.x (Yarn) depende
The `light` version contains just the plugin jar, without any Hadoop dependencies.

### What version to use?
It depends on whether you have Hadoop installed on your nodes or not. If you do, then we recommend exposing Hadoop to the Elasticsearch classpath (typically through an environment variable such as +ES_CLASSPATH+ - see the Elasticsearch [reference](http://www.elasticsearch.org/guide/en/elasticsearch/reference/1.x/setup-configuration.html) for more info) and using the `light` version.
It depends on whether you have Hadoop installed on your nodes or not. If you do, then we recommend exposing Hadoop to the Elasticsearch classpath (typically through an environment variable such as +ES_CLASSPATH+ - see the Elasticsearch [reference](https://www.elastic.co/guide/en/elasticsearch/reference/2.0/setup-configuration.html) for more info) and using the `light` version.
This guarantees the existing libraries and configuration are being picked up by the plugin.
If you do not have Hadoop installed, then select either the default version (for Hadoop stable/1.x) or, if you are using Hadoop 2, the `hadoop2` version.

Expand Down Expand Up @@ -72,7 +72,7 @@ repositories
conf_location: "extra-cfg.xml" # optional - Hadoop configuration XML to be loaded (use commas for multi values)
conf.<key> : "<value>" # optional - 'inlined' key=value added to the Hadoop configuration
concurrent_streams: 5 # optional - the number of concurrent streams (defaults to 5)
compress: "false" # optional - whether to compress the data or not (default)
compress: "false" # optional - whether to compress the metadata or not (default)
chunk_size: "10mb" # optional - chunk size (disabled by default)
```

Expand All @@ -85,7 +85,7 @@ Any HDFS-compatible file-systems (like Amazon `s3://` or Google `gs://`) can be
Otherwise, the plugin will only read the _default_, vanilla configuration of Hadoop and will not be able to recognized the plugged in file-system.

## Feedback / Q&A
We're interested in your feedback! You can find us on the User [mailing list](https://groups.google.com/forum/?fromgroups#!forum/elasticsearch) - please append `[Hadoop]` to the post subject to filter it out. For more details, see the [community](http://www.elasticsearch.org/community/) page.
We're interested in your feedback! You can find us on the [forum](https://discuss.elastic.co) - please use the Hadoop channel. For more details, see the [community](http://www.elasticsearch.org/community/) page.

## License
This project is released under version 2.0 of the [Apache License](http://www.apache.org/licenses/LICENSE-2.0)
Expand Down
Expand Up @@ -29,15 +29,15 @@
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ElasticsearchIntegrationTest;
import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope;
import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope;
import org.elasticsearch.test.store.MockDirectoryHelper;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.store.MockFSDirectoryService;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
Expand All @@ -46,15 +46,17 @@
import static org.hamcrest.Matchers.*;

@ClusterScope(scope = Scope.TEST, numDataNodes = 2)
public class HdfsSnapshotRestoreTest extends ElasticsearchIntegrationTest {
public class HdfsSnapshotRestoreTest extends ESIntegTestCase {


@Override
public Settings indexSettings() {
protected Settings nodeSettings(int ordinal) {
// During restore we frequently restore index to exactly the same state it was before, that might cause the same
// checksum file to be written twice during restore operation
return ImmutableSettings.builder().put(super.indexSettings())
.put(MockDirectoryHelper.RANDOM_PREVENT_DOUBLE_WRITE, false)
.put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false)
return Settings.builder().put(super.indexSettings())
.put(MockFSDirectoryService.RANDOM_PREVENT_DOUBLE_WRITE, false)
.put(MockFSDirectoryService.RANDOM_NO_DELETE_OPEN_FILE, false)
.put("plugin.types", HdfsPlugin.class.getName())
.build();
}

Expand All @@ -70,7 +72,7 @@ public final void wipeBefore() throws Exception {
@After
public final void wipeAfter() throws Exception {
wipeRepositories();
//cleanRepositoryFiles(path);
cleanRepositoryFiles(path);
}

@Test
Expand All @@ -80,13 +82,13 @@ public void testSimpleWorkflow() {

PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("hdfs")
.setSettings(ImmutableSettings.settingsBuilder()
.put("uri", "file://./")
.put("path", path)
.put("conf", "additional-cfg.xml, conf-2.xml")
.put("chunk_size", randomIntBetween(100, 1000))
.put("compress", randomBoolean())
).get();
.setSettings(Settings.settingsBuilder()
.put("uri", "file://./")
.put("path", path)
.put("conf", "additional-cfg.xml, conf-2.xml")
.put("chunk_size", randomIntBetween(100, 1000) + "k")
.put("compress", randomBoolean())
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));

createIndex("test-idx-1", "test-idx-2", "test-idx-3");
Expand Down Expand Up @@ -162,12 +164,12 @@ public void testWrongPath() {

PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo")
.setType("hdfs")
.setSettings(ImmutableSettings.settingsBuilder()
.put("uri", "file://./")
.put("path", path + "a@b$c#11:22")
.put("chunk_size", randomIntBetween(100, 1000))
.put("compress", randomBoolean())
).get();
.setSettings(Settings.settingsBuilder()
.put("uri", "file://./")
.put("path", path + "a@b$c#11:22")
.put("chunk_size", randomIntBetween(100, 1000) + "k")
.put("compress", randomBoolean())
).get();
assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true));

createIndex("test-idx-1", "test-idx-2", "test-idx-3");
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Locale;

import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
Expand All @@ -30,7 +31,8 @@
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;

import com.google.common.collect.ImmutableMap;

public class HdfsBlobContainer extends AbstractBlobContainer {

Expand All @@ -53,10 +55,16 @@ public boolean blobExists(String blobName) {
}

@Override
public boolean deleteBlob(String blobName) throws IOException {
return blobStore.fileSystemFactory().getFileSystem().delete(new Path(path, blobName), true);
public void deleteBlob(String blobName) throws IOException {
blobStore.fileSystemFactory().getFileSystem().delete(new Path(path, blobName), true);
}

@Override
public void move(String sourceBlobName, String targetBlobName) throws IOException {
if (!blobStore.fileSystemFactory().getFileSystem().rename(new Path(path, sourceBlobName), new Path(path, targetBlobName))) {
throw new IOException(String.format(Locale.ROOT, "can not move blob from [%s] to [%s]", sourceBlobName, targetBlobName));
}
}

@Override
public InputStream openInput(String blobName) throws IOException {
Expand Down
Expand Up @@ -31,19 +31,20 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.repositories.hdfs.FileSystemFactory;
import org.elasticsearch.threadpool.ThreadPool;

public class HdfsBlobStore extends AbstractComponent implements BlobStore {

private final FileSystemFactory ffs;
private final Path rootHdfsPath;
private final Executor executor;
private final ThreadPool threadPool;
private final int bufferSizeInBytes;

public HdfsBlobStore(Settings settings, FileSystemFactory ffs, Path path, Executor executor) throws IOException {
public HdfsBlobStore(Settings settings, FileSystemFactory ffs, Path path, ThreadPool threadPool) throws IOException {
super(settings);
this.ffs = ffs;
this.rootHdfsPath = path;
this.executor = executor;
this.threadPool = threadPool;

this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();

Expand All @@ -68,7 +69,7 @@ public Path path() {
}

public Executor executor() {
return executor;
return threadPool.executor(ThreadPool.Names.SNAPSHOT);
}

public int bufferSizeInBytes() {
Expand Down
Expand Up @@ -25,8 +25,6 @@
import java.net.URL;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -35,18 +33,17 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.hadoop.hdfs.blobstore.HdfsBlobStore;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.threadpool.ThreadPool;

public class HdfsRepository extends BlobStoreRepository implements FileSystemFactory {

Expand All @@ -56,34 +53,29 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac
private final BlobPath basePath;
private final ByteSizeValue chunkSize;
private final boolean compress;
private final ExecutorService concurrentStreamPool;
private final RepositorySettings repositorySettings;
private FileSystem fs;

@Inject
public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, ThreadPool threadPool) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);

this.repositorySettings = repositorySettings;

String path = repositorySettings.settings().get("path", componentSettings.get("path"));
String path = repositorySettings.settings().get("path", settings.get("path"));
if (path == null) {
throw new ElasticsearchIllegalArgumentException("no 'path' defined for hdfs snapshot/restore");
throw new IllegalArgumentException("no 'path' defined for hdfs snapshot/restore");
}

// get configuration
fs = getFileSystem();
Path hdfsPath = fs.makeQualified(new Path(path));
this.basePath = BlobPath.cleanPath();

int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5));
concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 5, TimeUnit.SECONDS,
EsExecutors.daemonThreadFactory(settings, "[hdfs_stream]"));

logger.debug("Using file-system [{}] for URI [{}], path [{}], concurrent_streams [{}]", fs, fs.getUri(), hdfsPath, concurrentStreams);
blobStore = new HdfsBlobStore(settings, this, hdfsPath, concurrentStreamPool);
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", null));
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));
logger.debug("Using file-system [{}] for URI [{}], path [{}]", fs, fs.getUri(), hdfsPath);
blobStore = new HdfsBlobStore(settings, this, hdfsPath, threadPool);
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", settings.getAsBytesSize("chunk_size", null));
this.compress = repositorySettings.settings().getAsBoolean("compress", settings.getAsBoolean("compress", false));
}

// as the FileSystem is long-lived and might go away, make sure to check it before it's being used.
Expand All @@ -110,25 +102,25 @@ public FileSystem getFileSystem() throws IOException {
}

private FileSystem initFileSystem(RepositorySettings repositorySettings) throws IOException {
Configuration cfg = new Configuration(repositorySettings.settings().getAsBoolean("load_defaults", componentSettings.getAsBoolean("load_defaults", true)));
Configuration cfg = new Configuration(repositorySettings.settings().getAsBoolean("load_defaults", settings.getAsBoolean("load_defaults", true)));

String confLocation = repositorySettings.settings().get("conf_location", componentSettings.get("conf_location"));
String confLocation = repositorySettings.settings().get("conf_location", settings.get("conf_location"));
if (Strings.hasText(confLocation)) {
for (String entry : Strings.commaDelimitedListToStringArray(confLocation)) {
addConfigLocation(cfg, entry.trim());
}
}

Map<String, String> map = componentSettings.getByPrefix("conf.").getAsMap();
Map<String, String> map = settings.getByPrefix("conf.").getAsMap();
for (Entry<String, String> entry : map.entrySet()) {
cfg.set(entry.getKey(), entry.getValue());
}

UserGroupInformation.setConfiguration(cfg);

String uri = repositorySettings.settings().get("uri", componentSettings.get("uri"));
String uri = repositorySettings.settings().get("uri", settings.get("uri"));
URI actualUri = (uri != null ? URI.create(uri) : FileSystem.getDefaultUri(cfg));
String user = repositorySettings.settings().get("user", componentSettings.get("user"));
String user = repositorySettings.settings().get("user", settings.get("user"));

try {
// disable FS cache
Expand All @@ -150,7 +142,7 @@ private void addConfigLocation(Configuration cfg, String confLocation) {
if (cfgURL == null) {
File file = new File(confLocation);
if (!file.canRead()) {
throw new ElasticsearchIllegalArgumentException(
throw new IllegalArgumentException(
String.format(
"Cannot find classpath resource or file 'conf_location' [%s] defined for hdfs snapshot/restore",
confLocation));
Expand All @@ -171,7 +163,7 @@ private void addConfigLocation(Configuration cfg, String confLocation) {
try {
cfgURL = new URL(confLocation);
} catch (MalformedURLException ex) {
throw new ElasticsearchIllegalArgumentException(String.format(
throw new IllegalArgumentException(String.format(
"Invalid 'conf_location' URL [%s] defined for hdfs snapshot/restore", confLocation), ex);
}
}
Expand Down Expand Up @@ -205,6 +197,5 @@ protected void doClose() throws ElasticsearchException {

IOUtils.closeStream(fs);
fs = null;
concurrentStreamPool.shutdown();
}
}
2 changes: 0 additions & 2 deletions src/main/resources/es-plugin.properties

This file was deleted.

3 changes: 3 additions & 0 deletions src/main/resources/plugin-descriptor.properties
@@ -0,0 +1,3 @@
description=Hdfs Plugin
version=2.1.0
classname=org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin

0 comments on commit 4c111f4

Please sign in to comment.