Skip to content

Commit

Permalink
hdfs repository
Browse files Browse the repository at this point in the history
relates to #72
  • Loading branch information
costin committed Jan 6, 2014
1 parent 165a251 commit afacdc4
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 0 deletions.
@@ -0,0 +1,151 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.repositories.hdfs;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchGenerationException;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.hadoop.hdfs.blobstore.HdfsBlobStore;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryName;
import org.elasticsearch.repositories.RepositorySettings;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;

public class HdfsRepository extends BlobStoreRepository implements Repository {

public final static String TYPE = "hdfs";

private final HdfsBlobStore blobStore;
private final BlobPath basePath;
private ByteSizeValue chunkSize;
private boolean compress;
private final ExecutorService concurrentStreamPool;
private final FileSystem fs;

@Inject
public HdfsRepository(RepositoryName name, RepositorySettings repositorySettings, IndexShardRepository indexShardRepository) throws IOException {
super(name.getName(), repositorySettings, indexShardRepository);

String path = repositorySettings.settings().get("path", componentSettings.get("path"));
if (path == null) {
throw new ElasticSearchIllegalArgumentException("no 'path' defined for hdfs snapshot/restore");
}

// get configuration
fs = initFileSystem(repositorySettings);
Path hdfsPath = fs.makeQualified(new Path(path));

int concurrentStreams = repositorySettings.settings().getAsInt("concurrent_streams", componentSettings.getAsInt("concurrent_streams", 5));
concurrentStreamPool = EsExecutors.newScaling(1, concurrentStreams, 5, TimeUnit.SECONDS,
EsExecutors.daemonThreadFactory(settings, "[hdfs_stream]"));

logger.debug("Using URI [{}], path [{}], concurrent_streams [{}]", fs, hdfsPath, concurrentStreams);
blobStore = new HdfsBlobStore(settings, fs, hdfsPath, concurrentStreamPool);
this.chunkSize = repositorySettings.settings().getAsBytesSize("chunk_size", componentSettings.getAsBytesSize("chunk_size", null));
this.compress = repositorySettings.settings().getAsBoolean("compress", componentSettings.getAsBoolean("compress", false));


String basePath = repositorySettings.settings().get("uri", null);
if (Strings.hasText(basePath)) {
BlobPath p = new BlobPath();
for(String elem : Strings.splitStringToArray(basePath, '/')) {
p = p.add(elem);
}
this.basePath = p;
} else {
this.basePath = BlobPath.cleanPath();
}
}

private FileSystem initFileSystem(RepositorySettings repositorySettings) throws IOException {
String uri = repositorySettings.settings().get("uri", componentSettings.get("uri"));
String confUri = repositorySettings.settings().get("conf_uri", componentSettings.get("conf_uri"));

Configuration cfg = new Configuration(repositorySettings.settings().getAsBoolean("load_defaults", componentSettings.getAsBoolean("load_defaults", true)));
if (confUri != null) {
try {
cfg.addResource(new URL(uri));
} catch (MalformedURLException ex) {
throw new ElasticSearchIllegalArgumentException(String.format("invalid 'conf_uri' [%s] defined for hdfs snapshot/restore", uri), ex);
}
}

Map<String, String> map = componentSettings.getByPrefix("conf.").getAsMap();
for (Entry<String, String> entry : map.entrySet()) {
cfg.set(entry.getKey(), entry.getValue());
}

URI actualUri = (uri != null ? URI.create(uri) : FileSystem.getDefaultUri(cfg));
String user = repositorySettings.settings().get("user", componentSettings.get("user"));

try {
return (user != null ? FileSystem.get(actualUri, cfg, user) : FileSystem.get(actualUri, cfg));
} catch (Exception ex) {
throw new ElasticSearchGenerationException(String.format("cannot create Hdfs file-system for uri [%s]", actualUri), ex);
}
}

@Override
protected BlobStore blobStore() {
return blobStore;
}

protected BlobPath basePath() {
return basePath;
}

@Override
protected boolean isCompress() {
return compress;
}

@Override
protected ByteSizeValue chunkSize() {
return chunkSize;
}

@Override
protected void doClose() throws ElasticSearchException {
super.doClose();

IOUtils.closeStream(fs);
concurrentStreamPool.shutdown();
}
}
@@ -0,0 +1,39 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.repositories.hdfs;

import org.elasticsearch.common.inject.AbstractModule;
import org.elasticsearch.index.snapshots.IndexShardRepository;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardRepository;
import org.elasticsearch.repositories.Repository;

public class HdfsRepositoryModule extends AbstractModule {

public HdfsRepositoryModule() {
super();
}

@Override
protected void configure() {
bind(Repository.class).to(HdfsRepository.class).asEagerSingleton();
bind(IndexShardRepository.class).to(BlobStoreIndexShardRepository.class).asEagerSingleton();
}
}

0 comments on commit afacdc4

Please sign in to comment.