Skip to content

Commit

Permalink
Make use of Hadoop 3.3 capability constants (#825)
Browse files Browse the repository at this point in the history
  • Loading branch information
medb committed Jul 20, 2022
1 parent 2624896 commit a37b26c
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
Expand Down Expand Up @@ -366,12 +367,6 @@ private GoogleCredentials getCredentials(Configuration config) throws IOExceptio
.orElse(credentials);
}

private static String validatePathCapabilityArgs(Path path, String capability) {
checkNotNull(path);
checkArgument(!isNullOrEmpty(capability), "capability parameter is empty string");
return Ascii.toLowerCase(capability);
}

private static boolean isImplicitDirectory(FileStatus curr) {
// Modification time of 0 indicates implicit directory.
return curr.isDirectory() && curr.getModificationTime() == 0;
Expand Down Expand Up @@ -1008,11 +1003,12 @@ protected int getDefaultPort() {

@Override
public boolean hasPathCapability(Path path, String capability) {
switch (validatePathCapabilityArgs(path, capability)) {
// TODO: remove string literals in favor of Constants in CommonPathCapabilities.java
// from Hadoop 3 when Hadoop 2 is no longer supported
case "fs.capability.paths.append":
case "fs.capability.paths.concat":
checkNotNull(path, "path must not be null");
checkArgument(
!isNullOrEmpty(capability), "capability must not be null or empty string for %s", path);
switch (Ascii.toLowerCase(capability)) {
case CommonPathCapabilities.FS_APPEND:
case CommonPathCapabilities.FS_CONCAT:
return true;
default:
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package com.google.cloud.hadoop.fs.gcs;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;

import com.google.cloud.hadoop.gcsio.CreateFileOptions;
import com.google.cloud.hadoop.gcsio.CreateObjectOptions;
Expand All @@ -25,6 +27,7 @@
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystemImpl;
import com.google.cloud.hadoop.gcsio.GoogleCloudStorageItemInfo;
import com.google.cloud.hadoop.gcsio.StorageResourceId;
import com.google.common.base.Ascii;
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.RateLimiter;
Expand All @@ -49,11 +52,13 @@
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

class GoogleHadoopOutputStream extends OutputStream implements IOStatisticsSource, Syncable {
class GoogleHadoopOutputStream extends OutputStream
implements IOStatisticsSource, StreamCapabilities, Syncable {

private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

Expand Down Expand Up @@ -367,4 +372,18 @@ private void throwIfNotOpen() throws IOException {
public IOStatistics getIOStatistics() {
return streamStatistics.getIOStatistics();
}

@Override
public boolean hasCapability(String capability) {
checkArgument(!isNullOrEmpty(capability), "capability must not be null or empty string");
switch (Ascii.toLowerCase(capability)) {
case StreamCapabilities.HFLUSH:
case StreamCapabilities.HSYNC:
return syncRateLimiter != null;
case StreamCapabilities.IOSTATISTICS:
return true;
default:
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,12 @@ public void hflush_throwsExceptionAfterClose() throws IOException {

@Test
public void hsync_largeNumberOfComposeComponents() throws Exception {
// Set an extremely low min sync interval as we need to perform many syncs in this test
ghfs.getConf().setInt(GCS_OUTPUT_STREAM_SYNC_MIN_INTERVAL_MS.getKey(), 1);

Path objectPath = new Path(ghfs.getUri().resolve("/hsync_largeNumberOfComposeComponents.bin"));

// number of compose components should be greater than 1024 (previous limit for GCS compose API)
// Number of compose components should be greater than 1024 (previous limit for GCS compose API)
byte[] expected = new byte[1536];
new Random().nextBytes(expected);

Expand Down

0 comments on commit a37b26c

Please sign in to comment.