Skip to content

Commit

Permalink
Merge pull request #55 from astrolabsoftware/dag-scheduler-event-loop…
Browse files Browse the repository at this point in the history
…-Fix

Issues/54: on the multifile problem in spark-fits
  • Loading branch information
JulienPeloton committed Oct 19, 2018
2 parents dff168a + 6789c37 commit 2726caa
Showing 1 changed file with 34 additions and 66 deletions.
100 changes: 34 additions & 66 deletions src/main/scala/com/astrolabsoftware/sparkfits/FitsSourceRelation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,6 @@ import com.astrolabsoftware.sparkfits.FitsFileInputFormat._
class FitsRelation(parameters: Map[String, String], userSchema: Option[StructType])(@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan {

// override def toString: String = "FITS"
//
// override def hashCode(): Int = getClass.hashCode()
//
// override def equals(other: Any): Boolean = other.isInstanceOf[FitsFileInputFormat]

// Level of verbosity
var verbosity : Boolean = false

Expand Down Expand Up @@ -203,6 +197,9 @@ class FitsRelation(parameters: Map[String, String], userSchema: Option[StructTyp
* the same. Throw an AssertionError otherwise.
* The check is performed only for BINTABLE.
*
* NOTE: This operation is very long for many files! Do not use it for
* hundreds of files!
*
* @param listOfFitsFiles : (List[String])
* List of files as a list of String.
* @return (String) the type of HDU: BINTABLE, IMAGE, EMPTY, or
Expand Down Expand Up @@ -253,23 +250,23 @@ class FitsRelation(parameters: Map[String, String], userSchema: Option[StructTyp
/**
* Create a RDD[Row] from the data of one HDU.
* The input can be either the path to one FITS file (path + filename),
* or the path to a directory containing FITS files. In the latter,
* the code will load all FITS files listed inside this directory
* and make the union of the HDU data. Needless to say that the FITS
* files must have the same structure, otherwise the union will be impossible.
* or the path to a directory containing FITS files or a glob on a
* directory (*.fits). Needless to say that the FITS files must
* have the same structure, otherwise the union will be impossible.
* The format of the input must be a String with Hadoop format
* - (local) file://path/to/data
* - (HDFS) hdfs://<IP>:<PORT>//path/to/data
*
*
* If the HDU type is not "implemented", return an empty RDD[Row].
*
* NOTE: Schema check needs to be fixed!
*
* @param fn : (String)
* Filename of the fits file to be read, or a directory containing FITS files
* with the same HDU structure.
* @return (RDD[Row]) always one single RDD made from the HDU of
* one FITS file, or from the same kind of HDU from several FITS file.
* Empty if the HDU type is not a BINTABLE.
* Empty if the HDU type is not a BINTABLE or IMAGE.
*
*/
def load(fn : String): RDD[Row] = {
Expand All @@ -278,7 +275,14 @@ class FitsRelation(parameters: Map[String, String], userSchema: Option[StructTyp

// Check that all the files have the same Schema
// in order to perform the union. Return the HDU type.
val implemented = checkSchemaAndReturnType(listOfFitsFiles)
// NOTE: This operation is very long for hundreds of files!
// NOTE: Limit that to the first 10 files.
// NOTE: Need to be fixed!
val implemented = if (listOfFitsFiles.size < 10) {
checkSchemaAndReturnType(listOfFitsFiles)
} else{
checkSchemaAndReturnType(listOfFitsFiles.slice(0, 10))
}

// Load one or all the FITS files found
load(listOfFitsFiles, implemented)
Expand All @@ -287,77 +291,41 @@ class FitsRelation(parameters: Map[String, String], userSchema: Option[StructTyp
/**
* Load the HDU data from several FITS file into a single RDD[Row].
* The structure of the HDU must be the same, that is contain the
* same number of columns with the same name and element types.
* same number of columns with the same name and element types. Note that
* we pass the list of all the files to newAPIHadoopFile directly, and
* Spark (Hadoop) does the union on its own. So powerful...
*
* If the HDU type is not "implemented", return an empty RDD[Row].
*
* @param fns : (List[String])
* List of filenames with the same structure.
* @return (RDD[Row]) always one single RDD[Row] made from the HDU of
* one FITS file, or from the same kind of HDU from several FITS file.
* Empty if the HDU type is not a BINTABLE.
* Empty if the HDU type is not a BINTABLE or IMAGE.
*
*/
def load(fns : List[String], implemented: Boolean): RDD[Row] = {

// Number of files
val nFiles = fns.size
if (verbosity) {
// Check number of files
val nFiles = fns.size
println("NFILES: ", nFiles)
}

// Initialise
var rdd = if (implemented) {
loadOneHDU(fns(0))
val rdd = if (implemented) {
// Distribute the table data
sqlContext.sparkContext.newAPIHadoopFile(fns.mkString(","),
classOf[FitsFileInputFormat],
classOf[LongWritable],
classOf[Seq[Row]],
conf).flatMap(x => x._2)
} else {
// If HDU not implemented, return an empty RDD
loadOneEmpty
}

// Union if more than one file
for ((file, index) <- fns.slice(1, nFiles).zipWithIndex) {
rdd = if (implemented) {
rdd.union(loadOneHDU(file))
} else {
rdd.union(loadOneEmpty)
}
}
rdd
}

/** Load a xxx FITS data contained in one HDU as a RDD[Row].
*
* @param fn : (String)
* Path + filename of the fits file to be read.
* @return : RDD[Row] made from one single HDU.
*/
def loadOneHDU(fn : String): RDD[Row] = {

// Open one file
val path = new Path(fn)
val indexHDU = conf.get("hdu").toInt
val fits = new Fits(path, conf, indexHDU)

// Register header and block boundaries in the Hadoop configuration
fits.registerHeader
fits.blockBoundaries.register(path, conf)

// Check the header if needed
if (verbosity) {
println(s"+------ FILE $fn ------+")
println(s"+------ HEADER (HDU=$indexHDU) ------+")
fits.blockHeader.foreach(println)
println("+----------------------------+")
}

// We do not need the data on the driver at this point.
// The executors will re-open it later on.
fits.data.close()

// Distribute the table data
sqlContext.sparkContext.newAPIHadoopFile(fn,
classOf[FitsFileInputFormat],
classOf[LongWritable],
classOf[Seq[Row]],
conf).flatMap(x => x._2)
}

/**
* Return an empty RDD of Row.
*
Expand Down

0 comments on commit 2726caa

Please sign in to comment.