In [24]:
#Estudo de Grafos em transportes

In [25]:
#importa a biblioteca que cria a seção do spark
from pyspark.sql import SparkSession 

In [26]:
#inicia a seção para a utilização do spark
spark = SparkSession\
        .builder\
        .appName("transporteGrafos")\
        .getOrCreate() #cria a seção caso não exista ou obtém a já criada

Carregando o arquivo

In [27]:
#diretório que contém o arquivo a ser utilizado para os nos
nosTransporte="./data/transport_nodes-7c826.csv"

#diretório que contém o arquivo com as relações
relacionamentosTransporte="./data/transport_relationships-c2bfc.csv" 

In [28]:
#importando os banco de dados
dfNos = spark.read.format('csv').options(header='true',delimiter=',', inferSchema=True).load(nosTransporte)

In [29]:
dfNos.show()

+----------------+---------+---------+----------+
|              id| latitude|longitude|population|
+----------------+---------+---------+----------+
|       Amsterdam|52.379189| 4.899431|    821752|
|         Utrecht|52.092876|  5.10448|    334176|
|        Den Haag|52.078663| 4.288788|    514861|
|       Immingham| 53.61239| -0.22219|      9642|
|       Doncaster| 53.52285| -1.13116|    302400|
|Hoek van Holland|  51.9775|  4.13333|      9382|
|      Felixstowe| 51.96375|   1.3511|     23689|
|         Ipswich| 52.05917|  1.15545|    133384|
|      Colchester| 51.88921|  0.90421|    104390|
|          London|51.509865|-0.118092|   8787892|
|       Rotterdam|  51.9225|  4.47917|    623652|
|           Gouda| 52.01667|  4.70833|     70939|
+----------------+---------+---------+----------+



In [30]:
dfRelacionamentos = spark.read.format('csv').options(header='true',delimiter=',', inferSchema=True).load(relacionamentosTransporte)

In [31]:
dfRelacionamentos.show()

+----------------+----------------+------------+----+
|             src|             dst|relationship|cost|
+----------------+----------------+------------+----+
|       Amsterdam|         Utrecht|       EROAD|  46|
|       Amsterdam|        Den Haag|       EROAD|  59|
|        Den Haag|       Rotterdam|       EROAD|  26|
|       Amsterdam|       Immingham|       EROAD| 369|
|       Immingham|       Doncaster|       EROAD|  74|
|       Doncaster|          London|       EROAD| 277|
|Hoek van Holland|        Den Haag|       EROAD|  27|
|      Felixstowe|Hoek van Holland|       EROAD| 207|
|         Ipswich|      Felixstowe|       EROAD|  22|
|      Colchester|         Ipswich|       EROAD|  32|
|          London|      Colchester|       EROAD| 106|
|           Gouda|       Rotterdam|       EROAD|  25|
|           Gouda|         Utrecht|       EROAD|  35|
|        Den Haag|           Gouda|       EROAD|  32|
|Hoek van Holland|       Rotterdam|       EROAD|  33|
+----------------+----------

Definindo e criando o grafo

In [32]:
#importando as funções para utilizar os grafos
from pyspark.sql.types import *

#contém os métodos para serem utilizados no processamento através dos grafos
from graphframes import *  

In [33]:
#definindo o "Esquema" para cada um dos nós
atributosNo = [
StructField("id", StringType(), True),\
StructField("latitude", FloatType(), True),\
StructField("longitude", FloatType(), True),\
StructField("population", IntegerType(), True)\
]

In [34]:
#importa os dados como nós
nos = spark.read.csv(nosTransporte, header=True,schema=StructType(atributosNo))

Definindo os relacionamentos diretos

In [35]:
#importa os dados como relacionamentos
relaci_direto = spark.read.csv(relacionamentosTransporte, header=True)

In [36]:
relaci_direto.show(5)

+---------+---------+------------+----+
|      src|      dst|relationship|cost|
+---------+---------+------------+----+
|Amsterdam|  Utrecht|       EROAD|  46|
|Amsterdam| Den Haag|       EROAD|  59|
| Den Haag|Rotterdam|       EROAD|  26|
|Amsterdam|Immingham|       EROAD| 369|
|Immingham|Doncaster|       EROAD|  74|
+---------+---------+------------+----+
only showing top 5 rows



Definindo os relacionamentos inversos

In [37]:
#define os relacionamentos com a troca das fontes e destinos
relaci_inverso = (relaci_direto.withColumn("newSrc", relaci_direto.dst).withColumn("newDst", relaci_direto.src)\
.drop("dst", "src")\
.withColumnRenamed("newSrc", "src")\
.withColumnRenamed("newDst", "dst")\
.select("src", "dst", "relationship", "cost"))

In [38]:
relaci_inverso.show(5)

+---------+---------+------------+----+
|      src|      dst|relationship|cost|
+---------+---------+------------+----+
|  Utrecht|Amsterdam|       EROAD|  46|
| Den Haag|Amsterdam|       EROAD|  59|
|Rotterdam| Den Haag|       EROAD|  26|
|Immingham|Amsterdam|       EROAD| 369|
|Doncaster|Immingham|       EROAD|  74|
+---------+---------+------------+----+
only showing top 5 rows



In [39]:
#cria o df com os relacionamentos diretos e inverso (torna o grafo bidirecional)
relacionamentos=relaci_direto.union(relaci_inverso)

In [40]:
relacionamentos.show()

+----------------+----------------+------------+----+
|             src|             dst|relationship|cost|
+----------------+----------------+------------+----+
|       Amsterdam|         Utrecht|       EROAD|  46|
|       Amsterdam|        Den Haag|       EROAD|  59|
|        Den Haag|       Rotterdam|       EROAD|  26|
|       Amsterdam|       Immingham|       EROAD| 369|
|       Immingham|       Doncaster|       EROAD|  74|
|       Doncaster|          London|       EROAD| 277|
|Hoek van Holland|        Den Haag|       EROAD|  27|
|      Felixstowe|Hoek van Holland|       EROAD| 207|
|         Ipswich|      Felixstowe|       EROAD|  22|
|      Colchester|         Ipswich|       EROAD|  32|
|          London|      Colchester|       EROAD| 106|
|           Gouda|       Rotterdam|       EROAD|  25|
|           Gouda|         Utrecht|       EROAD|  35|
|        Den Haag|           Gouda|       EROAD|  32|
|Hoek van Holland|       Rotterdam|       EROAD|  33|
|         Utrecht|       Ams

In [41]:
#define o grafo 
grafo=GraphFrame(nos, relacionamentos)

Py4JJavaError: An error occurred while calling o133.loadClass.
: java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
	at java.base/java.lang.reflect.Method.invoke(Method.java:577)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


23/01/16 01:39:30 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 934294 ms exceeds timeout 120000 ms
23/01/16 01:39:30 WARN SparkContext: Killing executors is not supported by current scheduler.


Explorando o nosso grafo

In [None]:
#encontrando quais são as cidades com mais de 100000 habitantes e menos de 300000
popMedia=grafo.vertices\
.filter("population > 100000 and population < 300000")\
.sort("population")\
.show()

In [None]:
#mostrando a quantidade de caminhos diretos (chegando)
display(grafo.inDegrees)

In [None]:
#mostrando a quantidade de caminhos inversos (saindo)
display(grafo.outDegrees)

In [None]:
#qual é o nó mais "importante" (tem mais caminhos que levam até ele)
total_degree = grafo.degrees
in_degree = grafo.inDegrees
out_degree = grafo.outDegrees


In [None]:
total_degree.show()

In [None]:
total_degree.join(in_degree, "id", how="left")\
.join(out_degree, "id", how="left")\
.fillna(0)\
.sort("inDegree", ascending=False)\
.show()

In [None]:
#realizando consultas
motifs = grafo.find("(Amsterdam)-[e]->(Utrecht)")
display(motifs)

In [None]:
#filtrando os resultados da consulta anterior
filtered = motifs.filter("e.cost < 30")
display(filtered)

In [None]:
#encontrando o menor caminho entre a cidade de Den Haag e alguma das cidades com tamanho médio
origem = "id='Den Haag'"
destino = "population > 100000 and population < 300000 and id <> 'Den Haag'"
resultado = grafo.bfs(origem, destino) #bfs encontra o menor caminho entre dois nós

In [None]:
print(resultado.columns)  #colunas com 'e' representam as arestas (edges) e colunas com 'v' representam os nós (vértices)

In [None]:
#selecionamdo apenas os nós (selecionando quem não começa com e)
colunas = [coluna for coluna in resultado.columns if not coluna.startswith("e")]
resultado.select(colunas).show(5,False)

In [None]:
grafo.vertices\
.filter("population > 100000 and population < 300000")\
.sort("population")\
.show()