From d88f24a443a6ca6fec53b79d679fe96d32c4f07c Mon Sep 17 00:00:00 2001 From: Daniel Warneke Date: Wed, 16 Jul 2014 21:55:51 +0200 Subject: [PATCH] [FLINK-1006] Added support for the MapR file system --- .../flink/configuration/ConfigConstants.java | 2 +- .../org/apache/flink/core/fs/FileSystem.java | 7 +- .../fs/hdfs/DistributedFileSystem.java | 2 +- .../runtime/fs/maprfs/MapRFileSystem.java | 386 ++++++++++++++++++ 4 files changed, 393 insertions(+), 4 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 0c0800a150693..68d96ca03a1f6 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -401,7 +401,7 @@ public final class ConfigConstants { public static final int DEFAULT_FS_STREAM_OPENING_TIMEOUT = 0; - // ------------------------ File System Bahavior ------------------------ + // ------------------------ File System Behavior ------------------------ /** * The default behavior with respect to overwriting existing files (= not overwrite) diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java index eeebc44614d35..f1355b34edea6 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java @@ -45,8 +45,10 @@ public abstract class FileSystem { private static final String LOCAL_FILESYSTEM_CLASS = "org.apache.flink.core.fs.local.LocalFileSystem"; - private static final String DISTRIBUTED_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.DistributedFileSystem"; + private static final String HADOOP_DISTRIBUTED_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.hdfs.DistributedFileSystem"; + private static final String MAPR_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.maprfs.MapRFileSystem"; + private static final String S3_FILESYSTEM_CLASS = "org.apache.flink.runtime.fs.s3.S3FileSystem"; @@ -156,7 +158,8 @@ public int hashCode() { private static final Map FSDIRECTORY = new HashMap(); static { - FSDIRECTORY.put("hdfs", DISTRIBUTED_FILESYSTEM_CLASS); + FSDIRECTORY.put("hdfs", HADOOP_DISTRIBUTED_FILESYSTEM_CLASS); + FSDIRECTORY.put("maprfs", MAPR_FILESYSTEM_CLASS); FSDIRECTORY.put("file", LOCAL_FILESYSTEM_CLASS); FSDIRECTORY.put("s3", S3_FILESYSTEM_CLASS); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java index 70f66b044346e..6280fed710766 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java @@ -38,7 +38,7 @@ import org.apache.hadoop.conf.Configuration; /** - * Concrete implementation of the {@Link FileSystem} base class for the Hadoop Distribution File System. The + * Concrete implementation of the {@link FileSystem} base class for the Hadoop Distribution File System. The * class is essentially a wrapper class which encapsulated the original Hadoop HDFS API. */ public final class DistributedFileSystem extends FileSystem { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java new file mode 100644 index 0000000000000..e85ebc5b0eecd --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java @@ -0,0 +1,386 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.fs.maprfs; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.flink.core.fs.BlockLocation; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.fs.hdfs.DistributedBlockLocation; +import org.apache.flink.runtime.fs.hdfs.DistributedDataInputStream; +import org.apache.flink.runtime.fs.hdfs.DistributedDataOutputStream; +import org.apache.flink.runtime.fs.hdfs.DistributedFileStatus; + +/** + * Concrete implementation of the {@link FileSystem} base class for the MapR + * file system. The class contains MapR specific code to initialize the + * connection to the file system. Apart from that, we code mainly reuses the + * existing HDFS wrapper code. + */ +public final class MapRFileSystem extends FileSystem { + + /** + * The log object used for debugging. + */ + private static final Log LOG = LogFactory.getLog(MapRFileSystem.class); + + /** + * The name of MapR's class containing the implementation of the Hadoop HDFS + * interface. + */ + private static final String MAPR_FS_IMPL_CLASS = "com.mapr.fs.MapRFileSystem"; + + /** + * Name of the environment variable to determine the location of the MapR + * installation. + */ + private static final String MAPR_HOME_ENV = "MAPR_HOME"; + + /** + * The default location of the MapR installation. + */ + private static final String DEFAULT_MAPR_HOME = "/opt/mapr/"; + + /** + * The path relative to the MAPR_HOME where MapR stores how to access the + * configured clusters. + */ + private static final String MAPR_CLUSTER_CONF_FILE = "/conf/mapr-clusters.conf"; + + /** + * A Hadoop configuration object used during the file system initialization. + */ + private final org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + + /** + * The MapR class containing the implementation of the Hadoop HDFS + * interface. + */ + private final Class fsClass; + + /** + * The MapR implementation of the Hadoop HDFS interface. + */ + private org.apache.hadoop.fs.FileSystem fs; + + /** + * Creates a new MapRFileSystem object to access the MapR file system. + * + * @throws IOException + * throw if the required MapR classes cannot be found + */ + @SuppressWarnings("unchecked") + public MapRFileSystem() throws IOException { + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Trying to load class %s to access the MapR file system", + MAPR_FS_IMPL_CLASS)); + } + + try { + this.fsClass = (Class) Class + .forName(MAPR_FS_IMPL_CLASS); + } catch (Exception e) { + throw new IOException( + String.format( + "Cannot find class %s, probably the runtime was not compiled against the MapR Hadoop libraries", + MAPR_FS_IMPL_CLASS), e); + } + } + + @Override + public Path getWorkingDirectory() { + + return new Path(this.fs.getWorkingDirectory().toUri()); + } + + @Override + public URI getUri() { + + return this.fs.getUri(); + } + + @Override + public void initialize(final URI path) throws IOException { + + if (LOG.isInfoEnabled()) { + LOG.info(String.format("Initializing MapR file system for path %s", + path.toString())); + } + + final String authority = path.getAuthority(); + if (authority == null || authority.isEmpty()) { + + // Use the default constructor to instantiate MapR file system + // object + + try { + this.fs = this.fsClass.newInstance(); + } catch (Exception e) { + throw new IOException(e); + } + } else { + + // We have an authority, check the MapR cluster configuration to + // find the CLDB locations. + final String[] cldbLocations = getCLDBLocations(authority); + + // Find the appropriate constructor + final Constructor constructor; + try { + constructor = this.fsClass.getConstructor(String.class, + String[].class); + } catch (NoSuchMethodException e) { + throw new IOException(e); + } + + // Instantiate the file system object + try { + this.fs = constructor.newInstance(authority, cldbLocations); + } catch (Exception e) { + throw new IOException(e); + } + } + + this.fs.initialize(path, this.conf); + } + + /** + * Retrieves the CLDB locations for the given MapR cluster name + * + * @param authority + * the name of the MapR cluster + * @return a list of CLDB locations + * @throws IOException + * thrown if the CLDB locations for the given MapR cluster name + * cannot be determined + */ + private static String[] getCLDBLocations(final String authority) + throws IOException { + + // Determine the MapR home + String maprHome = System.getenv(MAPR_HOME_ENV); + if (maprHome == null) { + maprHome = DEFAULT_MAPR_HOME; + } + + final File maprClusterConf = new File(maprHome, MAPR_CLUSTER_CONF_FILE); + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Trying to retrieve MapR cluster configuration from %s", + maprClusterConf)); + } + + // Read the cluster configuration file, format is specified at + // http://doc.mapr.com/display/MapR/mapr-clusters.conf + BufferedReader br = null; + try { + br = new BufferedReader(new FileReader(maprClusterConf)); + + String line; + while ((line = br.readLine()) != null) { + + // Normalize the string + line = line.trim(); + line = line.replace('\t', ' '); + + final String[] fields = line.split(" "); + if (fields == null) { + continue; + } + + if (fields.length < 1) { + continue; + } + + final String clusterName = fields[0]; + + if (!clusterName.equals(authority)) { + continue; + } + + final List cldbLocations = new ArrayList(); + + for (int i = 1; i < fields.length; ++i) { + + // Make sure this is not a key-value pair MapR recently + // introduced in the file format along with their security + // features. + if (!fields[i].isEmpty() && !fields[i].contains("=")) { + cldbLocations.add(fields[i]); + } + } + + if (cldbLocations.isEmpty()) { + throw new IOException( + String.format( + "%s contains entry for cluster %s but no CLDB locations.", + maprClusterConf, authority)); + } + + return cldbLocations.toArray(new String[0]); + } + + } finally { + if (br != null) { + br.close(); + } + } + + throw new IOException(String.format( + "Unable to find CLDB locations for cluster %s", authority)); + } + + @Override + public FileStatus getFileStatus(final Path f) throws IOException { + + final org.apache.hadoop.fs.FileStatus status = this.fs + .getFileStatus(new org.apache.hadoop.fs.Path(f.toString())); + + return new DistributedFileStatus(status); + } + + @Override + public BlockLocation[] getFileBlockLocations(final FileStatus file, + final long start, final long len) throws IOException { + + if (!(file instanceof DistributedFileStatus)) { + throw new IOException( + "file is not an instance of DistributedFileStatus"); + } + + final DistributedFileStatus f = (DistributedFileStatus) file; + + final org.apache.hadoop.fs.BlockLocation[] blkLocations = fs + .getFileBlockLocations(f.getInternalFileStatus(), start, len); + + // Wrap up HDFS specific block location objects + final DistributedBlockLocation[] distBlkLocations = new DistributedBlockLocation[blkLocations.length]; + for (int i = 0; i < distBlkLocations.length; i++) { + distBlkLocations[i] = new DistributedBlockLocation(blkLocations[i]); + } + + return distBlkLocations; + } + + @Override + public FSDataInputStream open(final Path f, final int bufferSize) + throws IOException { + + final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs.open( + new org.apache.hadoop.fs.Path(f.toString()), bufferSize); + + return new DistributedDataInputStream(fdis); + } + + @Override + public FSDataInputStream open(final Path f) throws IOException { + + final org.apache.hadoop.fs.FSDataInputStream fdis = this.fs + .open(new org.apache.hadoop.fs.Path(f.toString())); + + return new DistributedDataInputStream(fdis); + } + + @Override + public FSDataOutputStream create(final Path f, final boolean overwrite, + final int bufferSize, final short replication, final long blockSize) + throws IOException { + + final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create( + new org.apache.hadoop.fs.Path(f.toString()), overwrite, + bufferSize, replication, blockSize); + + return new DistributedDataOutputStream(fdos); + } + + @Override + public FSDataOutputStream create(final Path f, final boolean overwrite) + throws IOException { + + final org.apache.hadoop.fs.FSDataOutputStream fdos = this.fs.create( + new org.apache.hadoop.fs.Path(f.toString()), overwrite); + + return new DistributedDataOutputStream(fdos); + } + + @Override + public boolean delete(final Path f, final boolean recursive) + throws IOException { + + return this.fs.delete(new org.apache.hadoop.fs.Path(f.toString()), + recursive); + } + + @Override + public FileStatus[] listStatus(final Path f) throws IOException { + + final org.apache.hadoop.fs.FileStatus[] hadoopFiles = this.fs + .listStatus(new org.apache.hadoop.fs.Path(f.toString())); + final FileStatus[] files = new FileStatus[hadoopFiles.length]; + + // Convert types + for (int i = 0; i < files.length; i++) { + files[i] = new DistributedFileStatus(hadoopFiles[i]); + } + + return files; + } + + @Override + public boolean mkdirs(final Path f) throws IOException { + + return this.fs.mkdirs(new org.apache.hadoop.fs.Path(f.toString())); + } + + @Override + public boolean rename(final Path src, final Path dst) throws IOException { + + return this.fs.rename(new org.apache.hadoop.fs.Path(src.toString()), + new org.apache.hadoop.fs.Path(dst.toString())); + } + + @SuppressWarnings("deprecation") + @Override + public long getDefaultBlockSize() { + + return this.fs.getDefaultBlockSize(); + } + + @Override + public boolean isDistributedFS() { + + return true; + } +} \ No newline at end of file