In [1]:
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType}
import org.apache.spark.graphx._
import org.apache.spark.graphx.{Edge, Graph, PartitionID, VertexId}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}

val record_schema = 
    StructType(
        Seq(
            StructField(name = "ip",          dataType = StringType, nullable=true),
            StructField(name = "mac",         dataType = StringType, nullable=true),
            StructField(name = "hostname",    dataType = StringType, nullable=true),
            StructField(name = "serial_no",   dataType = StringType, nullable=true),
            StructField(name = "owner",       dataType = StringType, nullable=true),
            StructField(name = "source_name", dataType = StringType, nullable=true),
            StructField(name = "id",          dataType = LongType,   nullable=false)
        )
    )

val records = spark.read.format("csv")
                .option("header", "true")
                .schema(record_schema)
                .load("data/small_test_data.csv")

val mirrorColNames = for (col <- records.columns) yield "_"+col.toString
val mirror = records.toDF(mirrorColNames: _*)

def conditions(matchCols: Seq[String]): Column = {
    col("id")=!=col("_id") && 
    matchCols.map(c => col(c)===col("_"+c)).reduce(_ || _)
}

val edges = records.join(mirror, conditions(Seq("ip", "mac", "hostname")))
val edgesRDD = edges.select("id","_id")
                    .map(r => Edge(r.getAs[VertexId](0), r.getAs[VertexId](1), null))
                    .rdd

val nodesRDD = records.map(r => (r.getAs[VertexId]("id"), 1)).rdd

record_schema = StructType(StructField(ip,StringType,true), StructField(mac,StringType,true), StructField(hostname,StringType,true), StructField(serial_no,StringType,true), StructField(owner,StringType,true), StructField(source_name,StringType,true), StructField(id,LongType,false))
records = [ip: string, mac: string ... 5 more fields]
mirrorColNames = Array(_ip, _mac, _hostname, _serial_no, ...


Array(_ip, _mac, _hostname, _serial_no, ...

In [2]:
val graph = Graph(nodesRDD, edgesRDD)

graph = org.apache.spark.graphx.impl.GraphImpl@7c6c33be


org.apache.spark.graphx.impl.GraphImpl@7c6c33be

In [3]:
val cc = graph.connectedComponents()

cc = org.apache.spark.graphx.impl.GraphImpl@c5644bf


org.apache.spark.graphx.impl.GraphImpl@c5644bf

In [4]:
println(cc.edges.collect().mkString("\n"))
println(cc.vertices.collect().mkString("\n"))

Edge(3,4,null)
Edge(3,5,null)
Edge(4,3,null)
Edge(4,5,null)
Edge(5,3,null)
Edge(5,4,null)
(4,3)
(0,0)
(1,1)
(3,3)
(5,3)
(2,2)


In [5]:
println(graph.edges.collect().mkString("\n"))
println(graph.vertices.collect().mkString("\n"))

Edge(3,4,null)
Edge(3,5,null)
Edge(4,3,null)
Edge(4,5,null)
Edge(5,3,null)
Edge(5,4,null)
(4,1)
(0,1)
(1,1)
(3,1)
(5,1)
(2,1)


In [6]:
val out = cc.vertices.toDF()
val temp = records.join(out, $"id"===$"_1")
println(temp.collect().mkString("\n"))

[106.60.189.138,null,bdruhan1m@1688.com,null,null,SY,4,4,3]
[null,29:0b:58:3d:26:6d,isoulsby3v@behance.net,KXHGN8D8KM,Izzy Soulsby,LD,0,0,0]
[null,a9:11:e3:35:23:fa,cpollak74@nifty.com,Y5K30C65M3,Cathlene Pollak,LD,1,1,1]
[null,4c:31:07:e9:9a:90,bdruhan1m@1688.com,U0UOS9R43J,Brittaney Druhan,LD,3,3,3]
[106.60.189.138,4c:31:07:e9:9a:90,null,null,null,DT,5,5,3]
[null,90:22:03:25:11:d8,ckissock53@php.net,7ZGA85QA5H,Colin Kissock,LD,2,2,2]


out = [_1: bigint, _2: bigint]
temp = [ip: string, mac: string ... 7 more fields]


[ip: string, mac: string ... 7 more fields]

In [28]:
println(temp.schema)

StructType(StructField(ip,StringType,true), StructField(mac,StringType,true), StructField(hostname,StringType,true), StructField(serial_no,StringType,true), StructField(owner,StringType,true), StructField(source_name,StringType,true), StructField(record_id,LongType,true), StructField(_1,LongType,false), StructField(_2,LongType,false))


In [13]:
val gg=temp.withColumnRenamed("_2", "entity_id").groupBy("entity_id").agg(collect_set($"mac"))

gg = [entity_id: bigint, collect_set(mac): array<string>]


[entity_id: bigint, collect_set(mac): array<string>]

In [14]:
gg.collect()

Array([0,WrappedArray(29:0b:58:3d:26:6d)], [1,WrappedArray(a9:11:e3:35:23:fa)], [3,WrappedArray(4c:31:07:e9:9a:90)], [2,WrappedArray(90:22:03:25:11:d8)])