Skip to content

Commit

Permalink
Expose parser errors
Browse files Browse the repository at this point in the history
    CIMInputFormat is created on each worker node via classForName() or similar.
    To pass the ch.ninecode.cim.debug flag to each worker this requires a duplicate CIMInputFormatDebug with a fixed debug=true.
    Error messages about unknown CIM contents (coverage) are now emitted when ch.ninecode.cim.debug=true.
    Not setting ch.ninecode.cim.debug or setting it to false, now skips the CIMContext coverage step entirely.
    Unfortunately these coverage messages are only available on each worker separately in the stderr log.
    A suitablly configured log4j.properties on the driver and each worker is still required.
  • Loading branch information
Derrick Oswald committed Sep 22, 2020
1 parent 408a3d8 commit dad4edb
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 24 deletions.
2 changes: 2 additions & 0 deletions CIMReader/README.md
Expand Up @@ -233,6 +233,8 @@ The expression "spark.read.cim" in the above example is shorthand for the full D

```scala
val element = spark.read.format ("ch.ninecode.cim").options (opts).load (file1, file2, ...)
or
val elements = spark.read.options (opts).cim (file1, file2, ...)
```

where:
Expand Down
14 changes: 2 additions & 12 deletions CIMReader/src/main/scala/ch/ninecode/cim/CIMInputFormat.scala
@@ -1,17 +1,15 @@
package ch.ninecode.cim

import org.apache.commons.logging.{Log, LogFactory}
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat

import ch.ninecode.model.Element

class CIMInputFormat extends FileInputFormat[String, Element]
{
val log: Log = LogFactory.getLog (classOf [CIMInputFormat])

// /**
// * A factory that makes the split for this class. It can be overridden
// * by sub-classes to make sub-types
Expand All @@ -35,15 +33,10 @@ class CIMInputFormat extends FileInputFormat[String, Element]
*/
override def getSplits (context: JobContext): java.util.List[InputSplit] =
{
log.info ("getSplits")

// Note: we don't need to override this yet,
// but the JobContext has the Credentials (@see org.apache.hadoop.mapreduce.JobContext.getCredentials())
// which would be checked here.
val ret = super.getSplits (context)

log.info (s"getSplits: ${ret.size ()} splits returned")
ret
super.getSplits (context)
}

/**
Expand All @@ -55,9 +48,6 @@ class CIMInputFormat extends FileInputFormat[String, Element]
*/
def createRecordReader (split: InputSplit, context: TaskAttemptContext): RecordReader[String, Element] =
{
log.info ("createRecordReader")
log.info (s"split: ${split.toString}")
log.info (s"context: ${context.getTaskAttemptID.toString}")
new CIMRecordReader ()
}
}
48 changes: 48 additions & 0 deletions CIMReader/src/main/scala/ch/ninecode/cim/CIMInputFormatDebug.scala
@@ -0,0 +1,48 @@
package ch.ninecode.cim

import org.apache.commons.logging.{Log, LogFactory}
import org.apache.hadoop.mapreduce.InputSplit
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.mapreduce.RecordReader
import org.apache.hadoop.mapreduce.TaskAttemptContext
import ch.ninecode.model.Element

class CIMInputFormatDebug extends CIMInputFormat
{
val log: Log = LogFactory.getLog (getClass)

/**
* Generate the list of files and make them into FileSplits.
*
* Logically split the set of input files for the job.
* Each InputSplit is then assigned to an individual Mapper for processing.
* Note: The split is a logical split of the inputs and the input files are not physically split into chunks.
* For e.g. a split could be <input-file-path, start, offset> tuple.
* The InputFormat also creates the RecordReader to consume the InputSplit objects produced.
*
* @param context the job context
* @return the list of FileSplit objects for the job
*/
override def getSplits (context: JobContext): java.util.List[InputSplit] =
{
log.info ("getSplits")
val ret = super.getSplits (context)
log.info (s"getSplits: ${ret.size ()} splits returned")
ret
}

/**
* Create a record reader for a given split.
*
* @param split the split to be read
* @param context the information about the task
* @return a new record reader
*/
override def createRecordReader (split: InputSplit, context: TaskAttemptContext): RecordReader[String, Element] =
{
log.info ("createRecordReader")
log.info (s"split: ${split.toString}")
log.info (s"context: ${context.getTaskAttemptID.toString}")
new CIMRecordReader (true)
}
}
24 changes: 17 additions & 7 deletions CIMReader/src/main/scala/ch/ninecode/cim/CIMRecordReader.scala
Expand Up @@ -9,16 +9,19 @@ import org.apache.hadoop.mapreduce.lib.input.FileSplit

import ch.ninecode.model.Element

class CIMRecordReader extends RecordReader[String, Element]
class CIMRecordReader (debug: Boolean = false) extends RecordReader[String, Element]
{
val log: Log = LogFactory.getLog (classOf [CIMRecordReader])
val log: Log = LogFactory.getLog (getClass)
var cim: CHIM = _

def initialize (genericSplit: InputSplit, context: TaskAttemptContext): Unit =
{
log.info ("initialize")
log.info (s"genericSplit: ${genericSplit.toString}")
log.info (s"context: ${context.getTaskAttemptID.toString}")
if (debug)
{
log.info ("initialize")
log.info (s"genericSplit: ${genericSplit.toString}")
log.info (s"context: ${context.getTaskAttemptID.toString}")
}
val job = context.getConfiguration
val split = genericSplit.asInstanceOf [FileSplit]
val start = split.getStart
Expand Down Expand Up @@ -67,13 +70,20 @@ class CIMRecordReader extends RecordReader[String, Element]
// ToDo: using first here is approximate,
// the real character count would require reading the complete file
// from 0 to (start + first) and converting to characters
log.debug (s"XML text starting at byte offset ${start + first} of length $len characters begins with: ${xml.substring (0, 120)}")
if (debug)
log.debug (s"XML text starting at byte offset ${start + first} of length $len characters begins with: ${xml.substring (0, 120)}")
CIMContext.DEBUG = debug
cim = new CHIM (xml, first, first + len, start, start + bytes)
}

def close (): Unit =
{
log.info ("close")
if (debug)
{
log.info ("close")
for (error <- cim.context.errors)
log.error (error)
}
cim = null
}

Expand Down
17 changes: 12 additions & 5 deletions CIMReader/src/main/scala/ch/ninecode/cim/CIMRelation.scala
Expand Up @@ -203,11 +203,18 @@ class CIMRelation (
configuration.set (FileInputFormat.INPUT_DIR, path)
configuration.setLong (FileInputFormat.SPLIT_MAXSIZE, _SplitSize)

var rdd = spark.sparkContext.newAPIHadoopRDD (
configuration,
classOf [CIMInputFormat],
classOf [String],
classOf [Element]).values
var rdd = if (_Debug)
spark.sparkContext.newAPIHadoopRDD (
configuration,
classOf [CIMInputFormatDebug],
classOf [String],
classOf [Element]).values
else
spark.sparkContext.newAPIHadoopRDD (
configuration,
classOf [CIMInputFormat],
classOf [String],
classOf [Element]).values

put (rdd, "Elements", true)
ret = rdd.asInstanceOf [RDD[Row]]
Expand Down

0 comments on commit dad4edb

Please sign in to comment.