Skip to content

Commit

Permalink
Merge pull request #66 from SANSA-Stack/bugfix/spark/rdfxml-dataframe
Browse files Browse the repository at this point in the history
Fixes #33
  • Loading branch information
GezimSejdiu committed Jul 18, 2018
2 parents 4b1e3ab + b53d107 commit 413fe98
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ class JenaParser(val options: RdfXmlOptions) {
recordLiteral: T => UTF8String)
: Seq[InternalRow] = {
try {
println(recordLiteral(record))
val triples = new CollectorStreamRDF()
createParser(parserBuilder, record).parse(triples)
triples.getTriples.asScala.map(convertTriple(_))
triples.getTriples.asScala.map(convertTriple)
} catch {
case e@(_: RuntimeException | _: RiotException) =>
throw BadRecordException(() => recordLiteral(record), () => None, e)
Expand All @@ -30,7 +29,9 @@ class JenaParser(val options: RdfXmlOptions) {

def convertTriple(triple: org.apache.jena.graph.Triple): InternalRow = {
val row = new GenericInternalRow(3)
row.update(0, triple.getSubject)
row.update(0, UTF8String.fromBytes(triple.getSubject.toString.getBytes()))
row.update(1, UTF8String.fromBytes(triple.getPredicate.toString.getBytes()))
row.update(2, UTF8String.fromBytes(triple.getObject.toString.getBytes()))
row
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ object RdfXmlDataSource {
if (options.wholeFile) {
WholeFileRdfXmlDataSource
} else {
TextInputRdfXmlDataSource
WholeFileRdfXmlDataSource
// TextInputRdfXmlDataSource
}
}

Expand All @@ -87,7 +88,7 @@ object RdfXmlDataSource {
object TextInputRdfXmlDataSource extends RdfXmlDataSource[Text] {
override val isSplitable: Boolean = {
// splittable if the underlying source is
true
false
}

override protected def createBaseRdd(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,17 @@ import org.apache.spark.sql.types.{StringType, StructField, StructType}
class RdfXmlFileFormat extends TextBasedFileFormat with DataSourceRegister {
override val shortName: String = "rdfxml"

override def isSplitable(
sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = {
override def isSplitable(sparkSession: SparkSession,
options: Map[String, String],
path: Path): Boolean = {

val parsedOptions = new RdfXmlOptions(
options,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)

val rdfDataSource = RdfXmlDataSource(parsedOptions)

rdfDataSource.isSplitable && super.isSplitable(sparkSession, options, path)
}

Expand Down

0 comments on commit 413fe98

Please sign in to comment.