From c201ab35e1db2e7630abbf5ab530c600f50e9dca Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Wed, 12 Aug 2015 13:30:24 -0700 Subject: [PATCH 1/9] Implement S3 bulk listing --- core/pom.xml | 5 + .../org/apache/spark/deploy/SparkS3Util.scala | 336 ++++++++++++++++++ .../org/apache/spark/rdd/HadoopRDD.scala | 13 +- .../scala/org/apache/spark/rdd/UnionRDD.scala | 15 + .../spark/deploy/SparkS3UtilSuite.scala | 90 +++++ 5 files changed, 455 insertions(+), 4 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala create mode 100644 core/src/test/scala/org/apache/spark/deploy/SparkS3UtilSuite.scala diff --git a/core/pom.xml b/core/pom.xml index 61744bb5c7bf5..a0fabc9b49ccf 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -39,6 +39,11 @@ avro-mapred ${avro.mapred.classifier} + + com.amazonaws + aws-java-sdk + ${aws.java.sdk.version} + com.google.guava guava diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala b/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala new file mode 100644 index 0000000000000..a6b44a21f5c03 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala @@ -0,0 +1,336 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.net.URI +import java.util + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import com.amazonaws.{AmazonClientException, AmazonServiceException, ClientConfiguration, Protocol} +import com.amazonaws.auth.{AWSCredentialsProvider, BasicAWSCredentials, InstanceProfileCredentialsProvider, STSAssumeRoleSessionCredentialsProvider} +import com.amazonaws.internal.StaticCredentialsProvider +import com.amazonaws.services.s3.AmazonS3Client +import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing, S3ObjectSummary} + +import com.google.common.annotations.VisibleForTesting +import com.google.common.base.{Preconditions, Strings} +import com.google.common.cache.{Cache, CacheBuilder} +import com.google.common.collect.AbstractSequentialIterator + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path, PathFilter} +import org.apache.hadoop.fs.s3.S3Credentials +import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} +import org.apache.hadoop.mapred.{FileInputFormat, FileSplit, InputSplit, JobConf} + +import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.util.Utils + +/** + * :: DeveloperApi :: + * Contains util methods to interact with S3 from Spark. + */ +@DeveloperApi +object SparkS3Util extends Logging { + val sparkConf = new SparkConf() + val conf: Configuration = SparkHadoopUtil.get.newConfiguration(sparkConf) + + private val s3ClientCache: Cache[String, AmazonS3Client] = CacheBuilder + .newBuilder + .concurrencyLevel(Runtime.getRuntime.availableProcessors) + .build[String, AmazonS3Client] + + // Flag to enable S3 bulk listing. It is true by default. + private val S3_BULK_LISTING_ENABLED: String = "spark.s3.bulk.listing.enabled" + + // Properties for AmazonS3Client. Default values should just work most of time. + private val S3_CONNECT_TIMEOUT: String = "spark.s3.connect.timeout" + private val S3_MAX_CONNECTIONS: String = "spark.s3.max.connections" + private val S3_MAX_ERROR_RETRIES: String = "spark.s3.max.error.retries" + private val S3_SOCKET_TIMEOUT: String = "spark.s3.socket.timeout" + private val S3_SSL_ENABLED: String = "spark.s3.ssl.enabled" + private val S3_USE_INSTANCE_CREDENTIALS: String = "spark.s3.use.instance.credentials" + + // Ignore hidden files whose name starts with "_" and ".", or ends with "$folder$". + private val hiddenFileFilter = new PathFilter() { + override def accept(p: Path): Boolean = { + val name: String = p.getName() + !name.startsWith("_") && !name.startsWith(".") && !name.endsWith("$folder$") + } + } + + /** + * Initialize AmazonS3Client per bucket. Since access permissions might be different from bucket + * to bucket, it is necessary to initialize AmazonS3Client on a per bucket basis. + */ + private def getS3Client(bucket: String): AmazonS3Client = { + val sslEnabled: Boolean = sparkConf.getBoolean(S3_SSL_ENABLED, true) + val maxErrorRetries: Int = sparkConf.getInt(S3_MAX_ERROR_RETRIES, 10) + val connectTimeout: Int = sparkConf.getInt(S3_CONNECT_TIMEOUT, 5000) + val socketTimeout: Int = sparkConf.getInt(S3_SOCKET_TIMEOUT, 5000) + val maxConnections: Int = sparkConf.getInt(S3_MAX_CONNECTIONS, 500) + val useInstanceCredentials: Boolean = sparkConf.getBoolean(S3_USE_INSTANCE_CREDENTIALS, false) + + val clientConf: ClientConfiguration = new ClientConfiguration + clientConf.setMaxErrorRetry(maxErrorRetries) + clientConf.setProtocol(if (sslEnabled) Protocol.HTTPS else Protocol.HTTP) + clientConf.setConnectionTimeout(connectTimeout) + clientConf.setSocketTimeout(socketTimeout) + clientConf.setMaxConnections(maxConnections) + + // There are different ways of obtaining S3 bucket access. Try them in the following order: + // 1) Check if user specified an IAM role. If so, use it. + // 2) Check if instance is associated with an IAM role. If so, use it. + // 3) Check if default IAM role is set in Hadoop conf. If so, use it. + // 4) If no IAM role is found, search for an AWS key pair. + // If no credentials are found, throw an exception. + val s3RoleArn = Option(conf.get("aws.iam.role.arn")) + val s3RoleArnDefault = Option(conf.get("aws.iam.role.arn.default")) + val credentialsProvider: AWSCredentialsProvider = + s3RoleArn match { + case Some(role) => + logDebug("Use user-specified IAM role: " + role) + new STSAssumeRoleSessionCredentialsProvider( + role, "RoleSessionName-" + Utils.random.nextInt) + case None if useInstanceCredentials => + logDebug("Use IAM role associated with the instance") + new InstanceProfileCredentialsProvider + case _ => + s3RoleArnDefault match { + case Some(role) => + logDebug("Use default IAM role configured in Hadoop config: " + role) + new STSAssumeRoleSessionCredentialsProvider( + role, "RoleSessionName-" + Utils.random.nextInt) + case _ => + try { + logDebug("Use AWS key pair") + val credentials: S3Credentials = new S3Credentials + credentials.initialize(URI.create(bucket), conf) + new StaticCredentialsProvider( + new BasicAWSCredentials(credentials.getAccessKey, credentials.getSecretAccessKey)) + } catch { + case e: Exception => throw new RuntimeException("S3 credentials not configured", e) + } + } + } + + val providerName: String = credentialsProvider.getClass.getSimpleName + val s3ClientKey: String = + if (s3RoleArn == null) providerName + else providerName + "_" + s3RoleArn + + Option(s3ClientCache.getIfPresent(s3ClientKey)).getOrElse { + val newClient = new AmazonS3Client(credentialsProvider, clientConf) + s3ClientCache.put(s3ClientKey, newClient) + newClient + } + } + + /** + * Helper function to extract S3 object key name from the given path. + */ + private def keyFromPath(path: Path): String = { + Preconditions.checkArgument(path.isAbsolute, "Path is not absolute: %s", path) + var key: String = Strings.nullToEmpty(path.toUri.getPath) + if (key.startsWith("/")) { + key = key.substring(1) + } + if (key.endsWith("/")) { + key = key.substring(0, key.length - 1) + } + key + } + + /** + * Helper function to convert S3ObjectSummary into FileStatus. + */ + private def statusFromObjects( + bucket: String, + objects: util.List[S3ObjectSummary]): Iterator[FileStatus] = { + val blockSize: Long = getS3BlockSize() + val list: ArrayBuffer[FileStatus] = ArrayBuffer() + for (obj: S3ObjectSummary <- objects.asScala) { + if (!obj.getKey.endsWith("/")) { + val path = new Path(bucket + "/" + obj.getKey) + if (hiddenFileFilter.accept(path)) { + val status: FileStatus = new FileStatus( + obj.getSize, + false, + 1, + blockSize, + obj.getLastModified.getTime, + path) + list += status + } + } + } + list.iterator + } + + /** + * For the given path, list S3 objects and return an iterator of returned FileStatuses. + */ + @throws(classOf[AmazonClientException]) + @throws(classOf[AmazonServiceException]) + private def listPrefix(s3: AmazonS3Client, path: Path): Iterator[FileStatus] = { + val uri: URI = path.toUri + val key: String = keyFromPath(path) + val request: ListObjectsRequest = new ListObjectsRequest() + .withBucketName(uri.getAuthority) + .withPrefix(key) + + val listings = new AbstractSequentialIterator[ObjectListing](s3.listObjects(request)) { + protected def computeNext(previous: ObjectListing): ObjectListing = { + if (!previous.isTruncated) { + return null + } + s3.listNextBatchOfObjects(previous) + } + }.asScala + + val bucket: String = uri.getScheme + "://" + uri.getAuthority + listings + .map(listing => statusFromObjects(bucket, listing.getObjectSummaries)) + .reduceLeft(_ ++ _) + } + + /** + * For the given list of paths, sequentially list S3 objects per path and combine returned + * FileStatuses into a single array. + */ + private def listStatus(s3: AmazonS3Client, paths: List[Path]): Array[FileStatus] = { + val list: ArrayBuffer[FileStatus] = ArrayBuffer() + paths.foreach { path => + val iterator = listPrefix(s3, path) + while (iterator.hasNext) { + list += iterator.next + } + } + list.toArray + } + + /** + * Find S3 block size from Hadoop conf. Try both s3 and s3n names. + */ + private def getS3BlockSize(): Long = { + val value = Option(conf.get("fs.s3.block.size")) + .getOrElse(conf.get("fs.s3n.block.size", "67108864")) + value.toLong + } + + /** + * Find min split size from Hadoop conf. Try both Hadoop 1 and 2 names. + */ + private def getMinSplitSize(): Long = { + val value = Option(conf.get("mapred.min.split.size")) + .getOrElse(conf.get("mapreduce.input.fileinputformat.split.minsize", "1")) + value.toLong + } + + /** + * Return whether the given path is an S3 path or not. + */ + private def isS3Path(path: Path): Boolean = { + Option(path.toUri.getScheme).exists(_.toUpperCase.startsWith("S3")) + } + + /** + * Return whether S3 bulk listing can be enabled or not. + */ + def s3BulkListingEnabled(jobConf: JobConf): Boolean = { + val enabledByUser = sparkConf.getBoolean(S3_BULK_LISTING_ENABLED, true) + val inputPaths = FileInputFormat.getInputPaths(jobConf) + val noWildcard = inputPaths.forall(path => !new GlobPattern(path.toString).hasWildcard) + val s3Paths = inputPaths.forall(SparkS3Util.isS3Path(_)) + enabledByUser && inputPaths.nonEmpty && s3Paths && noWildcard + } + + /** + * Return whether the given file is splittable or not. + */ + @VisibleForTesting + def isSplitable(jobConf: JobConf, file: Path): Boolean = { + val compressionCodecs = new CompressionCodecFactory(jobConf) + val codec = compressionCodecs.getCodec(file) + if (codec == null) { + true + } else { + codec.isInstanceOf[SplittableCompressionCodec] + } + } + + /** + * Compute input splits for the given files. Burrowed code from `FileInputFormat.getSplits`. + */ + @VisibleForTesting + def computeSplits( + jobConf: JobConf, + files: Array[FileStatus], + minSplits: Int): Array[InputSplit] = { + val totalSize: Long = files.map(_.getLen).reduceLeft(_ + _) + val goalSize: Long = totalSize / (if (minSplits == 0) 1 else minSplits) + val minSize: Long = getMinSplitSize() + val splits: ArrayBuffer[InputSplit] = ArrayBuffer[InputSplit]() + // Since S3 objects are remote, use a zero-length array to fake data local hosts. + val fakeHosts: Array[String] = Array() + + for (file <- files) { + val path: Path = file.getPath + val length: Long = file.getLen + if (length > 0 && isSplitable(jobConf, path)) { + val blockSize: Long = file.getBlockSize + val splitSize: Long = Math.max(minSize, Math.min(goalSize, blockSize)) + var bytesRemaining: Long = length + while (bytesRemaining.toDouble / splitSize > 1.1) { + splits += new FileSplit(path, length - bytesRemaining, splitSize, fakeHosts) + bytesRemaining -= splitSize + } + if (bytesRemaining != 0) { + splits += new FileSplit(path, length - bytesRemaining, bytesRemaining, fakeHosts) + } + } else { + splits += new FileSplit(path, 0, length, fakeHosts) + } + } + + logDebug("Total size of input splits is " + totalSize) + logDebug("Num of input splits is " + splits.size) + + splits.toArray + } + + /** + * This is based on `FileInputFormat.getSplits` method. Two key differences are: + * 1) Use `AmazonS3Client.listObjects` instead of `FileSystem.listStatus`. + * 2) Bypass data locality hints since they're irrelevant to S3 objects. + */ + def getSplits(jobConf: JobConf, minSplits: Int): Array[InputSplit] = { + val inputPaths: Array[Path] = FileInputFormat.getInputPaths(jobConf) + val files: Array[FileStatus] = inputPaths.toList + .groupBy[String] { path => + val uri = path.toUri + uri.getScheme + "://" + uri.getAuthority + } + .map { case (bucket, paths) => (bucket, listStatus(getS3Client(bucket), paths)) } + .values.reduceLeft(_ ++ _) + computeSplits(jobConf, files, minSplits) + } +} diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index f37c95bedc0a5..5c1ba0b2daeee 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -41,7 +41,7 @@ import org.apache.hadoop.util.ReflectionUtils import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.{SparkS3Util, SparkHadoopUtil} import org.apache.spark.executor.DataReadMethod import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, NextIterator, Utils} @@ -193,10 +193,15 @@ class HadoopRDD[K, V]( override def getPartitions: Array[Partition] = { val jobConf = getJobConf() - // add the credentials here as this can be called before SparkContext initialized + // Add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) - val inputFormat = getInputFormat(jobConf) - val inputSplits = inputFormat.getSplits(jobConf, minPartitions) + val inputSplits = + if (SparkS3Util.s3BulkListingEnabled(jobConf)) { + SparkS3Util.getSplits(jobConf, minPartitions) + } else { + val inputFormat = getInputFormat(jobConf) + inputFormat.getSplits(jobConf, minPartitions) + } val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index 66cf4369da2ef..eff9b16884886 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -20,6 +20,8 @@ package org.apache.spark.rdd import java.io.{IOException, ObjectOutputStream} import scala.collection.mutable.ArrayBuffer +import scala.collection.parallel.ForkJoinTaskSupport +import scala.concurrent.forkjoin.ForkJoinPool import scala.reflect.ClassTag import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext} @@ -62,7 +64,20 @@ class UnionRDD[T: ClassTag]( var rdds: Seq[RDD[T]]) extends RDD[T](sc, Nil) { // Nil since we implement getDependencies + // Evaluate partitions in parallel (will be cached in each rdd) + private lazy val evaluatePartitions: Unit = { + val threshold = conf.getInt("spark.rdd.parallelListingThreshold", 10) + if (rdds.length > threshold) { + val parArray = rdds.toParArray + parArray.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(threshold)) + parArray.foreach(_.partitions) + } else { + rdds.foreach(_.partitions) + } + } + override def getPartitions: Array[Partition] = { + evaluatePartitions val array = new Array[Partition](rdds.map(_.partitions.length).sum) var pos = 0 for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) { diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkS3UtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkS3UtilSuite.scala new file mode 100644 index 0000000000000..4a556c33f51b2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/SparkS3UtilSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.util.Date + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.hadoop.mapred.{InputSplit, FileInputFormat, JobConf} + +import org.apache.spark.SparkFunSuite + +class SparkS3UtilSuite extends SparkFunSuite { + test("s3ListingEnabled function") { + val jobConf = new JobConf() + + // Disabled by user + SparkS3Util.sparkConf.set("spark.s3.bulk.listing.enabled", "false") + FileInputFormat.setInputPaths(jobConf, "s3://bucket/dir/file") + assert(SparkS3Util.s3BulkListingEnabled(jobConf) == false) + + // Input paths contain wildcards + SparkS3Util.sparkConf.set("spark.s3.bulk.listing.enabled", "true") + FileInputFormat.setInputPaths(jobConf, "s3://bucket/dir/*") + assert(SparkS3Util.s3BulkListingEnabled(jobConf) == false) + + // Input paths copntain non-S3 files + SparkS3Util.sparkConf.set("spark.s3.bulk.listing.enabled", "true") + FileInputFormat.setInputPaths(jobConf, "file://bucket/dir/file") + assert(SparkS3Util.s3BulkListingEnabled(jobConf) == false) + } + + test("isSplitable function") { + val jobConf: JobConf = new JobConf() + + // Splitable files + val parquetFile: Path = new Path("file.parquet") + assert(SparkS3Util.isSplitable(jobConf, parquetFile) == true) + val textFile: Path = new Path("file.txt") + assert(SparkS3Util.isSplitable(jobConf, textFile) == true) + + // Non-splitable files + val gzipFile: Path = new Path("file.gz") + assert(SparkS3Util.isSplitable(jobConf, gzipFile) == false) + } + + test("computeSplits function") { + val jobConf: JobConf = new JobConf() + // Set S3 block size to 64mb + jobConf.set("fs.s3.block.size", "67108864") + + val files: ArrayBuffer[FileStatus] = ArrayBuffer() + for (i <- 1 to 10) { + val status: FileStatus = new FileStatus( + 256 * 1024 * 1024, + false, + 1, + 64 * 1024 * 1024, + new Date().getTime, + new Path(s"s3://bucket/dir/file${i}")) + files += status + } + + val splits: Array[InputSplit] = SparkS3Util.computeSplits(jobConf, files.toArray, 1) + + // 40 splits are expected: 256mb * 10 files / 64mb + assert(splits.length == 40) + + // Zero-length array is expected for data local hosts + for (s <- splits) { + assert(s.getLocations.length == 0) + } + } +} From 0e2478c986dfef266865682c076959fe21105f59 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Fri, 23 Oct 2015 18:32:36 -0700 Subject: [PATCH 2/9] Replace reduceLeft with foldLeft because calling reducerLeft on empty list throws an error --- core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala | 4 ++-- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala b/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala index a6b44a21f5c03..a045bb904d56c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala @@ -278,14 +278,14 @@ object SparkS3Util extends Logging { } /** - * Compute input splits for the given files. Burrowed code from `FileInputFormat.getSplits`. + * Compute input splits for the given files. Borrowed code from `FileInputFormat.getSplits`. */ @VisibleForTesting def computeSplits( jobConf: JobConf, files: Array[FileStatus], minSplits: Int): Array[InputSplit] = { - val totalSize: Long = files.map(_.getLen).reduceLeft(_ + _) + val totalSize: Long = files.map(_.getLen).foldLeft(0L)((sum, len) => sum + len) val goalSize: Long = totalSize / (if (minSplits == 0) 1 else minSplits) val minSize: Long = getMinSplitSize() val splits: ArrayBuffer[InputSplit] = ArrayBuffer[InputSplit]() diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 5c1ba0b2daeee..8a3ca6e8f3957 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -41,7 +41,7 @@ import org.apache.hadoop.util.ReflectionUtils import org.apache.spark._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast -import org.apache.spark.deploy.{SparkS3Util, SparkHadoopUtil} +import org.apache.spark.deploy.{SparkHadoopUtil, SparkS3Util} import org.apache.spark.executor.DataReadMethod import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, NextIterator, Utils} From daf80f3c837b6cc0bc99dadb1d5d251f41e8c00b Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Tue, 27 Oct 2015 16:17:11 -0700 Subject: [PATCH 3/9] Lower the max concurrent S3 connections in SparkS3Util. Too many connections can overload cpus. --- core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala b/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala index a045bb904d56c..1da95c2e8e82f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala @@ -86,7 +86,7 @@ object SparkS3Util extends Logging { val maxErrorRetries: Int = sparkConf.getInt(S3_MAX_ERROR_RETRIES, 10) val connectTimeout: Int = sparkConf.getInt(S3_CONNECT_TIMEOUT, 5000) val socketTimeout: Int = sparkConf.getInt(S3_SOCKET_TIMEOUT, 5000) - val maxConnections: Int = sparkConf.getInt(S3_MAX_CONNECTIONS, 500) + val maxConnections: Int = sparkConf.getInt(S3_MAX_CONNECTIONS, 5) val useInstanceCredentials: Boolean = sparkConf.getBoolean(S3_USE_INSTANCE_CREDENTIALS, false) val clientConf: ClientConfiguration = new ClientConfiguration From 6dcf90898325a2d3bf7f5e96cc44bb785061cc07 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Thu, 29 Oct 2015 09:44:03 -0700 Subject: [PATCH 4/9] Set default min input split size to 128mb --- core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala | 2 +- .../test/scala/org/apache/spark/deploy/SparkS3UtilSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala b/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala index 1da95c2e8e82f..3434e8255385e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala @@ -241,7 +241,7 @@ object SparkS3Util extends Logging { */ private def getMinSplitSize(): Long = { val value = Option(conf.get("mapred.min.split.size")) - .getOrElse(conf.get("mapreduce.input.fileinputformat.split.minsize", "1")) + .getOrElse(conf.get("mapreduce.input.fileinputformat.split.minsize", "134217728")) value.toLong } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkS3UtilSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkS3UtilSuite.scala index 4a556c33f51b2..c683d1ed4e47f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkS3UtilSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkS3UtilSuite.scala @@ -42,7 +42,7 @@ class SparkS3UtilSuite extends SparkFunSuite { // Input paths copntain non-S3 files SparkS3Util.sparkConf.set("spark.s3.bulk.listing.enabled", "true") - FileInputFormat.setInputPaths(jobConf, "file://bucket/dir/file") + FileInputFormat.setInputPaths(jobConf, "file://dir/file") assert(SparkS3Util.s3BulkListingEnabled(jobConf) == false) } From b79c17d19badefff0d0f0975d3a9801783352cc8 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Fri, 30 Oct 2015 09:53:02 -0700 Subject: [PATCH 5/9] Incorporate review comments --- .../main/scala/org/apache/spark/deploy/SparkS3Util.scala | 6 +++--- core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala | 7 ++++++- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala b/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala index 3434e8255385e..1e384a7dbd724 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala @@ -40,7 +40,7 @@ import org.apache.hadoop.fs.s3.S3Credentials import org.apache.hadoop.io.compress.{CompressionCodecFactory, SplittableCompressionCodec} import org.apache.hadoop.mapred.{FileInputFormat, FileSplit, InputSplit, JobConf} -import org.apache.spark.{SparkConf, Logging} +import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils @@ -50,7 +50,7 @@ import org.apache.spark.util.Utils */ @DeveloperApi object SparkS3Util extends Logging { - val sparkConf = new SparkConf() + val sparkConf = SparkEnv.get.conf val conf: Configuration = SparkHadoopUtil.get.newConfiguration(sparkConf) private val s3ClientCache: Cache[String, AmazonS3Client] = CacheBuilder @@ -256,7 +256,7 @@ object SparkS3Util extends Logging { * Return whether S3 bulk listing can be enabled or not. */ def s3BulkListingEnabled(jobConf: JobConf): Boolean = { - val enabledByUser = sparkConf.getBoolean(S3_BULK_LISTING_ENABLED, true) + val enabledByUser = sparkConf.getBoolean(S3_BULK_LISTING_ENABLED, false) val inputPaths = FileInputFormat.getInputPaths(jobConf) val noWildcard = inputPaths.forall(path => !new GlobPattern(path.toString).hasWildcard) val s3Paths = inputPaths.forall(SparkS3Util.isS3Path(_)) diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index eff9b16884886..95035c051e622 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -28,6 +28,8 @@ import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, T import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils +import com.google.common.base.Preconditions + /** * Partition for UnionRDD. * @@ -64,9 +66,12 @@ class UnionRDD[T: ClassTag]( var rdds: Seq[RDD[T]]) extends RDD[T](sc, Nil) { // Nil since we implement getDependencies - // Evaluate partitions in parallel (will be cached in each rdd) + // Evaluate partitions in parallel. Partitions of each rdd will be cached by the `partitions` + // val in `RDD`. private lazy val evaluatePartitions: Unit = { val threshold = conf.getInt("spark.rdd.parallelListingThreshold", 10) + Preconditions.checkArgument(threshold > 0, + "spark.rdd.parallelListingThreshold must be positive: %s", threshold.toString) if (rdds.length > threshold) { val parArray = rdds.toParArray parArray.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(threshold)) From 8d17a736f37b1474f00d411c987381868fdd9938 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Sun, 8 Nov 2015 20:07:01 -0800 Subject: [PATCH 6/9] Use sdk-s3 and sdk-sts jars instead of general sdk jar --- core/pom.xml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/pom.xml b/core/pom.xml index a0fabc9b49ccf..b3d0e08402778 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -41,7 +41,12 @@ com.amazonaws - aws-java-sdk + aws-java-sdk-s3 + ${aws.java.sdk.version} + + + com.amazonaws + aws-java-sdk-sts ${aws.java.sdk.version} From 35559a9ae6f44995dc315da417bc0d3a462a1583 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Sun, 8 Nov 2015 20:13:24 -0800 Subject: [PATCH 7/9] Remove precondition in UnionRDD --- core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index 95035c051e622..b38a9f1aae0bd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -28,8 +28,6 @@ import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, T import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.Utils -import com.google.common.base.Preconditions - /** * Partition for UnionRDD. * @@ -70,9 +68,7 @@ class UnionRDD[T: ClassTag]( // val in `RDD`. private lazy val evaluatePartitions: Unit = { val threshold = conf.getInt("spark.rdd.parallelListingThreshold", 10) - Preconditions.checkArgument(threshold > 0, - "spark.rdd.parallelListingThreshold must be positive: %s", threshold.toString) - if (rdds.length > threshold) { + if (threshold > 0 && rdds.length > threshold) { val parArray = rdds.toParArray parArray.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(threshold)) parArray.foreach(_.partitions) From dcc5badc401b95f2ef27a5359fdf326f2ea63217 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Tue, 8 Dec 2015 11:07:19 -0800 Subject: [PATCH 8/9] Ensure s3 block size is greater than 10mb --- .../scala/org/apache/spark/deploy/SparkS3Util.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala b/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala index 1e384a7dbd724..6fa761be6b729 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala @@ -231,9 +231,16 @@ object SparkS3Util extends Logging { * Find S3 block size from Hadoop conf. Try both s3 and s3n names. */ private def getS3BlockSize(): Long = { + val minS3BlockSize = 10485760; // 10mb + val defaultS3BlockSize = 67108864; // 64mb val value = Option(conf.get("fs.s3.block.size")) - .getOrElse(conf.get("fs.s3n.block.size", "67108864")) - value.toLong + .getOrElse(conf.get("fs.s3n.block.size", defaultS3BlockSize.toString)).toLong + if (value < minS3BlockSize) { + logWarning("S3 block size is set too small: " + value + ". Overriding it to 10mb."); + minS3BlockSize + } else { + value + } } /** From 5bdd92443c33fc2ec72fabb141c878cf3488d1f4 Mon Sep 17 00:00:00 2001 From: Cheolsoo Park Date: Tue, 8 Dec 2015 11:59:54 -0800 Subject: [PATCH 9/9] Remove VisibleForTesting --- .../src/main/scala/org/apache/spark/deploy/SparkS3Util.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala b/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala index 6fa761be6b729..191cdbb98d4ce 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkS3Util.scala @@ -29,7 +29,6 @@ import com.amazonaws.internal.StaticCredentialsProvider import com.amazonaws.services.s3.AmazonS3Client import com.amazonaws.services.s3.model.{ListObjectsRequest, ObjectListing, S3ObjectSummary} -import com.google.common.annotations.VisibleForTesting import com.google.common.base.{Preconditions, Strings} import com.google.common.cache.{Cache, CacheBuilder} import com.google.common.collect.AbstractSequentialIterator @@ -272,8 +271,8 @@ object SparkS3Util extends Logging { /** * Return whether the given file is splittable or not. + * Exposed for testing. */ - @VisibleForTesting def isSplitable(jobConf: JobConf, file: Path): Boolean = { val compressionCodecs = new CompressionCodecFactory(jobConf) val codec = compressionCodecs.getCodec(file) @@ -286,8 +285,8 @@ object SparkS3Util extends Logging { /** * Compute input splits for the given files. Borrowed code from `FileInputFormat.getSplits`. + * Exposed for testing. */ - @VisibleForTesting def computeSplits( jobConf: JobConf, files: Array[FileStatus],