Skip to content

Commit

Permalink
GCS Storage instance lazily initialized in PersistGcs class
Browse files Browse the repository at this point in the history
  • Loading branch information
Pavel Pscheidl committed Jul 6, 2018
1 parent b481e14 commit df771bc
Showing 1 changed file with 31 additions and 28 deletions.
59 changes: 31 additions & 28 deletions h2o-persist-gcs/src/main/java/water/persist/PersistGcs.java
Expand Up @@ -32,18 +32,21 @@
@SuppressWarnings("unused")
public final class PersistGcs extends Persist {

// Value in DefaultCredentialsProvider.NO_GCE_CHECK_ENV_VAR is not publicly accessible
private static final String NO_GCE_CHECK_ENV_VAR = "NO_GCE_CHECK";
private Storage storage;

private final Storage storage;

@SuppressWarnings("unused")
public PersistGcs() {
if (System.getenv(NO_GCE_CHECK_ENV_VAR) == null){
storage = StorageOptions.getUnauthenticatedInstance().getService();
} else{
/**
* Returns an existing instance of {@link Storage} or creates a new one, if not initialized.
* Lazy-initialization of storage does not slow down startup of H2O (attempts are made to connect to GCS).
* The connection status and {@link com.google.auth.Credentials} are checked at actual request-time.
*
* @return An instance of {@link Storage}, if initialized
*/
private Storage getStorage() {
if (storage == null) {
storage = StorageOptions.getDefaultInstance().getService();
}

return storage;
}

@Override
Expand All @@ -59,7 +62,7 @@ public byte[] load(final Value v) throws IOException {
offset = FileVec.chunkOffset(k); // The offset
}

final ReadChannel reader = storage.reader(blobId);
final ReadChannel reader = getStorage().reader(blobId);
reader.seek(offset);
reader.read(wrappingBuffer);

Expand All @@ -69,7 +72,7 @@ public byte[] load(final Value v) throws IOException {
@Override
public Key uriToKey(URI uri) throws IOException {
final GcsBlob blob = GcsBlob.of(uri);
final Long contentSize = storage.get(blob.getBlobId()).getSize();
final Long contentSize = getStorage().get(blob.getBlobId()).getSize();
return GcsFileVec.make(blob.getCanonical(), contentSize);
}

Expand All @@ -80,14 +83,14 @@ public void store(Value v) throws IOException {
final GcsBlob blob = GcsBlob.of(v._key);
Log.debug("Storing: " + blob.toString());
final ByteBuffer buffer = ByteBuffer.wrap(payload);
storage.create(blob.getBlobInfo()).writer().write(buffer);
getStorage().create(blob.getBlobInfo()).writer().write(buffer);
}

@Override
public void delete(Value v) {
final BlobId blobId = GcsBlob.of(v._key).getBlobId();
Log.debug("Deleting: " + blobId.toString());
storage.get(blobId).delete();
getStorage().get(blobId).delete();
}

@Override
Expand All @@ -102,7 +105,7 @@ public void cleanUp() {
@Override
public List<String> load(String key) {
final List<String> blobs = new ArrayList<>();
for (Blob b : storage.get(key).list().iterateAll()) {
for (Blob b : getStorage().get(key).list().iterateAll()) {
blobs.add(b.getName());
}
return blobs;
Expand All @@ -116,7 +119,7 @@ public List<String> load(String key) {
@Override
public List<String> load(Object key) {
final List<String> fileNames = new ArrayList<>();
for (Bucket b : storage.list().iterateAll()) {
for (Bucket b : getStorage().list().iterateAll()) {
fileNames.add(b.getName());
}
return fileNames;
Expand Down Expand Up @@ -168,7 +171,7 @@ public void importFiles(String path,
parseBucket(bk[0], files, keys, fails);
} else {
try {
Blob blob = storage.get(bk[0], bk[1]);
Blob blob = getStorage().get(bk[0], bk[1]);
final GcsBlob gcsBlob = GcsBlob.of(blob.getBlobId());
final Key k = GcsFileVec.make(path, blob.getSize());
keys.add(k.toString());
Expand All @@ -185,7 +188,7 @@ private void parseBucket(String bucketId,
ArrayList<String> files,
ArrayList<String> keys,
ArrayList<String> fails) {
final Bucket bucket = storage.get(bucketId);
final Bucket bucket = getStorage().get(bucketId);
for (Blob blob : bucket.list().iterateAll()) {
final GcsBlob gcsBlob = GcsBlob.of(blob.getBlobId());
Log.debug("Importing: " + gcsBlob.toString());
Expand All @@ -204,7 +207,7 @@ private void parseBucket(String bucketId,
public InputStream open(final String path) {
final GcsBlob gcsBlob = GcsBlob.of(path);
Log.debug("Opening: " + gcsBlob.toString());
final Blob blob = storage.get(gcsBlob.getBlobId());
final Blob blob = getStorage().get(gcsBlob.getBlobId());
return new InputStream() {
final ReadChannel reader = blob.reader();

Expand Down Expand Up @@ -251,7 +254,7 @@ public void close() throws IOException {
public OutputStream create(String path, boolean overwrite) {
final GcsBlob gcsBlob = GcsBlob.of(path);
Log.debug("Creating: " + gcsBlob.getCanonical());
final WriteChannel writer = storage.create(gcsBlob.getBlobInfo()).writer();
final WriteChannel writer = getStorage().create(gcsBlob.getBlobInfo()).writer();
return new OutputStream() {
@Override
public void write(int b) throws IOException {
Expand Down Expand Up @@ -283,19 +286,19 @@ public boolean rename(String fromPath, String toPath) {
final BlobId fromBlob = GcsBlob.of(fromPath).getBlobId();
final BlobId toBlob = GcsBlob.of(toPath).getBlobId();

storage.get(fromBlob).copyTo(toBlob);
getStorage().get(fromBlob).copyTo(toBlob);
keyCache.invalidate(fromBlob.getBucket());
keyCache.invalidate(toBlob.getBucket());
return storage.delete(fromBlob);
return getStorage().delete(fromBlob);
}

@Override
public boolean exists(String path) {
final String bk[] = GcsBlob.removePrefix(path).split("/", 2);
if (bk.length == 1) {
return storage.get(bk[0]).exists();
return getStorage().get(bk[0]).exists();
} else if (bk.length == 2) {
Blob blob = storage.get(bk[0], bk[1]);
Blob blob = getStorage().get(bk[0], bk[1]);
return blob != null && blob.exists();
} else {
return false;
Expand All @@ -306,13 +309,13 @@ public boolean exists(String path) {
public boolean delete(String path) {
final BlobId blob = GcsBlob.of(path).getBlobId();
keyCache.invalidate(blob.getBucket());
return storage.get(blob).delete();
return getStorage().get(blob).delete();
}

@Override
public long length(String path) {
final BlobId blob = GcsBlob.of(path).getBlobId();
return storage.get(blob).getSize();
return getStorage().get(blob).getSize();
}

/**
Expand All @@ -329,7 +332,7 @@ public PersistEntry[] list(String path) {
int substrLen = bk.length == 2 ? bk[1].length() : 0;
List<PersistEntry> results = new ArrayList<>();
try {
for (Blob b : storage.list(bk[0]).iterateAll()) {
for (Blob b : getStorage().list(bk[0]).iterateAll()) {
if (bk.length == 1 || (bk.length == 2 && b.getName().startsWith(bk[1]))) {
String relativeName = b.getName().substring(substrLen);
if (relativeName.startsWith("/")) {
Expand All @@ -350,9 +353,9 @@ public boolean mkdirs(String path) {
final String input = GcsBlob.removePrefix(path);
final String[] bk = input.split("/", 2);
if (bk.length > 0) {
Bucket b = storage.get(bk[0]);
Bucket b = getStorage().get(bk[0]);
if (b == null || !b.exists()) {
storage.create(BucketInfo.of(bk[0]));
getStorage().create(BucketInfo.of(bk[0]));
}
return true;
} else {
Expand Down

0 comments on commit df771bc

Please sign in to comment.