Skip to content

Commit

Permalink
Messing around with hadoop file system uri resolution...
Browse files Browse the repository at this point in the history
  • Loading branch information
Aklakan committed Mar 1, 2021
1 parent 416e14d commit dcc24d4
Showing 1 changed file with 23 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package net.sansa_stack.spark.cli.cmd.impl

import java.net.URI

import com.google.common.base.Stopwatch
import net.sansa_stack.rdf.spark.model.rdd.RddOfDatasetOps
import net.sansa_stack.spark.cli.cmd.{CmdSansaTrigMerge, CmdSansaTrigQuery}
import org.aksw.commons.io.util.StdIo
import org.aksw.commons.io.util.{StdIo, UriToPathUtils, UriUtils}
import org.aksw.jena_sparql_api.rx.RDFLanguagesEx
import org.aksw.jena_sparql_api.utils.io.{StreamRDFDeferred, WriterStreamRDFBaseUtils}
import org.apache.jena.query.Dataset
Expand Down Expand Up @@ -70,19 +72,33 @@ object CmdSansaTrigMergeImpl {
.config("spark.sql.crossJoin.enabled", true)
.getOrCreate()

val fileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)
StreamManager.get().addLocator(new LocatorHdfs(fileSystem));
// StreamManager.get().addLocator(new LocatorHdfs(fileSystem))

val hadoopConf = spark.sparkContext.hadoopConfiguration

val paths = cmd.trigFiles.asScala
.map(new Path(_))
.map(str => {
val path = new Path(str)
val fs = UriUtils.tryNewURI(str).map(
FileSystem.get(_, hadoopConf))
.orElseThrow(() => new RuntimeException("Could not parse as URI: " + str))

val r = fs.resolvePath(path)
(fs, r)
})
.filter(e => e._1.isFile(e._2))
.map(_._2)
.toList

/*
val validPaths = paths
.filter(fileSystem.exists(_))
.filter(_.getFileSystem(hadoopConf).get)
.filter(!fileSystem.isFile(_))
.toSet
*/
val validPathSet = paths.toSet

val invalidPaths = paths.toSet.diff(validPaths)
val invalidPaths = paths.toSet.diff(validPathSet)
if (!invalidPaths.isEmpty) {
throw new IllegalArgumentException("The following paths are invalid (do not exist or are not a (readable) file): " + invalidPaths)
}
Expand All @@ -91,7 +107,7 @@ object CmdSansaTrigMergeImpl {
import net.sansa_stack.rdf.spark.io._

val initialRdd: RDD[Dataset] = spark.sparkContext.union(
validPaths
validPathSet
.map(path => spark.datasets(Lang.TRIG)(path.toString)).toSeq)


Expand Down

0 comments on commit dcc24d4

Please sign in to comment.