Skip to content

Commit

Permalink
Pavel pubdev 5688 gcs env check master (#2609)
Browse files Browse the repository at this point in the history
* PUBDEV-5688- GCS started in unauthorized regime when there is no connection available.

(cherry picked from commit b481e14)

* GCS Storage instance lazily initialized in PersistGcs class

(cherry picked from commit df771bc)

* GCS Storage lazy initialization moved to a provider class

(cherry picked from commit eea73ba)
  • Loading branch information
Pavel Pscheidl committed Jul 7, 2018
1 parent cca6c1a commit 7c1124c
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 20 deletions.
@@ -0,0 +1,28 @@
package water.persist;

import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;

/**
* A class wrapping {@link Storage}, enabling safe lazy initialization by only providing getStorage method, not risking for
* developers to access storage field directly.
*/
final class GcsStorageProvider {

private Storage storage;

/**
* Returns an existing instance of {@link Storage} or creates a new one, if not initialized.
* Lazy-initialization of storage does not slow down startup of H2O (attempts are made to connect to GCS).
* The connection status and {@link com.google.auth.Credentials} are checked at actual request-time.
*
* @return An instance of {@link Storage}, if initialized
*/
protected Storage getStorage() {
if (storage == null) {
storage = StorageOptions.getDefaultInstance().getService();
}

return storage;
}
}
43 changes: 23 additions & 20 deletions h2o-persist-gcs/src/main/java/water/persist/PersistGcs.java
Expand Up @@ -29,9 +29,11 @@
/**
* Persistence backend for GCS
*/
@SuppressWarnings("unused")
public final class PersistGcs extends Persist {

private final Storage storage = StorageOptions.getDefaultInstance().getService();
private GcsStorageProvider storageProvider = new GcsStorageProvider();


@Override
public byte[] load(final Value v) throws IOException {
Expand All @@ -46,7 +48,7 @@ public byte[] load(final Value v) throws IOException {
offset = FileVec.chunkOffset(k); // The offset
}

final ReadChannel reader = storage.reader(blobId);
final ReadChannel reader = storageProvider.getStorage().reader(blobId);
reader.seek(offset);
reader.read(wrappingBuffer);

Expand All @@ -56,7 +58,7 @@ public byte[] load(final Value v) throws IOException {
@Override
public Key uriToKey(URI uri) throws IOException {
final GcsBlob blob = GcsBlob.of(uri);
final Long contentSize = storage.get(blob.getBlobId()).getSize();
final Long contentSize = storageProvider.getStorage().get(blob.getBlobId()).getSize();
return GcsFileVec.make(blob.getCanonical(), contentSize);
}

Expand All @@ -67,14 +69,14 @@ public void store(Value v) throws IOException {
final GcsBlob blob = GcsBlob.of(v._key);
Log.debug("Storing: " + blob.toString());
final ByteBuffer buffer = ByteBuffer.wrap(payload);
storage.create(blob.getBlobInfo()).writer().write(buffer);
storageProvider.getStorage().create(blob.getBlobInfo()).writer().write(buffer);
}

@Override
public void delete(Value v) {
final BlobId blobId = GcsBlob.of(v._key).getBlobId();
Log.debug("Deleting: " + blobId.toString());
storage.get(blobId).delete();
storageProvider.getStorage().get(blobId).delete();
}

@Override
Expand All @@ -89,7 +91,7 @@ public void cleanUp() {
@Override
public List<String> load(String key) {
final List<String> blobs = new ArrayList<>();
for (Blob b : storage.get(key).list().iterateAll()) {
for (Blob b : storageProvider.getStorage().get(key).list().iterateAll()) {
blobs.add(b.getName());
}
return blobs;
Expand All @@ -103,7 +105,7 @@ public List<String> load(String key) {
@Override
public List<String> load(Object key) {
final List<String> fileNames = new ArrayList<>();
for (Bucket b : storage.list().iterateAll()) {
for (Bucket b : storageProvider.getStorage().list().iterateAll()) {
fileNames.add(b.getName());
}
return fileNames;
Expand Down Expand Up @@ -155,7 +157,7 @@ public void importFiles(String path,
parseBucket(bk[0], files, keys, fails);
} else {
try {
Blob blob = storage.get(bk[0], bk[1]);
Blob blob = storageProvider.getStorage().get(bk[0], bk[1]);
final GcsBlob gcsBlob = GcsBlob.of(blob.getBlobId());
final Key k = GcsFileVec.make(path, blob.getSize());
keys.add(k.toString());
Expand All @@ -172,7 +174,7 @@ private void parseBucket(String bucketId,
ArrayList<String> files,
ArrayList<String> keys,
ArrayList<String> fails) {
final Bucket bucket = storage.get(bucketId);
final Bucket bucket = storageProvider.getStorage().get(bucketId);
for (Blob blob : bucket.list().iterateAll()) {
final GcsBlob gcsBlob = GcsBlob.of(blob.getBlobId());
Log.debug("Importing: " + gcsBlob.toString());
Expand All @@ -191,7 +193,7 @@ private void parseBucket(String bucketId,
public InputStream open(final String path) {
final GcsBlob gcsBlob = GcsBlob.of(path);
Log.debug("Opening: " + gcsBlob.toString());
final Blob blob = storage.get(gcsBlob.getBlobId());
final Blob blob = storageProvider.getStorage().get(gcsBlob.getBlobId());
return new InputStream() {
final ReadChannel reader = blob.reader();

Expand Down Expand Up @@ -238,7 +240,7 @@ public void close() throws IOException {
public OutputStream create(String path, boolean overwrite) {
final GcsBlob gcsBlob = GcsBlob.of(path);
Log.debug("Creating: " + gcsBlob.getCanonical());
final WriteChannel writer = storage.create(gcsBlob.getBlobInfo()).writer();
final WriteChannel writer = storageProvider.getStorage().create(gcsBlob.getBlobInfo()).writer();
return new OutputStream() {
@Override
public void write(int b) throws IOException {
Expand Down Expand Up @@ -270,19 +272,19 @@ public boolean rename(String fromPath, String toPath) {
final BlobId fromBlob = GcsBlob.of(fromPath).getBlobId();
final BlobId toBlob = GcsBlob.of(toPath).getBlobId();

storage.get(fromBlob).copyTo(toBlob);
storageProvider.getStorage().get(fromBlob).copyTo(toBlob);
keyCache.invalidate(fromBlob.getBucket());
keyCache.invalidate(toBlob.getBucket());
return storage.delete(fromBlob);
return storageProvider.getStorage().delete(fromBlob);
}

@Override
public boolean exists(String path) {
final String bk[] = GcsBlob.removePrefix(path).split("/", 2);
if (bk.length == 1) {
return storage.get(bk[0]).exists();
return storageProvider.getStorage().get(bk[0]).exists();
} else if (bk.length == 2) {
Blob blob = storage.get(bk[0], bk[1]);
Blob blob = storageProvider.getStorage().get(bk[0], bk[1]);
return blob != null && blob.exists();
} else {
return false;
Expand All @@ -293,13 +295,13 @@ public boolean exists(String path) {
public boolean delete(String path) {
final BlobId blob = GcsBlob.of(path).getBlobId();
keyCache.invalidate(blob.getBucket());
return storage.get(blob).delete();
return storageProvider.getStorage().get(blob).delete();
}

@Override
public long length(String path) {
final BlobId blob = GcsBlob.of(path).getBlobId();
return storage.get(blob).getSize();
return storageProvider.getStorage().get(blob).getSize();
}

/**
Expand All @@ -316,7 +318,7 @@ public PersistEntry[] list(String path) {
int substrLen = bk.length == 2 ? bk[1].length() : 0;
List<PersistEntry> results = new ArrayList<>();
try {
for (Blob b : storage.list(bk[0]).iterateAll()) {
for (Blob b : storageProvider.getStorage().list(bk[0]).iterateAll()) {
if (bk.length == 1 || (bk.length == 2 && b.getName().startsWith(bk[1]))) {
String relativeName = b.getName().substring(substrLen);
if (relativeName.startsWith("/")) {
Expand All @@ -337,9 +339,9 @@ public boolean mkdirs(String path) {
final String input = GcsBlob.removePrefix(path);
final String[] bk = input.split("/", 2);
if (bk.length > 0) {
Bucket b = storage.get(bk[0]);
Bucket b = storageProvider.getStorage().get(bk[0]);
if (b == null || !b.exists()) {
storage.create(BucketInfo.of(bk[0]));
storageProvider.getStorage().create(BucketInfo.of(bk[0]));
}
return true;
} else {
Expand All @@ -350,4 +352,5 @@ public boolean mkdirs(String path) {
return false;
}
}

}

0 comments on commit 7c1124c

Please sign in to comment.