# GRAPH WITH SPARK: AN INTRODUCTION

In this tutorial we will give you *an introduction of a graph analysis with Spark.* 

We will work on a railway problem where we suppose to have a new train company that use some binaries. 

We are interested in their connectivities and in particular in:
- *the importance of each railway station*
- *seeing how they are clustered the different cities*
- *seeing what is the connectivity around the most important ones*
- *what is the best path starting from a point and arriving to another one*

## Settings

Follow these steps to run the notebook:

1. First of all **start and attach your DataBricks cluster to this notebook**.

2. After that install the **geoframes jar library** according to your Spark and Scala version. This jar in fundamental if you want to use the graphframe library.

3. Finally install these python libraries: **geopy**, **pandas**, **sklearn**, **graphframes**, **folium**.

First of all let's import all the libraries that we need.

In [0]:
# for geodata
from geopy.distance import geodesic
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
import folium
# for our computations
import pandas as pd
import numpy as np
from sklearn.datasets import make_blobs
import random
# for working with Spark
from graphframes import *
from pyspark.sql import functions as F

Let's fix a seed.

In [0]:
random.seed(10)

## 1. CREATION OF THE GRAPH DATASET

In this part of the notebook *we will create the railway dataset.*

For doing it we will introduce some italian cities, according to their latitude and longitude, connecting them with an edge. 

After that we will create some other nodes around each city, adding some random links between them.

We finally note that to create a graphframe we need two things:
1. **A vertex DataFrame** containing a special column named **id** which specifies unique IDs for each vertex in the graph.
2. **An edge DataFrame** containing two special columns: **src** (source vertex ID of edge) and **dst** (destination vertex ID of edge).

We start creating a geolocator to detect more information on the cities using both the latitude and the longitude.

In [0]:
geocoder = Nominatim(user_agent = 'your_app_name')
geocode = RateLimiter(geocoder.reverse, min_delay_seconds = 1,   return_value_on_exception = None) 

We create a row for each city using the latitude and longitude, a unique id, some information about the city and a relative cluster. This last number will be used later.

In [0]:
id_cities = [0,1,2,3,4]
dict_geo = {
    "coordTorino" : [45.07049,7.68682],
    "coordMilano" : [45.46427, 9.18951],
    "coordGenova":[44.4264, 8.91519],
    "coordRoma" : [41.9027,12.4963],
    "coordReggioCalabria":[38.11047, 15.66129], 
}
df_citta=pd.DataFrame(
                np.column_stack((id_cities, (list(map(lambda x:x[0],dict_geo.values()))), 
                        list(map(lambda x:x[1],dict_geo.values())), id_cities
                        )),
                        columns = ["cluster", "latitude", "longitude", "id"])
df_citta["nome_citta"] = list(map(lambda x, y: geocode((x, y), exactly_one=True).raw["address"], df_citta["latitude"], df_citta["longitude"]))
df_citta.head()  

Unnamed: 0,cluster,latitude,longitude,id,nome_citta
0,0.0,45.07049,7.68682,0.0,"{'road': 'Piazza Castello', 'neighbourhood': '..."
1,1.0,45.46427,9.18951,1.0,{'historic': 'Monumento a Vittorio Emanuele II...
2,2.0,44.4264,8.91519,2.0,"{'road': 'Via Bartolomeo Bianco', 'suburb': 'L..."
3,3.0,41.9027,12.4963,3.0,"{'amenity': 'Fontana delle Naiadi', 'road': 'P..."
4,4.0,38.11047,15.66129,4.0,"{'highway': 'Ospedale', 'road': 'Raccordo auto..."


In [0]:
colors_dict = {
    0.0:"blue",
    1.0:"red",
    2.0:"green",
    3.0:"yellow",
    4.0:"pink"
}

Let's see the **results in a map.**

In [0]:
mapCity = folium.Map(
    location=dict_geo["coordRoma"],
    tiles='cartodbpositron',
    zoom_start=6,
)
df_citta.apply(lambda row:folium.CircleMarker(location=[row["latitude"], row["longitude"]],  color = colors_dict[row["cluster"]]).add_to(mapCity), axis=1)
mapCity

Now we want to **create the link between the northen cities and another one with the cities of the south.**

We suppose there is a train between them and **we take the distance as the link weight.**

In [0]:
edges = pd.DataFrame(columns=["src", "dst","relationship",  "distance"])
for origin in df_citta.iterrows():
  for destination in df_citta.iterrows():
    if (origin[1][0] != destination[1][0]) and (
      (origin[1]["cluster"] <= 2 and destination[1]["cluster"] <= 2) or 
      ((origin[1]["cluster"] > 2 and destination[1]["cluster"] > 2))):
      new_edge = pd.DataFrame({
                    "src":[origin[1]["id"]],
                    "dst":[destination[1]["id"]],
                    "relationship": ["has_train"],
                    "distance": [geodesic(list(origin[1][["latitude", "longitude"]]), list(destination[1][["latitude", "longitude"]])).km]
               }, columns=["src", "dst","relationship",  "distance"])
      edges = edges.append(new_edge)
      folium.PolyLine([tuple(origin[1][["latitude", "longitude"]]), tuple(destination[1][["latitude", "longitude"]])], color ="blue").add_to(mapCity)
                
edges

Unnamed: 0,src,dst,relationship,distance
0,0.0,1.0,has_train,125.78595
0,0.0,2.0,has_train,120.768859
0,1.0,0.0,has_train,125.78595
0,1.0,2.0,has_train,117.353317
0,2.0,0.0,has_train,120.768859
0,2.0,1.0,has_train,117.353317
0,3.0,4.0,has_train,500.240974
0,4.0,3.0,has_train,500.240974


Let's see what we have done.

In [0]:
mapCity

Now we have the main cities and links and **we want to create the other cities randomly.**

In [0]:
first = 50
second = 20
third = 20
fourth = 20
fifth = 10

n_samples = [first, second, third, fourth, fifth]

In [0]:
centers = [dict_geo["coordTorino"],
           dict_geo["coordMilano"],
           dict_geo["coordGenova"],
           dict_geo["coordRoma"],
           dict_geo["coordReggioCalabria"],
          ]

#Deviazione dai centri del cluster
cluster_std = [0.01, 0.01,0.01, 0.01, 0.01]  

X, labels_true = make_blobs(n_samples=n_samples, centers=centers, cluster_std=cluster_std, random_state=0)
df_generato=pd.DataFrame(
                np.column_stack((labels_true, X)),
                        columns = ["cluster","latitude", "longitude"])

df_generato = df_generato.reset_index()
df_generato["id"] = df_generato.index + len(id_cities)
df_generato = df_generato.drop(["index"], axis = 1)
df_generato["nome_citta"] = list(map(lambda x, y: geocode((x, y), exactly_one=True).raw["address"], df_generato["latitude"], df_generato["longitude"]))
df_generato.head()

Unnamed: 0,cluster,latitude,longitude,id,nome_citta
0,0.0,45.061535,7.690689,5,"{'house_number': '29 scala B', 'road': 'Via de..."
1,0.0,45.064147,7.683193,6,"{'house_number': '32/B', 'road': 'Via Carlo Al..."
2,4.0,38.125415,15.64059,7,"{'village': 'Archi', 'city': 'Reggio di Calabr..."
3,1.0,45.468034,9.178516,8,"{'shop': 'Carminati parrucchieri', 'house_numb..."
4,0.0,45.066617,7.683797,9,"{'amenity': 'Banca Popolare di Vicenza', 'road..."


In [0]:
df_generato.apply(lambda row:folium.CircleMarker(location=[row["latitude"], row["longitude"]], color = colors_dict[row["cluster"]]).add_to(mapCity), axis=1)
mapCity

In this case for each new city we have assigned a unique id and a cluster related to the city which generated from.

We concatenate this new dataset with the previous one.

In [0]:
df_tot = pd.concat([df_citta, df_generato])
df_tot.head()

Unnamed: 0,cluster,latitude,longitude,id,nome_citta
0,0.0,45.07049,7.68682,0.0,"{'road': 'Piazza Castello', 'neighbourhood': '..."
1,1.0,45.46427,9.18951,1.0,{'historic': 'Monumento a Vittorio Emanuele II...
2,2.0,44.4264,8.91519,2.0,"{'road': 'Via Bartolomeo Bianco', 'suburb': 'L..."
3,3.0,41.9027,12.4963,3.0,"{'amenity': 'Fontana delle Naiadi', 'road': 'P..."
4,4.0,38.11047,15.66129,4.0,"{'highway': 'Ospedale', 'road': 'Raccordo auto..."


Now let's **create some new random links in each city.**

For each cluster around a city we will put a random number of edge to each node.

In [0]:
edges_random = pd.DataFrame(columns=["src", "dst", "relationship",  "distance"])
for city in id_cities:
    df_city = df_tot[df_tot["cluster"] == city].reset_index()
    max_rows = df_city.shape[0]
    
    for origin in df_city.iterrows():
        if not len(np.where(origin[1][0] == edges.src)[0]):
          rand_num_edges = random.randint(0,round(max_rows/4))
          random_edge = random.sample(range(0, max_rows), rand_num_edges)
          df_city_random = df_city.iloc[random_edge]
          for destination in df_city_random.iterrows():
            if origin[1][0] != destination[1][0]:
              new_edge = pd.DataFrame({
                        "src":[origin[1]["id"]],
                        "dst":[destination[1]["id"]],
                        "relationship": ["random_connection"],
                        "distance": [geodesic(list(origin[1][["latitude", "longitude"]]), list(destination[1][["latitude", "longitude"]])).km]
                    }, columns=["src", "dst","relationship",  "distance"])
              folium.PolyLine([tuple(origin[1][["latitude", "longitude"]]), tuple(destination[1][["latitude", "longitude"]])], color = "black").add_to(mapCity)

              edges_random = edges_random.append(new_edge)

In [0]:
mapCity

In [0]:
edges_random

Unnamed: 0,src,dst,relationship,distance
0,10.0,6.0,random_connection,2.954243
0,10.0,61.0,random_connection,2.293505
0,10.0,66.0,random_connection,2.495179
0,10.0,77.0,random_connection,2.167865
0,10.0,0.0,random_connection,2.228013
...,...,...,...,...
0,88.0,96.0,random_connection,1.889797
0,89.0,4.0,random_connection,1.638836
0,96.0,4.0,random_connection,1.711160
0,96.0,56.0,random_connection,0.760816


Concatenate with the previous edges.

In [0]:
edges_total = pd.concat([edges_random, edges])
edges_total.shape

In [0]:
edges_total.relationship.unique()

## 3. ANALYTICS WITH GRAPHFRAME

Let's move on the graph analytics with Spark. 

First of all we need to create our graph.

We will divide this section in 3 parts:
1. Create a graph
2. Basic queries
3. Graph algorithms

### 3.1 CREATE A GRAPH

First of all we need to create our two spark dataframes.

In [0]:
df_citta_spark, edge_spark = spark.createDataFrame(df_tot), spark.createDataFrame(edges_total)

**With the command <code>GraphFrame</code> we can create our graph.**

In [0]:
g = GraphFrame(df_citta_spark, edge_spark)
g.cache()

Let's note that the **nodes and edges remain spark dataframes.**

In [0]:
display(g.vertices)

cluster,latitude,longitude,id,nome_citta
0.0,45.07049,7.68682,0.0,"Map(city -> Torino, state -> Piemonte, postcode -> 1023, country -> Italia, county -> Torino, suburb -> Centro, country_code -> it, road -> Piazza Castello, neighbourhood -> Quadrilatero Romano)"
1.0,45.46427,9.18951,1.0,"Map(city -> Milano, state -> Lombardia, postcode -> 20122, country -> Italia, suburb -> Municipio 1, country_code -> it, province -> Milano, road -> 1_33051, neighbourhood -> Duomo, historic -> Monumento a Vittorio Emanuele II)"
2.0,44.4264,8.91519,2.0,"Map(city -> Genova, state -> Liguria, postcode -> 16127, country -> Italia, county -> Genova, suburb -> Lagaccio, country_code -> it, road -> Via Bartolomeo Bianco)"
3.0,41.9027,12.4963,3.0,"Map(city -> Roma, state -> Lazio, postcode -> 00184, amenity -> Fontana delle Naiadi, country -> Italia, county -> Roma Capitale, suburb -> Municipio Roma I, country_code -> it, road -> Piazza della Repubblica)"
4.0,38.11047,15.66129,4.0,"Map(city -> Reggio di Calabria, state -> Calabria, postcode -> 89124, highway -> Ospedale, country -> Italia, county -> Reggio di Calabria, country_code -> it, road -> Raccordo autostradale RA4)"
0.0,45.06153533438806,7.690689024978592,5.0,"Map(house_number -> 29 scala B, city -> Torino, state -> Piemonte, postcode -> 10123, country -> Italia, county -> Torino, suburb -> Circoscrizione 1, country_code -> it, road -> Via dei Mille, neighbourhood -> Borgo Nuovo)"
0.0,45.06414677906319,7.683192588340129,6.0,"Map(house_number -> 32/B, city -> Torino, state -> Piemonte, postcode -> 10123, country -> Italia, county -> Torino, suburb -> Circoscrizione 1, country_code -> it, road -> Via Carlo Alberto, neighbourhood -> Borgo Nuovo)"
4.0,38.12541484544492,15.640590149749864,7.0,"Map(city -> Reggio di Calabria, state -> Calabria, postcode -> 89121, country -> Italia, county -> Reggio di Calabria, village -> Archi, country_code -> it)"
1.0,45.46803425531156,9.17851599209416,8.0,"Map(house_number -> 55, shop -> Carminati parrucchieri, city -> Milano, state -> Lombardia, postcode -> 20121, country -> Italia, suburb -> Municipio 1, country_code -> it, province -> Milano, road -> Foro Buonaparte, neighbourhood -> Duomo)"
0.0,45.06661673182592,7.683796972494247,9.0,"Map(city -> Torino, state -> Piemonte, postcode -> 10123, amenity -> Banca Popolare di Vicenza, country -> Italia, county -> Torino, suburb -> Circoscrizione 1, country_code -> it, road -> Via Giovanni Giolitti, neighbourhood -> Borgo Nuovo)"


In [0]:
display(g.edges)

src,dst,relationship,distance
10.0,6.0,random_connection,2.9542426216275905
10.0,61.0,random_connection,2.293504947845127
10.0,66.0,random_connection,2.4951790640347404
10.0,77.0,random_connection,2.167864844253848
10.0,0.0,random_connection,2.2280134585287192
10.0,29.0,random_connection,0.6280226803181408
10.0,65.0,random_connection,2.931582074959745
10.0,68.0,random_connection,1.9035995086094697
10.0,42.0,random_connection,2.1896876957289604
12.0,21.0,random_connection,1.83316063401975


### 3.2 BASIC QUERIES

We can investigate the nodes with **maximum number of railways, both incoming and outcoming**.

In [0]:
display(g.degrees.sort(F.col("degree").desc()))

id,degree
124.0,23
123.0,22
77.0,21
55.0,21
83.0,20
21.0,20
61.0,19
76.0,19
13.0,19
121.0,19


But sometimes it's more interesting to know what are the most attracting cities, so the cities with **the most incoming railways,** filtering away the nodes that have no connections.

In [0]:
display(g.inDegrees.filter("inDegree >= 1").sort("inDegree", ascending=False))

id,inDegree
123.0,15
63.0,13
21.0,12
52.0,11
61.0,11
83.0,10
51.0,10
27.0,10
6.0,10
124.0,10


Some other times it's interesting to know what are the cities that you can leave easily as well, so with **the most outgoing degree.**

In [0]:
display(g.outDegrees.filter("outDegree >= 1").sort("outDegree", ascending=False))

id,outDegree
55.0,13
124.0,13
13.0,13
121.0,13
38.0,13
76.0,12
99.0,12
73.0,12
31.0,12
77.0,11


If we are interested in **starting from a city and go to another one** we can use a <code>motifs</code>.

In [0]:
paths = g.find("(a)-[e]->(b);(b)-[e2]->(c)")\
  .filter("e.relationship = 'random_connection'")\
  .filter("b.id in (0,1,2,3,4)")\
  .filter("a.cluster != c.cluster")\
  .select("a", "c")
display(paths)

a,c
"List(0.0, 45.085372521937956, 7.705778891760306, 10.0, Map(city -> Torino, state -> Piemonte, postcode -> 10154, amenity -> I.I.S Bodoni Paravia, country -> Italia, county -> Torino, suburb -> Circoscrizione 6, country_code -> it, road -> Via Amilcare Ponchielli))","List(2.0, 44.4264, 8.91519, 2.0, Map(city -> Genova, state -> Liguria, postcode -> 16127, country -> Italia, county -> Genova, suburb -> Lagaccio, country_code -> it, road -> Via Bartolomeo Bianco))"
"List(0.0, 45.085372521937956, 7.705778891760306, 10.0, Map(city -> Torino, state -> Piemonte, postcode -> 10154, amenity -> I.I.S Bodoni Paravia, country -> Italia, county -> Torino, suburb -> Circoscrizione 6, country_code -> it, road -> Via Amilcare Ponchielli))","List(1.0, 45.46427, 9.18951, 1.0, Map(city -> Milano, state -> Lombardia, postcode -> 20122, country -> Italia, suburb -> Municipio 1, country_code -> it, province -> Milano, road -> 1_33051, neighbourhood -> Duomo, historic -> Monumento a Vittorio Emanuele II))"
"List(0.0, 45.07810037725147, 7.688036750164929, 38.0, Map(house_number -> 20, city -> Torino, state -> Piemonte, postcode -> 10152, country -> Italia, county -> Torino, suburb -> Circoscrizione 7, country_code -> it, road -> Corso Undici Febbraio, neighbourhood -> Porta Palazzo))","List(2.0, 44.4264, 8.91519, 2.0, Map(city -> Genova, state -> Liguria, postcode -> 16127, country -> Italia, county -> Genova, suburb -> Lagaccio, country_code -> it, road -> Via Bartolomeo Bianco))"
"List(0.0, 45.07810037725147, 7.688036750164929, 38.0, Map(house_number -> 20, city -> Torino, state -> Piemonte, postcode -> 10152, country -> Italia, county -> Torino, suburb -> Circoscrizione 7, country_code -> it, road -> Corso Undici Febbraio, neighbourhood -> Porta Palazzo))","List(1.0, 45.46427, 9.18951, 1.0, Map(city -> Milano, state -> Lombardia, postcode -> 20122, country -> Italia, suburb -> Municipio 1, country_code -> it, province -> Milano, road -> 1_33051, neighbourhood -> Duomo, historic -> Monumento a Vittorio Emanuele II))"
"List(0.0, 45.093187546239875, 7.672276343254012, 51.0, Map(house_number -> 21/G, city -> Torino, state -> Piemonte, postcode -> 10149, country -> Italia, county -> Torino, suburb -> Circoscrizione 5, country_code -> it, road -> Corso Benedetto Brin, neighbourhood -> Borgata Tesso))","List(2.0, 44.4264, 8.91519, 2.0, Map(city -> Genova, state -> Liguria, postcode -> 16127, country -> Italia, county -> Genova, suburb -> Lagaccio, country_code -> it, road -> Via Bartolomeo Bianco))"
"List(0.0, 45.093187546239875, 7.672276343254012, 51.0, Map(house_number -> 21/G, city -> Torino, state -> Piemonte, postcode -> 10149, country -> Italia, county -> Torino, suburb -> Circoscrizione 5, country_code -> it, road -> Corso Benedetto Brin, neighbourhood -> Borgata Tesso))","List(1.0, 45.46427, 9.18951, 1.0, Map(city -> Milano, state -> Lombardia, postcode -> 20122, country -> Italia, suburb -> Municipio 1, country_code -> it, province -> Milano, road -> 1_33051, neighbourhood -> Duomo, historic -> Monumento a Vittorio Emanuele II))"
"List(0.0, 45.07778090562177, 7.6881098291075745, 73.0, Map(house_number -> 20, city -> Torino, state -> Piemonte, postcode -> 10152, country -> Italia, county -> Torino, suburb -> Circoscrizione 7, country_code -> it, road -> Corso Undici Febbraio, neighbourhood -> Porta Palazzo))","List(2.0, 44.4264, 8.91519, 2.0, Map(city -> Genova, state -> Liguria, postcode -> 16127, country -> Italia, county -> Genova, suburb -> Lagaccio, country_code -> it, road -> Via Bartolomeo Bianco))"
"List(0.0, 45.07778090562177, 7.6881098291075745, 73.0, Map(house_number -> 20, city -> Torino, state -> Piemonte, postcode -> 10152, country -> Italia, county -> Torino, suburb -> Circoscrizione 7, country_code -> it, road -> Corso Undici Febbraio, neighbourhood -> Porta Palazzo))","List(1.0, 45.46427, 9.18951, 1.0, Map(city -> Milano, state -> Lombardia, postcode -> 20122, country -> Italia, suburb -> Municipio 1, country_code -> it, province -> Milano, road -> 1_33051, neighbourhood -> Duomo, historic -> Monumento a Vittorio Emanuele II))"
"List(1.0, 45.46823006712662, 9.178579384912695, 115.0, Map(house_number -> 55, shop -> Carminati parrucchieri, city -> Milano, state -> Lombardia, postcode -> 20121, country -> Italia, suburb -> Municipio 1, country_code -> it, province -> Milano, road -> Foro Buonaparte, neighbourhood -> Duomo))","List(2.0, 44.4264, 8.91519, 2.0, Map(city -> Genova, state -> Liguria, postcode -> 16127, country -> Italia, county -> Genova, suburb -> Lagaccio, country_code -> it, road -> Via Bartolomeo Bianco))"
"List(1.0, 45.46823006712662, 9.178579384912695, 115.0, Map(house_number -> 55, shop -> Carminati parrucchieri, city -> Milano, state -> Lombardia, postcode -> 20121, country -> Italia, suburb -> Municipio 1, country_code -> it, province -> Milano, road -> Foro Buonaparte, neighbourhood -> Duomo))","List(0.0, 45.07049, 7.68682, 0.0, Map(city -> Torino, state -> Piemonte, postcode -> 1023, country -> Italia, county -> Torino, suburb -> Centro, country_code -> it, road -> Piazza Castello, neighbourhood -> Quadrilatero Romano))"


### 3.3 GRAPH ALGORITHMS

Let's move on and use some graph algorithms.

We first start with some community algorithms:
- *Strongly connected components*
- *Weakly connected components*
- *Label propagation*

In [0]:
resultSCC = g.stronglyConnectedComponents(maxIter=10)
display(resultSCC.select("component").distinct())

component
1511828488192
1443109011456
755914244096
1065151889408
1030792151040
283467841536
25769803777
120259084288
68719476736
858993459200


In [0]:
pd_resultSCC = resultSCC.toPandas()


no_of_colors=pd_resultSCC["component"].nunique()
color=["#"+''.join([random.choice('0123456789ABCDEF') for i in range(6)])
       for j in range(no_of_colors)]
color_dict = {}
clusters = pd_resultSCC["component"].unique()
for i in range(0,no_of_colors):
  color_dict[clusters[i]] =  color[i]

We can see the results in a map.

In [0]:
mapResultSCC = folium.Map(
    location=dict_geo["coordRoma"],
    tiles='cartodbpositron',
    zoom_start=6,
)
pd_resultSCC.apply(lambda row:folium.CircleMarker(location=[row["latitude"], row["longitude"]],  color = color_dict[row["component"]]).add_to(mapResultSCC), axis=1)
mapResultSCC

As we can see not all the stations in same starting cluster belong to the same cluster according to the algorithm.

The reason is that the **strongly connected component clusters all the stations where we have a path starting from a node, we go to some other nodes, and we come back to the first one.** Sometimes is not possible to come back. If it's not possible the first node and the last one aren't in the same cluster.

In [0]:
spark.sparkContext.setCheckpointDir("/tmp/graphframes-example-connected-components")
resultCC = g.connectedComponents()
display(resultCC)

cluster,latitude,longitude,id,nome_citta,component
0.0,45.07049,7.68682,0.0,"Map(city -> Torino, state -> Piemonte, postcode -> 1023, country -> Italia, county -> Torino, suburb -> Centro, country_code -> it, road -> Piazza Castello, neighbourhood -> Quadrilatero Romano)",25769803776
1.0,45.46427,9.18951,1.0,"Map(city -> Milano, state -> Lombardia, postcode -> 20122, country -> Italia, suburb -> Municipio 1, country_code -> it, province -> Milano, road -> 1_33051, neighbourhood -> Duomo, historic -> Monumento a Vittorio Emanuele II)",25769803776
2.0,44.4264,8.91519,2.0,"Map(city -> Genova, state -> Liguria, postcode -> 16127, country -> Italia, county -> Genova, suburb -> Lagaccio, country_code -> it, road -> Via Bartolomeo Bianco)",25769803776
3.0,41.9027,12.4963,3.0,"Map(city -> Roma, state -> Lazio, postcode -> 00184, amenity -> Fontana delle Naiadi, country -> Italia, county -> Roma Capitale, suburb -> Municipio Roma I, country_code -> it, road -> Piazza della Repubblica)",25769803778
4.0,38.11047,15.66129,4.0,"Map(city -> Reggio di Calabria, state -> Calabria, postcode -> 89124, highway -> Ospedale, country -> Italia, county -> Reggio di Calabria, country_code -> it, road -> Raccordo autostradale RA4)",25769803778
0.0,45.06153533438806,7.690689024978592,5.0,"Map(house_number -> 29 scala B, city -> Torino, state -> Piemonte, postcode -> 10123, country -> Italia, county -> Torino, suburb -> Circoscrizione 1, country_code -> it, road -> Via dei Mille, neighbourhood -> Borgo Nuovo)",25769803776
0.0,45.06414677906319,7.683192588340129,6.0,"Map(house_number -> 32/B, city -> Torino, state -> Piemonte, postcode -> 10123, country -> Italia, county -> Torino, suburb -> Circoscrizione 1, country_code -> it, road -> Via Carlo Alberto, neighbourhood -> Borgo Nuovo)",25769803776
4.0,38.12541484544492,15.640590149749864,7.0,"Map(city -> Reggio di Calabria, state -> Calabria, postcode -> 89121, country -> Italia, county -> Reggio di Calabria, village -> Archi, country_code -> it)",68719476736
1.0,45.46803425531156,9.17851599209416,8.0,"Map(house_number -> 55, shop -> Carminati parrucchieri, city -> Milano, state -> Lombardia, postcode -> 20121, country -> Italia, suburb -> Municipio 1, country_code -> it, province -> Milano, road -> Foro Buonaparte, neighbourhood -> Duomo)",25769803776
0.0,45.06661673182592,7.683796972494247,9.0,"Map(city -> Torino, state -> Piemonte, postcode -> 10123, amenity -> Banca Popolare di Vicenza, country -> Italia, county -> Torino, suburb -> Circoscrizione 1, country_code -> it, road -> Via Giovanni Giolitti, neighbourhood -> Borgo Nuovo)",25769803776


In [0]:
pd_resultCC = resultCC.toPandas()


no_of_colors=pd_resultCC["component"].nunique()
color=["#"+''.join([random.choice('0123456789ABCDEF') for i in range(6)])
       for j in range(no_of_colors)]
color_dict = {}
clusters = pd_resultCC["component"].unique()
for i in range(0,no_of_colors):
  color_dict[clusters[i]] =  color[i]

In [0]:
mapResultCC = folium.Map(
    location=dict_geo["coordRoma"],
    tiles='cartodbpositron',
    zoom_start=6,
)
pd_resultCC.apply(lambda row:folium.CircleMarker(location=[row["latitude"], row["longitude"]],  color = color_dict[row["component"]]).add_to(mapResultCC), axis=1)
mapResultCC

In this case we have the result we were exptecting, except for the fact we have only **two major clusters + some isolated nodes.**

Let's try with the label propagation algorithm.

In [0]:
resultLP = g.labelPropagation(maxIter=5)
display(resultLP)

cluster,latitude,longitude,id,nome_citta,label
0.0,45.0891655799015,7.677047221201236,27.0,"Map(city -> Torino, state -> Piemonte, postcode -> 10147, country -> Italia, county -> Torino, suburb -> San Donato, country_code -> it, road -> Corso Principe Oddone, neighbourhood -> Parco Dora (Spina 3))",558345748481
0.0,45.0579620463995,7.694594903558319,58.0,"Map(house_number -> 10/B, city -> Torino, state -> Piemonte, postcode -> 10131, country -> Italia, county -> Torino, suburb -> Circoscrizione 8, country_code -> it, road -> Via Curtatone, neighbourhood -> Crimea)",558345748481
3.0,41.902307171817725,12.484619065022589,120.0,"Map(shop -> OVS, city -> Roma, state -> Lazio, postcode -> 00187, country -> Italia, quarter -> Trevi, county -> Roma Capitale, suburb -> Municipio Roma I, country_code -> it, road -> Via del Tritone)",292057776128
1.0,45.45253876594886,9.208946211856492,49.0,"Map(house_number -> 6, city -> Milano, state -> Lombardia, postcode -> 20135, country -> Italia, quarter -> Porta Romana, suburb -> Municipio 4, country_code -> it, province -> Milano, road -> Via Gerolamo Tiraboschi)",627065225216
4.0,38.12541484544492,15.640590149749864,7.0,"Map(city -> Reggio di Calabria, state -> Calabria, postcode -> 89121, country -> Italia, county -> Reggio di Calabria, village -> Archi, country_code -> it)",68719476736
0.0,45.06235853717956,7.669557173976683,13.0,"Map(house_number -> 30 scala B, city -> Torino, state -> Piemonte, postcode -> 10128, country -> Italia, county -> Torino, suburb -> Circoscrizione 1, country_code -> it, road -> Via Rodolfo Montevecchio, neighbourhood -> San Secondo)",558345748481
0.0,45.07999088417525,7.685306427917023,14.0,"Map(house_number -> 15, city -> Torino, state -> Piemonte, postcode -> 10152, country -> Italia, county -> Torino, suburb -> Circoscrizione 7, country_code -> it, road -> Corso Giulio Cesare, neighbourhood -> Porta Palazzo)",558345748481
0.0,45.08279290680728,7.698843798487844,31.0,"Map(house_number -> 7/A, city -> Torino, state -> Piemonte, postcode -> 10154, country -> Italia, county -> Torino, suburb -> Circoscrizione 6, country_code -> it, road -> Via Mottalciata, neighbourhood -> Monte Rosa)",558345748481
1.0,45.45156515001514,9.19920396708158,118.0,"Map(house_number -> 24, city -> Milano, state -> Lombardia, postcode -> 20122, country -> Italia, quarter -> Porta Romana, suburb -> Municipio 5, country_code -> it, province -> Milano, road -> Viale Angelo Filippetti)",627065225216
0.0,45.066458230530266,7.699044450703824,77.0,"Map(house_number -> 16 scala A, city -> Torino, state -> Piemonte, postcode -> 10124, country -> Italia, county -> Torino, suburb -> Circoscrizione 7, country_code -> it, road -> Via Eusebio Bava)",558345748481


In [0]:
pd_resultLP = resultLP.toPandas()

In [0]:
pd_resultLP = resultLP.toPandas()

no_of_colors=pd_resultLP["label"].nunique()
color=["#"+''.join([random.choice('0123456789ABCDEF') for i in range(6)])
       for j in range(no_of_colors)]
color_dict = {}
clusters = pd_resultLP["label"].unique()
for i in range(0,no_of_colors):
  color_dict[clusters[i]] =  color[i]

In [0]:
mapResultLP = folium.Map(
    location=dict_geo["coordRoma"],
    tiles='cartodbpositron',
    zoom_start=6,
)
pd_resultLP.apply(lambda row:folium.CircleMarker(location=[row["latitude"], row["longitude"]],  color = color_dict[row["label"]]).add_to(mapResultLP), axis=1)
mapResultLP

In this case we can do some similar comments that for the SCC. 

The algorithm is different, but the different colors in the same original cluster are likely related with the fact that these components are isolated.

Let's move on and let's analyze the most influential nodes. For this reason we use the **page rank centrality**.

In [0]:
results_pageRank = g.pageRank(resetProbability=0.15, tol=0.01)
display(results_pageRank.vertices.sort(F.col("pagerank").desc()))

cluster,latitude,longitude,id,nome_citta,pagerank
3.0,41.9027,12.4963,3.0,"Map(city -> Roma, state -> Lazio, postcode -> 00184, amenity -> Fontana delle Naiadi, country -> Italia, county -> Roma Capitale, suburb -> Municipio Roma I, country_code -> it, road -> Piazza della Repubblica)",9.835514523328142
4.0,38.11047,15.66129,4.0,"Map(city -> Reggio di Calabria, state -> Calabria, postcode -> 89124, highway -> Ospedale, country -> Italia, county -> Reggio di Calabria, country_code -> it, road -> Raccordo autostradale RA4)",9.270861515809786
2.0,44.4264,8.91519,2.0,"Map(city -> Genova, state -> Liguria, postcode -> 16127, country -> Italia, county -> Genova, suburb -> Lagaccio, country_code -> it, road -> Via Bartolomeo Bianco)",6.431625061706512
0.0,45.07049,7.68682,0.0,"Map(city -> Torino, state -> Piemonte, postcode -> 1023, country -> Italia, county -> Torino, suburb -> Centro, country_code -> it, road -> Piazza Castello, neighbourhood -> Quadrilatero Romano)",5.612734317935344
1.0,45.46427,9.18951,1.0,"Map(city -> Milano, state -> Lombardia, postcode -> 20122, country -> Italia, suburb -> Municipio 1, country_code -> it, province -> Milano, road -> 1_33051, neighbourhood -> Duomo, historic -> Monumento a Vittorio Emanuele II)",5.492139806189937
0.0,45.06414677906319,7.683192588340129,6.0,"Map(house_number -> 32/B, city -> Torino, state -> Piemonte, postcode -> 10123, country -> Italia, county -> Torino, suburb -> Circoscrizione 1, country_code -> it, road -> Via Carlo Alberto, neighbourhood -> Borgo Nuovo)",2.552978640096065
4.0,38.11203506537965,15.663611810362005,88.0,"Map(city -> Reggio di Calabria, state -> Calabria, postcode -> 89124, country -> Italia, county -> Reggio di Calabria, country_code -> it)",1.9421861486859548
4.0,38.09890817568178,15.6691019810171,56.0,"Map(city -> Reggio di Calabria, state -> Calabria, postcode -> 89131, country -> Italia, county -> Reggio di Calabria, village -> Gallina, country_code -> it, neighbourhood -> San Sperato)",1.8212852911239144
0.0,45.07115517222383,7.689844718977397,123.0,"Map(city -> Torino, state -> Piemonte, postcode -> 1023, country -> Italia, county -> Torino, suburb -> Circoscrizione 1, country_code -> it, road -> Viale Alessandro Luzio, leisure -> Giardino di Levante)",1.6464135436393716
0.0,45.074513416411776,7.679971899090597,32.0,"Map(house_number -> 16, city -> Torino, state -> Piemonte, postcode -> 10122, country -> Italia, county -> Torino, suburb -> Circoscrizione 1, country_code -> it, road -> Via Sant'Agostino, neighbourhood -> Quadrilatero Romano)",1.5866129237805648


As we can expect the three most important cities are the major three cities in the north. After that we have several nodes related to them. 

This is the effect of page rank centrality.

Sometimes we can be interest to visit a new city starting from a certain one. 

Let's see the <code>bfs</code> algorithm to select a pth from a point to another one.

In [0]:
paths = g.bfs("id = 9.0", "id = 51.0")

display(paths)

cluster,latitude,longitude,id,nome_citta


In this case we see that we have no path between the two points.

Let's try another example with a path.

In [0]:
paths = g.bfs("id = 10.0", "id = 51.0")

display(paths)

from,e0,v1,e1,to
"List(0.0, 45.085372521937956, 7.705778891760306, 10.0, Map(city -> Torino, state -> Piemonte, postcode -> 10154, amenity -> I.I.S Bodoni Paravia, country -> Italia, county -> Torino, suburb -> Circoscrizione 6, country_code -> it, road -> Via Amilcare Ponchielli))","List(10.0, 66.0, random_connection, 2.4951790640347404)","List(0.0, 45.081884006845435, 7.674471741796464, 66.0, Map(house_number -> 15/B, city -> Torino, state -> Piemonte, postcode -> 10152, country -> Italia, county -> Torino, suburb -> Circoscrizione 7, country_code -> it, road -> Via Brindisi, neighbourhood -> Valdocco))","List(66.0, 51.0, random_connection, 1.2680360016957888)","List(0.0, 45.093187546239875, 7.672276343254012, 51.0, Map(house_number -> 21/G, city -> Torino, state -> Piemonte, postcode -> 10149, country -> Italia, county -> Torino, suburb -> Circoscrizione 5, country_code -> it, road -> Corso Benedetto Brin, neighbourhood -> Borgata Tesso))"
"List(0.0, 45.085372521937956, 7.705778891760306, 10.0, Map(city -> Torino, state -> Piemonte, postcode -> 10154, amenity -> I.I.S Bodoni Paravia, country -> Italia, county -> Torino, suburb -> Circoscrizione 6, country_code -> it, road -> Via Amilcare Ponchielli))","List(10.0, 77.0, random_connection, 2.167864844253848)","List(0.0, 45.066458230530266, 7.699044450703824, 77.0, Map(house_number -> 16 scala A, city -> Torino, state -> Piemonte, postcode -> 10124, country -> Italia, county -> Torino, suburb -> Circoscrizione 7, country_code -> it, road -> Via Eusebio Bava))","List(77.0, 51.0, random_connection, 3.6422736204820465)","List(0.0, 45.093187546239875, 7.672276343254012, 51.0, Map(house_number -> 21/G, city -> Torino, state -> Piemonte, postcode -> 10149, country -> Italia, county -> Torino, suburb -> Circoscrizione 5, country_code -> it, road -> Corso Benedetto Brin, neighbourhood -> Borgata Tesso))"


In this case we have different solutions for going from a certain point to another. So we can decide what is our preferite path according to the cities visited or according to minimum distance. In this case we have to write a function to extract the total distance between them and select the minimum one.

# 4. CONCLUSION

This was only a brief introduction of a graph analysis with Spark. 

Look at the [user guide](https://graphframes.github.io/graphframes/docs/_site/user-guide.html) to find a lot of other functionalities that you can use.

In addition the [Graph Algorithms book](https://neo4j.com/graph-algorithms-book/) is a very good book to study some other details and understand how to apply them for some interesting use cases. 

Another very good source is the [Databricks documentation](https://docs.databricks.com/spark/latest/graph-analysis/graphframes/user-guide-scala.html) where you can find a lot of examples. 

Finally you can read [this article](https://towardsdatascience.com/graphframes-in-jupyter-a-practical-guide-9b3b346cebc5) on Medium to have some explaination of a graph analysis on a Jupyter notebook in local.