<h1><center>Scala Graphx to obtain network related features</center></h1>

#### Import Modules for using sqlcontext, different graph features and other spark functionalities

In [34]:
import org.apache.spark.graphx.Graph
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.Edge
import org.apache.spark.graphx.VertexId
import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.graphx.lib._
import org.apache.spark.graphx.lib.PageRank

#### sqlcontext [Entry point for working with structured data in Spark. Allows the creation of DataFrame as well as the execution of SQL queries]

In [35]:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

#### Load data from s3 | sc.textFile loads each line as a string into RDD

In [36]:
val data = sc.textFile("s3://qubole-s3bucket-20170108/quobole_default_data_location/txn_pubkey_mapping_2011_2013/part*")

#### Data is converted to tabular (rows and columns) format for further analysis

##### Split each line with a comma

In [37]:
val data_split = data.map(line => line.split(','))

##### Define column names and datatype for Dataframe

In [38]:
case class X(BLOCK_ID:String,BLOCK_TIME:String,TX_ID:String,SENDER_PUBKEY_ID:String,RECEIVER_PUBKEY_ID:String,TX_IN_POS:String,
TX_OUT_POS:String,TX_IN_VALUE:String,TX_OUT_VALUE:String)

##### Create a dataframe from splitted data

In [39]:
val df = data_split.map{case Array(s0, s1, s2, s3, s4, s5, s6, s7, s8) => X(s0, s1, s2, s3, s4, s5, s6, s7, s8) }.toDF()

##### Register it as temporary table so that it can be queried using SQL

In [40]:
df.registerTempTable("txn_pubkey_mapping_2011_2013")

#### Take sender and receiver id from temp table between required time frame using sqlcontext (defined earlier)

In [41]:
val sept_h1= sqlContext.sql("select SENDER_PUBKEY_ID, RECEIVER_PUBKEY_ID from txn_pubkey_mapping_2011_2013 where BLOCK_TIME between '2013-11-01 00:00:00' AND '2013-12-01 00:00:00' ")

In [42]:
sept_h1.printSchema

root
 |-- SENDER_PUBKEY_ID: string (nullable = true)
 |-- RECEIVER_PUBKEY_ID: string (nullable = true)



#### Take sample (or work on entire data) and cache it, otherwise all the steps above will run again and again because of lazy evaluation. 

In [43]:
val sampledataRDD = sept_h1.rdd
sampledataRDD.cache() //Cache for reuse

MapPartitionsRDD[138] at rdd at <console>:72

#### Function to convert datatype

In [45]:
def toLong(s: String):Option[Long] = {
    try {
        Some(s.toLong)
    } catch {
        case e: NumberFormatException => None
    }
}

#### To Create a graph object, we need to define vertex and edge class

##### Create vertex RDD. Doing a union of all the sender and receiver ids

In [46]:
val d1 = sampledataRDD.map(field => field(0))
val d2 = sampledataRDD.map(field => field(1))
val d3 = d1.union(d2)
val d4 = d3.distinct()
val d5 = d4.map(row => row+"")s

##### Graph accepts vertex RDD in a format where first element is VertexId type in long format, followed by string datatypes which contains property of a vertex

In [50]:
val vertex: RDD[(VertexId, (String, String))] =  d5.map(row => (toLong(row).getOrElse(0), (("ran", "dom"))))

#### Create edge RDD

In [51]:
val edgeArray = sampledataRDD.map(row => (row(0)+"", row(1)+""))

##### Graph accepts edge RDD from Edge class (module imported) where first el ement is "from id", second element is "to id"  in long format and third element is a string containing the property of edge 

In [52]:
val edges: RDD[Edge[String]] =  edgeArray.map(row => Edge(toLong(row._1).getOrElse(0), toLong(row._2).getOrElse(0), "1"))

#### Create Graph from vertex and edge RDD created above

In [53]:
val graphtest = Graph(vertex, edges)
graphtest.cache() //cache graph for faster access

org.apache.spark.graphx.impl.GraphImpl@5a724ab2

### Calculate features from Graph

#### Indegree (In a directed graph, indgree is the number of incoming edges to a vertex)

In [54]:
val indeg = graphtest.inDegrees

#### Outdegree (In a directed graph, outdgree is the number of outgoing edges from a vertex)

In [56]:
val outdeg = graphtest.outDegrees

#### PageRank (It is centrality measure of a vertex. It is special case of eigenvector centrality and it reflects the importance of a vertex in the Graph

In [58]:
val prGraph = graphtest.pageRank(10).vertices

#### Triangle Count OR clustering coefficient (It reflects how clustered is the vertex. Higher the immediate neighbors of vertex transact with each other, higher is the value of clustering coefficient)

In [60]:
val triCounts = graphtest.triangleCount().vertices

##### Features are saved into S3 bucket so that it can be used later for training the model

In [55]:
indeg.saveAsTextFile("s3://mcknightelp/notebooks/temp_results/graphx_indeg_nov1_nov30/")
outdeg.saveAsTextFile("s3://mcknightelp/notebooks/temp_results/graphx_outdeg_nov1_nov30/")
prGraph.saveAsTextFile("s3://mcknightelp/notebooks/temp_results/graphx_pageRank_nov1_nov30/")
triCounts.saveAsTextFile("s3://mcknightelp/notebooks/temp_results/graphx_clusterCoef_nov1_nov30/")