## Exercise List 6

### 1. Utilizing Apache Spark and other correlated tools, beyond the graph constructed in Exercise List 5, implement the following options:

####   a. Determine the central node through the node degree.
####   b. Determine the central node by centrality utilizing the Wasserman and harmonic distances.
####   c. Determine the central node by intermediation.

In [1]:
from pyspark import SparkConf
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from graphframes import *

In [2]:
conf = SparkConf().setAppName('appName').setMaster('local')
sc = SparkContext.getOrCreate(conf)
spark = SparkSession(sc)
vertexes = spark.read.csv('transport-nodes.csv', header=True)
edges = spark.read.csv('transport-relationships.csv', header=True)
graph = GraphFrame(vertexes, edges)

#### a. Determine the central note through the node degree

In [41]:
# Determination the central node through the node degree
graph_degree = graph.degrees
city = dict(degree=0, city_id='')
iter

for row_value in graph_degree.select("id", "degree").collect():
    if int(row_value.degree) > city.get('degree'):
        city['degree'] = row_value.degree
        city['city_id'] = row_value.id
print(f"The central node is {city.get('city_id')}, with degree {city.get('degree')}")

The central node is Igarassu, with degree 4


#### b. Determine the central node by centrality utilizing the Wasserman and harmonic distances.

In [40]:
# Determination of the central node through the harmonic and wasserman distances
graph_vertexes = graph.vertices.select("id").collect()
graph_edges = graph.edges.select("src", "dst", "relationship", "cost").collect()
distances_list = list()
vertex_list = list()

for vertex in graph_vertexes:
    
    data = dict(id='', vertex_distance=list(), harmonic_score=0, wasserman_score=0)
    distance = list()
    id_cost_values = list()
    if vertex.id not in distances_list:
        data['id'] = vertex.id
        vertex_list.append(vertex.id)
    
    for edge_value in graph_edges:
        if (edge_value.src == vertex.id):
            distance.append((edge_value.dst, edge_value.cost))
            id_cost_values.append(edge_value.cost)
        elif (edge_value.dst == vertex.id):
            distance.append((edge_value.src, edge_value.cost))
            id_cost_values.append(edge_value.cost)
        else:
            continue
    cost_sum = sum(map(int, id_cost_values))
    number_nodes = len(distance)
    harmonic_score = 1 / cost_sum
    wasserman_score = (number_nodes - 1) / cost_sum
    data['harmonic_score'] = harmonic_score
    data['wasserman_score'] = wasserman_score
    data['vertex_distance'] = distance
    distances_list.append(data)

def harmonic_closeness(distances_list):
    max_value = 0
    for dist_list_value in distances_list:
        if (dist_list_value['harmonic_score'] > max_value):
            max_value = dist_list_value['harmonic_score']
            central_node = dist_list_value
    return central_node

def wasserman_closeness(distances_list):
    max_value = 0
    for dist_list_value in distances_list:
        if (dist_list_value['wasserman_score'] > max_value):
            max_value = dist_list_value['wasserman_score']
            central_node = dist_list_value
    return central_node


print(f"The central node is, by harmonic distance: {harmonic_closeness(distances_list)}\n")
print(f"The central node is, by Wasserman distance: {wasserman_closeness(distances_list)}")


The central node is, by harmonic distance: {'id': 'Ouricuri', 'vertex_distance': [('Arco Verde', '22'), ('Surubim', '32')], 'harmonic_score': 0.018518518518518517, 'wasserman_score': 0.018518518518518517}

The central node is, by Wasserman distance: {'id': 'Salgueiro', 'vertex_distance': [('Igarassu', '26'), ('Exu', '25'), ('Serra Talhada', '33')], 'harmonic_score': 0.011904761904761904, 'wasserman_score': 0.023809523809523808}
