In [None]:
# initializing Pyspark
%%time
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop2.tgz
!tar xf spark-3.3.2-bin-hadoop2.tgz


!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop2"

import findspark
findspark.init("spark-3.3.2-bin-hadoop2")# SPARK_HOME

import pyspark
from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

CPU times: user 203 ms, sys: 59.1 ms, total: 262 ms
Wall time: 21.8 s


In [None]:
#  Data read
textFile = sc.textFile('/content/finalData.csv')
count = textFile.count()
print(count)
rdd = textFile.map(lambda x: x.split(",")).flatMap(lambda x: [(x[0],x[i]) for i in range(1,count)])

#rdd = textFile.map(lambda x: x.split(",")).map(lambda x: (x[0],1))

print(rdd.collect())

#creating dictionary with vertex and adjacency list
lists = rdd.filter(lambda x: x[1]!='').groupByKey().map(lambda x: (x[0],list(x[1])))

#only for debugging
#print(lists.collect())

In [None]:
keys = lists.keys().collect()
values = lists.values().collect()

data = dict(zip(keys,values))
print(data)

def Mapper_Pcss(v,nlist):
  for vi in nlist:
    if v<vi:
      key = (v,vi)
    else:
      key = (vi,v)
    value = nlist
    yield (key,value)

#When you call a transformation like map on an RDD, PySpark will automatically distribute the computation across the available nodes in the cluster.
result = lists.flatMap(lambda x: Mapper_Pcss(x[0],x[1]))
print(result.collect())


In [None]:
# combiner
def combiner_Pcss(r):
  return r.groupByKey().map(lambda x: (x[0],list(x[1])))

combiner1 = combiner_Pcss(result)
print(combiner1.collect())

[(('0', '1'), [['1', '4'], ['0', '2', '3', '4']]), (('0', '4'), [['1', '4'], ['0', '1', '3']]), (('1', '4'), [['0', '2', '3', '4'], ['0', '1', '3']]), (('2', '3'), [['1', '3'], ['1', '2', '4']]), (('1', '2'), [['0', '2', '3', '4'], ['1', '3']]), (('1', '3'), [['0', '2', '3', '4'], ['1', '2', '4']]), (('3', '4'), [['0', '1', '3'], ['1', '2', '4']])]


In [None]:

Key = combiner1.map(lambda m: m[0]).collect()
l1 = combiner1.map(lambda m: m[1][0]).collect()
l2 = combiner1.map(lambda m: m[1][1]).collect()
epsilon = 0.25

#Reducer
def reducer_Pcss(Key,Adl1,Adl2):
  similarity = Jack(Adl1,Adl2)
  if similarity> epsilon:
     return (Key,similarity)
  #return (Key,similarity)

#Calculating structural similarity
def Jack(a,b):
  a = set(tuple(x) for x in a)
  b = set(tuple(x) for x in b)
  intersection = float(len(a.intersection(b)))
  union = float(len(a.union(b)))
  #Only for dubugging
  print(a,b)
  print("Intersection:", intersection, "Union:", union)
  s = float(intersection)/float(union)
  return s

reducer_one = list(map(lambda X: reducer_Pcss(X[0], X[1], X[2]), zip(Key, l1, l2)))
reducer1_rdd = sc.parallelize(reducer_one)
print(reducer1_rdd.collect())

{('4',), ('1',)} {('2',), ('0',), ('4',), ('3',)}
Intersection: 1.0 Union: 5.0
{('4',), ('1',)} {('0',), ('1',), ('3',)}
Intersection: 1.0 Union: 4.0
{('2',), ('0',), ('4',), ('3',)} {('0',), ('1',), ('3',)}
Intersection: 2.0 Union: 5.0
{('1',), ('3',)} {('2',), ('4',), ('1',)}
Intersection: 1.0 Union: 4.0
{('2',), ('0',), ('4',), ('3',)} {('1',), ('3',)}
Intersection: 1.0 Union: 5.0
{('2',), ('0',), ('4',), ('3',)} {('2',), ('4',), ('1',)}
Intersection: 2.0 Union: 5.0
{('0',), ('1',), ('3',)} {('2',), ('4',), ('1',)}
Intersection: 1.0 Union: 5.0
[None, None, (('1', '4'), 0.4), None, None, (('1', '3'), 0.4), None]


In [None]:
reducer_res = reducer1_rdd.filter(lambda x: x!=None)

n1 = reducer_res.map(lambda x: (x[0][0],x[0][1])).groupByKey().map(lambda x: (x[0],list(x[1])))
n2 = reducer_res.map(lambda x: (x[0][1],x[0][0])).groupByKey().map(lambda x: (x[0],list(x[1])))

partial = n1.union(n2).collect()
lpcc_network = sc.parallelize(partial)

#finding outliers
outliers = lists.keys().subtract(lpcc_network.keys())
print(lpcc_network.collect())

[('1', ['4', '3']), ('4', ['1']), ('3', ['1'])]
['0', '2']


In [None]:
import builtins as __builtin__

#class for managing the structure information of each node
class Structure:
  def __init__(self, status, label, adlist):
    self.status = status
    self.label = label
    self.adlist = adlist

  #for printing adjustment
  def __repr__(self):
    p = "{Status: " + str(self.status) + ", Label: " + str(self.label) + ", List: " + str(self.adlist) + "}"
    return p
  def __str__(self):
    p = "Status: " + str(self.status) + ", Label: " + str(self.label) + ", List: " + self.adlist
    return p

In [None]:
def LPCC_Mapper(v,status,label,adlist):
  if status==1:
    for vi in adlist:
      key = vi
      value = label
      yield (key,value)
  key=v
  value = Structure(status,label,adlist)
  yield (key,value)

In [None]:
def aux_combiner(t):
  labels = []
  for elem in t[1]:
    if type(elem)==Structure:
      struc = elem
    else:
      labels.append(elem)
  return (t[0],[struc,labels])

def LPCC_combiner(r):
  temp = r.groupByKey().map(lambda x: (x[0],list(x[1]))).map(lambda x: aux_combiner(x))
  #r1 = r.filter(lambda x: type(x[1])==Structure).groupByKey().map(lambda x: (x[0],list(x[1])[0]))
  return temp

In [None]:

def LPCC_Reducer(v,status,label,adlist,labels):
  if len(labels)!=0:
    lmin = int(labels[0])
    for l in labels:
      if int(l) < lmin:
        lmin = int(l)
    if lmin<int(label):
      status = 1
      label = lmin
    else:
      status = 0
  else:
    status = 0
  key = v
  value= Structure(status,label,adlist)
  return (key,value)


In [None]:
def LPCC(network):
  graph = network.map(lambda x: (x[0],Structure(1,x[0],x[1])))

  #only for debugging
  print(graph.collect())

  #counting the number of activated nodes
  s = graph.map(lambda x: x[1].status).reduce(lambda x, y: x+y)

  while(s>0):

    m2 =graph.flatMap(lambda x: LPCC_Mapper(x[0],x[1].status,x[1].label,x[1].adlist))

    print("After mapper: ")
    print(m2.collect())

    combined = LPCC_combiner(m2)

    print("After combiner: ")
    print(combined.collect())

    #calling reducer
    key = combined.flatMap(lambda x: x[0]).collect()
    stat = combined.map(lambda x: x[1][0].status).collect()
    label = combined.map(lambda x: x[1][0].label).collect()
    adlist = combined.map(lambda x: x[1][0].adlist).collect()
    labels = (combined.map(lambda x: x[1][1]).collect())
    r2 = list(map(lambda x: LPCC_Reducer(x[0],x[1],x[2],x[3],x[4]),zip(key,stat,label,adlist,labels)))
    graph = sc.parallelize(r2)

    print("After reducer: ")
    print(graph.collect())
    s = graph.map(lambda x: x[1].status).reduce(lambda x, y: x+y)
    print(s)

  clusters = graph.map(lambda x: (int(x[1].label),x[0])).groupByKey().map(lambda x: (x[0],list(x[1])))
  print("FINAL CLUSTERS: ")
  print(clusters.collect())

LPCC(lpcc_network)

[('1', {Status: 1, Label: 1, List: ['4', '3']}), ('4', {Status: 1, Label: 4, List: ['1']}), ('3', {Status: 1, Label: 3, List: ['1']})]
After mapper: 
[('4', '1'), ('3', '1'), ('1', {Status: 1, Label: 1, List: ['4', '3']}), ('1', '4'), ('4', {Status: 1, Label: 4, List: ['1']}), ('1', '3'), ('3', {Status: 1, Label: 3, List: ['1']})]
After combiner: 
[('4', [{Status: 1, Label: 4, List: ['1']}, ['1']]), ('1', [{Status: 1, Label: 1, List: ['4', '3']}, ['4', '3']]), ('3', [{Status: 1, Label: 3, List: ['1']}, ['1']])]
After reducer: 
[('4', {Status: 1, Label: 1, List: ['1']}), ('1', {Status: 0, Label: 1, List: ['4', '3']}), ('3', {Status: 1, Label: 1, List: ['1']})]
2
After mapper: 
[('1', 1), ('4', {Status: 1, Label: 1, List: ['1']}), ('1', {Status: 0, Label: 1, List: ['4', '3']}), ('1', 1), ('3', {Status: 1, Label: 1, List: ['1']})]
After combiner: 
[('1', [{Status: 0, Label: 1, List: ['4', '3']}, [1, 1]]), ('4', [{Status: 1, Label: 1, List: ['1']}, []]), ('3', [{Status: 1, Label: 1, List: 

In [None]:
print("OUTLIERS")
print(outliers.collect())

OUTLIERS
['0', '2']


In [None]:
spark.stop()