Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,6 @@ private FileCacheValue loadFiles(FileCacheKey key) {
URI uri = finalLocation.getPath().toUri();
if (uri.getScheme() != null) {
String scheme = uri.getScheme();
updateJobConf("fs." + scheme + ".impl.disable.cache", "true");
if (jobConf.get("fs." + scheme + ".impl") == null) {
if (!scheme.equals("hdfs") && !scheme.equals("viewfs")) {
updateJobConf("fs." + scheme + ".impl", PropertyConverter.getHadoopFSImplByScheme(scheme));
Expand Down Expand Up @@ -451,8 +450,6 @@ private synchronized void setJobConf() {
// https://blog.actorsfit.com/a?ID=00550-ce56ec63-1bff-4b0c-a6f7-447b93efaa31
jobConf.set("mapreduce.input.fileinputformat.input.dir.recursive", "true");
// disable FileSystem's cache
jobConf.set("fs.hdfs.impl.disable.cache", "true");
jobConf.set("fs.file.impl.disable.cache", "true");
}

private synchronized void updateJobConf(String key, String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,6 @@ private static Map<String, String> convertToOBSProperties(Map<String, String> pr
CloudCredential credential) {
Map<String, String> obsProperties = Maps.newHashMap();
obsProperties.put(OBSConstants.ENDPOINT, props.get(ObsProperties.ENDPOINT));
obsProperties.put(ObsProperties.FS.IMPL_DISABLE_CACHE, "true");
obsProperties.put("fs.obs.impl", getHadoopFSImplByScheme("obs"));
if (credential.isWhole()) {
obsProperties.put(OBSConstants.ACCESS_KEY, credential.getAccessKey());
Expand Down Expand Up @@ -275,7 +274,6 @@ public static String checkRegion(String endpoint, String region, String regionKe
private static void setS3FsAccess(Map<String, String> s3Properties, Map<String, String> properties,
CloudCredential credential) {
s3Properties.put(Constants.MAX_ERROR_RETRIES, "2");
s3Properties.put("fs.s3.impl.disable.cache", "true");
s3Properties.putIfAbsent("fs.s3.impl", S3AFileSystem.class.getName());
String credentialsProviders = getAWSCredentialsProviders(properties);
s3Properties.put(Constants.AWS_CREDENTIALS_PROVIDER, credentialsProviders);
Expand All @@ -286,8 +284,6 @@ private static void setS3FsAccess(Map<String, String> s3Properties, Map<String,
if (credential.isTemporary()) {
s3Properties.put(Constants.SESSION_TOKEN, credential.getSessionToken());
s3Properties.put(Constants.AWS_CREDENTIALS_PROVIDER, TemporaryAWSCredentialsProvider.class.getName());
s3Properties.put("fs.s3.impl.disable.cache", "true");
s3Properties.put("fs.s3a.impl.disable.cache", "true");
}
s3Properties.put(Constants.PATH_STYLE_ACCESS, properties.getOrDefault(USE_PATH_STYLE, "false"));
for (Map.Entry<String, String> entry : properties.entrySet()) {
Expand Down Expand Up @@ -362,7 +358,6 @@ private static void rewriteHdfsOnOssProperties(Map<String, String> ossProperties
private static Map<String, String> convertToCOSProperties(Map<String, String> props, CloudCredential credential) {
Map<String, String> cosProperties = Maps.newHashMap();
cosProperties.put(CosNConfigKeys.COSN_ENDPOINT_SUFFIX_KEY, props.get(CosProperties.ENDPOINT));
cosProperties.put("fs.cosn.impl.disable.cache", "true");
cosProperties.put("fs.cosn.impl", getHadoopFSImplByScheme("cosn"));
cosProperties.put("fs.lakefs.impl", getHadoopFSImplByScheme("lakefs"));
if (credential.isWhole()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public void testAWSOldCatalogPropertiesConverter() throws Exception {
Assertions.assertEquals(13, properties.size());

Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(22, hdProps.size());
Assertions.assertEquals(21, hdProps.size());
}

@Test
Expand All @@ -291,7 +291,8 @@ public void testS3CatalogPropertiesConverter() throws Exception {
Assertions.assertEquals(12, properties.size());

Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(21, hdProps.size());
Assertions.assertNull(hdProps.get("fs.s3.impl.disable.cache"));
Assertions.assertEquals(20, hdProps.size());
}

@Test
Expand Down Expand Up @@ -442,7 +443,8 @@ public void testGlueCatalogPropertiesConverter() throws Exception {
Assertions.assertEquals("s3.us-east-1.amazonaws.com", properties.get(S3Properties.ENDPOINT));

Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(30, hdProps.size());
Assertions.assertEquals(29, hdProps.size());
Assertions.assertNull(hdProps.get("fs.s3.impl.disable.cache"));

String query = "create catalog hms_glue properties (\n"
+ " 'type'='hms',\n"
Expand All @@ -460,7 +462,8 @@ public void testGlueCatalogPropertiesConverter() throws Exception {
Assertions.assertEquals("s3.us-east-1.amazonaws.com.cn", propertiesNew.get(S3Properties.ENDPOINT));

Map<String, String> hdPropsNew = catalogNew.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(30, hdPropsNew.size());
Assertions.assertNull(hdPropsNew.get("fs.s3.impl.disable.cache"));
Assertions.assertEquals(29, hdPropsNew.size());
}

@Test
Expand All @@ -474,7 +477,7 @@ public void testS3CompatibleCatalogPropertiesConverter() throws Exception {
+ " 'cos.secret_key' = 'skk'\n"
+ ");";
testS3CompatibleCatalogProperties(catalogName0, CosProperties.COS_PREFIX,
"cos.ap-beijing.myqcloud.com", query0, 12, 18);
"cos.ap-beijing.myqcloud.com", query0, 12, 17);

String catalogName1 = "hms_oss";
String query1 = "create catalog " + catalogName1 + " properties (\n"
Expand All @@ -496,7 +499,7 @@ public void testS3CompatibleCatalogPropertiesConverter() throws Exception {
+ " 'minio.secret_key' = 'skk'\n"
+ ");";
testS3CompatibleCatalogProperties(catalogName2, MinioProperties.MINIO_PREFIX,
"http://127.0.0.1", query2, 12, 21);
"http://127.0.0.1", query2, 12, 20);

String catalogName3 = "hms_obs";
String query3 = "create catalog hms_obs properties (\n"
Expand All @@ -507,7 +510,7 @@ public void testS3CompatibleCatalogPropertiesConverter() throws Exception {
+ " 'obs.secret_key' = 'skk'\n"
+ ");";
testS3CompatibleCatalogProperties(catalogName3, ObsProperties.OBS_PREFIX,
"obs.cn-north-4.myhuaweicloud.com", query3, 12, 17);
"obs.cn-north-4.myhuaweicloud.com", query3, 12, 16);
}

private void testS3CompatibleCatalogProperties(String catalogName, String prefix,
Expand All @@ -521,6 +524,7 @@ private void testS3CompatibleCatalogProperties(String catalogName, String prefix

Map<String, String> hdProps = catalog.getCatalogProperty().getHadoopProperties();
Assertions.assertEquals(bePropsSize, hdProps.size());
Assertions.assertNull(hdProps.get(String.format("fs.%s.impl.disable.cache", prefix)));

Map<String, String> expectedMetaProperties = new HashMap<>();
expectedMetaProperties.put("endpoint", endpoint);
Expand Down