Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BLOCK_UPLOAD_ACTIVE_BLOCKS_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DATA_BLOCKS_BUFFER_DEFAULT;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLIENT_CORRELATIONID;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
Expand Down Expand Up @@ -158,6 +159,17 @@ public class AzureBlobFileSystem extends FileSystem
public void initialize(URI uri, Configuration configuration)
throws IOException {
uri = ensureAuthority(uri, configuration);
/*
If the clientCorrelationId is set, disable the cache so that
a new instance is created and hence incorrect correlation is not
recorded
*/
String correlationId = configuration.get(FS_AZURE_CLIENT_CORRELATIONID);
if(correlationId != null && !correlationId.equals("")) {
String scheme = uri.getScheme();
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
configuration.setBoolean(disableCacheName, true);
}
super.initialize(uri, configuration);
setConf(configuration);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Arrays;
import java.util.Random;

import com.sun.tools.javac.util.Assert;
import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -33,6 +34,7 @@
import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;

import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_TOLERATE_CONCURRENT_APPEND;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_CLIENT_CORRELATIONID;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
Expand Down Expand Up @@ -104,8 +106,11 @@ public void testOOBWritesAndReadSucceed() throws Exception {
Configuration conf = this.getRawConfiguration();
conf.setBoolean(AZURE_TOLERATE_CONCURRENT_APPEND, true);
final AzureBlobFileSystem fs = getFileSystem(conf);
String clientCorrelationId = "valid-corr-id-123";
conf.set(FS_AZURE_CLIENT_CORRELATIONID, clientCorrelationId);
AzureBlobFileSystem fs2 = getFileSystem(conf);
assertEquals(fs, fs2);
int readBufferSize = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize();

byte[] bytesToRead = new byte[readBufferSize];
final byte[] b = new byte[2 * readBufferSize];
new Random().nextBytes(b);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ public void testConfigPropNotFound() throws Throwable {

for (String key : CONFIG_KEYS) {
setAuthConfig(abfsConf, true, AuthType.OAuth);
abfsConf.unset(key);
abfsConf.unset(key + "." + accountName);
testMissingConfigKey(abfsConf, key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,94 @@ public void testClientCorrelationId() throws Exception {
checkCorrelationConfigValidation(CLIENT_CORRELATIONID_LIST[2], false);
}

/*
Case 1 :- Filesystem 1 doesn't have clientCorrelationID set, so cache is not disabled.
Filesystem 2 has clientCorrelationId set but since cache is not disabled,
we return the instance from cache and hence both should be equal
*/
@Test
public void testClientCorrelationIdWithoutConf() throws Exception{
Configuration conf = getRawConfiguration();
AzureBlobFileSystem fs1 = getFileSystem(conf);
conf.set(FS_AZURE_CLIENT_CORRELATIONID, CLIENT_CORRELATIONID_LIST[0]);
AzureBlobFileSystem fs2 = getFileSystem(conf);
assertEquals(fs1, fs2);
String correlationID = fs1.getClientCorrelationId();
String correlationID1 = fs2.getClientCorrelationId();
assertEquals(correlationID, correlationID1);
}

/*
Case 2 :- Filesystem 1 has clientCorrelationID set, so cache is disabled.
Filesystem 2 doesn't have any clientCorrelationId set but since cache is disabled,
a new instance will be created for the same.
Hence, both the instances should be different, and clientCorrelationId for the second instance is null.
*/
@Test
public void testClientCorrelationIdWithConf() throws Exception {
Configuration conf = getRawConfiguration();
conf.set(FS_AZURE_CLIENT_CORRELATIONID, CLIENT_CORRELATIONID_LIST[0]);
AzureBlobFileSystem fs1 = getFileSystem(conf);
Configuration conf1 = getRawConfiguration();
conf1.unset(FS_AZURE_CLIENT_CORRELATIONID);
AzureBlobFileSystem fs2 = getFileSystem(conf1);
assertNotEquals(fs1, fs2);
String correlationID = fs1.getClientCorrelationId();
String correlationID1 = fs2.getClientCorrelationId();
assertNotEquals(correlationID, correlationID1);
}

/*
Case 3 :- Filesystem 1 has clientCorrelationID set, so cache is disabled.
Filesystem 2 also has clientCorrelationId set but since cache is disabled,
a new instance will be created for the same.
Hence, both the instances should be different, and clientCorrelationId for both should be different as well.
*/
@Test
public void testClientCorrelationIdBothWithConf() throws Exception {
Configuration conf = getRawConfiguration();
conf.set(FS_AZURE_CLIENT_CORRELATIONID, CLIENT_CORRELATIONID_LIST[0]);
AzureBlobFileSystem fs1 = getFileSystem(conf);
conf.set(FS_AZURE_CLIENT_CORRELATIONID, CLIENT_CORRELATIONID_LIST[1]);
AzureBlobFileSystem fs2 = getFileSystem(conf);
assertNotEquals(fs1, fs2);
String correlationID = fs1.getClientCorrelationId();
String correlationID1 = fs2.getClientCorrelationId();
assertNotEquals(correlationID, correlationID1);
}

/*
Case 4 :- Filesystem 1 has clientCorrelationID set, so cache is disabled.
Filesystem 2 also has clientCorrelationId set but since cache is disabled,
a new instance will be created for the same.
Now we manually enable cache, so the filesystem instance 3 created with the same configuration
will be returned from the cache itself and will be equal to the first instance that was created with
the configuration.
*/
@Test
public void testClientCorrelationIdNewConf() throws Exception {
Configuration conf = getRawConfiguration();
conf.set(FS_AZURE_CLIENT_CORRELATIONID, CLIENT_CORRELATIONID_LIST[0]);
AzureBlobFileSystem fs1 = getFileSystem(conf);
String scheme = fs1.getUri().getScheme();
String disableCacheName = String.format("fs.%s.impl.disable.cache", scheme);
Configuration conf2 = getRawConfiguration();
conf2.set(FS_AZURE_CLIENT_CORRELATIONID, CLIENT_CORRELATIONID_LIST[1]);
AzureBlobFileSystem fs2 = getFileSystem(conf2);
if(conf2.getBoolean(disableCacheName, true)){
conf2.setBoolean(disableCacheName, false);
}
conf2.set(FS_AZURE_CLIENT_CORRELATIONID, CLIENT_CORRELATIONID_LIST[2]);
AzureBlobFileSystem fs3 = getFileSystem(conf2);
assertNotEquals(fs1, fs2);
assertEquals(fs1, fs3);
String correlationID = fs1.getClientCorrelationId();
String correlationID1 = fs2.getClientCorrelationId();
String correlationID2 = fs3.getClientCorrelationId();
assertNotEquals(correlationID, correlationID1);
assertEquals(correlationID, correlationID2);
}

private String getOctalNotation(FsPermission fsPermission) {
Preconditions.checkNotNull(fsPermission, "fsPermission");
return String
Expand Down