In [34]:
println("Hello")

Hello


In [35]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD


In [36]:
val spark = SparkSession.builder()
    .appName("AcademicGraphAnalysis")
    .getOrCreate()
spark.conf.set("spark.sql.caseSensitive", true)
import spark.implicits._

spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@67ffe9e7
import spark.implicits._


In [37]:
val jsonFilePath = "dblp_v14-part2266-2.json"

val papersDF = spark.read.option("multiline", "true")
    .json(jsonFilePath)
    .select(
        "id",
        "title",
        "doi",
        "keywords",
        "n_citation",
        "year",
        "issn",
        "url",
        "abstract",
        "authors",
        "doc_type",
        "v12_authors",
        "references",
        "v12_id"
    )

// View the schema to understand the structure
papersDF.printSchema()

// Show a few rows of the DataFrame
papersDF.show(1, truncate = false)

root
 |-- id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- doi: string (nullable = true)
 |-- keywords: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- n_citation: long (nullable = true)
 |-- year: long (nullable = true)
 |-- issn: string (nullable = true)
 |-- url: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- abstract: string (nullable = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- org: string (nullable = true)
 |-- doc_type: string (nullable = true)
 |-- v12_authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- org: string (nullable = true)
 |-- references: array (nullable = true)
 |    |-- element: string (containsNull = tru

jsonFilePath: String = dblp_v14-part2266-2.json
papersDF: org.apache.spark.sql.DataFrame = [id: string, title: string ... 12 more fields]


In [45]:
val vertices: RDD[(VertexId, (String, String, Long))] = papersDF
    .select("id", "title", "year")
    .rdd
    .map(row => (row.getAs[String]("id").hashCode.toLong, (row.getAs[String]("id"), row.getAs[String]("title"), row.getAs[Long]("year"))))

vertices: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, (String, String, Long))] = MapPartitionsRDD[391] at map at <console>:111


In [39]:
val edges: RDD[Edge[String]] = papersDF
    .select("id", "references")
    .rdd
    .flatMap(row => {
        val paperId = row.getAs[String]("id").hashCode.toLong
        val references = row.getAs[Seq[String]]("references")
        
        // Check if references is not null before mapping
        if (references != null) {
            references.map(ref => Edge(paperId, ref.hashCode.toLong, "cites"))
        } else {
            // If references is null, return an empty list
            Seq.empty[Edge[String]]
        }
    })

edges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = MapPartitionsRDD[227] at flatMap at <console>:98


In [47]:
val graph: Graph[(String, String, Long), String] = Graph(vertices, edges)

graph: org.apache.spark.graphx.Graph[(String, String, Long),String] = org.apache.spark.graphx.impl.GraphImpl@753b26e4


In [48]:
val connectedComponents = graph.connectedComponents().vertices

// Join the connected component ID with the paper titles for interpretation
val connectedComponentsWithTitles = connectedComponents.join(vertices).map {
    case (id, (componentId, (paperId, title, year))) => (componentId, paperId, title, year)
}

connectedComponentsWithTitles.take(10).foreach(println)

(-1897842877,558b108284ae84d265c14849,Accident Risk Analysis and Model Applications of Railway Level Crossings,2008)
(-1964527228,558b1090e4b031bae1fb5238,An Enhanced Data-Reduction Algorithm for Event-Triggered Networks,2009)
(103442663,558b1092e4b031bae1fb5247,Polarimetric Differential SAR Interferometry: First Results With Ground-Based Measurements,2009)
(-216907331,558b11c5e4b031bae1fb59aa,Guidance laws for planar motion control,2008)
(-474470741,558b11c6e4b031bae1fb59ae,Geometrical conditions for output depending observability normal form,2008)
(-2147335190,558b113fe4b0b32fcb3a8e05,Industrial enterprises business processes simulation with BPsim.MAS,2008)
(-1971396260,558b11c3e4b031bae1fb5995,Control of distributed parameter systems subject to convex constraints: Applications to irrigation systems and Hypersonic Vehicles,2008)
(-2003521330,558b11c6e4b031bae1fb59ad,Estimation over heterogeneous sensor networks,2008)
(-2147335190,558b1140e4b0b32fcb3a8e0c,Emergence of simulations for 

connectedComponents: org.apache.spark.graphx.VertexRDD[org.apache.spark.graphx.VertexId] = VertexRDDImpl[526] at RDD at VertexRDD.scala:57
connectedComponentsWithTitles: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.VertexId, String, String, Long)] = MapPartitionsRDD[544] at map at <console>:114


In [49]:
import org.json4s.JsonDSL._        // For the `~` operator
import org.json4s.JsonAST._        // For working with JSON objects
import org.json4s.jackson.JsonMethods._  // For rendering JSON
import org.apache.spark.graphx._   // For GraphX
import java.nio.file.{Files, Paths} // For writing to file

// Assuming 'vertices' and 'edges' RDDs are already built using your graph

// Convert the vertices RDD to a JSON array (JArray)
val verticesJSON: JArray = JArray(vertices.map { case (id, (paperId, title, year)) =>
  JObject(
    "id" -> JInt(id),
    "paperId" -> JString(paperId),
    "title" -> JString(title),
    "year" -> JInt(year)
  )
}.collect().toList)  // Collect to List[JObject] before constructing JArray

// Step 1: Collect valid vertex IDs into a Set
val validVertexIds: Set[VertexId] = vertices.map(_._1).collect().toSet

// Step 2: Convert the edges RDD to a JSON array (JArray) with filtering
val edgesJSON: JArray = JArray(edges.filter { edge =>
    // Only include edges where both source and target are valid
    validVertexIds.contains(edge.srcId) && validVertexIds.contains(edge.dstId)
}.map { case Edge(srcId, dstId, relationship) =>
    JObject(
        "source" -> JInt(srcId),
        "target" -> JInt(dstId),
        "relationship" -> JString(relationship)
    )
}.collect().toList)  // Collect to List[JObject] before constructing JArray

// Combine vertices and edges into a single JSON object
val graphJSON: JObject = JObject(
    "nodes" -> JArray(vertices.map { case (id, (paperId, title, year)) =>
        JObject(
            "id" -> JInt(id),
            "paperId" -> JString(paperId),
            "title" -> JString(title),
            "year" -> JInt(year)
        )
    }.collect().toList),
    "links" -> edgesJSON
)

// Convert to compact JSON string
val jsonString = compact(render(graphJSON))

// Print the JSON (for debugging purposes)
// println(jsonString)

// Write the JSON to a file
Files.write(Paths.get("./graph.json"), jsonString.getBytes)


{"nodes":[{"id":913341741,"paperId":"558b11bde4b031bae1fb5972","title":"Close target reconnaissance using autonomous UAV formations","year":2008},{"id":913341744,"paperId":"558b11bde4b031bae1fb5975","title":"Input-output analysis of the 2D/3C model in channel flows of viscoelastic fluids","year":2008},{"id":-773990059,"paperId":"558b113ae4b0b32fcb3a8ddf","title":"Applicability of hybrid simulation to different modes of governance in UK healthcare","year":2008},{"id":-1135808098,"paperId":"558b107584ae84d265c147fc","title":"HMM Based Segmentation of Continuous Eddy Current Sensor Signals","year":2008},{"id":217500835,"paperId":"558b107684ae84d265c14800","title":"Implementation of the ERI standard and Evaluation of Applications with several low-cost technologies","year":2008},{"id":-675006111,"paperId":"558b11bfe4b031bae1fb597d","title":"Probability of error bounds for failure diagnosis and classification in hidden Markov models","year":2008},{"id":217500836,"paperId":"558b107684ae84d265

import org.json4s.JsonDSL._
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.graphx._
import java.nio.file.{Files, Paths}
verticesJSON: org.json4s.JsonAST.JArray = JArray(List(JObject(List((id,JInt(913341741)), (paperId,JString(558b11bde4b031bae1fb5972)), (title,JString(Close target reconnaissance using autonomous UAV formations)), (year,JInt(2008)))), JObject(List((id,JInt(913341744)), (paperId,JString(558b11bde4b031bae1fb5975)), (title,JString(Input-output analysis of the 2D/3C model in channel flows of viscoelastic fluids)), (year,JInt(2008)))), JObject(List((id,JInt(-773990059)), (paperId,JString(558b113ae4b0b32fcb3a8ddf)), (title,JString(Applicability of hybrid simulation to different modes of governance in UK healthcare)), (year,JInt(2008...


In [58]:
import scala.util.Random

// Step 1: Collect valid vertex IDs into a Set
val validVertexIds: Set[VertexId] = vertices.map(_._1).collect().toSet

// Step 2: Filter edges to include only those that point to valid vertices
val filteredEdges = edges.filter { edge =>
    validVertexIds.contains(edge.srcId) && validVertexIds.contains(edge.dstId)
}

// Step 3: Convert vertices to Sigma.js format
val random = new Random()
val nodesJSON = vertices.map { case (id, (paperId, title, year)) =>
  // Generate random coordinates (you may want to adjust the range as needed)
  val x = random.nextDouble() * 800  // Assuming width of SVG is 800
  val y = random.nextDouble() * 600  // Assuming height of SVG is 600
  JObject(
    "id" -> JInt(id), // Use paperId as the id for the node
    "label" -> JString(title),
    "x" -> JDouble(x),  // Use the generated x coordinate
    "y" -> JDouble(y),  // Use the generated y coordinate
    "size" -> JInt(3)   // Size can be arbitrary; adjust as necessary
  )
}.collect().toList

// Step 4: Convert the filtered edges RDD to a JSON array (JArray)
val edgesJSON = filteredEdges.zipWithIndex.map { case (Edge(srcId, dstId, relationship), index) =>
  JObject(
    "id" -> JString("e" + index), // Create a unique id for each edge
    "source" -> JInt(srcId),
    "target" -> JInt(dstId)
  )
}.collect().toList

// Step 5: Combine nodes and edges into a single JSON object for Sigma
val graphJSON: JObject = JObject(
  "nodes" -> JArray(nodesJSON),
  "edges" -> JArray(edgesJSON)
)

// Step 6: Convert to compact JSON string
val jsonString = compact(render(graphJSON))

// Step 7: Write the JSON to a file for Sigma.js
Files.write(Paths.get("./graph2.json"), jsonString.getBytes)


import scala.util.Random
validVertexIds: Set[org.apache.spark.graphx.VertexId] = Set(217500836, 1697590430, 344281490, 970261337, -1971396257, -1897842837, -773990059, 2088534119, -309494961, -309494965, -1897842877, -383048363, -1971396256, 217500841, -2132881009, 913341741, -675006111, -1971396260, -854992323, 735224419, 742092196, -383048341, 2088534118, 808776546, 808776542, 636238517, 1932629363, -1964527228, 913341744, 1989548197, -1736358089, 735224377, 1043814715, -1135808098, 735224418, -1897842836, 913341742, -675006110, 1932629343, -1736358084, -1971396258, -1897842879, 808776543, 217500835, 1697590429, -618085324, 344281493, 808776548, -1009028250)
filteredEdges: org.apache.spark.rdd.RDD[org.apache.spark.graphx.Edge[String]] = MapPartitionsRDD[581] at filter at <console>:134...
