Skip to content

Commit

Permalink
HADOOP-18869: [ABFS] Fix behavior of a File System APIs on root path (a…
Browse files Browse the repository at this point in the history
…pache#6003)

Contributed by Anuj Modi
  • Loading branch information
anujmodi2021 committed Nov 15, 2023
1 parent bdae7c7 commit 5d465f9
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;

import static java.net.HttpURLConnection.HTTP_CONFLICT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
Expand All @@ -120,6 +121,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_CREATE_ON_ROOT;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
Expand Down Expand Up @@ -324,6 +326,12 @@ public FSDataOutputStream create(final Path f,

statIncrement(CALL_CREATE);
trailingPeriodCheck(f);
if (f.isRoot()) {
throw new AbfsRestOperationException(HTTP_CONFLICT,
AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
ERR_CREATE_ON_ROOT,
null);
}

Path qualifiedPath = makeQualified(f);

Expand All @@ -348,6 +356,12 @@ public FSDataOutputStream createNonRecursive(final Path f, final FsPermission pe
final Progressable progress) throws IOException {

statIncrement(CALL_CREATE_NON_RECURSIVE);
if (f.isRoot()) {
throw new AbfsRestOperationException(HTTP_CONFLICT,
AzureServiceErrorCode.PATH_CONFLICT.getErrorCode(),
ERR_CREATE_ON_ROOT,
null);
}
final Path parent = f.getParent();
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.CREATE_NON_RECURSIVE, tracingHeaderFormat,
Expand Down Expand Up @@ -965,15 +979,25 @@ public void setXAttr(final Path path,
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.SET_ATTR, true, tracingHeaderFormat,
listener);
Hashtable<String, String> properties = abfsStore
.getPathStatus(qualifiedPath, tracingContext);
Hashtable<String, String> properties;
String xAttrName = ensureValidAttributeName(name);

if (path.isRoot()) {
properties = abfsStore.getFilesystemProperties(tracingContext);
} else {
properties = abfsStore.getPathStatus(qualifiedPath, tracingContext);
}

boolean xAttrExists = properties.containsKey(xAttrName);
XAttrSetFlag.validate(name, xAttrExists, flag);

String xAttrValue = abfsStore.decodeAttribute(value);
properties.put(xAttrName, xAttrValue);
abfsStore.setPathProperties(qualifiedPath, properties, tracingContext);
if (path.isRoot()) {
abfsStore.setFilesystemProperties(properties, tracingContext);
} else {
abfsStore.setPathProperties(qualifiedPath, properties, tracingContext);
}
} catch (AzureBlobFileSystemException ex) {
checkException(path, ex);
}
Expand Down Expand Up @@ -1005,9 +1029,15 @@ public byte[] getXAttr(final Path path, final String name)
TracingContext tracingContext = new TracingContext(clientCorrelationId,
fileSystemId, FSOperationType.GET_ATTR, true, tracingHeaderFormat,
listener);
Hashtable<String, String> properties = abfsStore
.getPathStatus(qualifiedPath, tracingContext);
Hashtable<String, String> properties;
String xAttrName = ensureValidAttributeName(name);

if (path.isRoot()) {
properties = abfsStore.getFilesystemProperties(tracingContext);
} else {
properties = abfsStore.getPathStatus(qualifiedPath, tracingContext);
}

if (properties.containsKey(xAttrName)) {
String xAttrValue = properties.get(xAttrName);
value = abfsStore.encodeAttribute(xAttrValue);
Expand Down Expand Up @@ -1488,7 +1518,7 @@ public static void checkException(final Path path,
case HttpURLConnection.HTTP_NOT_FOUND:
throw (IOException) new FileNotFoundException(message)
.initCause(exception);
case HttpURLConnection.HTTP_CONFLICT:
case HTTP_CONFLICT:
throw (IOException) new FileAlreadyExistsException(message)
.initCause(exception);
case HttpURLConnection.HTTP_FORBIDDEN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ public void setPathProperties(final Path path,
final Hashtable<String, String> properties, TracingContext tracingContext)
throws AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("setPathProperties", "setPathProperties")){
LOG.debug("setFilesystemProperties for filesystem: {} path: {} with properties: {}",
LOG.debug("setPathProperties for filesystem: {} path: {} with properties: {}",
client.getFileSystem(),
path,
properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,6 @@ public final class AbfsErrors {
+ "operation";
public static final String ERR_NO_LEASE_THREADS = "Lease desired but no lease threads "
+ "configured, set " + FS_AZURE_LEASE_THREADS;

public static final String ERR_CREATE_ON_ROOT = "Cannot create file over root path";
private AbfsErrors() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.io.IOException;
import java.util.EnumSet;

import org.junit.Assume;
import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.fs.Path;
Expand All @@ -46,45 +46,24 @@ public ITestAzureBlobFileSystemAttributes() throws Exception {
public void testSetGetXAttr() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
AbfsConfiguration conf = fs.getAbfsStore().getAbfsConfiguration();
Assume.assumeTrue(getIsNamespaceEnabled(fs));

byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute("hi");
byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute("你好");
String attributeName1 = "user.asciiAttribute";
String attributeName2 = "user.unicodeAttribute";
Path testFile = path("setGetXAttr");

// after creating a file, the xAttr should not be present
touch(testFile);
assertNull(fs.getXAttr(testFile, attributeName1));

// after setting the xAttr on the file, the value should be retrievable
fs.registerListener(
new TracingHeaderValidator(conf.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.SET_ATTR, true, 0));
fs.setXAttr(testFile, attributeName1, attributeValue1);
fs.setListenerOperation(FSOperationType.GET_ATTR);
assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
fs.registerListener(null);

// after setting a second xAttr on the file, the first xAttr values should not be overwritten
fs.setXAttr(testFile, attributeName2, attributeValue2);
assertArrayEquals(attributeValue1, fs.getXAttr(testFile, attributeName1));
assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName2));
final Path testPath = path("setGetXAttr");
fs.create(testPath);
testGetSetXAttrHelper(fs, testPath);
}

@Test
public void testSetGetXAttrCreateReplace() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
Assume.assumeTrue(getIsNamespaceEnabled(fs));
byte[] attributeValue = fs.getAbfsStore().encodeAttribute("one");
String attributeName = "user.someAttribute";
Path testFile = path("createReplaceXAttr");

// after creating a file, it must be possible to create a new xAttr
touch(testFile);
fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG);
assertArrayEquals(attributeValue, fs.getXAttr(testFile, attributeName));
Assertions.assertThat(fs.getXAttr(testFile, attributeName))
.describedAs("Retrieved Attribute Value is Not as Expected")
.containsExactly(attributeValue);

// however after the xAttr is created, creating it again must fail
intercept(IOException.class, () -> fs.setXAttr(testFile, attributeName, attributeValue, CREATE_FLAG));
Expand All @@ -93,7 +72,6 @@ public void testSetGetXAttrCreateReplace() throws Exception {
@Test
public void testSetGetXAttrReplace() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
Assume.assumeTrue(getIsNamespaceEnabled(fs));
byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute("one");
byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute("two");
String attributeName = "user.someAttribute";
Expand All @@ -108,6 +86,72 @@ public void testSetGetXAttrReplace() throws Exception {
// however after the xAttr is created, replacing it must succeed
fs.setXAttr(testFile, attributeName, attributeValue1, CREATE_FLAG);
fs.setXAttr(testFile, attributeName, attributeValue2, REPLACE_FLAG);
assertArrayEquals(attributeValue2, fs.getXAttr(testFile, attributeName));
Assertions.assertThat(fs.getXAttr(testFile, attributeName))
.describedAs("Retrieved Attribute Value is Not as Expected")
.containsExactly(attributeValue2);
}

@Test
public void testGetSetXAttrOnRoot() throws Exception {
AzureBlobFileSystem fs = getFileSystem();
final Path testPath = new Path("/");
testGetSetXAttrHelper(fs, testPath);
}

private void testGetSetXAttrHelper(final AzureBlobFileSystem fs,
final Path testPath) throws Exception {

String attributeName1 = "user.attribute1";
String attributeName2 = "user.attribute2";
String decodedAttributeValue1 = "hi";
String decodedAttributeValue2 = "hello";
byte[] attributeValue1 = fs.getAbfsStore().encodeAttribute(decodedAttributeValue1);
byte[] attributeValue2 = fs.getAbfsStore().encodeAttribute(decodedAttributeValue2);

// Attribute not present initially
Assertions.assertThat(fs.getXAttr(testPath, attributeName1))
.describedAs("Cannot get attribute before setting it")
.isNull();
Assertions.assertThat(fs.getXAttr(testPath, attributeName2))
.describedAs("Cannot get attribute before setting it")
.isNull();

// Set the Attributes
fs.registerListener(
new TracingHeaderValidator(fs.getAbfsStore().getAbfsConfiguration()
.getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.SET_ATTR, true, 0));
fs.setXAttr(testPath, attributeName1, attributeValue1);

// Check if the attribute is retrievable
fs.setListenerOperation(FSOperationType.GET_ATTR);
byte[] rv = fs.getXAttr(testPath, attributeName1);
Assertions.assertThat(rv)
.describedAs("Retrieved Attribute Does not Matches in Encoded Form")
.containsExactly(attributeValue1);
Assertions.assertThat(fs.getAbfsStore().decodeAttribute(rv))
.describedAs("Retrieved Attribute Does not Matches in Decoded Form")
.isEqualTo(decodedAttributeValue1);
fs.registerListener(null);

// Set the second Attribute
fs.setXAttr(testPath, attributeName2, attributeValue2);

// Check all the attributes present and previous Attribute not overridden
rv = fs.getXAttr(testPath, attributeName1);
Assertions.assertThat(rv)
.describedAs("Retrieved Attribute Does not Matches in Encoded Form")
.containsExactly(attributeValue1);
Assertions.assertThat(fs.getAbfsStore().decodeAttribute(rv))
.describedAs("Retrieved Attribute Does not Matches in Decoded Form")
.isEqualTo(decodedAttributeValue1);

rv = fs.getXAttr(testPath, attributeName2);
Assertions.assertThat(rv)
.describedAs("Retrieved Attribute Does not Matches in Encoded Form")
.containsExactly(attributeValue2);
Assertions.assertThat(fs.getAbfsStore().decodeAttribute(rv))
.describedAs("Retrieved Attribute Does not Matches in Decoded Form")
.isEqualTo(decodedAttributeValue2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.EnumSet;
import java.util.UUID;

import org.assertj.core.api.Assertions;
import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -33,6 +34,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.test.GenericTestUtils;
Expand Down Expand Up @@ -146,6 +148,26 @@ public void testCreateNonRecursive2() throws Exception {
assertIsFile(fs, testFile);
}

@Test
public void testCreateOnRoot() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
Path testFile = path(AbfsHttpConstants.ROOT_PATH);
AbfsRestOperationException ex = intercept(AbfsRestOperationException.class, () ->
fs.create(testFile, true));
if (ex.getStatusCode() != HTTP_CONFLICT) {
// Request should fail with 409.
throw ex;
}

ex = intercept(AbfsRestOperationException.class, () ->
fs.createNonRecursive(testFile, FsPermission.getDefault(),
false, 1024, (short) 1, 1024, null));
if (ex.getStatusCode() != HTTP_CONFLICT) {
// Request should fail with 409.
throw ex;
}
}

/**
* Attempts to use to the ABFS stream after it is closed.
*/
Expand Down Expand Up @@ -190,7 +212,8 @@ public void testTryWithResources() throws Throwable {
// the exception raised in close() must be in the caught exception's
// suppressed list
Throwable[] suppressed = fnfe.getSuppressed();
assertEquals("suppressed count", 1, suppressed.length);
Assertions.assertThat(suppressed.length)
.describedAs("suppressed count should be 1").isEqualTo(1);
Throwable inner = suppressed[0];
if (!(inner instanceof IOException)) {
throw inner;
Expand Down
2 changes: 1 addition & 1 deletion hadoop-tools/hadoop-azure/src/test/resources/abfs.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<configuration xmlns:xi="http://www.w3.org/2001/XInclude">
<property>
<name>fs.contract.test.root-tests-enabled</name>
<value>false</value>
<value>true</value>
</property>

<property>
Expand Down

0 comments on commit 5d465f9

Please sign in to comment.