# 스파크 고급 분석 7장

## 그래프X로 동시발생 네트워크 분석하기

In [1]:
%%configure -f
{
    "name": "medline-graphX",
    "proxyUser": "hduser",
    "driverMemory": "4000M", 
    "conf": {"spark.jars.packages": "graphframes:graphframes:0.3.0-spark2.0-s_2.11",
             "spark.master": "local[2]",
             "spark.jars": "hdfs://localhost:54310/jars/ch06-lsa-2.0.0-jar-with-dependencies.jar",
             "spark.sql.crossJoin.enabled": "true"}
}

In [6]:
import edu.umd.cloud9.collection.XMLInputFormat

import java.nio.charset.StandardCharsets
import java.security.MessageDigest

import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, DataFrame, SparkSession, Row}
import org.apache.spark.sql.functions._

import scala.xml.XML

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

import edu.umd.cloud9.collection.XMLInputFormat
import java.nio.charset.StandardCharsets
import java.security.MessageDigest
import org.apache.hadoop.io.{Text, LongWritable}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, DataFrame, SparkSession, Row}
import org.apache.spark.sql.functions._
import scala.xml.XML


In [3]:
val base = "hdfs://localhost:54310/medline/"


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

base: String = hdfs://localhost:54310/medline/


## 데이터 준비 

In [4]:
def loadMedline(spark: SparkSession, path:String) = {
    
    @transient val conf = new Configuration()
    conf.set(XMLInputFormat.START_TAG_KEY, "<MedlineCitation ")
    conf.set(XMLInputFormat.END_TAG_KEY, "</MedlineCitation>")
    val kvs = spark.sparkContext.newAPIHadoopFile(path, classOf[XMLInputFormat], classOf[LongWritable], classOf[Text], conf)
    kvs.map(_._2.toString).toDS()
}

val path = base + "medsamp2016a.xml"
val medlineRaw = loadMedline(spark, path)
medlineRaw.count

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

loadMedline: (spark: org.apache.spark.sql.SparkSession, path: String)org.apache.spark.sql.Dataset[String]
path: String = hdfs://localhost:54310/medline/medsamp2016a.xml
medlineRaw: org.apache.spark.sql.Dataset[String] = [value: string]
res6: Long = 30000


In [7]:
def majorTopics(record:String): Seq[String] = {
    val elem = XML.loadString(record)
    
    val dn = elem \\ "DescriptorName"
    
    dn.filter( n => (n \ "@MajorTopicYN").text == "Y").map( n => n.text)
}

val medline = medlineRaw.map(majorTopics)
medline.cache()
medline.take(1)(0)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

majorTopics: (record: String)Seq[String]
medline: org.apache.spark.sql.Dataset[Seq[String]] = [value: array<string>]
res12: medline.type = [value: array<string>]
res13: Seq[String] = List(Intellectual Disability, Maternal-Fetal Exchange, Pregnancy Complications)


## 동시 발생 조사하기

In [8]:
val topics = medline.flatMap(n => n).toDF("topic")
topics.createOrReplaceTempView("topics")

val topicDist = spark.sql("""
SELECT topic, COUNT(*) as cnt
FROM topics
GROUP BY topic
ORDER BY cnt DESC
""")

topics.count()
topicDist.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

topics: org.apache.spark.sql.DataFrame = [topic: string]
topicDist: org.apache.spark.sql.DataFrame = [topic: string, cnt: bigint]
res17: Long = 82195
+------------+----+
|       topic| cnt|
+------------+----+
|     Disease|1202|
|   Neoplasms| 983|
|Tuberculosis| 950|
|       Blood| 518|
|  Anesthesia| 379|
+------------+----+
only showing top 5 rows



In [9]:
topicDist.createOrReplaceTempView("topic_dist")
spark.sql("""
SELECT cnt, COUNT(*) as dist
FROM topic_dist
GROUP BY cnt
ORDER BY dist DESC
""").show

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---+----+
|cnt|dist|
+---+----+
|  1|2347|
|  2|1136|
|  3| 724|
|  4| 487|
|  5| 379|
|  6| 307|
|  7| 199|
|  8| 193|
|  9| 169|
| 10| 141|
| 11| 123|
| 12| 102|
| 15|  89|
| 13|  84|
| 14|  73|
| 16|  63|
| 18|  58|
| 17|  51|
| 20|  50|
| 22|  42|
+---+----+
only showing top 20 rows



#### 동시 발생쌍 만들기

In [10]:
def getCooccur(ds:Dataset[Seq[String]]) = {
    
    import spark.implicits._
    
    val pairs = ds.flatMap{ list => list.combinations(2)}.toDF("pair")
    
    pairs.createOrReplaceTempView("pairs_")
    spark.sql("""
    SELECT pair, COUNT(*) as cnt
    FROM pairs_
    GROUP BY pair
    ORDER BY cnt DESC
    """)
}

val cooccurs = getCooccur(medline)
cooccurs.cache()
cooccurs.show(5,false)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

getCooccur: (ds: org.apache.spark.sql.Dataset[Seq[String]])org.apache.spark.sql.DataFrame
cooccurs: org.apache.spark.sql.DataFrame = [pair: array<string>, cnt: bigint]
res22: cooccurs.type = [pair: array<string>, cnt: bigint]
+--------------------------------------------------+---+
|pair                                              |cnt|
+--------------------------------------------------+---+
|[Antibiotics, Antitubercular, Dermatologic Agents]|195|
|[Analgesia, Anesthesia]                           |181|
|[Analgesia, Anesthesia and Analgesia]             |179|
|[Anesthesia, Anesthesia and Analgesia]            |177|
|[Anesthesia, Anesthesiology]                      |153|
+--------------------------------------------------+---+
only showing top 5 rows



## 그래프X로 동시발생 네트워크 구성하기

In [12]:
// Vertext ID unique하게 결정하기 (주어진 cooccur pair를 hash떠서)
def hashId(str: String): Long = {
    // This is effectively the same implementation as in Guava's Hashing, but 'inlined'
    // to avoid a dependency on Guava just for this. It creates a long from the first 8 bytes
    // of the (16 byte) MD5 hash, with first byte as least-significant byte in the long.
    val bytes = MessageDigest.getInstance("MD5").digest(str.getBytes(StandardCharsets.UTF_8))
    (bytes(0) & 0xFFL) |
    ((bytes(1) & 0xFFL) << 8) |
    ((bytes(2) & 0xFFL) << 16) |
    ((bytes(3) & 0xFFL) << 24) |
    ((bytes(4) & 0xFFL) << 32) |
    ((bytes(5) & 0xFFL) << 40) |
    ((bytes(6) & 0xFFL) << 48) |
    ((bytes(7) & 0xFFL) << 56)
}

//  topic list로부터 vectex 확보
val vertices = topics.map{ case Row(topic:String) => (hashId(topic), topic) } toDF("hash", "topic")
val uniqueHashes = vertices.agg(countDistinct("hash")).take(1)

// coocccurs로부터 edge 확보
val edges = cooccurs.map { case Row(topics:Seq[String], cnt:Long) =>
    val ids = topics.map(_.toString).map(hashId).sorted
    Edge(ids(0), ids(1), cnt)
}

val vertexRDD = vertices.rdd.map{ case Row(hash:Long, topic:String) => (hash, topic)}
val topicGraph = Graph(vertexRDD, edges.rdd)
topicGraph.cache()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

hashId: (str: String)Long
vertices: org.apache.spark.sql.DataFrame = [hash: bigint, topic: string]
uniqueHashes: Array[org.apache.spark.sql.Row] = Array([7699])
       val edges = cooccurs.map { case Row(topics:Seq[String], cnt:Long) =>
                                                  ^
edges: org.apache.spark.sql.Dataset[org.apache.spark.graphx.Edge[Long]] = [srcId: bigint, dstId: bigint ... 1 more field]
vertexRDD: org.apache.spark.rdd.RDD[(Long, String)] = MapPartitionsRDD[99] at map at <console>:54
topicGraph: org.apache.spark.graphx.Graph[String,Long] = org.apache.spark.graphx.impl.GraphImpl@59affc97
res31: org.apache.spark.graphx.Graph[String,Long] = org.apache.spark.graphx.impl.GraphImpl@59affc97


In [13]:
vertexRDD.count

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res32: Long = 82195


In [14]:
topicGraph.vertices.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res33: Long = 7699


In [15]:
edges.count

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res34: Long = 72560


In [16]:
topicGraph.vertices

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

res35: org.apache.spark.graphx.VertexRDD[String] = VertexRDDImpl[116] at RDD at VertexRDD.scala:57


## 네트워크 구조 - Connected Components

원리 - 이웃에게 자신이 아닌 min vertex id 전달, 같은 min value를 공유하는 것끼리 CC 이다. 

In [17]:

val ccGraph = topicGraph.connectedComponents()
val ccDF = ccGraph.vertices.toDF("vid", "cid")
ccDF.createOrReplaceTempView("cc")
spark.sql("""
SELECT cid, COUNT(*) as cnt
FROM cc
GROUP BY cid
ORDER BY cnt DESC
LIMIT 5
""").show

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

ccGraph: org.apache.spark.graphx.Graph[org.apache.spark.graphx.VertexId,Long] = org.apache.spark.graphx.impl.GraphImpl@6d86a336
ccDF: org.apache.spark.sql.DataFrame = [vid: bigint, cid: bigint]
+--------------------+----+
|                 cid| cnt|
+--------------------+----+
|-9215470674759766104|7572|
| 1765411469112156596|   3|
| 3608770526546285755|   3|
|  731949936574312042|   2|
|-4492732731552733030|   2|
+--------------------+----+



In [18]:
// def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2]

val topicCCDF = topicGraph.vertices.innerJoin(ccGraph.vertices) {
    case (topicId, name, cid) => (name, cid.toLong)
}.values.toDF("topic", "cid")

topicCCDF.where($"cid" === "-9215470674759766104").show

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

topicCCDF: org.apache.spark.sql.DataFrame = [topic: string, cid: bigint]
+--------------------+--------------------+
|               topic|                 cid|
+--------------------+--------------------+
|Spinal Cord Injuries|-9215470674759766104|
|              Pyuria|-9215470674759766104|
|         Hepatitis A|-9215470674759766104|
|     Uveal Neoplasms|-9215470674759766104|
|                Skin|-9215470674759766104|
|     Radiodermatitis|-9215470674759766104|
|   Hand Disinfection|-9215470674759766104|
|Forced Expiratory...|-9215470674759766104|
|    Hemifacial Spasm|-9215470674759766104|
|Benzophenanthridines|-9215470674759766104|
|      Duodenal Ulcer|-9215470674759766104|
|   Spinal Dysraphism|-9215470674759766104|
|        Polymyositis|-9215470674759766104|
|             Ketosis|-9215470674759766104|
|         Naturopathy|-9215470674759766104|
|  Social Environment|-9215470674759766104|
|             Syringa|-9215470674759766104|
|Unconscious (Psyc...|-9215470674759766104|
|  

## 네트워크 구조 - Degree Distribution

In [20]:
val degrees:VertexRDD[Int] = topicGraph.degrees.cache()
degrees.map(_._2).stats()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

degrees: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[257] at RDD at VertexRDD.scala:57
res42: org.apache.spark.util.StatCounter = (count: 7596, mean: 19.104792, stdev: 40.490526, max: 1482.000000, min: 1.000000)


In [36]:
val namesAndDegree = degrees.innerJoin(topicGraph.vertices) {
    (topicId, degree, name) => (name, degree.toInt)
}.values.toDF("topic", "degree")
namesAndDegree.orderBy($"degree".desc).show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

namesAndDegree: org.apache.spark.sql.DataFrame = [topic: string, degree: int]
+-------------------+------+
|              topic|degree|
+-------------------+------+
|            Disease|  1482|
|          Neoplasms|   918|
|              Blood|   706|
|       Tuberculosis|   665|
|Wounds and Injuries|   449|
+-------------------+------+
only showing top 5 rows



## 카이제곱 통계량으로 관련이 낮은 관계 필터링하기

* 특정 두 주제(A, B)에 대해서 카이제곱 통계량 분석

|  | A가 나왔음 | B가 나오지 않음 | A 총계 |
|---|:---:|---:|---:|
| B가 나왔음 |YY | YN | YA |
| B가 나오지 않음 | NY | NN | NA |
| B 통계 | YB | NB | T |


In [69]:
// 전체 문서 개수 T
val T = medline.count()

// 특정 주제가 등장하는 문서의 수 ( YA, YB 에 해당)
val topicDistRdd = topicDist.map {
    case Row(topic:String, cnt:Long) => (hashId(topic), cnt)
}.rdd
val topicDistGraph = Graph(topicDistRdd, topicGraph.edges)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

T: Long = 30000
topicDistRdd: org.apache.spark.rdd.RDD[(Long, Long)] = MapPartitionsRDD[643] at rdd at <console>:59
topicDistGraph: org.apache.spark.graphx.Graph[Long,Long] = org.apache.spark.graphx.impl.GraphImpl@21c50ba9


In [75]:
// 카이제곱 검증값 (X_2)
def chiSq(YY:Long, YB:Long, YA:Long, T:Long) = {
    val NB = T - YB
    val NA = T - YA
    val YN = YA - YY
    val NY = YB - YY
    val NN = T - NY - YN - YY
    
    val inner = math.abs(YY*NN - YN *NY) -T/2.0
    T * math.pow(inner,2) / (YA*NA*YB*NB)
}

val chiSquaredGraph = topicDistGraph.mapTriplets(triplet => {
    // EDGE 속성 : topicGraph에서 왓으므로 co-occur count
    // VERTEX 속성 : topicDistGraph에서 왔으므로, 각 토픽의 문서 무관 발생량 (즉 YA or YB)
    chiSq(triplet.attr, triplet.srcAttr, triplet.dstAttr, T)
    
    // 결론적으로 만들어지는 triplet의 edge에 X^2 검증값이 들어있을 것이다. 
})

chiSquaredGraph.edges.map(e=>e.attr).stats()
chiSquaredGraph.edges.count

// 자유도 1, 99.999%의 기각범위는 19.5, 이것보다 큰 pair는 서로 독립이 아닌 (즉 연관이 깊은 것) 것으로 간주해서 살린다. 
val interesting = chiSquaredGraph.subgraph( triplet => triplet.attr > 19.5 )
interesting.edges.count  // 7만 => 3만여개 정도로 줄였음

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

chiSq: (YY: Long, YB: Long, YA: Long, T: Long)Double
chiSquaredGraph: org.apache.spark.graphx.Graph[Long,Double] = org.apache.spark.graphx.impl.GraphImpl@2b8fb024
res148: org.apache.spark.util.StatCounter = (count: 72560, mean: 315.119299, stdev: 1354.301605, max: 29096.810219, min: 0.000000)
res149: Long = 72560
interesting: org.apache.spark.graphx.Graph[Long,Double] = org.apache.spark.graphx.impl.GraphImpl@518c838e
res152: Long = 36938


In [79]:
val interestingDegrees = interesting.degrees.cache()
interestingDegrees.map(_._2).stats()  // 필터링 전과 비교하면 많이 connectivity가 sparse해졌다.

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

interestingDegrees: org.apache.spark.graphx.VertexRDD[Int] = VertexRDDImpl[704] at RDD at VertexRDD.scala:57
res155: org.apache.spark.util.StatCounter = (count: 7587, mean: 9.737182, stdev: 10.634602, max: 247.000000, min: 1.000000)


# Small Word 이론 검증

* 노드 대부분 차수(degree)가 작고, 밀도가 높은 군집에 속해있다. (군집계수가 크다)
* 한 노드에서 다른 노드로 빠르게 도달 (small shortest path)

* 각 Vertext에서의 군집계수 (k:이웃수, t:트라이앵글수)
 $$C=\frac{2t}{k(k-1)}$$

In [84]:
// 각 꼭지점별 트라이앵글수 구한 후 통계
val triangleGraph = interesting.triangleCount()
triangleGraph.vertices.map(_._2).stats()

// 전체 가능한 트라이앵글 수 = k(k-1)/2
val maxTriGraph = interestingDegrees.mapValues( d => d*(d-1)/2.0 )

// 지역 군집 계수 
val localClusterCoef = triangleGraph.vertices.innerJoin(maxTriGraph) {
    (vid, triangleCout, maxTriangleCount) => {
        if (maxTriangleCount == 0) 0 else 1.0 * triangleCout / maxTriangleCount
    }
}

// 네트워크 전체에 대한 평균적 지역 군집 계수 (avg)
localClusterCoef.map(_._2).sum() / interesting.vertices.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

triangleGraph: org.apache.spark.graphx.Graph[Int,Double] = org.apache.spark.graphx.impl.GraphImpl@5cf9f4c5
res168: org.apache.spark.util.StatCounter = (count: 7699, mean: 12.089622, stdev: 20.924037, max: 691.000000, min: 0.000000)
maxTriGraph: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[971] at RDD at VertexRDD.scala:57
localClusterCoef: org.apache.spark.graphx.VertexRDD[Double] = VertexRDDImpl[973] at RDD at VertexRDD.scala:57
res175: Double = 0.35686064879743434


### 프레겔(대량 동기 병렬 그래프 처리)을 사용한 평균 경로 길이 계산하기

프레겔 사용법
1) 각 Vertex의 상태 정의 : 자신이 알고있는 경로 길이 정보 = Map[VertextID, Int]

2) 이웃으로부터의 메세지와 현재 상태를 종합하여 내보낼 메세지 갱신 함수 : 누가 더 작은 경로 길이를 알면 이걸로 replace해야함

3) 자신의 상태 갱신 

In [92]:
// 두 개의 메세지 병합 함수 : 최소거리이므로 min
def mergeMaps(m1:Map[VertexId, Int], m2:Map[VertexId, Int]): Map[VertexId, Int] = {
    
    def minThatExists(k: VertexId):Int = {
        math.min(
            m1.getOrElse(k, Int.MaxValue),
            m2.getOrElse(k, Int.MaxValue)
        )
    }
    
    (m1.keySet ++ m2.keySet).map {
        k => (k, minThatExists(k))
    }.toMap
}

// 꼭지점 갱신 함수 : state, msg 병합
def update(id:VertexId, state:Map[VertexId, Int], msg:Map[VertexId, Int]) = {
    mergeMaps(state, msg)
}

// 각 꼭지점에 보낼 메세지 
//  : EdgeTriplet에서  1) +1 증가 (path상의 경유고려), 2) src <-> dst 사이의 병합정보의 갱신이 있을시만 Iterator exist
def checkIncrement(
    a: Map[VertexId, Int],
    b: Map[VertexId, Int],
    bid: VertexId) = {
    
    // +1 중가 
    val aplus = a.map{ case (v,d) => (v, d+1) }
    
    // 병합 메세지 변화시만 전파 
    if ( b != mergeMaps(aplus, b)) {
        Iterator((bid, aplus))
    } else {
        Iterator.empty
    }
}

def iterate(e: EdgeTriplet[Map[VertexId, Int], _]) = {
    checkIncrement(e.srcAttr, e.dstAttr, e.dstId) ++ 
    checkIncrement(e.dstAttr, e.srcAttr, e.dstId)
}

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

mergeMaps: (m1: Map[org.apache.spark.graphx.VertexId,Int], m2: Map[org.apache.spark.graphx.VertexId,Int])Map[org.apache.spark.graphx.VertexId,Int]
update: (id: org.apache.spark.graphx.VertexId, state: Map[org.apache.spark.graphx.VertexId,Int], msg: Map[org.apache.spark.graphx.VertexId,Int])Map[org.apache.spark.graphx.VertexId,Int]
checkIncrement: (a: Map[org.apache.spark.graphx.VertexId,Int], b: Map[org.apache.spark.graphx.VertexId,Int], bid: org.apache.spark.graphx.VertexId)Iterator[(org.apache.spark.graphx.VertexId, scala.collection.immutable.Map[org.apache.spark.graphx.VertexId,Int])]
iterate: (e: org.apache.spark.graphx.EdgeTriplet[Map[org.apache.spark.graphx.VertexId,Int], _])Iterator[(org.apache.spark.graphx.VertexId, scala.collection.immutable.Map[org.apache.spark.graphx.VertexId,Int])]


In [96]:
// 2% 만 하자
val fraction = 0.02 
val replacement = false
val sample = interesting.vertices.map(v => v._1).sample(replacement, fraction, 1792L)
val ids = sample.collect().toSet
val mapGraph = interesting.mapVertices((id, v) => {
    if (ids.contains(id)) {
        Map(id -> 0)
    } else {
        Map[VertexId, Int]()
    }
})

// 프레겔 시작
val start = Map[VertexId, Int]() // 초기 메세지 
val res = mapGraph.ops.pregel(start)(update, iterate, mergeMaps)   // (vid, vid, path_length)

// 결과
val paths = res.vertices.flatMap { case (id, m) =>
    m.map { case (k, v) =>
        if (id < k) {
            (id, k, v)
        } else {
            (k, id, v)
        }
    }
}.distinct().cache()

// OOM 주의

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Invalid status code '500' from http://localhost:8998/sessions/0/statements/95 with error payload: "java.util.concurrent.ExecutionException: org.apache.livy.rsc.rpc.RpcException: java.lang.OutOfMemoryError: GC overhead limit exceeded\n"


In [97]:
paths.map(_._3).filter(_ > 0).stats()

An error was encountered:
Session 0 did not reach idle status in time. Current status is busy.


In [98]:
val hist = paths.map(_._3).countByValue()
hist.toSeq.sorted.foreach(println)

An error was encountered:
Session 0 did not reach idle status in time. Current status is busy.
