Skip to content

Commit

Permalink
BlobContainer interface changed in elasticsearch 1.4.0
Browse files Browse the repository at this point in the history
AWS plugin needs an update because of this change elastic/elasticsearch#7551

Closes #37.

(cherry picked from commit 6d5ac76)
  • Loading branch information
dadoonet committed Oct 30, 2014
1 parent e8a0fe8 commit e3028c1
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 172 deletions.
2 changes: 0 additions & 2 deletions README.md
Expand Up @@ -358,7 +358,6 @@ The Azure repository supports following settings:

* `container`: Container name. Defaults to `elasticsearch-snapshots`
* `base_path`: Specifies the path within container to repository data. Defaults to empty (root directory).
* `concurrent_streams`: Throttles the number of streams (per node) preforming snapshot operation. Defaults to `5`.
* `chunk_size`: Big files can be broken down into chunks during snapshotting if needed. The chunk size can be specified
in bytes or by using size value notation, i.e. `1g`, `10m`, `5k`. Defaults to `64m` (64m max)
* `compress`: When set to `true` metadata files are stored in compressed format. This setting doesn't affect index
Expand All @@ -378,7 +377,6 @@ $ curl -XPUT 'http://localhost:9200/_snapshot/my_backup2' -d '{
"settings": {
"container": "backup_container",
"base_path": "backups",
"concurrent_streams": 2,
"chunk_size": "32m",
"compress": true
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Expand Up @@ -44,7 +44,7 @@ governing permissions and limitations under the License. -->

<properties>
<elasticsearch.version>1.4.0-SNAPSHOT</elasticsearch.version>
<lucene.version>4.9.0</lucene.version>
<lucene.version>4.10.1</lucene.version>
<tests.output>onerror</tests.output>
<tests.shuffle>true</tests.shuffle>
<tests.output>onerror</tests.output>
Expand Down
Expand Up @@ -24,8 +24,8 @@
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.collect.ImmutableMap;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;

/**
Expand All @@ -38,7 +38,6 @@ static public final class Fields {
public static final String KEY = "storage_key";
public static final String CONTAINER = "container";
public static final String BASE_PATH = "base_path";
public static final String CONCURRENT_STREAMS = "concurrent_streams";
public static final String CHUNK_SIZE = "chunk_size";
public static final String COMPRESS = "compress";
}
Expand All @@ -57,9 +56,7 @@ static public final class Fields {

InputStream getInputStream(String container, String blob) throws ServiceException;

ImmutableMap<String,BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix) throws URISyntaxException, StorageException, ServiceException;

void putObject(String container, String blob, InputStream is, long length) throws URISyntaxException, StorageException, IOException;

OutputStream getOutputStream(String container, String blob) throws URISyntaxException, StorageException;

ImmutableMap<String,BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix) throws URISyntaxException, StorageException, ServiceException;
}
Expand Up @@ -43,8 +43,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
Expand Down Expand Up @@ -197,6 +197,11 @@ public InputStream getInputStream(String container, String blob) throws ServiceE
return blobResult.getContentStream();
}

@Override
public OutputStream getOutputStream(String container, String blob) throws URISyntaxException, StorageException {
return client.getContainerReference(container).getBlockBlobReference(blob).openOutputStream();
}

@Override
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String container, String keyPath, String prefix) throws URISyntaxException, StorageException, ServiceException {
logger.debug("listBlobsByPrefix container [{}], keyPath [{}], prefix [{}]", container, keyPath, prefix);
Expand All @@ -223,16 +228,6 @@ public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(String container, St
return blobsBuilder.build();
}

@Override
public void putObject(String container, String blobname, InputStream is, long length) throws URISyntaxException, StorageException, IOException {
if (logger.isTraceEnabled()) {
logger.trace("creating blob in container [{}], blob [{}], length [{}]",
container, blobname, length);
}
CloudBlockBlob blob = client.getContainerReference(container).getBlockBlobReference(blobname);
blob.upload(is, length);
}

@Override
protected void doStart() throws ElasticsearchException {
logger.debug("starting azure storage client instance");
Expand Down
Expand Up @@ -21,7 +21,6 @@

import com.microsoft.windowsazure.services.core.ServiceException;
import com.microsoft.windowsazure.services.core.storage.StorageException;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
Expand All @@ -33,20 +32,21 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URISyntaxException;

/**
*
*/
public class AbstractAzureBlobContainer extends AbstractBlobContainer {
public class AzureBlobContainer extends AbstractBlobContainer {

protected final ESLogger logger = ESLoggerFactory.getLogger(AbstractAzureBlobContainer.class.getName());
protected final ESLogger logger = ESLoggerFactory.getLogger(AzureBlobContainer.class.getName());
protected final AzureBlobStore blobStore;

protected final String keyPath;

public AbstractAzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
public AzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
super(path);
this.blobStore = blobStore;
String keyPath = path.buildAsString("/");
Expand All @@ -68,6 +68,32 @@ public boolean blobExists(String blobName) {
return false;
}

@Override
public InputStream openInput(String blobName) throws IOException {
try {
return blobStore.client().getInputStream(blobStore.container(), buildKey(blobName));
} catch (ServiceException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(e.getMessage());
}
throw new IOException(e);
}
}

@Override
public OutputStream createOutput(String blobName) throws IOException {
try {
return new AzureOutputStream(blobStore.client().getOutputStream(blobStore.container(), buildKey(blobName)));
} catch (StorageException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
throw new FileNotFoundException(e.getMessage());
}
throw new IOException(e);
} catch (URISyntaxException e) {
throw new IOException(e);
}
}

@Override
public boolean deleteBlob(String blobName) throws IOException {
try {
Expand All @@ -82,35 +108,6 @@ public boolean deleteBlob(String blobName) throws IOException {
}
}

@Override
public void readBlob(final String blobName, final ReadBlobListener listener) {
blobStore.executor().execute(new Runnable() {
@Override
public void run() {
InputStream is = null;
try {
is = blobStore.client().getInputStream(blobStore.container(), buildKey(blobName));
byte[] buffer = new byte[blobStore.bufferSizeInBytes()];
int bytesRead;
while ((bytesRead = is.read(buffer)) != -1) {
listener.onPartial(buffer, 0, bytesRead);
}
is.close();
listener.onCompleted();
} catch (ServiceException e) {
if (e.getHttpStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) {
listener.onFailure(new FileNotFoundException(e.getMessage()));
} else {
listener.onFailure(e);
}
} catch (Throwable e) {
IOUtils.closeWhileHandlingException(is);
listener.onFailure(e);
}
}
});
}

@Override
public ImmutableMap<String, BlobMetaData> listBlobsByPrefix(@Nullable String prefix) throws IOException {

Expand Down
Expand Up @@ -22,16 +22,13 @@
import com.microsoft.windowsazure.services.core.ServiceException;
import com.microsoft.windowsazure.services.core.storage.StorageException;
import org.elasticsearch.cloud.azure.AzureStorageService;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.ImmutableBlobContainer;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;

import java.net.URISyntaxException;
import java.util.concurrent.Executor;

/**
*
Expand All @@ -42,17 +39,10 @@ public class AzureBlobStore extends AbstractComponent implements BlobStore {

private final String container;

private final Executor executor;

private final int bufferSizeInBytes;

public AzureBlobStore(Settings settings, AzureStorageService client, String container, Executor executor) throws URISyntaxException, StorageException {
public AzureBlobStore(Settings settings, AzureStorageService client, String container) throws URISyntaxException, StorageException {
super(settings);
this.client = client;
this.container = container;
this.executor = executor;

this.bufferSizeInBytes = (int) settings.getAsBytesSize("buffer_size", new ByteSizeValue(100, ByteSizeUnit.KB)).bytes();

if (!client.doesContainerExist(container)) {
client.createContainer(container);
Expand All @@ -72,17 +62,9 @@ public String container() {
return container;
}

public Executor executor() {
return executor;
}

public int bufferSizeInBytes() {
return bufferSizeInBytes;
}

@Override
public ImmutableBlobContainer immutableBlobContainer(BlobPath path) {
return new AzureImmutableBlobContainer(path, this);
public BlobContainer blobContainer(BlobPath path) {
return new AzureBlobContainer(path, this);
}

@Override
Expand Down

This file was deleted.

@@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cloud.azure.blobstore;

import java.io.IOException;
import java.io.OutputStream;

public class AzureOutputStream extends OutputStream {

private final OutputStream blobOutputStream;

public AzureOutputStream(OutputStream blobOutputStream) {
this.blobOutputStream = blobOutputStream;
}

@Override
public void write(int b) throws IOException {
blobOutputStream.write(b);
}

@Override
public void close() throws IOException {
try {
blobOutputStream.close();
} catch (IOException e) {
// Azure is sending a "java.io.IOException: Stream is already closed."
}
}
}
Expand Up @@ -19,7 +19,6 @@

package org.elasticsearch.discovery.azure;

import org.elasticsearch.Version;
import org.elasticsearch.cloud.azure.AzureComputeService;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
Expand Down Expand Up @@ -51,7 +50,7 @@ public AzureDiscovery(Settings settings, ClusterName clusterName, ThreadPool thr
DiscoveryNodeService discoveryNodeService, AzureComputeService azureService, NetworkService networkService,
DiscoverySettings discoverySettings, ElectMasterService electMasterService) {
super(settings, clusterName, threadPool, transportService, clusterService, nodeSettingsService,
discoveryNodeService, pingService, electMasterService, Version.CURRENT, discoverySettings);
discoveryNodeService, pingService, electMasterService, discoverySettings);
if (settings.getAsBoolean("cloud.enabled", true)) {
ImmutableList<? extends ZenPing> zenPings = pingService.zenPings();
UnicastZenPing unicastZenPing = null;
Expand Down

0 comments on commit e3028c1

Please sign in to comment.