[SPARK-2759][CORE] Generic Binary File Support in Spark #1658

Closed
wants to merge 30 commits into
from

Projects

None yet

8 participants

@kmader
Contributor
kmader commented Jul 30, 2014

The additions add the abstract BinaryFileInputFormat and BinaryRecordReader classes for reading in data as a byte stream and converting it to another format using the def parseByteArray(inArray: Array[Byte]): T function.
As a trivial example ByteInputFormat and ByteRecordReader are included which just return the Array[Byte] from a given file.
Finally a RDD for BinaryFileInputFormat (to allow for easier partitioning changes as was done for WholeFileInput) was added and the appropriate byteFiles to the SparkContext so the functions can be easily used by others.
A common use case might be to read in a folder

sc.byteFiles("s3://mydrive/tif/*.tif").map(rawData => ReadTiffFromByteArray(rawData))
@mateiz
Contributor
mateiz commented Jul 30, 2014

Do you mind opening a JIRA issue on https://issues.apache.org/jira/browse/SPARK to track this?

Also, I wonder if we should make the API just return an RDD of InputStreams. That way users can read directly from a stream and don't need to load the whole file in memory into a byte array. The only awkward thing is that calling cache() on an RDD of InputStreams wouldn't work, but hopefully this is obvious (and will be documented). Or if that doesn't sound good, we could return some objects that let you open a stream repeatedly (some kind of BinaryFile object with a stream method).

@mateiz
Contributor
mateiz commented Jul 30, 2014

Jenkins, this is ok to test

@mateiz mateiz commented on an outdated diff Jul 30, 2014
core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -511,6 +511,26 @@ class SparkContext(config: SparkConf) extends Logging {
}
/**
+ * Get an RDD for a Hadoop-readable dataset as byte-streams for each file (useful for binary data)
+ *
+ * @param minPartitions A suggestion value of the minimal splitting number for input data.
+ *
+ * @note Small files are preferred, large file is also allowable, but may cause bad performance.
+ */
+ def byteFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, Array[Byte])] = {
@mateiz
mateiz Jul 30, 2014 Contributor

I'd call this binaryFiles.

Also, please add it to JavaSparkContext, and ideally we'd have a way to add it to Python as well. That one will be trickier -- we probably need to read the file in chunks and pass them to Python. But I think it's important to design the API as part of this change.

@SparkQA
SparkQA commented Jul 30, 2014

QA tests have started for PR 1658. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17505/consoleFull

@SparkQA
SparkQA commented Jul 30, 2014

QA results for PR 1658:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
@serializable abstract class BinaryRecordReader[T](
@serializable class ByteInputFormat extends BinaryFileInputFormat[Array[Byte]] {
@serializable class ByteRecordReader(

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17505/consoleFull

@kmader kmader added apache headers, added datainputstream directly as an output opt…
…ion for more complicated readers (HDF5 perhaps), and renamed several of the functions and files to be more consistent. Also added parallel functions to the java api
1cfa38a
@SparkQA
SparkQA commented Jul 31, 2014

QA tests have started for PR 1658. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17517/consoleFull

@SparkQA
SparkQA commented Jul 31, 2014

QA results for PR 1658:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class StreamBasedRecordReader[T](
class StreamRecordReader(
* A class for extracting the information from the file using the BinaryRecordReader (as Byte array)
class StreamInputFormat extends StreamFileInputFormat[DataInputStream] {
abstract class BinaryRecordReader[T](
class ByteRecordReader(
* A class for extracting the information from the file using the BinaryRecordReader (as Byte array)
class ByteInputFormat extends StreamFileInputFormat[Array[Byte]] {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17517/consoleFull

@SparkQA
SparkQA commented Jul 31, 2014

QA tests have started for PR 1658. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17523/consoleFull

@kmader
Contributor
kmader commented Jul 31, 2014

Thanks for the feedback, I have made the changes requested, created an issue (https://issues.apache.org/jira/browse/SPARK-2759), and added a dataStreamFiles to both SparkContext and JavaSparkContext which returns the DataInputStream itself (I have a feeling this might create a few more new issues with serialization or properly closing or rerunning tasks, but I guess we'll see).

My recommendation (as I have done in my own code) would be to use the abstract class StreamBasedRecordReader and implement an appropriate version for custom filetypes by implementing def parseStream(inStream: DataInputStream): T

As for PySpark it is my guess that is would be easiest to create a library of StreamBasedRecordReader classes for common file types since it is much less expensive to do IO on the Scala/Java level. Alternatively a Spark function could copy the file into a local directory on demand and provide the local filename to Python

@kmader kmader changed the title from Generic Binary File Support in Spark to [SPARK-2759][CORE] Generic Binary File Support in Spark Jul 31, 2014
@SparkQA
SparkQA commented Jul 31, 2014

QA results for PR 1658:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class StreamBasedRecordReader[T](
class StreamRecordReader(
class StreamInputFormat extends StreamFileInputFormat[DataInputStream] {
abstract class BinaryRecordReader[T](
class ByteRecordReader(
* A class for reading the file using the BinaryRecordReader (as Byte array)
class ByteInputFormat extends StreamFileInputFormat[Array[Byte]] {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17523/consoleFull

@freeman-lab
Contributor

@kmader @mateiz this looks really useful! I was about to submit a related PR for an InputFormat that reads and splits large flat binary files into records (of a specified length), rather than read one file per record as here. We find this is the easiest easy way for users to bring large numerical data from existing NumPy / Matlab pipelines into Spark. Here's a gist. Would these be compatible? Perhaps analogous to the text case, we could have both byteFile and wholeByteFiles?

@kmader
Contributor
kmader commented Aug 13, 2014

@freeman-lab looks good, I will add it to this pull request if that's ok for you. I think my personal preference would be do keep binaryFiles for standard operations and fixedLengthBinaryFiles for other files since many standard binary formats are not so easily partition-able and trying to read in tif, jpg, even hdf5 and raw under such conditions will be rather difficult to do correctly. Where as for text files line by line is a common partitioning. Perhaps there are other use cases that I am not familiar with that speak against this though.

@kmader kmader Added FixedLengthBinaryInputFormat and RecordReader from freeman-lab …
…and added them to both the JavaSparkContext and the SparkContext as fixedLengthBinaryFile
eacfaa6
@SparkQA
SparkQA commented Aug 13, 2014

QA tests have started for PR 1658. This patch DID NOT merge cleanly!
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18439/consoleFull

@SparkQA
SparkQA commented Aug 13, 2014

QA results for PR 1658:
- This patch FAILED unit tests.

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18439/consoleFull

@kmader kmader commented on an outdated diff Aug 13, 2014
core/src/main/scala/org/apache/spark/SparkContext.scala
+ classOf[StreamInputFormat],
+ classOf[String],
+ classOf[DataInputStream],
+ updateConf,
+ minPartitions).setName(path)
+ }
+
+ /**
+ * Load data from a flat binary file, assuming each record is a set of numbers
+ * with the specified numerical format (see ByteBuffer), and the number of
+ * bytes per record is constant (see FixedLengthBinaryInputFormat)
+ *
+ * @param path Directory to the input data files
+ * @return An RDD of data with values, RDD[(Array[Byte])]
+ */
+ def fixedLengthBinaryFiles(path: String): RDD[Array[Byte]] = {
@kmader
kmader Aug 13, 2014 Contributor

This has been taken almost directly from
https://github.com/freeman-lab/thunder/blob/master/scala/src/main/scala/thunder/util/Load.scala without the extra formatting to load it as a a list of doubles

@SparkQA
SparkQA commented Aug 13, 2014

QA tests have started for PR 1658. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18441/consoleFull

@SparkQA
SparkQA commented Aug 13, 2014

QA results for PR 1658:
- This patch PASSES unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
class FixedLengthBinaryInputFormat extends FileInputFormat[LongWritable, BytesWritable] {
class FixedLengthBinaryRecordReader extends RecordReader[LongWritable, BytesWritable] {
abstract class StreamBasedRecordReader[T](
class StreamRecordReader(
class StreamInputFormat extends StreamFileInputFormat[DataInputStream] {
abstract class BinaryRecordReader[T](
class ByteRecordReader(
* A class for reading the file using the BinaryRecordReader (as Byte array)
class ByteInputFormat extends StreamFileInputFormat[Array[Byte]] {

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18441/consoleFull

@mateiz
Contributor
mateiz commented Aug 13, 2014

Hey, sorry for taking a bit of time to get back to this (I've been looking at 1.1 stuff), but I have a few comments on the API:

  • Do we need both a stream API and a byte array one? I'd personally offer only the stream one because it's less likely to cause crashes (with the other one there's a risk of OutOfMemoryError).
  • For the files vs fixed-length records, maybe we can call the methods binaryFiles and binaryRecords.
  • Are you planning to create saveAsBinaryFiles / saveAsBinaryRecords too? We don't have to have it in this PR but it would be useful.
@mateiz mateiz commented on an outdated diff Aug 13, 2014
core/src/main/scala/org/apache/spark/SparkContext.scala
+ this,
+ classOf[ByteInputFormat],
+ classOf[String],
+ classOf[Array[Byte]],
+ updateConf,
+ minPartitions).setName(path)
+ }
+
+ /**
+ * Get an RDD for a Hadoop-readable dataset as DataInputStreams for each file
+ * (useful for binary data)
+ *
+ *
+ * @param minPartitions A suggestion value of the minimal splitting number for input data.
+ *
+ * @note Care must be taken to close the files afterwards
@mateiz
mateiz Aug 13, 2014 Contributor

It is a bit unfortunate that users have to close the streams by hand. If you want to get around this, you can create a custom RDD wrapping around the HadoopRDD, whose compute() method can add a cleanup hook to its TaskContext to close the stream. Take a look at TaskContext.addOnCompleteCallback().

@mateiz
mateiz Oct 8, 2014 Contributor

Hey Kevin, is this @note still relevant? using addOnCompleteCallback you might be able to avoid this.

@mateiz mateiz commented on an outdated diff Aug 13, 2014
...cala/org/apache/spark/api/java/JavaSparkContext.scala
+ * `JavaPairRDD<String, byte[]> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
+ *
+ * <p> then `rdd` contains
+ * {{{
+ * (a-hdfs-path/part-00000, its content)
+ * (a-hdfs-path/part-00001, its content)
+ * ...
+ * (a-hdfs-path/part-nnnnn, its content)
+ * }}}
+ *
+ * @note Small files are preferred, large file is also allowable, but may cause bad performance.
+ *
+ * @param minPartitions A suggestion value of the minimal splitting number for input data.
+ */
+ def dataStreamFiles(path: String, minPartitions: Int = defaultMinPartitions):
+ JavaPairRDD[String,DataInputStream] = new JavaPairRDD(sc.dataStreamFiles(path,minPartitions))
@mateiz
mateiz Aug 13, 2014 Contributor

Put spaces after the commas and make sure the indentation matches our style guide (here it should be indented by two spaces after the "def").

@mateiz mateiz commented on an outdated diff Aug 13, 2014
...apache/spark/input/FixedLengthBinaryInputFormat.scala
+ */
+
+package org.apache.spark.input
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{BytesWritable, LongWritable}
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
+import org.apache.hadoop.mapreduce.{InputSplit, JobContext, RecordReader, TaskAttemptContext}
+
+/**
+ * Custom Input Format for reading and splitting flat binary files that contain records,
+ * each of which are a fixed size in bytes. The fixed record size is specified through
+ * a parameter recordLength in the Hadoop configuration.
+ */
+
+object FixedLengthBinaryInputFormat {
@mateiz
mateiz Aug 13, 2014 Contributor

All of these objects should be marked private[spark] so that they don't show up in the public API.

@mateiz mateiz commented on an outdated diff Aug 13, 2014
...apache/spark/input/FixedLengthBinaryInputFormat.scala
+ * only contains full records. Each InputSplit passed to FixedLengthBinaryRecordReader
+ * will start at the first byte of a record, and the last byte will the last byte of a record.
+ */
+ override def computeSplitSize(blockSize: Long, minSize: Long, maxSize: Long): Long = {
+
+ val defaultSize = super.computeSplitSize(blockSize, minSize, maxSize)
+
+ // If the default size is less than the length of a record, make it equal to it
+ // Otherwise, make sure the split size is as close to possible as the default size,
+ // but still contains a complete set of records, with the first record
+ // starting at the first byte in the split and the last record ending with the last byte
+
+ defaultSize match {
+ case x if x < recordLength => recordLength.toLong
+ case _ => (Math.floor(defaultSize / recordLength) * recordLength).toLong
+ }
@mateiz
mateiz Aug 13, 2014 Contributor

Probably clearer to write this as

if (defaultSize < recordLength) {
  recordLenght.toLong
} else {
  (Math.floor(defaultSize / recordLength) * recordLength).toLong
}
@mateiz mateiz commented on an outdated diff Aug 13, 2014
...apache/spark/input/FixedLengthBinaryInputFormat.scala
+ // but still contains a complete set of records, with the first record
+ // starting at the first byte in the split and the last record ending with the last byte
+
+ defaultSize match {
+ case x if x < recordLength => recordLength.toLong
+ case _ => (Math.floor(defaultSize / recordLength) * recordLength).toLong
+ }
+ }
+
+ /**
+ * Create a FixedLengthBinaryRecordReader
+ */
+ override def createRecordReader(split: InputSplit, context: TaskAttemptContext):
+ RecordReader[LongWritable, BytesWritable] = {
+ new FixedLengthBinaryRecordReader
+ }
@mateiz
mateiz Aug 13, 2014 Contributor

Nit: the indent is slightly wrong here, should look like

override def createRecordReader(split: InputSplit, context: TaskAttemptContext):
    RecordReader[LongWritable, BytesWritable] = {
  new FixedLengthBinaryRecordReader
}
@mateiz
Contributor
mateiz commented Aug 13, 2014

BTW apart from that, a few other comments:

  • Make sure that the classes / objects you add are private[spark] so they don't show up in the API docs and such.
  • You should also design some test suites for this to put in core/src/test. Take a look at other test suites to see how ScalaTest works and how we create temp files. Basically you want to create some files programmatically in the test, then try to read them with this API.

Thanks for the continued work on this BTW, it's looking great.

@freeman-lab
Contributor

@kmader great to incorporate the FixedRecord stuff into the PR, thanks! I like @mateiz's suggestion for naming the two methods. I was starting to work on saveAsBinaryRecords so could put that together in a separate PR, maybe @kmader already has some save methods for the binaryFiles case?

@kmader
Contributor
kmader commented Aug 14, 2014

Addressing the major issues brought up

Do we need both a stream API and a byte array one? The byte array might be more problematic with out of memory, but stream one might have issues if the streams are serialized and shuffled before being read. For reading tiff/jpg images, a byte array is a sufficient input, I also imagine this could get used for many types of proprietary but small files (spectra, special hardware devices, and the like) which could not easily be parsed by binaryRecord but would nonetheless be useful to read.

Finally for the file closing, I could avoid, I believe, rewriting too much code by just extending the close method of my StreamBasedRecordReader object to try again to close the stream. The NewHadoopRDD calls this method anyways using your executeOnCompleteCallbacks.

I would like to leave the abstract classes BinaryRecordReader and StreamFileInputFormat public since otherwise all of the implementations have to reside in org.apache.spark which is inconvenient for external packages, but I will make the others.

As for saving, I have code from my tools already but I would prefer to finish this pull request for input and then make a separate PR for saving since it is a different beast.

I have created simple test cases and added them to FileSuite and JavaAPISuite respectively

@SparkQA
SparkQA commented Aug 14, 2014

QA tests have started for PR 1658. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18551/consoleFull

@SparkQA
SparkQA commented Aug 14, 2014

QA results for PR 1658:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class StreamBasedRecordReader[T](
abstract class BinaryRecordReader[T](
* A class for reading the file using the BinaryRecordReader (as Byte array)

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18551/consoleFull

@SparkQA
SparkQA commented Aug 14, 2014

QA tests have started for PR 1658. This patch merges cleanly.
View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18556/consoleFull

@SparkQA
SparkQA commented Aug 14, 2014

QA results for PR 1658:
- This patch FAILED unit tests.
- This patch merges cleanly
- This patch adds the following public classes (experimental):
abstract class StreamBasedRecordReader[T](
abstract class BinaryRecordReader[T](
* A class for reading the file using the BinaryRecordReader (as Byte array)

For more information see test ouptut:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18556/consoleFull

@mateiz
Contributor
mateiz commented Aug 15, 2014

@kmader to allow streams to be shuffle-able, how about the following? We create a class called BinaryData with a method open() that returns an InputStream. The BinaryData object has information about an InputSplit inside and can thus be shuffled across the network, cached, etc. And whenever users want to read it, they can get a stream. Would this solve the problem?

@loveconan1988

it should can be solve 。

------------------ 原始邮件 ------------------
发件人: "Matei Zaharia";notifications@github.com;
发送时间: 2014年8月15日(星期五) 上午9:55
收件人: "apache/spark"spark@noreply.github.com;

主题: Re: [spark] [SPARK-2759][CORE] Generic Binary File Support in Spark(#1658)

@kmader to allow streams to be shuffle-able, how about the following? We create a class called BinaryData with a method open() that returns an InputStream. The BinaryData object has information about an InputSplit inside and can thus be shuffled across the network, cached, etc. And whenever users want to read it, they can get a stream. Would this solve the problem?


Reply to this email directly or view it on GitHub.

@SparkQA
SparkQA commented Sep 5, 2014

Can one of the admins verify this patch?

@mateiz
Contributor
mateiz commented Sep 6, 2014

Hey @kmader, just curious, did you think about this BinaryData object idea? If you don't have time to do it, someone else can also pick up from where you left off.

@kmader
Contributor
kmader commented Sep 7, 2014

Hey @mateiz, Sorry, I had other projects to work on. I have made the changes and called the new class PortableDataStream

@jrabary
Contributor
jrabary commented Sep 16, 2014

Hi all,
I'm trying to use this patch to load a set of jpeg images but the path (key) of the output is empty

val image = sc.binaryFiles("data/*.jpg")
image.take(1) foreach println
(,[B@15435ac)

how can I correct this ?

kmader added some commits Sep 16, 2014
@kmader kmader Update RawFileInput.scala
trying to fix bug where name appears blank
a01c9cf
@kmader kmader Update RawFileInput.scala
typo in path value
932a206
@kmader
Contributor
kmader commented Sep 16, 2014

Thanks @jrabary for this find, it had to do with the new method for handling PortableDataStreams which didn't calculate the name correctly. I think I have it fixed now

val images = sc.binaryFiles("figure/*.png")
println(images.first)

Output

(figure/unnamed-chunk-2.png,[B@2f1a30ca)
@mateiz mateiz commented on an outdated diff Sep 18, 2014
.../main/scala/org/apache/spark/input/RawFileInput.scala
+ extends Serializable {
+
+ private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream]
+ private var isOpen = false
+ /**
+ * Calculate the path name independently of opening the file
+ */
+ private lazy val path = {
+ val pathp = split.getPath(index)
+ pathp.toString
+ }
+
+ /**
+ * create a new DataInputStream from the split and context
+ */
+ def open(): FSDataInputStream = {
@mateiz
mateiz Sep 18, 2014 Contributor

Instead of returning this Hadoop data type, can we return a java.io.DataInputStream? It's easier to maintain in the future.

@mateiz mateiz commented on an outdated diff Sep 18, 2014
.../main/scala/org/apache/spark/input/RawFileInput.scala
+ } catch {
+ case ioe: java.io.IOException => // do nothing
+ }
+ }
+ }
+ def getPath(): String = path
+}
+
+/**
+ * An abstract class of [[org.apache.hadoop.mapreduce.RecordReader RecordReader]]
+ * to reading files out as streams
+ */
+abstract class StreamBasedRecordReader[T](
+ split: CombineFileSplit,
+ context: TaskAttemptContext,
+ index: Integer)
@mateiz
mateiz Sep 18, 2014 Contributor

Formatting here is kind of wrong, the arguments should be indented only 4 spaces

@mateiz mateiz commented on an outdated diff Sep 18, 2014
.../main/scala/org/apache/spark/input/RawFileInput.scala
+ super.setMaxSplitSize(maxSplitSize)
+ }
+
+ def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
+ RecordReader[String,T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by not creating them
+ * until they need to be read
+ */
+class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer)
+ extends Serializable {
+
+ private var fileIn: FSDataInputStream = null.asInstanceOf[FSDataInputStream]
@mateiz
mateiz Sep 18, 2014 Contributor

I think you can just write = null

@mateiz mateiz commented on an outdated diff Sep 18, 2014
.../main/scala/org/apache/spark/input/RawFileInput.scala
+ }.sum
+
+ val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
+ super.setMaxSplitSize(maxSplitSize)
+ }
+
+ def createRecordReader(split: InputSplit, taContext: TaskAttemptContext):
+ RecordReader[String,T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by not creating them
+ * until they need to be read
+ */
+class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer)
@mateiz
mateiz Sep 18, 2014 Contributor

IMO we should call this BinaryData or DataStream instead of PortableDataStream, because the user doesn't really care that it's portable. I prefer BinaryData slightly more but I'd also be okay with DataStream.

@mateiz mateiz commented on an outdated diff Sep 18, 2014
core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -511,6 +511,67 @@ class SparkContext(config: SparkConf) extends Logging {
}
/**
+ * Get an RDD for a Hadoop-readable dataset as byte-streams for each file
+ * (useful for binary data)
+ *
+ * @param minPartitions A suggestion value of the minimal splitting number for input data.
+ *
+ * @note Small files are preferred, large file is also allowable, but may cause bad performance.
+ */
+ def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
+ RDD[(String, Array[Byte])] = {
@mateiz
mateiz Sep 18, 2014 Contributor

What if we just added a toArray to PortableDataStream, and had only one method for reading these? Then you could do sc.binaryFiles(...).map(_.toArray) if you want to get byte arrays. Or would this cause a regression?

Basically my suggestion is to have binaryFiles, which will return an RDD of PortableDataStream, and binaryRecords, which will return an RDD of byte arrays of the same length (since I imagine there's no point streaming a record).

@mateiz
Contributor
mateiz commented Sep 18, 2014

Jenkins, add to whitelist and test this please

@mateiz
Contributor
mateiz commented Sep 18, 2014

@kmader sorry for the delay on this, but the new version looks pretty good. Made a few more comments on the API. The other thing I see is that the code style is not quite matching the rest of the project in some places -- you can do sbt scalastyle to check it, or Jenkins will also check it. Will point out the bigger ones.

@mateiz mateiz commented on an outdated diff Sep 18, 2014
.../main/scala/org/apache/spark/input/RawFileInput.scala
+ * A general format for reading whole files in as streams, byte arrays,
+ * or other functions to be added
+ */
+abstract class StreamFileInputFormat[T]
+ extends CombineFileInputFormat[String,T] {
+ override protected def isSplitable(context: JobContext, file: Path): Boolean = false
+ /**
+ * Allow minPartitions set by end-user in order to keep compatibility with old Hadoop API.
+ */
+ def setMaxSplitSize(context: JobContext, minPartitions: Int) {
+ val files = listStatus(context)
+ val totalLen = files.map { file =>
+ if (file.isDir) 0L else file.getLen
+ }.sum
+
+ val maxSplitSize = Math.ceil(totalLen*1.0/files.length).toLong
@mateiz
mateiz Sep 18, 2014 Contributor

Put spaces around binary operators like * and /

@mateiz mateiz commented on an outdated diff Sep 18, 2014
.../main/scala/org/apache/spark/input/RawFileInput.scala
+ context: TaskAttemptContext,
+ index: Integer)
+ extends StreamBasedRecordReader[PortableDataStream](split,context,index) {
+
+ def parseStream(inStream: PortableDataStream): PortableDataStream = inStream
+}
+
+/**
+ * A class for extracting the information from the file using the
+ * BinaryRecordReader (as Byte array)
+ */
+private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] {
+ override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)=
+ {
+ new CombineFileRecordReader[String,PortableDataStream](
+ split.asInstanceOf[CombineFileSplit],taContext,classOf[StreamRecordReader]
@mateiz
mateiz Sep 18, 2014 Contributor

Put spaces after all commas

@SparkQA
SparkQA commented Sep 18, 2014

QA tests have started for PR 1658 at commit 932a206.

  • This patch merges cleanly.
@mateiz mateiz and 2 others commented on an outdated diff Sep 18, 2014
.../main/scala/org/apache/spark/input/RawFileInput.scala
+ */
+private[spark] class StreamInputFormat extends StreamFileInputFormat[PortableDataStream] {
+ override def createRecordReader(split: InputSplit, taContext: TaskAttemptContext)=
+ {
+ new CombineFileRecordReader[String,PortableDataStream](
+ split.asInstanceOf[CombineFileSplit],taContext,classOf[StreamRecordReader]
+ )
+ }
+}
+
+/**
+ * A [[org.apache.hadoop.mapreduce.RecordReader RecordReader]] for reading a single binary file
+ * out in a key-value pair, where the key is the file path and the value is the entire content of
+ * the file as a byte array
+ */
+abstract class BinaryRecordReader[T](
@mateiz
mateiz Sep 18, 2014 Contributor

This should probably be private[spark]

@kmader
kmader Oct 1, 2014 Contributor

I would prefer this as public so my other tools can extend it without being in org.apache.spark, as I imagine there will be a number of more specific types like TIFFReader, MP3Reader, AVIReader, etc

@mateiz
mateiz Oct 1, 2014 Contributor

In that case, add @DeveloperApi to it to make clear that this is an internal API. In general Spark provides very strong API compatibility guarantees (https://cwiki.apache.org/confluence/display/SPARK/Spark+Versioning+Policy) so we don't try to expose stuff unless we have to. Specifically, unless you make this DeveloperApi, you wouldn't be able to break the API or the semantics of this class in future Spark releases. For DeveloperApi we allow that, though we still discourage it, because we warn users that it might change.

@pwendell
pwendell Oct 1, 2014 Contributor

I'm not sure I see re-use in other tools as a sufficiently strong reason for making this bytecode public. This is a fairly small class, can you just copy this class into your project? Making this public will tie our hands in terms of changing this in the future.

@kmader
kmader Oct 2, 2014 Contributor

I have thought about the issue some more and due to the restructuring of the PR to use these PortableDataStreams. I now think that rather than implementing this abstract class, a better solution would be to take advantage of implicit class conversion and create for example implicit class PortableTiffReader(pds: PortableDataStream) with the method readAsTiff.

  • keeps the implementations of readers independent from the underlying filesystem and serialization issues.
  • easier to add new file formats which can be imported with a simple import and used from binaryFiles directly.
@mateiz mateiz commented on an outdated diff Sep 18, 2014
core/src/test/java/org/apache/spark/JavaAPISuite.java
+ String tempDirName = tempDir.getAbsolutePath();
+ File file1 = new File(tempDirName + "/part-00000");
+
+ FileOutputStream fos1 = new FileOutputStream(file1);
+
+ FileChannel channel1 = fos1.getChannel();
+ ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+ channel1.write(bbuf);
+
+
+ JavaPairRDD<String, byte[]> readRDD = sc.binaryFiles(tempDirName,3);
+ List<Tuple2<String, byte[]>> result = readRDD.collect();
+ for (Tuple2<String, byte[]> res : result) {
+ Assert.assertArrayEquals(content1, res._2());
+ }
+ }
@mateiz
mateiz Sep 18, 2014 Contributor

Make sure to add tests for the new methods too (binaryRecords and data streams). For data streams, add a test where we cache them and go over some RDD multiple times, to make sure they can be re-read.

@mateiz mateiz and 2 others commented on an outdated diff Sep 18, 2014
core/src/main/scala/org/apache/spark/SparkContext.scala
+ classOf[StreamInputFormat],
+ classOf[String],
+ classOf[PortableDataStream],
+ updateConf,
+ minPartitions).setName(path)
+ }
+
+ /**
+ * Load data from a flat binary file, assuming each record is a set of numbers
+ * with the specified numerical format (see ByteBuffer), and the number of
+ * bytes per record is constant (see FixedLengthBinaryInputFormat)
+ *
+ * @param path Directory to the input data files
+ * @return An RDD of data with values, RDD[(Array[Byte])]
+ */
+ def binaryRecords(path: String): RDD[Array[Byte]] = {
@mateiz
mateiz Sep 18, 2014 Contributor

IMO we should pass the record length here rather than waiting for users to set it in hadoopConfiguration. You can make a new Configuration in this method and set it. I don't see any use case where the user would prefer to set it in a global configuration object.

@freeman-lab
freeman-lab Sep 18, 2014 Contributor

@mateiz I agree that makes sense. Originally I used the global configuration file so I could call it from PySpark, as in sc.newAPIHadoopRDD('FixedLengthBinaryInputFormat',..., conf={'recordLength': '100'}). But we can just add the new binaryRecords to PySpark directly (probably in a separate PR), and have record length an argument as you suggest.

@kmader
kmader Oct 1, 2014 Contributor

Yes this makes much more sense, I had just copied the code from @freeman-lab, but I made it into a parameter now.

@SparkQA
SparkQA commented Sep 18, 2014

QA tests have finished for PR 1658 at commit 932a206.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer)
    • abstract class StreamBasedRecordReader[T](
    • abstract class BinaryRecordReader[T](
@kmader kmader fixed several scala-style issues, changed structure of binaryFiles, r…
…emoved excessive classes added new tests. The caching tests still have a serialization issue, but that should be easily fixed as well.
238c83c
@SparkQA
SparkQA commented Oct 1, 2014

QA tests have started for PR 1658 at commit 238c83c.

  • This patch merges cleanly.
@SparkQA
SparkQA commented Oct 1, 2014

QA tests have finished for PR 1658 at commit 238c83c.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PortableDataStream(split: CombineFileSplit, context: TaskAttemptContext, index: Integer)
    • abstract class StreamBasedRecordReader[T](
    • abstract class BinaryRecordReader[T](
@kmader kmader Fixed the serialization issue with PortableDataStream since neither C…
…ombineFileSplit nor TaskAttemptContext implement the Serializable interface, by using ByteArrays for storing both and then recreating the objects from these bytearrays as needed.
19812a8
@SparkQA
SparkQA commented Oct 1, 2014

QA tests have started for PR 1658 at commit 19812a8.

  • This patch merges cleanly.
@SparkQA
SparkQA commented Oct 1, 2014

QA tests have finished for PR 1658 at commit 19812a8.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PortableDataStream(@transient isplit: CombineFileSplit, @transient context: TaskAttemptContext, index: Integer)
    • abstract class StreamBasedRecordReader[T](
    • abstract class BinaryRecordReader[T](
@kmader kmader fixing line length and output from FSDataInputStream to DataInputStre…
…am to minimize sensitivity to Hadoop API changes
4163e38
@SparkQA
SparkQA commented Oct 1, 2014

QA tests have started for PR 1658 at commit 4163e38.

  • This patch merges cleanly.
@SparkQA
SparkQA commented Oct 1, 2014

QA tests have finished for PR 1658 at commit 4163e38.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PortableDataStream(@transient isplit: CombineFileSplit,
    • abstract class StreamBasedRecordReader[T](
    • abstract class BinaryRecordReader[T](
@kmader kmader filename check in "binary file input as byte array" test now ignores …
…prefixes and suffixes which might get added by Hadoop
0588737
@SparkQA
SparkQA commented Oct 1, 2014

QA tests have started for PR 1658 at commit 0588737.

  • This patch merges cleanly.
@SparkQA
SparkQA commented Oct 1, 2014

QA tests have finished for PR 1658 at commit 0588737.

  • This patch fails unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PortableDataStream(@transient isplit: CombineFileSplit,
    • abstract class StreamBasedRecordReader[T](
    • abstract class BinaryRecordReader[T](
@SparkQA
SparkQA commented Oct 1, 2014

QA tests have started for PR 1658 at commit c27a8f1.

  • This patch merges cleanly.
@SparkQA
SparkQA commented Oct 1, 2014

QA tests have finished for PR 1658 at commit c27a8f1.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PortableDataStream(@transient isplit: CombineFileSplit,
    • abstract class StreamBasedRecordReader[T](
    • abstract class BinaryRecordReader[T](
@SparkQA
SparkQA commented Oct 2, 2014

QA tests have started for PR 1658 at commit a32fef7.

  • This patch merges cleanly.
@SparkQA
SparkQA commented Oct 2, 2014

QA tests have finished for PR 1658 at commit a32fef7.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PortableDataStream(@transient isplit: CombineFileSplit,
@mateiz mateiz commented on an outdated diff Oct 8, 2014
core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -510,6 +510,53 @@ class SparkContext(config: SparkConf) extends Logging {
minPartitions).setName(path)
}
+
+ /**
+ * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file
+ * (useful for binary data)
+ *
+ *
+ * @param minPartitions A suggestion value of the minimal splitting number for input data.
+ *
+ * @note Care must be taken to close the files afterwards
+ * @note Small files are preferred, large file is also allowable, but may cause bad performance.
+ */
+ @DeveloperApi
@mateiz
mateiz Oct 8, 2014 Contributor

There's no need to have @DeveloperApi on this, as PortableDataStream is something we want to support.

@mateiz mateiz commented on an outdated diff Oct 8, 2014
.../main/scala/org/apache/spark/input/RawFileInput.scala
+
+ val maxSplitSize = Math.ceil(totalLen * 1.0 / files.length).toLong
+ super.setMaxSplitSize(maxSplitSize)
+ }
+
+ def createRecordReader(split: InputSplit, taContext: TaskAttemptContext): RecordReader[String, T]
+
+}
+
+/**
+ * A class that allows DataStreams to be serialized and moved around by not creating them
+ * until they need to be read
+ * @note TaskAttemptContext is not serializable resulting in the confBytes construct
+ * @note CombineFileSplit is not serializable resulting in the splitBytes construct
+ */
+@DeveloperApi
@mateiz
mateiz Oct 8, 2014 Contributor

No need for @DeveloperApi on this, we want to support it. It was only needed on the various input formats.

@mateiz
mateiz Oct 8, 2014 Contributor

Actually, one thing that would help is to make this a trait (i.e. interface in Java), so users can't instantiate it, and then have a private[spark] class PortableDataStreamImpl for the implementation. We don't want to expose the constructor if we can.

@mateiz mateiz commented on an outdated diff Oct 8, 2014
.../main/scala/org/apache/spark/input/RawFileInput.scala
+ open()
+ val innerBuffer = ByteStreams.toByteArray(fileIn)
+ close()
+ innerBuffer
+ }
+
+ /**
+ * close the file (if it is already open)
+ */
+ def close() = {
+ if (isOpen) {
+ try {
+ fileIn.close()
+ isOpen = false
+ } catch {
+ case ioe: java.io.IOException => // do nothing
@mateiz
mateiz Oct 8, 2014 Contributor

set isOpen = false here too, if closing failed. I think you can just set it to false above the try.

@mateiz mateiz commented on an outdated diff Oct 8, 2014
.../main/scala/org/apache/spark/input/RawFileInput.scala
@@ -0,0 +1,219 @@
+/*
@mateiz
mateiz Oct 8, 2014 Contributor

This file contains several different classes none of which is named RawFileInput -- it would be better to move them to separate files, each named after the class. That makes them easier to find later.

@mateiz mateiz commented on an outdated diff Oct 8, 2014
...c/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+/** Allows better control of the partitioning
+ *
+ */
+import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapreduce._
+import org.apache.spark.{InterruptibleIterator, TaskContext, Partition, SparkContext}
+import org.apache.spark.input.StreamFileInputFormat
+
+private[spark] class BinaryFileRDD[T](
+ sc : SparkContext,
+ inputFormatClass: Class[_ <: StreamFileInputFormat[T]],
+ keyClass: Class[String],
+ valueClass: Class[T],
+ @transient conf: Configuration,
+ minPartitions: Int)
@mateiz
mateiz Oct 8, 2014 Contributor

Format is slightly wrong here, the constructor args should only be indented with 4 spaces

@mateiz mateiz and 1 other commented on an outdated diff Oct 8, 2014
...c/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+ keyClass: Class[String],
+ valueClass: Class[T],
+ @transient conf: Configuration,
+ minPartitions: Int)
+ extends NewHadoopRDD[String, T](sc, inputFormatClass, keyClass, valueClass, conf) {
+
+
+ override def getPartitions: Array[Partition] = {
+ val inputFormat = inputFormatClass.newInstance
+ inputFormat match {
+ case configurable: Configurable =>
+ configurable.setConf(conf)
+ case _ =>
+ }
+ val jobContext = newJobContext(conf, jobId)
+ inputFormat.setMaxSplitSize(jobContext, minPartitions)
@mateiz
mateiz Oct 8, 2014 Contributor

Is this actually a max split size? It seems you're passing an int that means something else, but I might've misunderstood

@mateiz
mateiz Oct 28, 2014 Contributor

BTW this comment was important too, what is the meaning of this parameter?

@kmader
kmader Oct 30, 2014 Contributor

Sorry this function was named incorrectly, it ultimately calls setMaxSplitSize after calculating the maximum size based on the number of partitions, I have now renamed it accordingly

@mateiz mateiz commented on the diff Oct 8, 2014
core/src/test/java/org/apache/spark/JavaAPISuite.java
+ byte[] content1 = "spark isn't always easy to use.\n".getBytes("utf-8");
+ int numOfCopies = 10;
+ String tempDirName = tempDir.getAbsolutePath();
+ File file1 = new File(tempDirName + "/part-00000");
+
+ FileOutputStream fos1 = new FileOutputStream(file1);
+
+ FileChannel channel1 = fos1.getChannel();
+
+ for (int i=0;i<numOfCopies;i++) {
+ ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+ channel1.write(bbuf);
+ }
+ channel1.close();
+
+ JavaRDD<byte[]> readRDD = sc.binaryRecords(tempDirName,content1.length);
@mateiz
mateiz Oct 8, 2014 Contributor

Add spaces after commas (affects several places in here)

@mateiz mateiz commented on an outdated diff Oct 8, 2014
core/src/test/scala/org/apache/spark/FileSuite.scala
+ val file = new java.io.FileOutputStream(outFile)
+ val channel = file.getChannel
+ channel.write(bbuf)
+ channel.close()
+ file.close()
+
+ val inRdd = sc.binaryFiles(outFileName).cache()
+ inRdd.foreach{
+ curData: (String, PortableDataStream) =>
+ curData._2.toArray() // force the file to read
+ }
+ val mappedRdd = inRdd.map{
+ curData: (String, PortableDataStream) =>
+ (curData._2.getPath(),curData._2)
+ }
+ val (infile: String, indata: PortableDataStream) = mappedRdd.first
@mateiz
mateiz Oct 8, 2014 Contributor

Use collect instead of first because first might go through a different code path (computing the RDD locally instead of reusing cached data). You can do .collect.head

@mateiz mateiz commented on the diff Oct 8, 2014
core/src/test/scala/org/apache/spark/FileSuite.scala
+ // create file
+ val testOutput = Array[Byte](1,2,3,4,5,6)
+ val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+ // write data to file
+ val file = new java.io.FileOutputStream(outFile)
+ val channel = file.getChannel
+ channel.write(bbuf)
+ channel.close()
+ file.close()
+
+ val inRdd = sc.binaryFiles(outFileName).cache()
+ inRdd.foreach{
+ curData: (String, PortableDataStream) =>
+ curData._2.toArray() // force the file to read
+ }
+ val mappedRdd = inRdd.map{
@mateiz
mateiz Oct 8, 2014 Contributor

Small style issue, always have space before {

@mateiz mateiz and 1 other commented on an outdated diff Oct 8, 2014
...cala/org/apache/spark/api/java/JavaSparkContext.scala
+ * Do
+ * `JavaPairRDD<String,DataInputStream> rdd = sparkContext.binaryFiles("hdfs://a-hdfs-path")`,
+ *
+ * <p> then `rdd` contains
+ * {{{
+ * (a-hdfs-path/part-00000, its content)
+ * (a-hdfs-path/part-00001, its content)
+ * ...
+ * (a-hdfs-path/part-nnnnn, its content)
+ * }}}
+ *
+ * @note Small files are preferred, large file is also allowable, but may cause bad performance.
+ *
+ * @param minPartitions A suggestion value of the minimal splitting number for input data.
+ */
+ def binaryArrays(path: String, minPartitions: Int = defaultMinPartitions):
@mateiz
mateiz Oct 8, 2014 Contributor

Don't add a default value here, it won't be usable from Java

@mateiz
mateiz Oct 8, 2014 Contributor

Also it seems this method is gone in Scala, maybe it needs to be removed in Java?

@kmader
kmader Oct 21, 2014 Contributor

In scala it is easy to add separately, in java you need to create an anonymous class which is more of a hassle

@mateiz
mateiz Oct 28, 2014 Contributor

I'd still remove this. It's confusing to see the API in just one language, and with Java 8, the extra class will be a one-liner.

@mateiz mateiz commented on an outdated diff Oct 8, 2014
...cala/org/apache/spark/api/java/JavaSparkContext.scala
+ * Do
+ * `JavaPairRDD<String, byte[]> rdd = sparkContext.dataStreamFiles("hdfs://a-hdfs-path")`,
+ *
+ * <p> then `rdd` contains
+ * {{{
+ * (a-hdfs-path/part-00000, its content)
+ * (a-hdfs-path/part-00001, its content)
+ * ...
+ * (a-hdfs-path/part-nnnnn, its content)
+ * }}}
+ *
+ * @note Small files are preferred, large file is also allowable, but may cause bad performance.
+ *
+ * @param minPartitions A suggestion value of the minimal splitting number for input data.
+ */
+ def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
@mateiz
mateiz Oct 8, 2014 Contributor

Same here, no default value. Instead add a version of the method with only one argument, and one with two.

@mateiz mateiz commented on the diff Oct 8, 2014
...pache/spark/input/FixedLengthBinaryRecordReader.scala
+ currentPosition = currentPosition + recordLength
+
+ // return true
+ return true
+ }
+
+ false
+ }
+
+ var splitStart: Long = 0L
+ var splitEnd: Long = 0L
+ var currentPosition: Long = 0L
+ var recordLength: Int = 0
+ var fileInputStream: FSDataInputStream = null
+ var recordKey: LongWritable = null
+ var recordValue: BytesWritable = null
@mateiz
mateiz Oct 8, 2014 Contributor

Put these at the top of the class to make it a bit easier to read. Also they can be private.

@mateiz mateiz commented on the diff Oct 8, 2014
core/src/test/scala/org/apache/spark/FileSuite.scala
+
+ val inRdd = sc.binaryFiles(outFileName).cache()
+ inRdd.foreach{
+ curData: (String, PortableDataStream) =>
+ curData._2.toArray() // force the file to read
+ }
+ val mappedRdd = inRdd.map{
+ curData: (String, PortableDataStream) =>
+ (curData._2.getPath(),curData._2)
+ }
+ val (infile: String, indata: PortableDataStream) = mappedRdd.first
+
+ // Try reading the output back as an object file
+
+ assert(indata.toArray === testOutput)
+ }
@mateiz
mateiz Oct 8, 2014 Contributor

Apart from the cache() test, try adding one where we call persist(StorageLevel.DISK_ONLY) to check that these are also stored correctly to disk. And add one where we use Kryo serialization too.

@mateiz
Contributor
mateiz commented Oct 8, 2014

Hey Kevin, sorry for the delay in getting back to this. I just made a few more comments. I think this is getting pretty close, hopefully we can put it in 1.2.

@mateiz mateiz commented on an outdated diff Oct 8, 2014
core/src/test/scala/org/apache/spark/FileSuite.scala
+ val channel = file.getChannel
+ for(i <- 1 to testOutputCopies) {
+ val bbuf = java.nio.ByteBuffer.wrap(testOutput)
+ channel.write(bbuf)
+ }
+ channel.close()
+ file.close()
+
+ val inRdd = sc.binaryRecords(outFileName, testOutput.length)
+ // make sure there are enough elements
+ assert(inRdd.count == testOutputCopies)
+
+ // now just compare the first one
+ val indata: Array[Byte] = inRdd.first
+ assert(indata === testOutput)
+ }
@mateiz
mateiz Oct 8, 2014 Contributor

Add a test where you try to read records with 0 or negative size too, which should raise an exception in the driver program

@mateiz mateiz commented on an outdated diff Oct 8, 2014
core/src/test/java/org/apache/spark/JavaAPISuite.java
+ ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+ channel1.write(bbuf);
+ channel1.close();
+ JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName,3);
+ List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
+ for (Tuple2<String, PortableDataStream> res : result) {
+ Assert.assertArrayEquals(content1, res._2().toArray());
+ }
+ }
+
+ @Test
+ public void binaryFilesCaching() throws Exception {
+ // Reusing the wholeText files example
+ byte[] content1 = "spark is easy to use.\n".getBytes("utf-8");
+
+
@mateiz
mateiz Oct 8, 2014 Contributor

Try not to have 2 blank lines in a row, except between classes if you have multiple classes in one file

@mateiz mateiz commented on an outdated diff Oct 8, 2014
...pache/spark/input/FixedLengthBinaryRecordReader.scala
+
+ // get the record length
+ recordLength = FixedLengthBinaryInputFormat.getRecordLength(context)
+
+ // get the filesystem
+ val fs = file.getFileSystem(job)
+
+ // open the File
+ fileInputStream = fs.open(file)
+
+ // seek to the splitStart position
+ fileInputStream.seek(splitStart)
+
+ // set our current position
+ currentPosition = splitStart
+
@mateiz
mateiz Oct 8, 2014 Contributor

Avoid blank lines at end of methods

@mateiz mateiz commented on the diff Oct 8, 2014
core/src/test/java/org/apache/spark/JavaAPISuite.java
+
+
+ String tempDirName = tempDir.getAbsolutePath();
+ File file1 = new File(tempDirName + "/part-00000");
+
+ FileOutputStream fos1 = new FileOutputStream(file1);
+
+ FileChannel channel1 = fos1.getChannel();
+ ByteBuffer bbuf = java.nio.ByteBuffer.wrap(content1);
+ channel1.write(bbuf);
+ channel1.close();
+ JavaPairRDD<String, PortableDataStream> readRDD = sc.binaryFiles(tempDirName,3);
+ List<Tuple2<String, PortableDataStream>> result = readRDD.collect();
+ for (Tuple2<String, PortableDataStream> res : result) {
+ Assert.assertArrayEquals(content1, res._2().toArray());
+ }
@mateiz
mateiz Oct 8, 2014 Contributor

Test the versions of the methods that take only one argument too (the main purpose of the Java API suite is to make sure all these methods are callable from Java, this is why it's useful to add calls to them even if they're not doing much)

@mateiz mateiz commented on an outdated diff Oct 8, 2014
...c/main/scala/org/apache/spark/rdd/BinaryFileRDD.scala
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+/** Allows better control of the partitioning
+ *
@mateiz
mateiz Oct 8, 2014 Contributor

This comment seems unrelated, why is it up here?

@SparkQA
SparkQA commented Oct 20, 2014

QA tests have started for PR 1658 at commit 92bda0d.

  • This patch merges cleanly.
@SparkQA
SparkQA commented Oct 20, 2014

QA tests have finished for PR 1658 at commit 92bda0d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@kmader
Contributor
kmader commented Oct 20, 2014

So I made the requested changes and added a few more tests, but the tests appear to have not run for a strange reason: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21922/console, the build runs out of memory with Maven but works fine in IntelliJ but I do not get any feedback on the style is there any single maven phase I can run to get that?

@mateiz
Contributor
mateiz commented Oct 21, 2014

There might've been some Jenkins issues recently; going to restart it.

@mateiz
Contributor
mateiz commented Oct 21, 2014

BTW for the style, you can do "sbt/sbt scalastyle" locally if you want. Not sure there's a command in Maven.

@mateiz
Contributor
mateiz commented Oct 21, 2014

Jenkins, retest this please

@SparkQA
SparkQA commented Oct 21, 2014

QA tests have started for PR 1658 at commit 92bda0d.

  • This patch merges cleanly.
@SparkQA
SparkQA commented Oct 21, 2014

QA tests have finished for PR 1658 at commit 92bda0d.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA
SparkQA commented Oct 21, 2014

QA tests have started for PR 1658 at commit 8ac288b.

  • This patch merges cleanly.
@SparkQA
SparkQA commented Oct 21, 2014

QA tests have finished for PR 1658 at commit 8ac288b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PortableDataStream(@transient isplit: CombineFileSplit,
@SparkQA
SparkQA commented Oct 21, 2014

QA tests have started for PR 1658 at commit 6379be4.

  • This patch merges cleanly.
@SparkQA
SparkQA commented Oct 21, 2014

QA tests have finished for PR 1658 at commit 6379be4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PortableDataStream(@transient isplit: CombineFileSplit,
@mateiz
Contributor
mateiz commented Oct 28, 2014

Thanks for the update, Kevin. Note that there are still a few comments from me on https://github.com/apache/spark/pull/1658/files, do you mind dealing with those?

@mateiz
Contributor
mateiz commented Oct 29, 2014

@kmader btw if you don't have time to deal with these comments, let me know; I might be able to take the patch from where it is and implement them.

@mateiz
Contributor
mateiz commented Oct 30, 2014

Thanks for the update, Kevin. Looks like Jenkins had some issues with git, will retry it.

@mateiz
Contributor
mateiz commented Oct 30, 2014

Jenkins, retest this please

@SparkQA
SparkQA commented Oct 30, 2014

Test build #22503 has finished for PR 1658 at commit 3c49a30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@SparkQA
SparkQA commented Oct 30, 2014

Test build #22505 has finished for PR 1658 at commit 3c49a30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PortableDataStream(@transient isplit: CombineFileSplit,
@asfgit asfgit pushed a commit that closed this pull request Nov 1, 2014
@kmader @mateiz kmader + mateiz [SPARK-2759][CORE] Generic Binary File Support in Spark
The additions add the abstract BinaryFileInputFormat and BinaryRecordReader classes for reading in data as a byte stream and converting it to another format using the ```def parseByteArray(inArray: Array[Byte]): T``` function.
As a trivial example ```ByteInputFormat``` and ```ByteRecordReader``` are included which just return the Array[Byte] from a given file.
Finally a RDD for ```BinaryFileInputFormat``` (to allow for easier partitioning changes as was done for WholeFileInput) was added and the appropriate byteFiles to the ```SparkContext``` so the functions can be easily used by others.
A common use case might be to read in a folder
```
sc.byteFiles("s3://mydrive/tif/*.tif").map(rawData => ReadTiffFromByteArray(rawData))
```

Author: Kevin Mader <kevinmader@gmail.com>
Author: Kevin Mader <kmader@users.noreply.github.com>

Closes #1658 from kmader/master and squashes the following commits:

3c49a30 [Kevin Mader] fixing wholetextfileinput to it has the same setMinPartitions function as in BinaryData files
359a096 [Kevin Mader] making the final corrections suggested by @mateiz and renaming a few functions to make their usage clearer
6379be4 [Kevin Mader] reorganizing code
7b9d181 [Kevin Mader] removing developer API, cleaning up imports
8ac288b [Kevin Mader] fixed a single slightly over 100 character line
92bda0d [Kevin Mader] added new tests, renamed files, fixed several of the javaapi functions, formatted code more nicely
a32fef7 [Kevin Mader] removed unneeded classes added DeveloperApi note to portabledatastreams since the implementation might change
49174d9 [Kevin Mader] removed unneeded classes added DeveloperApi note to portabledatastreams since the implementation might change
c27a8f1 [Kevin Mader] jenkins crashed before running anything last time, so making minor change
b348ce1 [Kevin Mader] fixed order in check (prefix only appears on jenkins not when I run unit tests locally)
0588737 [Kevin Mader] filename check in "binary file input as byte array" test now ignores prefixes and suffixes which might get added by Hadoop
4163e38 [Kevin Mader] fixing line length and output from FSDataInputStream to DataInputStream to minimize sensitivity to Hadoop API changes
19812a8 [Kevin Mader] Fixed the serialization issue with PortableDataStream since neither CombineFileSplit nor TaskAttemptContext implement the Serializable interface, by using ByteArrays for storing both and then recreating the objects from these bytearrays as needed.
238c83c [Kevin Mader] fixed several scala-style issues, changed structure of binaryFiles, removed excessive classes added new tests. The caching tests still have a serialization issue, but that should be easily fixed as well.
932a206 [Kevin Mader] Update RawFileInput.scala
a01c9cf [Kevin Mader] Update RawFileInput.scala
441f79a [Kevin Mader] fixed a few small comments and dependency
12e7be1 [Kevin Mader] removing imglib from maven (definitely not ready yet)
5deb79e [Kevin Mader] added new portabledatastream to code so that it can be serialized correctly
f032bc0 [Kevin Mader] fixed bug in path name, renamed tests
bc5c0b9 [Kevin Mader] made minor stylistic adjustments from mateiz
df8e528 [Kevin Mader] fixed line lengths and changed java test
9a313d5 [Kevin Mader] making classes that needn't be public private, adding automatic file closure, adding new tests
edf5829 [Kevin Mader] fixing line lengths, adding new lines
f4841dc [Kevin Mader] un-optimizing imports, silly intellij
eacfaa6 [Kevin Mader] Added FixedLengthBinaryInputFormat and RecordReader from freeman-lab and added them to both the JavaSparkContext and the SparkContext as fixedLengthBinaryFile
1622935 [Kevin Mader] changing the line lengths to make jenkins happy
1cfa38a [Kevin Mader] added apache headers, added datainputstream directly as an output option for more complicated readers (HDF5 perhaps), and renamed several of the functions and files to be more consistent. Also added parallel functions to the java api
84035f1 [Kevin Mader] adding binary and byte file support spark
81c5f12 [Kevin Mader] Merge pull request #1 from apache/master
7136719
@asfgit asfgit closed this in 7136719 Nov 1, 2014
@mateiz
Contributor
mateiz commented Nov 1, 2014

Thanks @kmader, I merged this now. I manually amended the patch a bit to fix style issues (there were still a bunch of commas without spaces, etc), and I also changed the name of the recordLength property in Hadoop JobConfs to start with org.apache.spark so that it's less likely to clash with other Hadoop properties. Finally I marked this API as @Experimental for now since it's new in this release, though we can probably make it non-experimental in 1.3.

@dgshep dgshep pushed a commit to dgshep/spark that referenced this pull request Dec 8, 2014
@kmader kmader + Davis Shepherd [SPARK-2759][CORE] Generic Binary File Support in Spark
The additions add the abstract BinaryFileInputFormat and BinaryRecordReader classes for reading in data as a byte stream and converting it to another format using the ```def parseByteArray(inArray: Array[Byte]): T``` function.
As a trivial example ```ByteInputFormat``` and ```ByteRecordReader``` are included which just return the Array[Byte] from a given file.
Finally a RDD for ```BinaryFileInputFormat``` (to allow for easier partitioning changes as was done for WholeFileInput) was added and the appropriate byteFiles to the ```SparkContext``` so the functions can be easily used by others.
A common use case might be to read in a folder
```
sc.byteFiles("s3://mydrive/tif/*.tif").map(rawData => ReadTiffFromByteArray(rawData))
```

Author: Kevin Mader <kevinmader@gmail.com>
Author: Kevin Mader <kmader@users.noreply.github.com>

Closes #1658 from kmader/master and squashes the following commits:

3c49a30 [Kevin Mader] fixing wholetextfileinput to it has the same setMinPartitions function as in BinaryData files
359a096 [Kevin Mader] making the final corrections suggested by @mateiz and renaming a few functions to make their usage clearer
6379be4 [Kevin Mader] reorganizing code
7b9d181 [Kevin Mader] removing developer API, cleaning up imports
8ac288b [Kevin Mader] fixed a single slightly over 100 character line
92bda0d [Kevin Mader] added new tests, renamed files, fixed several of the javaapi functions, formatted code more nicely
a32fef7 [Kevin Mader] removed unneeded classes added DeveloperApi note to portabledatastreams since the implementation might change
49174d9 [Kevin Mader] removed unneeded classes added DeveloperApi note to portabledatastreams since the implementation might change
c27a8f1 [Kevin Mader] jenkins crashed before running anything last time, so making minor change
b348ce1 [Kevin Mader] fixed order in check (prefix only appears on jenkins not when I run unit tests locally)
0588737 [Kevin Mader] filename check in "binary file input as byte array" test now ignores prefixes and suffixes which might get added by Hadoop
4163e38 [Kevin Mader] fixing line length and output from FSDataInputStream to DataInputStream to minimize sensitivity to Hadoop API changes
19812a8 [Kevin Mader] Fixed the serialization issue with PortableDataStream since neither CombineFileSplit nor TaskAttemptContext implement the Serializable interface, by using ByteArrays for storing both and then recreating the objects from these bytearrays as needed.
238c83c [Kevin Mader] fixed several scala-style issues, changed structure of binaryFiles, removed excessive classes added new tests. The caching tests still have a serialization issue, but that should be easily fixed as well.
932a206 [Kevin Mader] Update RawFileInput.scala
a01c9cf [Kevin Mader] Update RawFileInput.scala
441f79a [Kevin Mader] fixed a few small comments and dependency
12e7be1 [Kevin Mader] removing imglib from maven (definitely not ready yet)
5deb79e [Kevin Mader] added new portabledatastream to code so that it can be serialized correctly
f032bc0 [Kevin Mader] fixed bug in path name, renamed tests
bc5c0b9 [Kevin Mader] made minor stylistic adjustments from mateiz
df8e528 [Kevin Mader] fixed line lengths and changed java test
9a313d5 [Kevin Mader] making classes that needn't be public private, adding automatic file closure, adding new tests
edf5829 [Kevin Mader] fixing line lengths, adding new lines
f4841dc [Kevin Mader] un-optimizing imports, silly intellij
eacfaa6 [Kevin Mader] Added FixedLengthBinaryInputFormat and RecordReader from freeman-lab and added them to both the JavaSparkContext and the SparkContext as fixedLengthBinaryFile
1622935 [Kevin Mader] changing the line lengths to make jenkins happy
1cfa38a [Kevin Mader] added apache headers, added datainputstream directly as an output option for more complicated readers (HDF5 perhaps), and renamed several of the functions and files to be more consistent. Also added parallel functions to the java api
84035f1 [Kevin Mader] adding binary and byte file support spark
81c5f12 [Kevin Mader] Merge pull request #1 from apache/master
6895ecf
@JoshRosen JoshRosen commented on the diff Dec 26, 2014
core/src/main/scala/org/apache/spark/SparkContext.scala
+ * with the specified numerical format (see ByteBuffer), and the number of
+ * bytes per record is constant (see FixedLengthBinaryInputFormat)
+ *
+ * @param path Directory to the input data files
+ * @param recordLength The length at which to split the records
+ * @return An RDD of data with values, RDD[(Array[Byte])]
+ */
+ def binaryRecords(path: String, recordLength: Int,
+ conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = {
+ conf.setInt("recordLength",recordLength)
+ val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path,
+ classOf[FixedLengthBinaryInputFormat],
+ classOf[LongWritable],
+ classOf[BytesWritable],
+ conf=conf)
+ val data = br.map{ case (k, v) => v.getBytes}
@JoshRosen
JoshRosen Dec 26, 2014 Member

It turns out that getBytes returns a padded byte array, so I think you may need to copy / slice out the subarray with the data using v.getLength; see HADOOP-6298: "BytesWritable#getBytes is a bad name that leads to programming mistakes" for more details.

Using getBytes without getLength has caused bugs in Spark in the past: #2712.

Is the use of getBytes in this patch a bug? Or is it somehow safe due to our use of FixedLengthBinaryInputFormat? If it is somehow safe, we should have a comment which explains this so that readers who know about the getBytes issue aren't confused (or better yet, an assert that getBytes returns an array of the expected length).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment