Skip to content

Commit

Permalink
Merge branch 'master' of github.com:derrickoswald/CIMSpark
Browse files Browse the repository at this point in the history
  • Loading branch information
Derrick Oswald committed Oct 29, 2020
2 parents bbb3f43 + 2bf9297 commit d287c89
Show file tree
Hide file tree
Showing 170 changed files with 101,310 additions and 91,677 deletions.
2 changes: 1 addition & 1 deletion .editorconfig
Expand Up @@ -397,7 +397,7 @@ ij_scala_space_before_if_parentheses = true
ij_scala_space_before_infix_like_method_parentheses = false
ij_scala_space_before_infix_method_call_parentheses = false
ij_scala_space_before_infix_operator_like_method_call_parentheses = true
ij_scala_space_before_method_call_parentheses = true
ij_scala_space_before_method_call_parentheses = false
ij_scala_space_before_method_left_brace = true
ij_scala_space_before_method_parentheses = true
ij_scala_space_before_type_colon = false
Expand Down
452 changes: 226 additions & 226 deletions CIMExport/src/main/scala/ch/ninecode/cim/CIMExport.scala

Large diffs are not rendered by default.

92 changes: 46 additions & 46 deletions CIMExport/src/main/scala/ch/ninecode/cim/CIMExportMain.scala
Expand Up @@ -22,35 +22,35 @@ object CIMExportMain
{
val properties: Properties =
{
val in = this.getClass.getResourceAsStream ("/application.properties")
val p = new Properties ()
p.load (in)
in.close ()
val in = this.getClass.getResourceAsStream("/application.properties")
val p = new Properties()
p.load(in)
in.close()
p
}
val APPLICATION_NAME: String = properties.getProperty ("artifactId")
val APPLICATION_VERSION: String = properties.getProperty ("version")
val SPARK: String = properties.getProperty ("spark")
val APPLICATION_NAME: String = properties.getProperty("artifactId")
val APPLICATION_VERSION: String = properties.getProperty("version")
val SPARK: String = properties.getProperty("spark")

def jarForObject (obj: Object): String =
{
// see https://stackoverflow.com/questions/320542/how-to-get-the-path-of-a-running-jar-file
var ret = obj.getClass.getProtectionDomain.getCodeSource.getLocation.getPath
try
{
ret = URLDecoder.decode (ret, "UTF-8")
ret = URLDecoder.decode(ret, "UTF-8")
}
catch
{
case e: UnsupportedEncodingException => e.printStackTrace ()
case e: UnsupportedEncodingException => e.printStackTrace()
}
if (!ret.toLowerCase ().endsWith (".jar"))
if (!ret.toLowerCase().endsWith(".jar"))
{
// as an aid to debugging, make jar in tmp and pass that name
val name = s"/tmp/${Random.nextInt (99999999)}.jar"
val writer = new Jar (new scala.reflect.io.File (new java.io.File (name))).jarWriter ()
writer.addDirectory (new scala.reflect.io.Directory (new java.io.File (s"${ret}ch/")), "ch/")
writer.close ()
val name = s"/tmp/${Random.nextInt(99999999)}.jar"
val writer = new Jar(new scala.reflect.io.File(new java.io.File(name))).jarWriter()
writer.addDirectory(new scala.reflect.io.Directory(new java.io.File(s"${ret}ch/")), "ch/")
writer.close()
ret = name
}

Expand All @@ -64,70 +64,70 @@ object CIMExportMain
*/
def main (args: Array[String])
{
val optionparser = new CIMExportOptionsParser (APPLICATION_NAME, APPLICATION_VERSION)
val optionparser = new CIMExportOptionsParser(APPLICATION_NAME, APPLICATION_VERSION)

optionparser.parse (args, CIMExportOptions ()) match
optionparser.parse(args, CIMExportOptions()) match
{
case Some (options) =>
case Some(options) =>
if (options.valid)
{
if (options.loglevel != LogLevels.OFF)
{
org.apache.log4j.LogManager.getLogger ("ch.ninecode.cim.CIMExport").setLevel (LogLevels.toLog4j (options.loglevel))
org.apache.log4j.LogManager.getLogger ("ch.ninecode.cim.CIMNetworkTopologyProcessor").setLevel (LogLevels.toLog4j (options.loglevel))
org.apache.log4j.LogManager.getLogger("ch.ninecode.cim.CIMExport").setLevel(LogLevels.toLog4j(options.loglevel))
org.apache.log4j.LogManager.getLogger("ch.ninecode.cim.CIMNetworkTopologyProcessor").setLevel(LogLevels.toLog4j(options.loglevel))
}
val log = LoggerFactory.getLogger (this.getClass)
val log = LoggerFactory.getLogger(this.getClass)

val configuration = new SparkConf ()
configuration.setAppName (APPLICATION_NAME)
val configuration = new SparkConf()
configuration.setAppName(APPLICATION_NAME)
if ("" != options.master)
configuration.setMaster (options.master)
configuration.setMaster(options.master)
if (options.sparkopts.nonEmpty)
options.sparkopts.map (pair => configuration.set (pair._1, pair._2))
options.sparkopts.map(pair => configuration.set(pair._1, pair._2))
// get the necessary jar files to send to the cluster
configuration.setJars (Array (jarForObject (new DefaultSource ())))
configuration.setJars(Array(jarForObject(new DefaultSource())))
// register for Kryo serialization
configuration.registerKryoClasses (CIMClasses.list)
configuration.registerKryoClasses(CIMClasses.list)
// choose which Cassandra
configuration.set ("spark.cassandra.connection.host", options.host)
configuration.set ("spark.cassandra.connection.port", options.port.toString)
configuration.set("spark.cassandra.connection.host", options.host)
configuration.set("spark.cassandra.connection.port", options.port.toString)

val session_builder = SparkSession.builder ()
val session = session_builder.config (configuration).getOrCreate ()
val session_builder = SparkSession.builder()
val session = session_builder.config(configuration).getOrCreate()
val version = session.version
log.info (s"Spark $version session established")
if (version.take (SPARK.length) != SPARK.take (version.length))
log.warn (s"Spark version ($version) does not match the version ($SPARK) used to build $APPLICATION_NAME")
log.info(s"Spark $version session established")
if (version.take(SPARK.length) != SPARK.take(version.length))
log.warn(s"Spark version ($version) does not match the version ($SPARK) used to build $APPLICATION_NAME")

try
{
// read the file
val reader_options = scala.collection.mutable.HashMap[String, String]()
options.cimopts.map (pair => reader_options.put (pair._1, pair._2))
val filelist = options.files.mkString (",")
reader_options.put ("path", filelist)
log.info (s"reading CIM files $filelist")
val elements = session.read.format ("ch.ninecode.cim").options (reader_options).load (options.files: _*)
log.info (s"${elements.count} elements")
options.cimopts.map(pair => reader_options.put(pair._1, pair._2))
val filelist = options.files.mkString(",")
reader_options.put("path", filelist)
log.info(s"reading CIM files $filelist")
val elements = session.read.format("ch.ninecode.cim").options(reader_options).load(options.files: _*)
log.info(s"${elements.count} elements")

val export = new CIMExport (session)
val export = new CIMExport(session)
if (options.all)
export.exportAll (options.outputfile)
export.exportAll(options.outputfile)
if (options.islands)
export.exportAllIslands (options.outputdir)
export.exportAllIslands(options.outputdir)
else
if (options.transformers)
export.exportAllTransformers (options)
export.exportAllTransformers(options)
}
finally
{
session.stop ()
session.stop()
}
}
if (!options.unittest)
sys.exit (0)
sys.exit(0)
case None =>
sys.exit (1)
sys.exit(1)
}
}
}
Expand Up @@ -49,14 +49,14 @@ final case class CIMExportOptions
unittest: Boolean = false,
loglevel: LogLevels.Value = LogLevels.OFF,
master: String = "",
sparkopts: Map[String, String] = Map (
sparkopts: Map[String, String] = Map(
"spark.graphx.pregel.checkpointInterval" -> "8",
"spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
"spark.kryo.registrator" -> "ch.ninecode.cim.CIMRegistrator",
"spark.ui.showConsoleProgress" -> "false",
"spark.sql.debug.maxToStringFields" -> "250",
"spark.sql.catalog.casscatalog" -> "com.datastax.spark.connector.datasource.CassandraCatalog"),
cimopts: Map[String, String] = Map (
cimopts: Map[String, String] = Map(
"ch.ninecode.cim.do_topo_islands" -> "true"
),
all: Boolean = false,
Expand All @@ -70,5 +70,5 @@ final case class CIMExportOptions
keyspace: String = "cimexport",
replication: Int = 1,
topology: Boolean = false,
files: Seq[String] = Seq ()
files: Seq[String] = Seq()
)

0 comments on commit d287c89

Please sign in to comment.