diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index d0bdd9818db24..4e93e3e55df25 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -118,6 +118,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLIENT_CORRELATIONID; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel; import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator; @@ -158,6 +159,17 @@ public class AzureBlobFileSystem extends FileSystem public void initialize(URI uri, Configuration configuration) throws IOException { uri = ensureAuthority(uri, configuration); + /* + If the clientCorrelationId is set, disable the cache so that + a new instance is created and hence incorrect correlation is not + recorded + */ + String correlationId = configuration.get(FS_AZURE_CLIENT_CORRELATIONID); + if(correlationId != null && !correlationId.equals("")) { + String scheme = uri.getScheme(); + String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); + configuration.setBoolean(disableCacheName, true); + } super.initialize(uri, configuration); setConf(configuration); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java index 56016a39470e4..fcdd89162eebb 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemE2E.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Random; +import com.sun.tools.javac.util.Assert; import org.junit.Test; import org.apache.hadoop.conf.Configuration; @@ -33,6 +34,7 @@ import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLIENT_CORRELATIONID; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists; import static org.apache.hadoop.test.LambdaTestUtils.intercept; @@ -104,8 +106,11 @@ public void testOOBWritesAndReadSucceed() throws Exception { Configuration conf = this.getRawConfiguration(); conf.setBoolean(AZURE_TOLERATE_CONCURRENT_APPEND, true); final AzureBlobFileSystem fs = getFileSystem(conf); + String clientCorrelationId = "valid-corr-id-123"; + conf.set(FS_AZURE_CLIENT_CORRELATIONID, clientCorrelationId); + AzureBlobFileSystem fs2 = getFileSystem(conf); + assertEquals(fs, fs2); int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize(); - byte[] bytesToRead = new byte[readBufferSize]; final byte[] b = new byte[2 * readBufferSize]; new Random().nextBytes(b); diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAccountConfiguration.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAccountConfiguration.java index 86bb2adbe56ed..7bbd75ca922d7 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAccountConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestAccountConfiguration.java @@ -382,6 +382,7 @@ public void testConfigPropNotFound() throws Throwable { for (String key : CONFIG_KEYS) { setAuthConfig(abfsConf, true, AuthType.OAuth); + abfsConf.unset(key); abfsConf.unset(key + "." + accountName); testMissingConfigKey(abfsConf, key); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java index cf1a89dd1eaba..381bbf41fc130 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java @@ -66,6 +66,94 @@ public void testClientCorrelationId() throws Exception { checkCorrelationConfigValidation(CLIENT_CORRELATIONID_LIST[2], false); } + /* + Case 1 :- Filesystem 1 doesn't have clientCorrelationID set, so cache is not disabled. + Filesystem 2 has clientCorrelationId set but since cache is not disabled, + we return the instance from cache and hence both should be equal + */ + @Test + public void testClientCorrelationIdWithoutConf() throws Exception{ + Configuration conf = getRawConfiguration(); + AzureBlobFileSystem fs1 = getFileSystem(conf); + conf.set(FS_AZURE_CLIENT_CORRELATIONID, CLIENT_CORRELATIONID_LIST[0]); + AzureBlobFileSystem fs2 = getFileSystem(conf); + assertEquals(fs1, fs2); + String correlationID = fs1.getClientCorrelationId(); + String correlationID1 = fs2.getClientCorrelationId(); + assertEquals(correlationID, correlationID1); + } + + /* + Case 2 :- Filesystem 1 has clientCorrelationID set, so cache is disabled. + Filesystem 2 doesn't have any clientCorrelationId set but since cache is disabled, + a new instance will be created for the same. + Hence, both the instances should be different, and clientCorrelationId for the second instance is null. + */ + @Test + public void testClientCorrelationIdWithConf() throws Exception { + Configuration conf = getRawConfiguration(); + conf.set(FS_AZURE_CLIENT_CORRELATIONID, CLIENT_CORRELATIONID_LIST[0]); + AzureBlobFileSystem fs1 = getFileSystem(conf); + Configuration conf1 = getRawConfiguration(); + conf1.unset(FS_AZURE_CLIENT_CORRELATIONID); + AzureBlobFileSystem fs2 = getFileSystem(conf1); + assertNotEquals(fs1, fs2); + String correlationID = fs1.getClientCorrelationId(); + String correlationID1 = fs2.getClientCorrelationId(); + assertNotEquals(correlationID, correlationID1); + } + + /* + Case 3 :- Filesystem 1 has clientCorrelationID set, so cache is disabled. + Filesystem 2 also has clientCorrelationId set but since cache is disabled, + a new instance will be created for the same. + Hence, both the instances should be different, and clientCorrelationId for both should be different as well. + */ + @Test + public void testClientCorrelationIdBothWithConf() throws Exception { + Configuration conf = getRawConfiguration(); + conf.set(FS_AZURE_CLIENT_CORRELATIONID, CLIENT_CORRELATIONID_LIST[0]); + AzureBlobFileSystem fs1 = getFileSystem(conf); + conf.set(FS_AZURE_CLIENT_CORRELATIONID, CLIENT_CORRELATIONID_LIST[1]); + AzureBlobFileSystem fs2 = getFileSystem(conf); + assertNotEquals(fs1, fs2); + String correlationID = fs1.getClientCorrelationId(); + String correlationID1 = fs2.getClientCorrelationId(); + assertNotEquals(correlationID, correlationID1); + } + + /* + Case 4 :- Filesystem 1 has clientCorrelationID set, so cache is disabled. + Filesystem 2 also has clientCorrelationId set but since cache is disabled, + a new instance will be created for the same. + Now we manually enable cache, so the filesystem instance 3 created with the same configuration + will be returned from the cache itself and will be equal to the first instance that was created with + the configuration. + */ + @Test + public void testClientCorrelationIdNewConf() throws Exception { + Configuration conf = getRawConfiguration(); + conf.set(FS_AZURE_CLIENT_CORRELATIONID, CLIENT_CORRELATIONID_LIST[0]); + AzureBlobFileSystem fs1 = getFileSystem(conf); + String scheme = fs1.getUri().getScheme(); + String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme); + Configuration conf2 = getRawConfiguration(); + conf2.set(FS_AZURE_CLIENT_CORRELATIONID, CLIENT_CORRELATIONID_LIST[1]); + AzureBlobFileSystem fs2 = getFileSystem(conf2); + if(conf2.getBoolean(disableCacheName, true)){ + conf2.setBoolean(disableCacheName, false); + } + conf2.set(FS_AZURE_CLIENT_CORRELATIONID, CLIENT_CORRELATIONID_LIST[2]); + AzureBlobFileSystem fs3 = getFileSystem(conf2); + assertNotEquals(fs1, fs2); + assertEquals(fs1, fs3); + String correlationID = fs1.getClientCorrelationId(); + String correlationID1 = fs2.getClientCorrelationId(); + String correlationID2 = fs3.getClientCorrelationId(); + assertNotEquals(correlationID, correlationID1); + assertEquals(correlationID, correlationID2); + } + private String getOctalNotation(FsPermission fsPermission) { Preconditions.checkNotNull(fsPermission, "fsPermission"); return String