In [1]:
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

conf = SparkConf().setAppName("pyspark")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [2]:
#I think it is not possible to use sets inside dataframes, so I had to use ArrayType
btc_raw = sc.parallelize([(0,1), (1,2), (2,5), (5,8), (7,8), (3,7), (3,4), (3,6), (10,11), (10,12), (12,13)])
G = btc_raw.flatMap(lambda x: [x, (x[1], x[0])]).groupByKey().map(lambda x: ( [x[0]], list(set(x[1])) ))
G_old = btc_raw.flatMap(lambda x: [x, (x[1], x[0])]).groupByKey().map(lambda x: ( x[0], set(x[1]) ))

In [3]:
print("G_new:", G.collect())
print("\nG_old:", G_old.collect())

G_new: [([0], [1]), ([8], [5, 7]), ([4], [3]), ([12], [10, 13]), ([1], [0, 2]), ([5], [8, 2]), ([13], [12]), ([2], [1, 5]), ([6], [3]), ([10], [11, 12]), ([7], [8, 3]), ([3], [4, 6, 7]), ([11], [10])]

G_old: [(0, {1}), (8, {5, 7}), (4, {3}), (12, {10, 13}), (1, {0, 2}), (5, {8, 2}), (13, {12}), (2, {1, 5}), (6, {3}), (10, {11, 12}), (7, {8, 3}), (3, {4, 6, 7}), (11, {10})]


DataFrame Schema

In [4]:
schemaList = ["Node", "NN"]
schemaType = [ArrayType( IntegerType() ), ArrayType( IntegerType() )]
schemaNull = [False, True]

fields = [StructField(schemaList[0], schemaType[0], schemaNull[0]),\
          StructField(schemaList[1], schemaType[1], schemaNull[1])]

schema = StructType(fields)

Applying schema to RDD

In [5]:
dfG = sqlContext.createDataFrame(G, schema)
dfG.createOrReplaceTempView("graph")
dfG.printSchema()

root
 |-- Node: array (nullable = false)
 |    |-- element: integer (containsNull = true)
 |-- NN: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [6]:
sqlContext.sql("SELECT * FROM graph").show()
dfG.select("Node").show()

+----+---------+
|Node|       NN|
+----+---------+
| [0]|      [1]|
| [8]|   [5, 7]|
| [4]|      [3]|
|[12]| [10, 13]|
| [1]|   [0, 2]|
| [5]|   [8, 2]|
|[13]|     [12]|
| [2]|   [1, 5]|
| [6]|      [3]|
|[10]| [11, 12]|
| [7]|   [8, 3]|
| [3]|[4, 6, 7]|
|[11]|     [10]|
+----+---------+

+----+
|Node|
+----+
| [0]|
| [8]|
| [4]|
|[12]|
| [1]|
| [5]|
|[13]|
| [2]|
| [6]|
|[10]|
| [7]|
| [3]|
|[11]|
+----+



# Min_Selection_Step

In [29]:
#Min_Selection_Step in pieces
new_nodes = dfG.select( "NN", array_min( array_union(col("Node"), col("NN"))  ).alias("v_min"))
new_nodes = new_nodes.select(explode(new_nodes.NN).alias("Node"), "v_min")
new_nodes = new_nodes.groupBy(array("Node")).agg(collect_set("v_min"))
new_nodes.show()

+-----------+------------------+
|array(Node)|collect_set(v_min)|
+-----------+------------------+
|       [12]|          [12, 10]|
|        [1]|            [0, 1]|
|       [13]|              [10]|
|        [6]|               [3]|
|        [3]|               [3]|
|        [5]|            [1, 5]|
|        [4]|               [3]|
|        [8]|            [2, 3]|
|        [7]|            [5, 3]|
|       [10]|              [10]|
|       [11]|              [10]|
|        [2]|            [0, 2]|
|        [0]|               [0]|
+-----------+------------------+



In [34]:
def Min_Selection_Step(df_G):
    v_min = df_G.select( "NN", array_min( array_union(col("Node"), col("NN"))  ).alias("v_min"))
    addEdge = v_min.select(explode(v_min.NN).alias("Node"), "v_min")
    dfH = addEdge.groupBy(array("Node").alias("Node")).agg(collect_set("v_min").alias("v_min"))
    return dfH

In [33]:
dfH = Min_Selection_Step(dfG)
dfH.show()

+-----------+------------------+
|array(Node)|collect_set(v_min)|
+-----------+------------------+
|       [12]|          [12, 10]|
|        [1]|            [0, 1]|
|       [13]|              [10]|
|        [6]|               [3]|
|        [3]|               [3]|
|        [5]|            [1, 5]|
|        [4]|               [3]|
|        [8]|            [2, 3]|
|        [7]|            [5, 3]|
|       [10]|              [10]|
|       [11]|              [10]|
|        [2]|            [0, 2]|
|        [0]|               [0]|
+-----------+------------------+



# Pruning_Step

In [None]:
v_min = dfH.select("")

In [None]:
def Pruning_Step(H, T, Seeds):
    #H = H.cache()
    #minimum node of the neighborhood: shared for following parts
    v_min = H.mapValues(lambda x: min(x))
    v_min_bc = sc.broadcast(dict(v_min.collect())) #Broadcasting v_min
    
    #---------------G construction-------------------
    H_filtered = H.filter(lambda x: len(x[1]) > 1)
    NN_H_u = H_filtered.mapValues(lambda x: x - {min(x)} )
    #With Broadcasting
    addEdge2=NN_H_u.map(lambda x:(x[0],(x[1],v_min_bc.value[x[0]]))).flatMap(lambda x:[(x[1][1],y) for y in x[1][0]])
    #Without broadcasting
    #addEdge2 = NN_H_u.join(v_min).flatMap(lambda x: [(x[1][1], y) for y in x[1][0]])
    G = addEdge2.flatMap(lambda x: [x, (x[1], x[0])]).groupByKey().mapValues(lambda x: set(x))
    
    #---------------Tree construction--------------
    #The deactivated Nodes do not appear in G_{t+1}
    deactiveNodes = H.filter(lambda x: x[0] not in x[1]).mapValues(lambda x: False)
    #Without broadcasting
    #addEdge3 = deactiveNodes.join(v_min).map(lambda x: (x[1][1], x[0]))
    #With Broadcasting
    addEdge3 = deactiveNodes.map(lambda x: (x[0], (x[1], v_min_bc.value[x[0]]))).map(lambda x: (x[1][1], x[0]))
    T = T.union(addEdge3)

    #--------------Find Seed-----------------
    #Elements in H with neighborhood from G_{t+1}
    NN_G_H = H.cogroup(G).mapValues(lambda x: (list(x[0]), list(x[1])) ).mapValues(lambda x: set_join(x) )

    #Not sure is necessary to use True/False
    #deactivated = NN_G_H.cogroup(deactiveNodes).map(lambda x: (x[0], (list(x[1][0]), list(x[1][1])) ))
    #seed = deactivated.filter(lambda x: (len(x[1][0]) <= 1) & (x[0] in x[1][0]) & x[1][1]) 
    
    seed = NN_G_H.filter(lambda x: (len(x[1]) <= 1) & (x[0] in x[1]))
    Seeds = Seeds.union(seed)

    return [G, T, Seeds]