Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.cloud.storage.StorageOptions;
import com.google.common.base.Strings;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
Expand All @@ -42,6 +43,7 @@
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Iterator;
Expand Down Expand Up @@ -128,9 +130,8 @@ public final boolean mkdir(URI uri)
if (existsDirectoryOrBucket(gcsUri)) {
return true;
}
Blob blob =
_storage.create(BlobInfo.newBuilder(BlobId.of(gcsUri.getBucketName(), directoryPath)).build(), new byte[0]);
return blob.exists();
_storage.create(BlobInfo.newBuilder(BlobId.of(gcsUri.getBucketName(), directoryPath)).build(), new byte[0]);
return true;
} catch (Exception e) {
throw new IOException(e);
}
Expand Down Expand Up @@ -247,9 +248,10 @@ public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean recursive)
new FileMetadata.Builder().setFilePath(GcsUri.createGcsUri(bucketName, blob.getName()).toString())
.setLength(blob.getSize()).setIsDirectory(isDirectory);
if (!isDirectory) {
// Note: if it's a directory, updateTime is set to null, and calling this getter leads to NPE.
// public Long getUpdateTime() { return updateTime; }. So skip this for directory.
fileBuilder.setLastModifiedTime(blob.getUpdateTime());
OffsetDateTime blobUpdateTime = blob.getUpdateTimeOffsetDateTime();
if (blobUpdateTime != null) {
fileBuilder.setLastModifiedTime(blobUpdateTime.toInstant().toEpochMilli());
}
}
listBuilder.add(fileBuilder.build());
}
Expand Down Expand Up @@ -294,8 +296,9 @@ public List<FileMetadata> listFilesWithMetadata(final URI fileUri, final boolean
.setFilePath(filePath)
.setLength(blob.getSize())
.setIsDirectory(false);
if (blob.getUpdateTime() != null) {
fileBuilder.setLastModifiedTime(blob.getUpdateTime());
OffsetDateTime blobUpdateTime = blob.getUpdateTimeOffsetDateTime();
if (blobUpdateTime != null) {
fileBuilder.setLastModifiedTime(blobUpdateTime.toInstant().toEpochMilli());
}
result.add(fileBuilder.build());
if (result.size() >= maxResults) {
Expand Down Expand Up @@ -343,7 +346,12 @@ public boolean isDirectory(URI uri)
@Override
public long lastModified(URI uri)
throws IOException {
return getBlob(new GcsUri(uri)).getUpdateTime();
Blob blob = getBlob(new GcsUri(uri));
if (blob == null) {
return 0L;
}
OffsetDateTime updateTime = blob.getUpdateTimeOffsetDateTime();
return updateTime != null ? updateTime.toInstant().toEpochMilli() : 0L;
}

@Override
Expand All @@ -353,10 +361,16 @@ public boolean touch(URI uri)
LOGGER.info("touch {}", uri);
GcsUri gcsUri = new GcsUri(uri);
Blob blob = getBlob(gcsUri);
long updateTime = blob.getUpdateTime();
if (blob == null) {
// PinotFS contract: if the file does not exist, create an empty file
BlobInfo blobInfo = BlobInfo.newBuilder(BlobId.of(gcsUri.getBucketName(), gcsUri.getPath())).build();
_storage.create(blobInfo, new byte[0]);
return true;
}
// Any successful GCS objects.patch call advances the blob's updateTime server-side,
// so returning true on success (and propagating StorageException on failure) is correct.
_storage.update(blob.toBuilder().setMetadata(blob.getMetadata()).build());
long newUpdateTime = getBlob(gcsUri).getUpdateTime();
return newUpdateTime > updateTime;
return true;
} catch (StorageException e) {
throw new IOException(e);
}
Expand All @@ -367,6 +381,9 @@ public InputStream open(URI uri)
throws IOException {
try {
Blob blob = getBlob(new GcsUri(uri));
if (blob == null) {
throw new FileNotFoundException("File '" + uri + "' does not exist");
}
return Channels.newInputStream(blob.reader());
} catch (StorageException e) {
throw new IOException(e);
Expand Down Expand Up @@ -533,17 +550,19 @@ private boolean batchDelete(Page<Blob> page) {
private boolean copyFile(GcsUri srcUri, GcsUri dstUri)
throws IOException {
Blob blob = getBlob(srcUri);
Blob newBlob =
_storage.create(BlobInfo.newBuilder(BlobId.of(dstUri.getBucketName(), dstUri.getPath())).build(), new byte[0]);
CopyWriter copyWriter = blob.copyTo(newBlob.getBlobId());
if (blob == null) {
throw new FileNotFoundException("Source file '" + srcUri + "' does not exist");
}
BlobId dstBlobId = BlobId.of(dstUri.getBucketName(), dstUri.getPath());
CopyWriter copyWriter = blob.copyTo(dstBlobId);
copyWriter.getResult();
return copyWriter.isDone() && blob.exists();
return copyWriter.isDone();
}

private boolean copy(GcsUri srcUri, GcsUri dstUri)
throws IOException {
if (!exists(srcUri)) {
throw new IOException(String.format("Source URI '%s' does not exist", srcUri));
throw new IOException("Source URI '" + srcUri + "' does not exist");
}
if (srcUri.equals(dstUri)) {
return true;
Expand All @@ -554,7 +573,7 @@ private boolean copy(GcsUri srcUri, GcsUri dstUri)
}
// copy directory
if (srcUri.hasSubpath(dstUri) || dstUri.hasSubpath(srcUri)) {
throw new IOException(String.format("Cannot copy from or to a subdirectory: '%s' -> '%s'", srcUri, dstUri));
throw new IOException("Cannot copy from or to a subdirectory: '" + srcUri + "' -> '" + dstUri + "'");
}
/**
* If an non-empty blob exists and does not end with "/"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.plugin.filesystem;

import com.google.api.gax.paging.Page;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.util.Collections;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertThrows;
import static org.testng.Assert.fail;


/**
* Unit tests verifying null-safety fixes in GcsPinotFS:
* - open() throws FileNotFoundException (not NPE) when the blob does not exist
* - copy() throws FileNotFoundException (not NPE) when the source blob does not exist
*/
public class GcsPinotFSNullSafetyTest {

private Storage _mockStorage;
private GcsPinotFS _gcsPinotFS;

@BeforeMethod
public void setUp()
throws Exception {
_mockStorage = mock(Storage.class);
_gcsPinotFS = new GcsPinotFS();
Field storageField = GcsPinotFS.class.getDeclaredField("_storage");
storageField.setAccessible(true);
storageField.set(_gcsPinotFS, _mockStorage);
}

@Test
public void testOpenThrowsFileNotFoundExceptionWhenBlobDoesNotExist()
throws IOException {
URI uri = URI.create("gs://test-bucket/missing-file");
when(_mockStorage.get(any(BlobId.class))).thenReturn(null);

try {
_gcsPinotFS.open(uri);
fail("Expected FileNotFoundException");
} catch (FileNotFoundException ex) {
assertEquals(ex.getMessage(), "File '" + uri + "' does not exist");
}
}

@Test
public void testOpenDoesNotThrowNullPointerException()
throws IOException {
URI uri = URI.create("gs://test-bucket/missing-file");
when(_mockStorage.get(any(BlobId.class))).thenReturn(null);

// Must throw FileNotFoundException, not NullPointerException
assertThrows(FileNotFoundException.class, () -> _gcsPinotFS.open(uri));
}

@SuppressWarnings("unchecked")
@Test
public void testCopyFileThrowsFileNotFoundExceptionWhenSourceBlobDoesNotExist()
throws IOException {
URI srcUri = URI.create("gs://test-bucket/src-file");
URI dstUri = URI.create("gs://test-bucket/dst-file");

// existsDirectoryOrBucket calls _storage.get(BlobId with prefix ending in '/') -> null
// Then calls _storage.list(...) -> empty page -> returns false
Page<Blob> emptyPage = mock(Page.class);
when(emptyPage.iterateAll()).thenReturn(Collections.emptyList());
when(_mockStorage.list(anyString(), (Storage.BlobListOption[]) any())).thenReturn(emptyPage);

// _storage.get for exact path (no trailing slash):
// First call -> existsFile check: return a blob so exists() returns true
// Second call -> copyFile's getBlob: return null to trigger FileNotFoundException
Blob mockBlob = mock(Blob.class);
when(mockBlob.exists()).thenReturn(true);
when(_mockStorage.get(any(BlobId.class)))
.thenReturn(null) // existsDirectoryOrBucket: prefix-path blob (with trailing /)
.thenReturn(mockBlob) // existsFile: file-path blob (no trailing /)
.thenReturn(null); // copyFile's getBlob: returns null -> FileNotFoundException

try {
_gcsPinotFS.copyDir(srcUri, dstUri);
fail("Expected FileNotFoundException");
} catch (FileNotFoundException ex) {
assertEquals(ex.getMessage(), "Source file 'gs://test-bucket/src-file' does not exist");
}
}

@SuppressWarnings("unchecked")
@Test
public void testCopyFileDoesNotThrowNullPointerExceptionWhenSourceBlobDoesNotExist()
throws IOException {
URI srcUri = URI.create("gs://test-bucket/src-file");
URI dstUri = URI.create("gs://test-bucket/dst-file");

Page<Blob> emptyPage = mock(Page.class);
when(emptyPage.iterateAll()).thenReturn(Collections.emptyList());
when(_mockStorage.list(anyString(), (Storage.BlobListOption[]) any())).thenReturn(emptyPage);

Blob mockBlob = mock(Blob.class);
when(mockBlob.exists()).thenReturn(true);
when(_mockStorage.get(any(BlobId.class)))
.thenReturn(null)
.thenReturn(mockBlob)
.thenReturn(null);

// Must throw FileNotFoundException, not NullPointerException
assertThrows(FileNotFoundException.class, () -> _gcsPinotFS.copyDir(srcUri, dstUri));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -76,6 +79,9 @@ private static Blob mockBlob(final String name, final long size, final Long upda
when(blob.getName()).thenReturn(name);
when(blob.getSize()).thenReturn(size);
when(blob.getUpdateTime()).thenReturn(updateTime);
OffsetDateTime offsetDateTime = updateTime != null
? OffsetDateTime.ofInstant(Instant.ofEpochMilli(updateTime), ZoneOffset.UTC) : null;
when(blob.getUpdateTimeOffsetDateTime()).thenReturn(offsetDateTime);
return blob;
}

Expand Down
Loading