diff --git a/README.md b/README.md index 26aad077c9cea..b15d90fed2caa 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@ # Hadoop HDFS Snapshot/Restore plugin -`elasticsearch-repository-hdfs` plugin allows Elasticsearch 1.4 to use `hdfs` file-system as a repository for [snapshot/restore](http://www.elasticsearch.org/guide/en/elasticsearch/reference/master/modules-snapshots.html). See [this blog](http://www.elasticsearch.org/blog/introducing-snapshot-restore/) entry for a quick introduction to snapshot/restore. +`elasticsearch-repository-hdfs` plugin allows Elasticsearch 2.0 to use `hdfs` file-system as a repository for [snapshot/restore](http://www.elasticsearch.org/guide/en/elasticsearch/reference/master/modules-snapshots.html). See [this blog](http://www.elasticsearch.org/blog/introducing-snapshot-restore/) entry for a quick introduction to snapshot/restore. ## Requirements -- Elasticsearch (version *1.4* or higher). For Elasticsearch *1.0*-*1.3* use the 2.0.x version of the plugin. +- Elasticsearch (version *2.0* or higher). - HDFS accessible file-system (from the Elasticsearch classpath) ## Flavors @@ -17,7 +17,7 @@ The `hadoop2` version contains the plugin jar plus the Hadoop 2.x (Yarn) depende The `light` version contains just the plugin jar, without any Hadoop dependencies. ### What version to use? -It depends on whether you have Hadoop installed on your nodes or not. If you do, then we recommend exposing Hadoop to the Elasticsearch classpath (typically through an environment variable such as +ES_CLASSPATH+ - see the Elasticsearch [reference](http://www.elasticsearch.org/guide/en/elasticsearch/reference/1.x/setup-configuration.html) for more info) and using the `light` version. +It depends on whether you have Hadoop installed on your nodes or not. If you do, then we recommend exposing Hadoop to the Elasticsearch classpath (typically through an environment variable such as +ES_CLASSPATH+ - see the Elasticsearch [reference](https://www.elastic.co/guide/en/elasticsearch/reference/2.0/setup-configuration.html) for more info) and using the `light` version. This guarantees the existing libraries and configuration are being picked up by the plugin. If you do not have Hadoop installed, then select either the default version (for Hadoop stable/1.x) or, if you are using Hadoop 2, the `hadoop2` version. @@ -72,7 +72,7 @@ repositories conf_location: "extra-cfg.xml" # optional - Hadoop configuration XML to be loaded (use commas for multi values) conf. : "" # optional - 'inlined' key=value added to the Hadoop configuration concurrent_streams: 5 # optional - the number of concurrent streams (defaults to 5) - compress: "false" # optional - whether to compress the data or not (default) + compress: "false" # optional - whether to compress the metadata or not (default) chunk_size: "10mb" # optional - chunk size (disabled by default) ``` @@ -85,7 +85,7 @@ Any HDFS-compatible file-systems (like Amazon `s3://` or Google `gs://`) can be Otherwise, the plugin will only read the _default_, vanilla configuration of Hadoop and will not be able to recognized the plugged in file-system. ## Feedback / Q&A -We're interested in your feedback! You can find us on the User [mailing list](https://groups.google.com/forum/?fromgroups#!forum/elasticsearch) - please append `[Hadoop]` to the post subject to filter it out. For more details, see the [community](http://www.elasticsearch.org/community/) page. +We're interested in your feedback! You can find us on the [forum](https://discuss.elastic.co) - please use the Hadoop channel. For more details, see the [community](http://www.elasticsearch.org/community/) page. ## License This project is released under version 2.0 of the [Apache License](http://www.apache.org/licenses/LICENSE-2.0) diff --git a/src/itest/java/org/elasticsearch/repositories/hdfs/HdfsSnapshotRestoreTest.java b/src/itest/java/org/elasticsearch/repositories/hdfs/HdfsSnapshotRestoreTest.java index 9b6240d819c2c..072ffd73fb3dd 100644 --- a/src/itest/java/org/elasticsearch/repositories/hdfs/HdfsSnapshotRestoreTest.java +++ b/src/itest/java/org/elasticsearch/repositories/hdfs/HdfsSnapshotRestoreTest.java @@ -29,15 +29,15 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.snapshots.SnapshotState; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; -import org.elasticsearch.test.ElasticsearchIntegrationTest.Scope; -import org.elasticsearch.test.store.MockDirectoryHelper; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.ESIntegTestCase.Scope; +import org.elasticsearch.test.store.MockFSDirectoryService; import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -46,15 +46,17 @@ import static org.hamcrest.Matchers.*; @ClusterScope(scope = Scope.TEST, numDataNodes = 2) -public class HdfsSnapshotRestoreTest extends ElasticsearchIntegrationTest { +public class HdfsSnapshotRestoreTest extends ESIntegTestCase { + @Override - public Settings indexSettings() { + protected Settings nodeSettings(int ordinal) { // During restore we frequently restore index to exactly the same state it was before, that might cause the same // checksum file to be written twice during restore operation - return ImmutableSettings.builder().put(super.indexSettings()) - .put(MockDirectoryHelper.RANDOM_PREVENT_DOUBLE_WRITE, false) - .put(MockDirectoryHelper.RANDOM_NO_DELETE_OPEN_FILE, false) + return Settings.builder().put(super.indexSettings()) + .put(MockFSDirectoryService.RANDOM_PREVENT_DOUBLE_WRITE, false) + .put(MockFSDirectoryService.RANDOM_NO_DELETE_OPEN_FILE, false) + .put("plugin.types", HdfsPlugin.class.getName()) .build(); } @@ -70,7 +72,7 @@ public final void wipeBefore() throws Exception { @After public final void wipeAfter() throws Exception { wipeRepositories(); - //cleanRepositoryFiles(path); + cleanRepositoryFiles(path); } @Test @@ -80,13 +82,13 @@ public void testSimpleWorkflow() { PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") .setType("hdfs") - .setSettings(ImmutableSettings.settingsBuilder() - .put("uri", "file://./") - .put("path", path) - .put("conf", "additional-cfg.xml, conf-2.xml") - .put("chunk_size", randomIntBetween(100, 1000)) - .put("compress", randomBoolean()) - ).get(); + .setSettings(Settings.settingsBuilder() + .put("uri", "file://./") + .put("path", path) + .put("conf", "additional-cfg.xml, conf-2.xml") + .put("chunk_size", randomIntBetween(100, 1000) + "k") + .put("compress", randomBoolean()) + ).get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); createIndex("test-idx-1", "test-idx-2", "test-idx-3"); @@ -162,12 +164,12 @@ public void testWrongPath() { PutRepositoryResponse putRepositoryResponse = client.admin().cluster().preparePutRepository("test-repo") .setType("hdfs") - .setSettings(ImmutableSettings.settingsBuilder() - .put("uri", "file://./") - .put("path", path + "a@b$c#11:22") - .put("chunk_size", randomIntBetween(100, 1000)) - .put("compress", randomBoolean()) - ).get(); + .setSettings(Settings.settingsBuilder() + .put("uri", "file://./") + .put("path", path + "a@b$c#11:22") + .put("chunk_size", randomIntBetween(100, 1000) + "k") + .put("compress", randomBoolean()) + ).get(); assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); createIndex("test-idx-1", "test-idx-2", "test-idx-3"); diff --git a/src/main/java/org/elasticsearch/hadoop/hdfs/blobstore/HdfsBlobContainer.java b/src/main/java/org/elasticsearch/hadoop/hdfs/blobstore/HdfsBlobContainer.java index e546bd7c92c65..785b2f9a76930 100644 --- a/src/main/java/org/elasticsearch/hadoop/hdfs/blobstore/HdfsBlobContainer.java +++ b/src/main/java/org/elasticsearch/hadoop/hdfs/blobstore/HdfsBlobContainer.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Locale; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; @@ -30,7 +31,8 @@ import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.support.AbstractBlobContainer; import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; -import org.elasticsearch.common.collect.ImmutableMap; + +import com.google.common.collect.ImmutableMap; public class HdfsBlobContainer extends AbstractBlobContainer { @@ -53,10 +55,16 @@ public boolean blobExists(String blobName) { } @Override - public boolean deleteBlob(String blobName) throws IOException { - return blobStore.fileSystemFactory().getFileSystem().delete(new Path(path, blobName), true); + public void deleteBlob(String blobName) throws IOException { + blobStore.fileSystemFactory().getFileSystem().delete(new Path(path, blobName), true); } + @Override + public void move(String sourceBlobName, String targetBlobName) throws IOException { + if (!blobStore.fileSystemFactory().getFileSystem().rename(new Path(path, sourceBlobName), new Path(path, targetBlobName))) { + throw new IOException(String.format(Locale.ROOT, "can not move blob from [%s] to [%s]", sourceBlobName, targetBlobName)); + } + } @Override public InputStream openInput(String blobName) throws IOException { diff --git a/src/main/java/org/elasticsearch/hadoop/hdfs/blobstore/HdfsBlobStore.java b/src/main/java/org/elasticsearch/hadoop/hdfs/blobstore/HdfsBlobStore.java index 43d7df2129492..edfd842f9ecb4 100644 --- a/src/main/java/org/elasticsearch/hadoop/hdfs/blobstore/HdfsBlobStore.java +++ b/src/main/java/org/elasticsearch/hadoop/hdfs/blobstore/HdfsBlobStore.java @@ -31,19 +31,20 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.repositories.hdfs.FileSystemFactory; +import org.elasticsearch.threadpool.ThreadPool; public class HdfsBlobStore extends AbstractComponent implements BlobStore { private final FileSystemFactory ffs; private final Path rootHdfsPath; - private final Executor executor; + private final ThreadPool threadPool; private final int bufferSizeInBytes; - public HdfsBlobStore(Settings settings, FileSystemFactory ffs, Path path, Executor executor) throws IOException { + public HdfsBlobStore(Settings settings, FileSystemFactory ffs, Path path, ThreadPool threadPool) throws IOException { super(settings); this.ffs = ffs; this.rootHdfsPath = path; - this.executor = executor; + this.threadPool = threadPool; this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes(); @@ -68,7 +69,7 @@ public Path path() { } public Executor executor() { - return executor; + return threadPool.executor(ThreadPool.Names.SNAPSHOT); } public int bufferSizeInBytes() { diff --git a/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java b/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java index bc6b74c538c22..b43370f18c264 100644 --- a/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java +++ b/src/main/java/org/elasticsearch/repositories/hdfs/HdfsRepository.java @@ -25,8 +25,6 @@ import java.net.URL; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -35,18 +33,17 @@ import org.apache.hadoop.security.UserGroupInformation; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchGenerationException; -import org.elasticsearch.ElasticsearchIllegalArgumentException; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobPath; import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.hadoop.hdfs.blobstore.HdfsBlobStore; import org.elasticsearch.index.snapshots.IndexShardRepository; import org.elasticsearch.repositories.RepositoryName; import org.elasticsearch.repositories.RepositorySettings; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.threadpool.ThreadPool; public class HdfsRepository extends BlobStoreRepository implements FileSystemFactory { @@ -56,19 +53,18 @@ public class HdfsRepository extends BlobStoreRepository implements FileSystemFac private final BlobPath basePath; private final ByteSizeValue chunkSize; private final boolean compress; - private final ExecutorService concurrentStreamPool; private final RepositorySettings repositorySettings; private FileSystem fs; @Inject - public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException { + public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository, ThreadPool threadPool) throws IOException { super(name.getName(), repositorySettings, indexShardRepository); this.repositorySettings = repositorySettings; - String path = repositorySettings.settings().get("path", componentSettings.get("path")); + String path = repositorySettings.settings().get("path", settings.get("path")); if (path == null) { - throw new ElasticsearchIllegalArgumentException("no 'path' defined for hdfs snapshot/restore"); + throw new IllegalArgumentException("no 'path' defined for hdfs snapshot/restore"); } // get configuration @@ -76,14 +72,10 @@ public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings Path hdfsPath = fs.makeQualified(new Path(path)); this.basePath = BlobPath.cleanPath(); - int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5)); - concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 5, TimeUnit.SECONDS, - EsExecutors.daemonThreadFactory(settings, "[hdfs_stream]")); - - logger.debug("Using file-system [{}] for URI [{}], path [{}], concurrent_streams [{}]", fs, fs.getUri(), hdfsPath, concurrentStreams); - blobStore = new HdfsBlobStore(settings, this, hdfsPath, concurrentStreamPool); - this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", null)); - this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false)); + logger.debug("Using file-system [{}] for URI [{}], path [{}]", fs, fs.getUri(), hdfsPath); + blobStore = new HdfsBlobStore(settings, this, hdfsPath, threadPool); + this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", settings.getAsBytesSize("chunk_size", null)); + this.compress = repositorySettings.settings().getAsBoolean("compress", settings.getAsBoolean("compress", false)); } // as the FileSystem is long-lived and might go away, make sure to check it before it's being used. @@ -110,25 +102,25 @@ public FileSystem getFileSystem() throws IOException { } private FileSystem initFileSystem(RepositorySettings repositorySettings) throws IOException { - Configuration cfg = new Configuration(repositorySettings.settings().getAsBoolean("load_defaults", componentSettings.getAsBoolean("load_defaults", true))); + Configuration cfg = new Configuration(repositorySettings.settings().getAsBoolean("load_defaults", settings.getAsBoolean("load_defaults", true))); - String confLocation = repositorySettings.settings().get("conf_location", componentSettings.get("conf_location")); + String confLocation = repositorySettings.settings().get("conf_location", settings.get("conf_location")); if (Strings.hasText(confLocation)) { for (String entry : Strings.commaDelimitedListToStringArray(confLocation)) { addConfigLocation(cfg, entry.trim()); } } - Map map = componentSettings.getByPrefix("conf.").getAsMap(); + Map map = settings.getByPrefix("conf.").getAsMap(); for (Entry entry : map.entrySet()) { cfg.set(entry.getKey(), entry.getValue()); } UserGroupInformation.setConfiguration(cfg); - String uri = repositorySettings.settings().get("uri", componentSettings.get("uri")); + String uri = repositorySettings.settings().get("uri", settings.get("uri")); URI actualUri = (uri != null ? URI.create(uri) : FileSystem.getDefaultUri(cfg)); - String user = repositorySettings.settings().get("user", componentSettings.get("user")); + String user = repositorySettings.settings().get("user", settings.get("user")); try { // disable FS cache @@ -150,7 +142,7 @@ private void addConfigLocation(Configuration cfg, String confLocation) { if (cfgURL == null) { File file = new File(confLocation); if (!file.canRead()) { - throw new ElasticsearchIllegalArgumentException( + throw new IllegalArgumentException( String.format( "Cannot find classpath resource or file 'conf_location' [%s] defined for hdfs snapshot/restore", confLocation)); @@ -171,7 +163,7 @@ private void addConfigLocation(Configuration cfg, String confLocation) { try { cfgURL = new URL(confLocation); } catch (MalformedURLException ex) { - throw new ElasticsearchIllegalArgumentException(String.format( + throw new IllegalArgumentException(String.format( "Invalid 'conf_location' URL [%s] defined for hdfs snapshot/restore", confLocation), ex); } } @@ -205,6 +197,5 @@ protected void doClose() throws ElasticsearchException { IOUtils.closeStream(fs); fs = null; - concurrentStreamPool.shutdown(); } } \ No newline at end of file diff --git a/src/main/resources/es-plugin.properties b/src/main/resources/es-plugin.properties deleted file mode 100644 index be600b0f38325..0000000000000 --- a/src/main/resources/es-plugin.properties +++ /dev/null @@ -1,2 +0,0 @@ -plugin=org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin - diff --git a/src/main/resources/plugin-descriptor.properties b/src/main/resources/plugin-descriptor.properties new file mode 100644 index 0000000000000..7924d0a3121eb --- /dev/null +++ b/src/main/resources/plugin-descriptor.properties @@ -0,0 +1,3 @@ +description=Hdfs Plugin +version=2.1.0 +classname=org.elasticsearch.plugin.hadoop.hdfs.HdfsPlugin