From df587f38de919ca6cdb7b383f96522415c417c9e Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sat, 19 Jul 2014 13:51:27 +0100 Subject: [PATCH] Downgrade to Guava 11 from 14. Remove 12+ functionality, or replace with equivalents from 11 --- .../main/scala/org/apache/spark/rdd/RDD.scala | 2 +- .../spark/util/collection/AppendOnlyMap.scala | 2 +- .../collection/ExternalAppendOnlyMap.scala | 4 +- .../spark/util/collection/OpenHashSet.scala | 2 +- .../apache/spark/util/collection/Utils.scala | 39 ------------------- pom.xml | 8 ++-- .../spark/sql/parquet/ParquetFilters.scala | 7 ++-- 7 files changed, 12 insertions(+), 52 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/util/collection/Utils.scala diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a6abc49c5359e..120918d83dd02 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1130,7 +1130,7 @@ abstract class RDD[T: ClassTag]( mapPartitions { items => // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) - queue ++= util.collection.Utils.takeOrdered(items, num)(ord) + queue ++= items Iterator.single(queue) }.reduce { (queue1, queue2) => queue1 ++= queue2 diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index 290282c9c2e28..0359244ea28ca 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -202,7 +202,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) /** * Re-hash a value to deal better with hash functions that don't differ in the lower bits. */ - private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt() + private def rehash(h: Int): Int = Hashing.murmur3_32().hashLong(h).asInt() /** Double the table's size and re-hash everything */ protected def growTable() { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 6f263c39d1435..790d5941a360e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -24,7 +24,7 @@ import scala.collection.BufferedIterator import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import com.google.common.io.ByteStreams +import com.google.common.io.LimitInputStream import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.annotation.DeveloperApi @@ -396,7 +396,7 @@ class ExternalAppendOnlyMap[K, V, C]( */ private def nextBatchStream(): InputStream = { if (batchSizes.length > 0) { - ByteStreams.limit(bufferedStream, batchSizes.remove(0)) + new LimitInputStream(bufferedStream, batchSizes.remove(0)) } else { // No more batches left bufferedStream diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index 4e363b74f4bef..033455363e1a4 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -258,7 +258,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( /** * Re-hash a value to deal better with hash functions that don't differ in the lower bits. */ - private def hashcode(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt() + private def hashcode(h: Int): Int = Hashing.murmur3_32().hashLong(h).asInt() private def nextPowerOf2(n: Int): Int = { val highBit = Integer.highestOneBit(n) diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala deleted file mode 100644 index c5268c0fae0ef..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util.collection - -import scala.collection.JavaConversions.{collectionAsScalaIterable, asJavaIterator} - -import com.google.common.collect.{Ordering => GuavaOrdering} - -/** - * Utility functions for collections. - */ -private[spark] object Utils { - - /** - * Returns the first K elements from the input as defined by the specified implicit Ordering[T] - * and maintains the ordering. - */ - def takeOrdered[T](input: Iterator[T], num: Int)(implicit ord: Ordering[T]): Iterator[T] = { - val ordering = new GuavaOrdering[T] { - override def compare(l: T, r: T) = ord.compare(l, r) - } - collectionAsScalaIterable(ordering.leastOf(asJavaIterator(input), num)).iterator - } -} diff --git a/pom.xml b/pom.xml index 4e2d64a833640..55dd27b91b277 100644 --- a/pom.xml +++ b/pom.xml @@ -244,7 +244,7 @@ com.google.guava guava - 14.0.1 + 11.0.2 org.apache.commons @@ -252,9 +252,9 @@ 3.3.2 - commons-codec - commons-codec - 1.5 + commons-codec + commons-codec + 1.5 com.google.code.findbugs diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala index cc575bedd8fcb..b519c8e67eca7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetFilters.scala @@ -17,14 +17,13 @@ package org.apache.spark.sql.parquet +import org.apache.commons.codec.binary.Base64 import org.apache.hadoop.conf.Configuration import parquet.filter._ import parquet.filter.ColumnPredicates._ import parquet.column.ColumnReader -import com.google.common.io.BaseEncoding - import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.expressions.{Predicate => CatalystPredicate} import org.apache.spark.sql.catalyst.expressions._ @@ -237,7 +236,7 @@ object ParquetFilters { def serializeFilterExpressions(filters: Seq[Expression], conf: Configuration): Unit = { if (filters.length > 0) { val serialized: Array[Byte] = SparkSqlSerializer.serialize(filters) - val encoded: String = BaseEncoding.base64().encode(serialized) + val encoded: String = new Base64().encodeAsString(serialized) conf.set(PARQUET_FILTER_DATA, encoded) } } @@ -250,7 +249,7 @@ object ParquetFilters { def deserializeFilterExpressions(conf: Configuration): Seq[Expression] = { val data = conf.get(PARQUET_FILTER_DATA) if (data != null) { - val decoded: Array[Byte] = BaseEncoding.base64().decode(data) + val decoded: Array[Byte] = new Base64().decode(data) SparkSqlSerializer.deserialize(decoded) } else { Seq()