Skip to content

Commit

Permalink
Implement append mode
Browse files Browse the repository at this point in the history
    Previously, invoking the CIMReader would unpersist all RDD[<CIM Class>] resulting from prior invocations.
    These RDD would also be hidden from searching spark.sparkContext.getPersistentRDDs() list for names by setting their name to null.
    Now, using option ch.ninecode.cim.append=true the contents of the new CIM files are appended to existing RDD[<CIM Class>] (with union).
    This allows two similar use-cases on hot clusters (where one or more CIM files have already been read in):
    - rdf:about modifications to a hot cluster, for example opening or closing Switch elements
    - the more general ChangeSet application to a hot cluster, allowing additions and deletions of elements as well as modifications
    Of course, if the appended elements change the topology, you should execute the CIMNetworkTopologyProcessor (again).
    Other processing steps may also need to be repeated, depending on the CIM file(s) contents.
    Reading cached contents (ch.ninecode.cim.cache option) takes precedence, so the cache should be deleted prior to using append mode.
  • Loading branch information
Derrick Oswald committed Oct 8, 2020
1 parent 380a7a5 commit b1c8416
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 64 deletions.
10 changes: 6 additions & 4 deletions CIMReader/README.md
Expand Up @@ -242,17 +242,19 @@ where:
* opts is pairs of named options in a Map[String,String], where values are usually "true" or "false",
but for some topology options "ForceTrue", "ForceFalse" or "Unforced".
CIM reader specific option names and their meaning are:
* ch.ninecode.cim.append - on a subsequent read operation, append these CIM file(s) contents to existing RDD in memory
* ch.ninecode.cim.apply_changesets - merge difference model elements by applying all ChangeSet
* ch.ninecode.cim.do_about - merge rdf:about elements into rdf:ID elements with the same mRID
* ch.ninecode.cim.do_normalize - normalize 1:N relations which are denormalized
* ch.ninecode.cim.do_deduplication - eliminate duplicates based on CIM mRID
* ch.ninecode.cim.make_edges - generate the Edges RDD and table
* ch.ninecode.cim.do_join - merge CIM files (by UserAttribute)
* ch.ninecode.cim.do_topo - generate TopologicalNode elements
* ch.ninecode.cim.do_topo_islands - generate TopologicalIsland elements (forces ch.ninecode.cim.do_topo true also)
* ch.ninecode.cim.force_retain_switches - force switches to have two TopologicalNode irregardless of the retain value
* ch.ninecode.cim.force_retain_fuses - force fuses to have two TopologicalNode irregardless of the retain value
* ch.ninecode.cim.force_switch_separate_islands - force switches to have two TopologicalIsland irregardless of the retain value
* ch.ninecode.cim.force_fuse_separate_islands - force fuses to have two TopologicalIsland irregardless of the retain value
* ch.ninecode.cim.force_retain_switches - force switches to have two TopologicalNode regardless of the retain value
* ch.ninecode.cim.force_retain_fuses - force fuses to have two TopologicalNode regardless of the retain value
* ch.ninecode.cim.force_switch_separate_islands - force switches to have two TopologicalIsland regardless of the retain value
* ch.ninecode.cim.force_fuse_separate_islands - force fuses to have two TopologicalIsland regardless of the retain value
* ch.ninecode.cim.default_switch_open_state - default open value when normalOpen and open aren both not specified
* ch.ninecode.cim.debug - add additional checks and messages for debugging purposes
* ch.ninecode.cim.cache - save resulting RDD as an object file using this name, for subsequent reads use this as a cache
Expand Down
122 changes: 62 additions & 60 deletions CIMReader/src/main/scala/ch/ninecode/cim/CIMRelation.scala
Expand Up @@ -61,6 +61,8 @@ class CIMRelation (

// check for a storage level option
implicit val _StorageLevel: StorageLevel = StorageLevel.fromString (parameters.getOrElse ("StorageLevel", "MEMORY_AND_DISK_SER"))
// check for append option
val _Append: Boolean = parameters.getOrElse ("ch.ninecode.cim.append", "false").toBoolean
// check for rdf:about option
val _About: Boolean = parameters.getOrElse ("ch.ninecode.cim.do_about", "false").toBoolean
// check for normalization option
Expand Down Expand Up @@ -150,6 +152,27 @@ class CIMRelation (
)
}

def removeSubclassRDD (elements: RDD[Element]): Unit =
{
// aggregate the set of subclass names
val names = elements
.aggregate (Set [String]())(
(set, element) => element.classes.toSet.union (set),
(set1, set2) => set1.union (set2)
)
// remove subclass RDD if they exist
for (name <- names;
target = applyPattern (name))
{
spark.sparkContext.getPersistentRDDs.find (_._2.name == target) match
{
case Some ((_: Int, existing: RDD[_])) =>
existing.setName (null).unpersist (true)
case Some (_) | None =>
}
}
}

// For a non-partitioned relation, this method builds an RDD[Row] containing all rows within this relation.
override def buildScan (): RDD[Row] =
{
Expand All @@ -158,33 +181,27 @@ class CIMRelation (
// register the ElementUDT
ElementRegistration.register ()

var ret: RDD[Row] = null
var ret: RDD[Element] = null

// remove any existing RDD created by this relation
// if appending get the old RDD[Element] and remove any existing subclass RDD
// else also remove the old RDD[Element]
val target = applyPattern ("Element")
spark.sparkContext.getPersistentRDDs.find (_._2.name == target).foreach (
x =>
{
val (_, old) = x
// aggregate the set of subclass names
val names = old.asInstanceOf[RDD[Element]]
.aggregate (Set [String]())(
(set, element) => element.classes.toSet.union (set),
(set1, set2) => set1.union (set2)
)
// remove subclass RDD if they exist (they should)
for (name <- names;
target = applyPattern (name))
spark.sparkContext.getPersistentRDDs.find (_._2.name == target) match
{
case Some ((_: Int, existing: RDD[_])) =>
existing.setName (null).unpersist (true)
case Some (_) | None =>
}
// remove the Element rdd
old.setName (null).unpersist (true)
}
)
val previous = spark.sparkContext.getPersistentRDDs.find (_._2.name == target) match
{
case Some ((_, old)) =>
val rdd = old.asInstanceOf[RDD[Element]]
removeSubclassRDD (rdd)
if (_Append)
Some (rdd)
else
{
// remove the old RDD[Element]
rdd.setName (null).unpersist (true)
None
}
case None =>
None
}

if (_Cache != "")
{
Expand All @@ -197,7 +214,7 @@ class CIMRelation (
val rdd: RDD[Element] = spark.sparkContext.objectFile (_Cache)
put (rdd, true)
make_tables (rdd)
ret = rdd.asInstanceOf [RDD[Row]]
ret = rdd
}
}

Expand All @@ -210,7 +227,7 @@ class CIMRelation (
configuration.set (FileInputFormat.INPUT_DIR, path)
configuration.setLong (FileInputFormat.SPLIT_MAXSIZE, _SplitSize)

var rdd = if (_Debug)
ret = if (_Debug)
spark.sparkContext.newAPIHadoopRDD (
configuration,
classOf [CIMInputFormatDebug],
Expand All @@ -223,50 +240,38 @@ class CIMRelation (
classOf [String],
classOf [Element]).values

put (rdd, true)
ret = rdd.asInstanceOf [RDD[Row]]
ret = previous match
{
case Some (old) =>
old.union (ret)
case None =>
ret
}
put (ret, true)

// about processing if requested
if (_About)
{
val about = new CIMAbout (spark, _StorageLevel)
rdd = about.do_about ()
ret = rdd.asInstanceOf [RDD[Row]]
}
ret = new CIMAbout (spark, _StorageLevel).do_about ()

// normalize if requested
if (_Normalize)
{
val normalize = new CIMNormalize (spark, _StorageLevel)
rdd = normalize.do_normalization ()
ret = rdd.asInstanceOf [RDD[Row]]
}
ret = new CIMNormalize (spark, _StorageLevel).do_normalization ()

// dedup if requested
if (_DeDup)
{
val dedup = new CIMDeDup (spark, _StorageLevel)
rdd = dedup.do_deduplicate ()
ret = rdd.asInstanceOf [RDD[Row]]
}
ret = new CIMDeDup (spark, _StorageLevel).do_deduplicate ()

// apply changes if requested
if (_Changes)
{
val change = CIMChange (spark, _StorageLevel)
rdd = change.apply_changes
ret = rdd.asInstanceOf [RDD[Row]]
}
ret = CIMChange (spark, _StorageLevel).apply_changes

// as a side effect, define all the other temporary tables
log.info ("creating temporary tables")
make_tables (rdd)
make_tables (ret)

// merge ISU and NIS ServiceLocations if requested
if (_Join)
{
val join = new CIMJoin (spark, _StorageLevel)
ret = join.do_join ().asInstanceOf [RDD[Row]]
}
ret = new CIMJoin (spark, _StorageLevel).do_join ()

// perform topological processing if requested
if (_Topo)
Expand All @@ -284,15 +289,12 @@ class CIMRelation (
storage = _StorageLevel
)
)
ret = ntp.process.asInstanceOf [RDD[Row]]
ret = ntp.process
}

// set up edge graph if requested
if (_Edges)
{
val cimedges = new CIMEdges (spark, _StorageLevel)
ret = cimedges.make_edges (_Topo).asInstanceOf [RDD[Row]]
}
ret = new CIMEdges (spark, _StorageLevel).make_edges (_Topo)

// cache elements if requested
if (_Cache != "")
Expand All @@ -302,6 +304,6 @@ class CIMRelation (
}
}

ret
ret.asInstanceOf [RDD[Row]]
}
}
2 changes: 2 additions & 0 deletions CIMReader/src/test/resources/log4j.properties
Expand Up @@ -42,3 +42,5 @@ log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR

log4j.logger.ch.ninecode=INFO
29 changes: 29 additions & 0 deletions CIMReader/src/test/scala/ch/ninecode/cim/CIMDiffSuite.scala
Expand Up @@ -155,4 +155,33 @@ case class CIMDiffSuite () extends ch.ninecode.SparkSuite
assert (get[ACLineSegment](TEMPLATE2.format ("ACLineSegment")) == null, "deleted")
assert (get[ACLineSegment](TEMPLATE1.format ("ACLineSegment")) != null, "not deleted")
}

test ("append changeset")
{
implicit spark: SparkSession =>

val elements1 = readFile (s"${FILE_DEPOT}DemoData.rdf")
val count1 = elements1.count
assert (count1 == 1742, "# elements before applying ChangeSet")
val elements2 = readFile (s"$FILE_DEPOT${CIMFILE}_diff.rdf", Map [String, String](
"ch.ninecode.cim.append" -> "true",
"ch.ninecode.cim.apply_changesets" -> "true"
))
val count2 = elements2.count
val demo = 1742 // the _diff file has 12 which are added then removed
val deletions = 5
val additions = 1
assert (demo - deletions + additions == count2, "# elements after applying ChangeSet")

val fuses = get[Fuse]
assert (fuses.filter (_.id == "FUS0053").count () == 0, "FUS0053 is deleted")
val networks = get[EquivalentNetwork]
assert (networks.count == 1, "one network added")
assert (networks.filter (_.id == "Network1").count () == 1, "Network1 is added")
val consumers = get[EnergyConsumer]
val user19 = consumers.filter (_.id == "USR0019").collect
assert (user19.length == 1, "USR0019 is still present")
assert (user19(0).p == 14000.0, "USR0019 p")
assert (user19(0).q == 1200.0, "USR0019 q")
}
}

0 comments on commit b1c8416

Please sign in to comment.