From 7949222219ffdba242595bf3ad14dd9c53f68b82 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 12 Nov 2015 16:19:35 +0900 Subject: [PATCH] [SPARK-11687][SQL] Mixed usage of fold and foldLeft, reduce and reduceLeft and reduceOption and reduceLeftOption --- .../apache/spark/deploy/RPackageUtils.scala | 3 ++- .../main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../spark/rdd/ZippedPartitionsRDD.scala | 2 +- .../org/apache/spark/ml/tree/treeModels.scala | 2 +- .../mllib/tree/model/treeEnsembleModels.scala | 2 +- .../spark/ml/feature/Word2VecSuite.scala | 2 +- .../spark/mllib/clustering/LDASuite.scala | 2 +- .../clustering/StreamingKMeansSuite.scala | 2 +- .../spark/mllib/util/MLUtilsSuite.scala | 2 +- .../catalyst/analysis/HiveTypeCoercion.scala | 4 ++-- .../sql/catalyst/optimizer/Optimizer.scala | 24 +++++++++---------- .../sql/catalyst/planning/patterns.scala | 2 +- .../sql/catalyst/analysis/AnalysisSuite.scala | 2 +- .../columnar/InMemoryColumnarTableScan.scala | 6 ++--- .../datasources/DataSourceStrategy.scala | 4 ++-- .../datasources/json/InferSchema.scala | 2 +- .../datasources/parquet/ParquetFilters.scala | 2 +- .../datasources/parquet/ParquetRelation.scala | 8 +++---- .../spark/sql/jdbc/AggregatedDialect.scala | 2 +- .../apache/spark/sql/DataFrameStatSuite.scala | 2 +- .../org/apache/spark/sql/UnsafeRowSuite.scala | 2 +- .../spark/sql/sources/FilteredScanSuite.scala | 2 +- .../spark/sql/sources/PrunedScanSuite.scala | 2 +- .../spark/sql/hive/orc/OrcFilters.scala | 2 +- .../sql/sources/SimpleTextRelation.scala | 2 +- .../dstream/ReducedWindowedDStream.scala | 6 ++--- .../streaming/BasicOperationsSuite.scala | 2 +- 27 files changed, 48 insertions(+), 47 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index 7d160b6790eaa..4b4aa4fd9c050 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -198,7 +198,8 @@ private[deploy] object RPackageUtils extends Logging { if (dir.isDirectory) { val subDir = dir.listFiles(new FilenameFilter { override def accept(dir: File, name: String): Boolean = { - !excludePatterns.map(name.contains).reduce(_ || _) // exclude files with given pattern + // exclude files with given pattern + !excludePatterns.map(name.contains).reduceLeft(_ || _) } }) subDir.flatMap(listFilesRecursively(_, excludePatterns)).toSet diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 800ef53cbef07..ded8887893eea 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1052,7 +1052,7 @@ abstract class RDD[T: ClassTag]( // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) val cleanOp = sc.clean(op) - val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp) + val foldPartition = (iter: Iterator[T]) => iter.foldLeft(zeroValue)(cleanOp) val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult) sc.runJob(this, foldPartition, mergeResult) jobResult diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index 4333a679c8aae..f229093cb38fe 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -59,7 +59,7 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag]( Array.tabulate[Partition](numParts) { i => val prefs = rdds.map(rdd => rdd.preferredLocations(rdd.partitions(i))) // Check whether there are any hosts that match all RDDs; otherwise return the union - val exactMatchLocations = prefs.reduce((x, y) => x.intersect(y)) + val exactMatchLocations = prefs.reduceLeft((x, y) => x.intersect(y)) val locs = if (!exactMatchLocations.isEmpty) exactMatchLocations else prefs.flatten.distinct new ZippedPartitionsPartition(i, rdds, locs) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/treeModels.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/treeModels.scala index b77191156f68f..b3ade28d2744b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/treeModels.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/treeModels.scala @@ -92,7 +92,7 @@ private[ml] trait TreeEnsembleModel { val header = toString + "\n" header + trees.zip(treeWeights).zipWithIndex.map { case ((tree, weight), treeIndex) => s" Tree $treeIndex (weight $weight):\n" + tree.rootNode.subtreeToString(4) - }.fold("")(_ + _) + }.foldLeft("")(_ + _) } /** Number of trees in ensemble */ diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala index 90e032e3d9842..2594600e69655 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/treeEnsembleModels.scala @@ -370,7 +370,7 @@ private[tree] sealed class TreeEnsembleModel( val header = toString + "\n" header + trees.zipWithIndex.map { case (tree, treeIndex) => s" Tree $treeIndex:\n" + tree.topNode.subtreeToString(4) - }.fold("")(_ + _) + }.foldLeft("")(_ + _) } /** diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala index 23dfdaa9f8fc6..45d2500692887 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala @@ -49,7 +49,7 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { ) val expected = doc.map { sentence => - Vectors.dense(sentence.map(codes.apply).reduce((word1, word2) => + Vectors.dense(sentence.map(codes.apply).reduceLeft((word1, word2) => word1.zip(word2).map { case (v1, v2) => v1 + v2 } ).map(_ / numOfWords)) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala index 37fb69d68f6be..f5ef685d74fe1 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala @@ -565,7 +565,7 @@ private[clustering] object LDASuite { Array[Double](0.2, 0.2, 0.05, 0.05, 0.5) // topic 2 ) def tinyTopics: Matrix = new DenseMatrix(numRows = tinyVocabSize, numCols = tinyK, - values = tinyTopicsAsArray.fold(Array.empty[Double])(_ ++ _)) + values = tinyTopicsAsArray.foldLeft(Array.empty[Double])(_ ++ _)) def tinyTopicDescription: Array[(Array[Int], Array[Double])] = tinyTopicsAsArray.map { topic => val (termWeights, terms) = topic.zipWithIndex.sortBy(-_._1).unzip (terms.toArray, termWeights.toArray) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala index 65e37c64d404e..9a2d75285fa4e 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/StreamingKMeansSuite.scala @@ -67,7 +67,7 @@ class StreamingKMeansSuite extends SparkFunSuite with TestSuiteBase { // estimated center from streaming should exactly match the arithmetic mean of all data points // because the decay factor is set to 1.0 val grandMean = - input.flatten.map(x => x.toBreeze).reduce(_ + _) / (numBatches * numPoints).toDouble + input.flatten.map(x => x.toBreeze).reduceLeft(_ + _) / (numBatches * numPoints).toDouble assert(model.latestModel().clusterCenters(0) ~== Vectors.dense(grandMean.toArray) absTol 1E-5) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala index 70219e9ad9d3e..13233c0c7d0da 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala @@ -202,7 +202,7 @@ class MLUtilsSuite extends SparkFunSuite with MLlibTestSparkContext { "Each training+validation set combined should contain all of the data.") } // K fold cross validation should only have each element in the validation set exactly once - assert(foldedRdds.map(_._2).reduce((x, y) => x.union(y)).collect().sorted === + assert(foldedRdds.map(_._2).reduceLeft((x, y) => x.union(y)).collect().sorted === data.collect().sorted) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index bf2bff0243fa3..0aaea3be577f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -626,7 +626,7 @@ object HiveTypeCoercion { case Seq(elseVal) if elseVal.dataType != commonType => Seq(Cast(elseVal, commonType)) case other => other - }.reduce(_ ++ _) + }.reduceLeft(_ ++ _) c match { case _: CaseWhen => CaseWhen(castedBranches) case CaseKeyWhen(key, _) => CaseKeyWhen(key, castedBranches) @@ -641,7 +641,7 @@ object HiveTypeCoercion { case Seq(whenExpr, thenExpr) if whenExpr.dataType != commonType => Seq(Cast(whenExpr, commonType), thenExpr) case other => other - }.reduce(_ ++ _) + }.reduceLeft(_ ++ _) CaseKeyWhen(Cast(c.key, commonType), castedBranches) }.getOrElse(c) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f4dba67f13b54..2acda41041e97 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -137,8 +137,8 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { val andConditions = splitConjunctivePredicates(condition) andConditions.partition(_.deterministic) match { case (deterministic, nondeterministic) => - deterministic.reduceOption(And).getOrElse(Literal(true)) -> - nondeterministic.reduceOption(And).getOrElse(Literal(true)) + deterministic.reduceLeftOption(And).getOrElse(Literal(true)) -> + nondeterministic.reduceLeftOption(And).getOrElse(Literal(true)) } } @@ -506,11 +506,11 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { val rdiff = rhs.filterNot(e => common.exists(e.semanticEquals(_))) if (ldiff.isEmpty || rdiff.isEmpty) { // (a || b || c || ...) && (a || b) => (a || b) - common.reduce(Or) + common.reduceLeft(Or) } else { // (a || b || c || ...) && (a || b || d || ...) => // ((c || ...) && (d || ...)) || a || b - (common :+ And(ldiff.reduce(Or), rdiff.reduce(Or))).reduce(Or) + (common :+ And(ldiff.reduceLeft(Or), rdiff.reduceLeft(Or))).reduceLeft(Or) } } } // end of And(left, right) @@ -544,11 +544,11 @@ object BooleanSimplification extends Rule[LogicalPlan] with PredicateHelper { val rdiff = rhs.filterNot(e => common.exists(e.semanticEquals(_))) if (ldiff.isEmpty || rdiff.isEmpty) { // (a && b) || (a && b && c && ...) => a && b - common.reduce(And) + common.reduceLeft(And) } else { // (a && b && c && ...) || (a && b && d && ...) => // ((c && ...) || (d && ...)) && a && b - (common :+ Or(ldiff.reduce(And), rdiff.reduce(And))).reduce(And) + (common :+ Or(ldiff.reduceLeft(And), rdiff.reduceLeft(And))).reduceLeft(And) } } } // end of Or(left, right) @@ -640,8 +640,8 @@ object PushPredicateThroughProject extends Rule[LogicalPlan] with PredicateHelpe filter } else { // Push down the small conditions without nondeterministic expressions. - val pushedCondition = deterministic.map(replaceAlias(_, aliasMap)).reduce(And) - Filter(nondeterministic.reduce(And), + val pushedCondition = deterministic.map(replaceAlias(_, aliasMap)).reduceLeft(And) + Filter(nondeterministic.reduceLeft(And), project.copy(child = Filter(pushedCondition, grandChild))) } } @@ -670,10 +670,10 @@ object PushPredicateThroughGenerate extends Rule[LogicalPlan] with PredicateHelp conjunct => conjunct.references subsetOf g.child.outputSet } if (pushDown.nonEmpty) { - val pushDownPredicate = pushDown.reduce(And) + val pushDownPredicate = pushDown.reduceLeft(And) val withPushdown = Generate(g.generator, join = g.join, outer = g.outer, g.qualifier, g.generatorOutput, Filter(pushDownPredicate, g.child)) - stayUp.reduceOption(And).map(Filter(_, withPushdown)).getOrElse(withPushdown) + stayUp.reduceLeftOption(And).map(Filter(_, withPushdown)).getOrElse(withPushdown) } else { filter } @@ -694,9 +694,9 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel conjunct => conjunct.references subsetOf AttributeSet(groupingExpressions) } if (pushDown.nonEmpty) { - val pushDownPredicate = pushDown.reduce(And) + val pushDownPredicate = pushDown.reduceLeft(And) val withPushdown = aggregate.copy(child = Filter(pushDownPredicate, grandChild)) - stayUp.reduceOption(And).map(Filter(_, withPushdown)).getOrElse(withPushdown) + stayUp.reduceLeftOption(And).map(Filter(_, withPushdown)).getOrElse(withPushdown) } else { filter } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 6f4f11406d7c4..9c098869e99f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -124,7 +124,7 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper { if (joinKeys.nonEmpty) { val (leftKeys, rightKeys) = joinKeys.unzip logDebug(s"leftKeys:$leftKeys | rightKeys:$rightKeys") - Some((joinType, leftKeys, rightKeys, otherPredicates.reduceOption(And), left, right)) + Some((joinType, leftKeys, rightKeys, otherPredicates.reduceLeftOption(And), left, right)) } else { None } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 65f09b46afae1..e8aff1b97c347 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -30,7 +30,7 @@ class AnalysisSuite extends AnalysisTest { test("union project *") { val plan = (1 to 100) .map(_ => testRelation) - .fold[LogicalPlan](testRelation) { (a, b) => + .foldLeft[LogicalPlan](testRelation) { (a, b) => a.select(UnresolvedStar(None)).select('a).unionAll(b.select(UnresolvedStar(None))) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index 7eb1ad7cd8198..c21bdf035c3fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -76,7 +76,7 @@ private[sql] case class InMemoryRelation( private def computeSizeInBytes = { val sizeOfRow: Expression = BindReferences.bindReference( - output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduce(Add), + output.map(a => partitionStatistics.forAttribute(a).sizeInBytes).reduceLeft(Add), partitionStatistics.schema) batchStats.value.map(row => sizeOfRow.eval(row).asInstanceOf[Long]).sum @@ -225,7 +225,7 @@ private[sql] case class InMemoryColumnarTableScan( @transient val buildFilter: PartialFunction[Expression, Expression] = { case And(lhs: Expression, rhs: Expression) if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) => - (buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _) + (buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduceLeft(_ && _) case Or(lhs: Expression, rhs: Expression) if buildFilter.isDefinedAt(lhs) && buildFilter.isDefinedAt(rhs) => @@ -294,7 +294,7 @@ private[sql] case class InMemoryColumnarTableScan( buffers.mapPartitions { cachedBatchIterator => val partitionFilter = newPredicate( - partitionFilters.reduceOption(And).getOrElse(Literal(true)), + partitionFilters.reduceLeftOption(And).getOrElse(Literal(true)), schema) // Find the ordinals and data types of the requested columns. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 7265d6a4de2e6..71083f3f8ffa3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -242,7 +242,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { if (partitionPruningPredicates.nonEmpty) { val predicate = partitionPruningPredicates - .reduceOption(expressions.And) + .reduceLeftOption(expressions.And) .getOrElse(Literal(true)) val boundPredicate = InterpretedPredicate.create(predicate.transform { @@ -423,7 +423,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { Some(sources.IsNotNull(a.name)) case expressions.And(left, right) => - (translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And) + (translateFilter(left) ++ translateFilter(right)).reduceLeftOption(sources.And) case expressions.Or(left, right) => for { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index b9914c581a657..70b293205b4bf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -201,7 +201,7 @@ private[sql] object InferSchema { case (StructType(fields1), StructType(fields2)) => val newFields = (fields1 ++ fields2).groupBy(field => field.name).map { case (name, fieldTypes) => - val dataType = fieldTypes.view.map(_.dataType).reduce(compatibleType) + val dataType = fieldTypes.view.map(_.dataType).reduceLeft(compatibleType) StructField(name, dataType, nullable = true) } StructType(newFields.toSeq.sortBy(_.name)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 07714329370a5..1f2e1e93bbb3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -257,7 +257,7 @@ private[sql] object ParquetFilters { makeGtEq.lift(dataTypeOf(name)).map(_(name, value)) case sources.And(lhs, rhs) => - (createFilter(schema, lhs) ++ createFilter(schema, rhs)).reduceOption(FilterApi.and) + (createFilter(schema, lhs) ++ createFilter(schema, rhs)).reduceLeftOption(FilterApi.and) case sources.Or(lhs, rhs) => for { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 5a7c6b95b565f..c485084c83a8a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -547,7 +547,7 @@ private[sql] object ParquetRelation extends Logging { // converted (`ParquetFilters.createFilter` returns an `Option`). That's why a `flatMap` // is used here. .flatMap(ParquetFilters.createFilter(dataSchema, _)) - .reduceOption(FilterApi.and) + .reduceLeftOption(FilterApi.and) .foreach(ParquetInputFormat.setFilterPredicate(conf, _)) } @@ -634,7 +634,7 @@ private[sql] object ParquetRelation extends Logging { } } - finalSchemas.reduceOption { (left, right) => + finalSchemas.reduceLeftOption { (left, right) => try left.merge(right) catch { case e: Throwable => throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) } @@ -768,10 +768,10 @@ private[sql] object ParquetRelation extends Logging { footers.map { footer => ParquetRelation.readSchemaFromFooter(footer, converter) - }.reduceOption(_ merge _).iterator + }.reduceLeftOption(_ merge _).iterator }.collect() - partiallyMergedSchemas.reduceOption(_ merge _) + partiallyMergedSchemas.reduceLeftOption(_ merge _) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala index 467d8d62d1b7f..b52deda0dc587 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/AggregatedDialect.scala @@ -31,7 +31,7 @@ private class AggregatedDialect(dialects: List[JdbcDialect]) extends JdbcDialect require(dialects.nonEmpty) override def canHandle(url : String): Boolean = - dialects.map(_.canHandle(url)).reduce(_ && _) + dialects.map(_.canHandle(url)).reduceLeft(_ && _) override def getCatalystType( sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index b15af42caa3ab..fd76a7adee562 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -52,7 +52,7 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { val splits = data.randomSplit(Array[Double](1, 2, 3), seed) assert(splits.length == 3, "wrong number of splits") - assert(splits.reduce((a, b) => a.unionAll(b)).sort("id").collect().toList == + assert(splits.reduceLeft((a, b) => a.unionAll(b)).sort("id").collect().toList == data.collect().toList, "incomplete or wrong split") val s = splits.map(_.count()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala index 00f1526576cc5..d739b887b2b94 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UnsafeRowSuite.scala @@ -141,7 +141,7 @@ class UnsafeRowSuite extends SparkFunSuite { unsafeRow.setInt(0, 2) assert(emptyRow.getInt(0) === 1) - val longString = UTF8String.fromString((1 to 100).map(_ => "abc").reduce(_ + _)) + val longString = UTF8String.fromString((1 to 100).map(_ => "abc").reduceLeft(_ + _)) val row2 = InternalRow(3, longString) val unsafeRow2 = converter.apply(row2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala index 2cad964e55b2b..4a1df4e7b05f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/FilteredScanSuite.scala @@ -115,7 +115,7 @@ case class SimpleFilteredScan(from: Int, to: Int)(@transient val sqlContext: SQL } sqlContext.sparkContext.parallelize(from to to).filter(eval).map(i => - Row.fromSeq(rowBuilders.map(_(i)).reduceOption(_ ++ _).getOrElse(Seq.empty))) + Row.fromSeq(rowBuilders.map(_(i)).reduceLeftOption(_ ++ _).getOrElse(Seq.empty))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala index a89c5f8007e78..56a2543bdc1cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PrunedScanSuite.scala @@ -48,7 +48,7 @@ case class SimplePrunedScan(from: Int, to: Int)(@transient val sqlContext: SQLCo } sqlContext.sparkContext.parallelize(from to to).map(i => - Row.fromSeq(rowBuilders.map(_(i)).reduceOption(_ ++ _).getOrElse(Seq.empty))) + Row.fromSeq(rowBuilders.map(_(i)).reduceLeftOption(_ ++ _).getOrElse(Seq.empty))) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 27193f54d3a91..2ed3bd78d44da 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -34,7 +34,7 @@ private[orc] object OrcFilters extends Logging { def createFilter(filters: Array[Filter]): Option[SearchArgument] = { for { // Combines all filters with `And`s to produce a single conjunction predicate - conjunction <- filters.reduceOption(And) + conjunction <- filters.reduceLeftOption(And) // Then tries to build a single ORC `SearchArgument` for the conjunction predicate builder <- buildSearchArgument(conjunction, SearchArgumentFactory.newBuilder()) } yield builder.build() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index bdc48a383bbbf..720cf50a88e82 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -147,7 +147,7 @@ class SimpleTextRelation( val literal = Literal.create(value, dataType) val attribute = inputAttributes.find(_.name == column).get expressions.GreaterThan(attribute, literal) - }.reduceOption(expressions.And).getOrElse(Literal(true)) + }.reduceLeftOption(expressions.And).getOrElse(Literal(true)) InterpretedPredicate.create(filterCondition, inputAttributes) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala index 6a583bf2a3626..7116898a00ca2 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala @@ -148,17 +148,17 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag]( "Are you sure your key class hashes consistently?") } // Reduce the new values - newValues.reduce(reduceF) // return + newValues.reduceLeft(reduceF) // return } else { // Get the previous window's reduced value var tempValue = arrayOfValues(0).head // If old values exists, then inverse reduce then from previous value if (!oldValues.isEmpty) { - tempValue = invReduceF(tempValue, oldValues.reduce(reduceF)) + tempValue = invReduceF(tempValue, oldValues.reduceLeft(reduceF)) } // If new values exists, then reduce them with previous value if (!newValues.isEmpty) { - tempValue = reduceF(tempValue, newValues.reduce(reduceF)) + tempValue = reduceF(tempValue, newValues.reduceLeft(reduceF)) } tempValue // return } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 9d296c6d3ef8b..a1afe6b859ff8 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -74,7 +74,7 @@ class BasicOperationsSuite extends TestSuiteBase { assert(numInputPartitions === 2, "Number of input partitions has been changed from 2") val input = Seq(1 to 4, 5 to 8, 9 to 12) val output = Seq(Seq(3, 7), Seq(11, 15), Seq(19, 23)) - val operation = (r: DStream[Int]) => r.mapPartitions(x => Iterator(x.reduce(_ + _))) + val operation = (r: DStream[Int]) => r.mapPartitions(x => Iterator(x.reduceLeft(_ + _))) testOperation(input, operation, output, true) }