Skip to content

Commit

Permalink
Update to Spark 3.0.1
Browse files Browse the repository at this point in the history
    Update CIMSpark to Spark 3.0.1.
    Update spark-cassandra-connector_2.12 to version 3.0.0.
    Update list of contributors.
    Change RDD[Element] name from Elements to Element,
    which is a breaking change requiring programs to
    change the searched name when retrieving the RDD[Element]:
      - CIMRDD subclasses use getOrElse[Element]
      - others use sc.getPersistentRDDs.filter(_._2.name == "Element").head._2.asInstanceOf[RDD[Element]]
  • Loading branch information
Derrick Oswald committed Oct 6, 2020
1 parent 1eb90d9 commit e665a1d
Show file tree
Hide file tree
Showing 24 changed files with 86 additions and 52 deletions.
3 changes: 2 additions & 1 deletion CIMExport/pom.xml
Expand Up @@ -5,7 +5,7 @@
<groupId>ch.ninecode.cim</groupId>
<artifactId>CIMSpark</artifactId>
<relativePath>../pom.xml</relativePath>
<version>2.12-3.0.0-5.0.4</version>
<version>2.12-3.0.1-5.1.0</version>
</parent>
<artifactId>CIMExport</artifactId>
<name>${project.artifactId}</name>
Expand Down Expand Up @@ -380,6 +380,7 @@ enable_scripted_user_defined_functions: true
<goal>sign</goal>
</goals>
<configuration>
<!--suppress UnresolvedMavenProperty -->
<keyname>${gpg.keyname}</keyname>
</configuration>
</execution>
Expand Down
6 changes: 3 additions & 3 deletions CIMExport/src/main/scala/ch/ninecode/cim/CIMExport.scala
Expand Up @@ -529,7 +529,7 @@ class CIMExport (spark: SparkSession, storage: StorageLevel = StorageLevel.MEMOR
*/
def exportAll (filename: String): Unit =
{
val elements = getOrElse [Element]("Elements")
val elements = getOrElse [Element]
export (elements, filename)
}

Expand Down Expand Up @@ -642,7 +642,7 @@ class CIMExport (spark: SparkSession, storage: StorageLevel = StorageLevel.MEMOR
// make a mapping of mRID to mRID
// "if you include me, you have to include him" and vice-versa for some relations
val relationships = classes.map (x => (x.name, x.relations)).toMap
val ying_yang = getOrElse [Element]("Elements").flatMap (dependents (relationships, stop.map (_._1))).persist (storage)
val ying_yang = getOrElse [Element].flatMap (dependents (relationships, stop.map (_._1))).persist (storage)

// done is a list of keyed PairRDD, the keys are mRID_Island and each pair is an mRID and the Island it belongs to
var done: List[RDD[KeyedItem]] = List ()
Expand Down Expand Up @@ -685,7 +685,7 @@ class CIMExport (spark: SparkSession, storage: StorageLevel = StorageLevel.MEMOR
.flatMap (p => p._2.map (q => (p._1, q)))
.persist (storage)

val ret = getOrElse [Element]("Elements").keyBy (_.id).join (all_done).values.map (_.swap).persist (storage)
val ret = getOrElse [Element].keyBy (_.id).join (all_done).values.map (_.swap).persist (storage)
log.info (s"${ret.count} elements")
done.foreach (_.unpersist (false))
ying_yang.unpersist (false)
Expand Down
8 changes: 4 additions & 4 deletions CIMExport/src/test/scala/ch/ninecode/SparkSuite.scala
Expand Up @@ -138,10 +138,10 @@ class SparkSuite extends FixtureAnyFunSuite
* @param spark The Spark session which persisted the named RDD.
* @tparam T The type of objects contained in the named RDD.
* @return The typed RDD, e.g. <code>RDD[T]</code>.
* @example The RDD of all elements is somewhat special,
* currently it is named Elements (plural), so this method must be used:
* {{{val elements: RDD[Element] = get[Element]("Elements")}}}.
*
* @example If there were two CIM files read in and stored with names having suffix _1 and _2,
* this would get both of the RDD[Element] :
* {{{val elements_1: RDD[Element] = get[Element]("Element_1")}}}
* {{{val elements_2: RDD[Element] = get[Element]("Element_2")}}}
*/
def get[T: ClassTag] (name: String)(implicit spark: SparkSession): RDD[T] =
{
Expand Down
2 changes: 1 addition & 1 deletion CIMJDBC/CIMClientJDBC/pom.xml
Expand Up @@ -4,7 +4,7 @@
<groupId>ch.ninecode.cim</groupId>
<artifactId>CIMJDBC</artifactId>
<relativePath>../pom.xml</relativePath>
<version>2.12-3.0.0-5.0.4</version>
<version>2.12-3.0.1-5.1.0</version>
</parent>
<artifactId>CIMClientJDBC</artifactId>
<name>${project.artifactId}</name>
Expand Down
2 changes: 1 addition & 1 deletion CIMJDBC/CIMServerJDBC/pom.xml
Expand Up @@ -4,7 +4,7 @@
<groupId>ch.ninecode.cim</groupId>
<artifactId>CIMJDBC</artifactId>
<relativePath>../pom.xml</relativePath>
<version>2.12-3.0.0-5.0.4</version>
<version>2.12-3.0.1-5.1.0</version>
</parent>
<artifactId>CIMServerJDBC</artifactId>
<name>${project.artifactId}</name>
Expand Down
2 changes: 1 addition & 1 deletion CIMJDBC/pom.xml
Expand Up @@ -4,7 +4,7 @@
<groupId>ch.ninecode.cim</groupId>
<artifactId>CIMSpark</artifactId>
<relativePath>../pom.xml</relativePath>
<version>2.12-3.0.0-5.0.4</version>
<version>2.12-3.0.1-5.1.0</version>
</parent>
<artifactId>CIMJDBC</artifactId>
<name>${project.artifactId}</name>
Expand Down
4 changes: 2 additions & 2 deletions CIMReader/README.md
Expand Up @@ -103,7 +103,7 @@ This should print out the Scala shell welcome screen with cool ASCII art:
```
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/root/spark/spark-3.0.0-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
:: loading settings :: url = jar:file:/root/spark/spark-3.0.1-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
ch.ninecode.cim#CIMReader added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-69ac0dfd-a56d-46a5-9b45-b9754495a263;1.0
confs: [default]
Expand All @@ -129,7 +129,7 @@ Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.0.0
/___/ .__/\_,_/_/ /_/\_\ version 3.0.1
/_/
Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 11.0.7)
Expand Down
12 changes: 6 additions & 6 deletions CIMReader/build.sbt
Expand Up @@ -2,19 +2,19 @@ lazy val cimreader = (project in file(".")).
settings(
organization := "ch.ninecode.cim",
name := "CIMReader",
version := "3.0.0-5.0.4",
version := "3.0.1-5.1.0",
scalaVersion := "2.12.10",
licenses += ("MIT", url("http://opensource.org/licenses/MIT")),
javaOptions += "-Xss4m",
bintrayPackageLabels := Seq("CIM", "9code")
)

resolvers ++= Seq (
"releases" at "http://oss.sonatype.org/content/repositories/releases"
"releases" at "https://repo1.maven.org/maven2/",
)
libraryDependencies += "org.apache.spark" % "spark-core_2.12" % "3.0.0"
libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "3.0.0"
libraryDependencies += "org.apache.spark" % "spark-hive-thriftserver_2.12" % "3.0.0"
libraryDependencies += "org.apache.spark" % "spark-graphx_2.12" % "3.0.0"
libraryDependencies += "org.apache.spark" % "spark-core_2.12" % "3.0.1"
libraryDependencies += "org.apache.spark" % "spark-sql_2.12" % "3.0.1"
libraryDependencies += "org.apache.spark" % "spark-hive-thriftserver_2.12" % "3.0.1"
libraryDependencies += "org.apache.spark" % "spark-graphx_2.12" % "3.0.1"
libraryDependencies += "com.github.scopt" % "scopt_2.12" % "4.0.0-RC2"
libraryDependencies += "org.scalatest" % "scalatest_2.12" % "3.0.8" % "test"
3 changes: 2 additions & 1 deletion CIMReader/pom.xml
Expand Up @@ -5,7 +5,7 @@
<groupId>ch.ninecode.cim</groupId>
<artifactId>CIMSpark</artifactId>
<relativePath>../pom.xml</relativePath>
<version>2.12-3.0.0-5.0.4</version>
<version>2.12-3.0.1-5.1.0</version>
</parent>
<artifactId>CIMReader</artifactId>
<name>${project.artifactId}</name>
Expand Down Expand Up @@ -194,6 +194,7 @@
<goal>sign</goal>
</goals>
<configuration>
<!--suppress UnresolvedMavenProperty -->
<keyname>${gpg.keyname}</keyname>
</configuration>
</execution>
Expand Down
6 changes: 3 additions & 3 deletions CIMReader/src/main/scala/ch/ninecode/cim/CIMAbout.scala
Expand Up @@ -77,7 +77,7 @@ class CIMAbout (spark: SparkSession, storage: StorageLevel = StorageLevel.MEMORY
* Since RDD are immutable, another copy is created containing only
* primary elements (elements with rdf:ID attributes) and this replaces
* the current RDD[Element] referenced by the persistent
* RDD registry. The old element RDD is renamed to "about_Elements".
* RDD registry.
* Multiple (duplicate) primary elements each have all rdf:about elements
* merged into them (no de-duplication).
*
Expand All @@ -88,7 +88,7 @@ class CIMAbout (spark: SparkSession, storage: StorageLevel = StorageLevel.MEMORY
def do_about (): RDD[Element] =
{
// get the elements RDD
val elements = getOrElse [Element]("Elements")
val elements = getOrElse [Element]

// get the elements flagged as "rdf:about"
val about_elements = elements.filter (_.about).groupBy (_.id)
Expand All @@ -103,7 +103,7 @@ class CIMAbout (spark: SparkSession, storage: StorageLevel = StorageLevel.MEMORY
val new_elements = element_groups.map (merge)

// swap the old Elements RDD for the new one
put (new_elements, "Elements", false)
put (new_elements, false)

new_elements
}
Expand Down
2 changes: 1 addition & 1 deletion CIMReader/src/main/scala/ch/ninecode/cim/CIMChange.scala
Expand Up @@ -286,7 +286,7 @@ case class CIMChange (spark: SparkSession, storage: StorageLevel = StorageLevel.
def apply_changes: RDD[Element] =
{
// get the elements RDD
var elements = getOrElse [Element]("Elements")
var elements = getOrElse [Element]

// get the ChangeSet(s)
val changes = elements.filter (x => changeset_classes.contains (x.baseclass)) // reduce the work as much as possible
Expand Down
6 changes: 3 additions & 3 deletions CIMReader/src/main/scala/ch/ninecode/cim/CIMDeDup.scala
Expand Up @@ -89,7 +89,7 @@ class CIMDeDup (spark: SparkSession, storage: StorageLevel = StorageLevel.MEMORY
*
* Since RDD are immutable, another copy is created containing only unique elements
* and this replaces the current RDD[Element] referenced by the persistent
* RDD registry. The old element RDD is renamed to "duplicate_Elements".
* RDD registry.
*
* The new RDD is cached and checkpointed (if checkpointing is enabled by the Spark context having a CheckpointDir).
*
Expand All @@ -100,13 +100,13 @@ class CIMDeDup (spark: SparkSession, storage: StorageLevel = StorageLevel.MEMORY
log.info ("eliminating duplicates")

// get the elements RDD
val elements = getOrElse [Element]("Elements")
val elements = getOrElse [Element]

// deduplicate
val new_elements = elements.keyBy (_.id).groupByKey ().values.map (deduplicate)

// swap the old Elements RDD for the new one
put (new_elements, "Elements", false)
put (new_elements, false)

new_elements
}
Expand Down
2 changes: 1 addition & 1 deletion CIMReader/src/main/scala/ch/ninecode/cim/CIMEdges.scala
Expand Up @@ -388,7 +388,7 @@ class CIMEdges (spark: SparkSession, storage: StorageLevel)
log.info ("making Edges RDD")

// get the elements RDD
val elements = getOrElse [Element]("Elements")
val elements = getOrElse [Element]

// get the terminals
val terminals = getOrElse [Terminal]
Expand Down
4 changes: 2 additions & 2 deletions CIMReader/src/main/scala/ch/ninecode/cim/CIMJoin.scala
Expand Up @@ -306,7 +306,7 @@ class CIMJoin (spark: SparkSession, storage: StorageLevel) extends CIMRDD with S
union (updated_locations.asInstanceOf [RDD[Element]])

// replace elements in Elements
val old_elements = getOrElse [Element]("Elements")
val old_elements = getOrElse [Element]
val new_elements = old_elements.keyBy (_.id).leftOuterJoin (newelem.keyBy (_.id)).
values.flatMap (
(arg: (Element, Option[Element])) =>
Expand All @@ -318,7 +318,7 @@ class CIMJoin (spark: SparkSession, storage: StorageLevel) extends CIMRDD with S
)

// swap the old Elements RDD for the new one
put (new_elements, "Elements", false)
put (new_elements, false)

new_elements
}
Expand Down
Expand Up @@ -432,7 +432,7 @@ case class CIMNetworkTopologyProcessor (spark: SparkSession, options: CIMTopolog
def makeGraph (): Graph[CIMVD, CIMEdgeData] =
{
// get Element that are ConductingEquipment
val equipment = getOrElse [Element]("Elements").filter (conductingEquipment (_).isDefined)
val equipment = getOrElse [Element].filter (conductingEquipment (_).isDefined)

// get the terminals with ConnectivityNode keyed by equipment
val terms = getOrElse [Terminal]
Expand Down Expand Up @@ -868,7 +868,7 @@ case class CIMNetworkTopologyProcessor (spark: SparkSession, options: CIMTopolog
graph = identifyIslands (graph)

// get the terminals keyed by equipment with equipment
val elements = getOrElse [Element]("Elements").keyBy (_.id)
val elements = getOrElse [Element].keyBy (_.id)
val terms = getOrElse [Terminal].keyBy (_.ConductingEquipment).join (elements).values
// map each graph vertex to the terminals
val vertices = getOrElse [ConnectivityNode].map (x => (asVertexId (x.id), x))
Expand Down Expand Up @@ -978,13 +978,13 @@ case class CIMNetworkTopologyProcessor (spark: SparkSession, options: CIMTopolog
union (new_terminals.asInstanceOf [RDD[Element]])

// replace elements in Elements
val old_elements = getOrElse [Element]("Elements")
val old_elements = getOrElse [Element]
val new_elements = old_elements.keyBy (_.id).subtract (oldelem.keyBy (_.id)).values.union (newelem)

// swap the old Elements RDD for the new one
if (options.debug && log.isDebugEnabled)
log.debug (s"RDD[Element]")
put (new_elements, "Elements", true)
put (new_elements, true)

log.info ("finished Network Topology Processing")
new_elements
Expand All @@ -1009,7 +1009,7 @@ case class CIMNetworkTopologyProcessor (spark: SparkSession, options: CIMTopolog
if (options.identify_islands && islands.isEmpty)
process
else
get [Element]("Elements")
get [Element]
}
}
}
4 changes: 2 additions & 2 deletions CIMReader/src/main/scala/ch/ninecode/cim/CIMNormalize.scala
Expand Up @@ -167,7 +167,7 @@ class CIMNormalize (spark: SparkSession, storage: StorageLevel = StorageLevel.ME
def do_normalization (): RDD[Element] =
{
// get the elements RDD keyed by id
val old_elements = getOrElse [Element]("Elements")
val old_elements = getOrElse [Element]
val elements = old_elements.keyBy (_.id)
val all = elements.count

Expand Down Expand Up @@ -197,7 +197,7 @@ class CIMNormalize (spark: SparkSession, storage: StorageLevel = StorageLevel.ME
val new_elements: RDD[Element] = cleaned.subtractByKey (fixed2).union (fixed2).values

// swap the old Elements RDD for the new one
put (new_elements, "Elements", true)
put (new_elements, true)

new_elements
}
Expand Down
7 changes: 4 additions & 3 deletions CIMReader/src/main/scala/ch/ninecode/cim/CIMRDD.scala
Expand Up @@ -76,9 +76,10 @@ trait CIMRDD
* @param log A logger for error messages.
* @tparam T The type of objects contained in the named RDD.
* @return The typed RDD, e.g. <code>RDD[T]</code>.
* @example The RDD of all elements is somewhat special,
* currently it is named Elements (plural), so this method must be used:
* {{{val elements: RDD[Element] = get[Element]("Elements")}}}.
* @example If there were two CIM files read in and stored with names having suffix _1 and _2,
* this would get both of the RDD[Element] :
* {{{val elements_1: RDD[Element] = get[Element]("Element_1")}}}
* {{{val elements_2: RDD[Element] = get[Element]("Element_2")}}}
*
*/
def get[T: ClassTag] (name: String)(implicit spark: SparkSession, log: Logger): RDD[T] =
Expand Down
6 changes: 3 additions & 3 deletions CIMReader/src/main/scala/ch/ninecode/cim/CIMRelation.scala
Expand Up @@ -158,7 +158,7 @@ class CIMRelation (
var ret: RDD[Row] = null

// remove any existing RDD created by this relation
spark.sparkContext.getPersistentRDDs.find (_._2.name == "Elements").foreach (
spark.sparkContext.getPersistentRDDs.find (_._2.name == "Element").foreach (
x =>
{
val (_, old) = x
Expand Down Expand Up @@ -190,7 +190,7 @@ class CIMRelation (
{
log.info (s"reading cache: ${_Cache}")
val rdd: RDD[Element] = spark.sparkContext.objectFile (_Cache)
put (rdd, "Elements", true)
put (rdd, true)
make_tables (rdd)
ret = rdd.asInstanceOf [RDD[Row]]
}
Expand Down Expand Up @@ -218,7 +218,7 @@ class CIMRelation (
classOf [String],
classOf [Element]).values

put (rdd, "Elements", true)
put (rdd, true)
ret = rdd.asInstanceOf [RDD[Row]]

// about processing if requested
Expand Down
7 changes: 4 additions & 3 deletions CIMReader/src/test/scala/ch/ninecode/SparkSuite.scala
Expand Up @@ -68,9 +68,10 @@ class SparkSuite extends FixtureAnyFunSuite with Unzip
* @param spark The Spark session which persisted the named RDD.
* @tparam T The type of objects contained in the named RDD.
* @return The typed RDD, e.g. <code>RDD[T]</code>.
* @example The RDD of all elements is somewhat special,
* currently it is named Elements (plural), so this method must be used:
* {{{val elements: RDD[Element] = get[Element]("Elements")}}}.
* @example If there were two CIM files read in and stored with names having suffix _1 and _2,
* * this would get both of the RDD[Element] :
* {{{val elements_1: RDD[Element] = get[Element]("Element_1")}}}
* {{{val elements_2: RDD[Element] = get[Element]("Element_2")}}}
*
*/
def get[T: ClassTag] (name: String)(implicit spark: SparkSession): RDD[T] =
Expand Down
3 changes: 2 additions & 1 deletion CIMReader/src/test/scala/ch/ninecode/cim/TestSuite.scala
Expand Up @@ -28,5 +28,6 @@ class TestSuite extends Suites (
new CIMAboutSuite,
new CIMNormalizeSuite,
new CIMCacheSuite,
new CIMSerializeSuite
new CIMSerializeSuite,
new CIMDiffSuite
)
2 changes: 1 addition & 1 deletion CIMTool/pom.xml
Expand Up @@ -4,7 +4,7 @@
<groupId>ch.ninecode.cim</groupId>
<artifactId>CIMSpark</artifactId>
<relativePath>../pom.xml</relativePath>
<version>2.12-3.0.0-5.0.4</version>
<version>2.12-3.0.1-5.1.0</version>
</parent>
<artifactId>CIMTool</artifactId>
<name>${project.artifactId}</name>
Expand Down
Binary file added img/Luca.jpeg
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added img/Rijad.jpeg
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit e665a1d

Please sign in to comment.