From fa40db011e9dd9e67482d5a659103196d5a6c8a6 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 19 Dec 2014 10:27:42 +0800 Subject: [PATCH 1/5] Add an Ordering for NullWritable to make the compiler generate same byte codes for RDD --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 214f22bc5b603..4bdb13f9ad3d0 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1174,6 +1174,14 @@ abstract class RDD[T: ClassTag]( * Save this RDD as a text file, using string representations of elements. */ def saveAsTextFile(path: String) { + // https://issues.apache.org/jira/browse/SPARK-2075 + // NullWritable is a Comparable rather than Comparable[NullWritable] in Hadoop 1.+, + // so the compiler cannot find an implicit Ordering for it. It will generate different + // anonymous classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. Therefore, here we + // provide an Ordering for NullWritable so that the compiler will generate same codes. + implicit val nullWritableOrdering = new Ordering[NullWritable] { + override def compare(x: NullWritable, y: NullWritable): Int = 0 + } this.map(x => (NullWritable.get(), new Text(x.toString))) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) } @@ -1182,6 +1190,10 @@ abstract class RDD[T: ClassTag]( * Save this RDD as a compressed text file, using string representations of elements. */ def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) { + // https://issues.apache.org/jira/browse/SPARK-2075 + implicit val nullWritableOrdering = new Ordering[NullWritable] { + override def compare(x: NullWritable, y: NullWritable): Int = 0 + } this.map(x => (NullWritable.get(), new Text(x.toString))) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec) } From ca03559acf8e01afe4f0fa5d6c15a9283e9ee975 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 19 Dec 2014 12:17:16 +0800 Subject: [PATCH 2/5] Use reflection to access JobContext/TaskAttemptContext.getConfiguration --- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 12 ++++++++++++ .../spark/input/FixedLengthBinaryInputFormat.scala | 4 +++- .../spark/input/FixedLengthBinaryRecordReader.scala | 3 ++- .../org/apache/spark/input/PortableDataStream.scala | 5 ++++- .../spark/input/WholeTextFileRecordReader.scala | 4 +++- 5 files changed, 24 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 60ee115e393ce..57f9faf5ddd1d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.fs.FileSystem.Statistics import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation @@ -183,6 +184,17 @@ class SparkHadoopUtil extends Logging { Class.forName("org.apache.hadoop.fs.FileSystem$Statistics$StatisticsData") statisticsDataClass.getDeclaredMethod(methodName) } + + /** + * Using reflection to get the Configuration from JobContext/TaskAttemptContext. If we directly + * call `JobContext/TaskAttemptContext.getConfiguration`, it will generate different byte codes + * for Hadoop 1.+ and Hadoop 2.+ because JobContext/TaskAttemptContext is class in Hadoop 1.+ + * while it's interface in Hadoop 2.+. + */ + def getConfigurationFromJobContext(context: JobContext): Configuration = { + val method = context.getClass.getMethod("getConfiguration") + method.invoke(context).asInstanceOf[Configuration] + } } object SparkHadoopUtil { diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala index 89b29af2000c8..4293493850472 100644 --- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala @@ -21,6 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{BytesWritable, LongWritable} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext} +import org.apache.spark.deploy.SparkHadoopUtil /** * Custom Input Format for reading and splitting flat binary files that contain records, @@ -33,7 +34,8 @@ private[spark] object FixedLengthBinaryInputFormat { /** Retrieves the record length property from a Hadoop configuration */ def getRecordLength(context: JobContext): Int = { - context.getConfiguration.get(RECORD_LENGTH_PROPERTY).toInt + SparkHadoopUtil.get.getConfigurationFromJobContext(context). + get(RECORD_LENGTH_PROPERTY).toInt } } diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala index 36a1e5d475f46..67a96925da019 100644 --- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryRecordReader.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.io.compress.CompressionCodecFactory import org.apache.hadoop.io.{BytesWritable, LongWritable} import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.spark.deploy.SparkHadoopUtil /** * FixedLengthBinaryRecordReader is returned by FixedLengthBinaryInputFormat. @@ -82,7 +83,7 @@ private[spark] class FixedLengthBinaryRecordReader // the actual file we will be reading from val file = fileSplit.getPath // job configuration - val job = context.getConfiguration + val job = SparkHadoopUtil.get.getConfigurationFromJobContext(context) // check compression val codec = new CompressionCodecFactory(job).getCodec(file) if (codec != null) { diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index 457472547fcbb..6dc93e2f3b2fb 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -19,6 +19,8 @@ package org.apache.spark.input import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} +import org.apache.spark.deploy.SparkHadoopUtil + import scala.collection.JavaConversions._ import com.google.common.io.ByteStreams @@ -145,7 +147,8 @@ class PortableDataStream( private val confBytes = { val baos = new ByteArrayOutputStream() - context.getConfiguration.write(new DataOutputStream(baos)) + SparkHadoopUtil.get.getConfigurationFromJobContext(context). + write(new DataOutputStream(baos)) baos.toByteArray } diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala index 1b1131b9b8831..31bde8a78f3c6 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.InputSplit import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, CombineFileRecordReader} import org.apache.hadoop.mapreduce.RecordReader import org.apache.hadoop.mapreduce.TaskAttemptContext +import org.apache.spark.deploy.SparkHadoopUtil /** @@ -51,7 +52,8 @@ private[spark] class WholeTextFileRecordReader( extends RecordReader[String, String] with Configurable { private[this] val path = split.getPath(index) - private[this] val fs = path.getFileSystem(context.getConfiguration) + private[this] val fs = path.getFileSystem( + SparkHadoopUtil.get.getConfigurationFromJobContext(context)) // True means the current file has been processed, then skip it. private[this] var processed = false From 734bac96340f1d82584aa0bcba67d3f60e09c39d Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 19 Dec 2014 15:56:05 +0800 Subject: [PATCH 3/5] Explicitly set the implicit parameters --- .../main/scala/org/apache/spark/rdd/RDD.scala | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 4bdb13f9ad3d0..bd17dae14d3a8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1176,13 +1176,17 @@ abstract class RDD[T: ClassTag]( def saveAsTextFile(path: String) { // https://issues.apache.org/jira/browse/SPARK-2075 // NullWritable is a Comparable rather than Comparable[NullWritable] in Hadoop 1.+, - // so the compiler cannot find an implicit Ordering for it. It will generate different - // anonymous classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. Therefore, here we - // provide an Ordering for NullWritable so that the compiler will generate same codes. - implicit val nullWritableOrdering = new Ordering[NullWritable] { + // so the compiler cannot find an implicit Ordering for it and will use the default `null`. + // It will generate different anonymous classes for `saveAsTextFile` in Hadoop 1.+ and + // Hadoop 2.+. Therefore, here we provide an Ordering for NullWritable so that the compiler + // will generate same bytecode. + val nullWritableOrdering = new Ordering[NullWritable] { override def compare(x: NullWritable, y: NullWritable): Int = 0 } - this.map(x => (NullWritable.get(), new Text(x.toString))) + val nullWritableClassTag = implicitly[ClassTag[NullWritable]] + val textClassTag = implicitly[ClassTag[Text]] + val r = this.map(x => (NullWritable.get(), new Text(x.toString))) + RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, nullWritableOrdering) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) } @@ -1191,10 +1195,13 @@ abstract class RDD[T: ClassTag]( */ def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) { // https://issues.apache.org/jira/browse/SPARK-2075 - implicit val nullWritableOrdering = new Ordering[NullWritable] { + val nullWritableOrdering = new Ordering[NullWritable] { override def compare(x: NullWritable, y: NullWritable): Int = 0 } - this.map(x => (NullWritable.get(), new Text(x.toString))) + val nullWritableClassTag = implicitly[ClassTag[NullWritable]] + val textClassTag = implicitly[ClassTag[Text]] + val r = this.map(x => (NullWritable.get(), new Text(x.toString))) + RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, nullWritableOrdering) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec) } From e4ad8b5230c64d9c0224231454d4fbc739c16279 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 20 Dec 2014 21:26:36 +0800 Subject: [PATCH 4/5] Use null for the implicit Ordering --- .../main/scala/org/apache/spark/rdd/RDD.scala | 24 +++++++++---------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index bd17dae14d3a8..a94206963b52f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1175,18 +1175,19 @@ abstract class RDD[T: ClassTag]( */ def saveAsTextFile(path: String) { // https://issues.apache.org/jira/browse/SPARK-2075 - // NullWritable is a Comparable rather than Comparable[NullWritable] in Hadoop 1.+, - // so the compiler cannot find an implicit Ordering for it and will use the default `null`. - // It will generate different anonymous classes for `saveAsTextFile` in Hadoop 1.+ and - // Hadoop 2.+. Therefore, here we provide an Ordering for NullWritable so that the compiler - // will generate same bytecode. - val nullWritableOrdering = new Ordering[NullWritable] { - override def compare(x: NullWritable, y: NullWritable): Int = 0 - } + // + // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit + // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]` + // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an + // Ordering for `NullWritable`. That's why the compiler will generate different anonymous + // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+. + // + // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate + // same bytecodes for `saveAsTextFile`. val nullWritableClassTag = implicitly[ClassTag[NullWritable]] val textClassTag = implicitly[ClassTag[Text]] val r = this.map(x => (NullWritable.get(), new Text(x.toString))) - RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, nullWritableOrdering) + RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) } @@ -1195,13 +1196,10 @@ abstract class RDD[T: ClassTag]( */ def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) { // https://issues.apache.org/jira/browse/SPARK-2075 - val nullWritableOrdering = new Ordering[NullWritable] { - override def compare(x: NullWritable, y: NullWritable): Int = 0 - } val nullWritableClassTag = implicitly[ClassTag[NullWritable]] val textClassTag = implicitly[ClassTag[Text]] val r = this.map(x => (NullWritable.get(), new Text(x.toString))) - RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, nullWritableOrdering) + RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec) } From 39d9df2e3af8b6a5da21787f0a59a7f2006cb0be Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 20 Dec 2014 21:31:04 +0800 Subject: [PATCH 5/5] Fix the code style --- .../org/apache/spark/input/FixedLengthBinaryInputFormat.scala | 3 +-- .../main/scala/org/apache/spark/input/PortableDataStream.scala | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala index 4293493850472..c219d21fbefa9 100644 --- a/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/FixedLengthBinaryInputFormat.scala @@ -34,8 +34,7 @@ private[spark] object FixedLengthBinaryInputFormat { /** Retrieves the record length property from a Hadoop configuration */ def getRecordLength(context: JobContext): Int = { - SparkHadoopUtil.get.getConfigurationFromJobContext(context). - get(RECORD_LENGTH_PROPERTY).toInt + SparkHadoopUtil.get.getConfigurationFromJobContext(context).get(RECORD_LENGTH_PROPERTY).toInt } } diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index 6dc93e2f3b2fb..593a62b3e3b32 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -19,8 +19,6 @@ package org.apache.spark.input import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} -import org.apache.spark.deploy.SparkHadoopUtil - import scala.collection.JavaConversions._ import com.google.common.io.ByteStreams @@ -30,6 +28,7 @@ import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAt import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat, CombineFileRecordReader, CombineFileSplit} import org.apache.spark.annotation.Experimental +import org.apache.spark.deploy.SparkHadoopUtil /** * A general format for reading whole files in as streams, byte arrays,