diff --git a/auron-core/pom.xml b/auron-core/pom.xml index 414553e31..8565aeb7e 100644 --- a/auron-core/pom.xml +++ b/auron-core/pom.xml @@ -77,6 +77,12 @@ provided + + org.apache.hadoop + hadoop-client-runtime + test + + org.junit.jupiter junit-jupiter-api diff --git a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java index 48f10ab01..df2c8dd6a 100644 --- a/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java +++ b/auron-core/src/main/java/org/apache/auron/jni/JniBridge.java @@ -20,6 +20,7 @@ import java.lang.management.BufferPoolMXBean; import java.lang.management.ManagementFactory; import java.net.URI; +import java.net.URISyntaxException; import java.nio.ByteBuffer; import java.util.List; import java.util.concurrent.ConcurrentHashMap; @@ -71,12 +72,16 @@ public static void putResource(String key, Object value) { } public static FSDataInputWrapper openFileAsDataInputWrapper(FileSystem fs, String path) throws Exception { - // the path is a URI string, so we need to convert it to a URI object - return FSDataInputWrapper.wrap(fs.open(new Path(new URI(path)))); + return FSDataInputWrapper.wrap(fs.open(toInputPath(path))); } public static FSDataOutputWrapper createFileAsDataOutputWrapper(FileSystem fs, String path) throws Exception { - return FSDataOutputWrapper.wrap(fs.create(new Path(new URI(path)))); + return FSDataOutputWrapper.wrap(fs.create(new Path(path))); + } + + private static Path toInputPath(String path) throws URISyntaxException { + String safePath = path.indexOf('#') >= 0 ? path.replace("#", "%23") : path; + return new Path(new URI(safePath)); } public static long getDirectMemoryUsed() { diff --git a/auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java b/auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java new file mode 100644 index 000000000..5f8128393 --- /dev/null +++ b/auron-core/src/test/java/org/apache/auron/jni/JniBridgeTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.auron.jni; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RawLocalFileSystem; +import org.junit.jupiter.api.Test; + +public class JniBridgeTest { + + @Test + public void testFileWrappersPreserveLiteralHashInHdfsPath() throws Exception { + String path = "hdfs://mycluster/tmp/channel=wx_repro#mini/spark-submit-123/part-00000.json"; + CapturingFileSystem cfs = new CapturingFileSystem(); + + JniBridge.openFileAsDataInputWrapper(cfs, path).close(); + JniBridge.createFileAsDataOutputWrapper(cfs, path).close(); + + assertPathPreservesHash(cfs.openedPath); + assertPathPreservesHash(cfs.createdPath); + } + + @Test + public void testFileWrappersHandleReadUriAndWriteRawPercentPaths() throws Exception { + String readPath = "file:/tmp/t1/part=test%2520test/part-00000.parquet"; + String writePath = "file:/tmp/t1/part=test%20test/part-00001.parquet"; + CapturingFileSystem cfs = new CapturingFileSystem(); + + JniBridge.openFileAsDataInputWrapper(cfs, readPath).close(); + JniBridge.createFileAsDataOutputWrapper(cfs, writePath).close(); + + assertEquals( + "/tmp/t1/part=test%20test/part-00000.parquet", + cfs.openedPath.toUri().getPath()); + assertEquals( + "/tmp/t1/part=test%20test/part-00001.parquet", + cfs.createdPath.toUri().getPath()); + } + + private static void assertPathPreservesHash(Path path) { + assertEquals( + "/tmp/channel=wx_repro#mini/spark-submit-123/part-00000.json", + path.toUri().getPath()); + assertNull(path.toUri().getFragment()); + } + + private static class CapturingFileSystem extends RawLocalFileSystem { + private final Statistics statistics = new Statistics("hdfs"); + private Path openedPath; + private Path createdPath; + + @Override + public FSDataInputStream open(Path path) throws IOException { + openedPath = path; + return new FSDataInputStream(new EmptyFSInputStream()); + } + + @Override + public FSDataOutputStream create(Path path) throws IOException { + createdPath = path; + return new FSDataOutputStream(new ByteArrayOutputStream(), statistics); + } + } + + private static class EmptyFSInputStream extends FSInputStream { + @Override + public void seek(long pos) {} + + @Override + public long getPos() { + return 0; + } + + @Override + public boolean seekToNewSource(long targetPos) { + return false; + } + + @Override + public int read() { + return -1; + } + } +}