In [48]:
#!pip install --user pixiedust
#!pip install ggplot
#!pip install wget

In [1]:
from pixiedust.packageManager import PackageManager
pkg=PackageManager()
pkg.installPackage("graphframes:graphframes:0")
pkg.printAllPackages()

sqlContext=SQLContext(sc)

Pixiedust database opened successfully


Package already installed: graphframes:graphframes:0
graphframes:graphframes:0.5.0-spark2.1-s_2.11 => /gpfs/fs01/user/se1f-2003f1257f20bb-7f578b9e48a9/data/libs/graphframes-0.5.0-spark2.1-s_2.11.jar


# Librerias

In [2]:
from graphframes import GraphFrame
from pyspark.sql.functions import col, desc, when
from pyspark.sql import functions as F

#from ggplot import *
import matplotlib.pylab as plt
import pandas as pd
import seaborn as sns
import wget

# Cargamos archivos para crear grafo

Aqui cargamos los archivos de vertices y nodos, estos vienen de csv

Los vertices/nodos y aristas/relaciones son representados como dos dataframes de Spark. Mas sobre dataframes a continuacion.

En el dataframe de nodos debe existir una columna llamada `id` para designar el identificador del nodo. Igualmente, en el dataframe de relaciones deben existir columnas `src` y `dst` que denotan el origen y destino de la relacion. Estos ultimos, deben corresponder a ids validos del dataframe de nodos. 

In [3]:
nodes_file = wget.download("https://raw.githubusercontent.com/astenuz/AnalyticsForumWorkshop/master/data/trade_agro_nodes.csv")
edges_file = wget.download("https://raw.githubusercontent.com/astenuz/AnalyticsForumWorkshop/master/data/trade_agro_edges.csv")

El siguiente bloque carga los archivos csv como dataframes de spark.

In [4]:
nodes_df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(nodes_file)
    
edges_df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(edges_file)

# Intro a Dataframes

Antes de continuar vamos a ver una introduccion corta a Dataframes de Spark para entender las operaciones basicas con tablas de datos que usaremos en el taller.

Un dataframe es basicamente una tabla, como de excel. Tiene columnas que interpretamos generalmente como atributos y filas que interpretamos como observaciones. Por ejemplo, veamos que hay en el dataframe de nodos.

In [5]:
nodes_df.head(5)

[Row(id='AFG', name='Afghanistan', region='South Asia'),
 Row(id='ALB', name='Albania', region='Europe'),
 Row(id='ATA', name='Antarctica', region='Other'),
 Row(id='DZA', name='Algeria', region='Middle East and North Africa'),
 Row(id='ASM', name='American Samoa', region='Oceania')]

Head selecciona los primeros 5 `Row` del dataframe, para obtener una representacion mas amigable podemos usar show o display.

In [6]:
#nodes_df.show(5)
display(nodes_df)

id,name,region
AFG,Afghanistan,South Asia
ATA,Antarctica,Other
AGO,Angola,Sub-Saharan Africa
AZE,Azerbaijan,Central and Northern Asia
AUS,Australia,Oceania
ARM,Armenia,Central and Northern Asia
BRB,Barbados,Caribbean and Central America
BEL,Belgium,Europe
BTN,Bhutan,South Asia
BOL,Bolivia,South America


Ademas de lo que acaban de ver, las operaciones que usaremos sobre dataframes en este lab son: `filter`, `select`, `withColumn`, `count`, `groupby` y `sort`

`filter` sirve para escoger filas que cumplen cierta condicion.

In [7]:
nodes_df.filter(col("Region")=='Oceania')

DataFrame[id: string, name: string, region: string]

In [8]:
nodes_df.filter(col("Region")=='Oceania').show(5)

+---+--------------------+-------+
| id|                name| region|
+---+--------------------+-------+
|ASM|      American Samoa|Oceania|
|AUS|           Australia|Oceania|
|BVT|       Bouvet Island|Oceania|
|IOT|British Indian Oc...|Oceania|
|SLB|        Solomon Isds|Oceania|
+---+--------------------+-------+
only showing top 5 rows



Para escoger columnas, usamos `select`

In [9]:
nodes_df.select("name").show(5)

+--------------+
|          name|
+--------------+
|   Afghanistan|
|       Albania|
|    Antarctica|
|       Algeria|
|American Samoa|
+--------------+
only showing top 5 rows



Tambien podemos agregar columnas utilizando `withColumn`.

In [10]:
edges_df\
    .withColumn("weight_mas_1", col("weight")+1)\
    .show(5)

+---+---+--------------------+----------+----------+------------+
|src|dst|            resource|     value|    weight|weight_mas_1|
+---+---+--------------------+----------+----------+------------+
|AFG|DZA|        Horticulture|    26.313|         3|         4.0|
|AFG|DZA|     Rubber and gums|     1.507|     0.585|       1.585|
|AFG|AUS|        Horticulture|40.0930017|34.3058425|  35.3058425|
|AFG|AUS|            Oilseeds|     1.248|      0.46|        1.46|
|AFG|AUS|Stimulants, tobac...|    34.645|      0.02|        1.02|
+---+---+--------------------+----------+----------+------------+
only showing top 5 rows



Varias veces vamos a contar resultados usando `groupby` y `count`

In [11]:
nodes_df\
    .groupby("region").count()\
    .show()

+--------------------+-----+
|              region|count|
+--------------------+-----+
|          South Asia|    8|
|Middle East and N...|   20|
|              Europe|   43|
|  Sub-Saharan Africa|   50|
|               China|    3|
|               Other|    4|
|       North America|    5|
|East Asia excludi...|    3|
|       South America|   13|
|Caribbean and Cen...|   31|
|             Oceania|   30|
|Central and North...|   12|
|      Southeast Asia|   11|
+--------------------+-----+



In [12]:
#una variante
#note que esto permite contar ocurrencias de otras columnas dentro del grupo, 
#tambien debemos agregar el alias a la columna para que no quede nombrada como 'count(region)'
nodes_df\
    .groupby("region")\
    .agg(F.count("region").alias("count"))\
    .show()

+--------------------+-----+
|              region|count|
+--------------------+-----+
|          South Asia|    8|
|Middle East and N...|   20|
|              Europe|   43|
|  Sub-Saharan Africa|   50|
|               China|    3|
|               Other|    4|
|       North America|    5|
|East Asia excludi...|    3|
|       South America|   13|
|Caribbean and Cen...|   31|
|             Oceania|   30|
|Central and North...|   12|
|      Southeast Asia|   11|
+--------------------+-----+



`sort` permite ordenar valores, el orden por defecto es ascendente pero podemos invertirlo con `desc`

In [13]:
nodes_df\
    .groupby("region").count()\
    .sort(desc("count"))\
    .show()

+--------------------+-----+
|              region|count|
+--------------------+-----+
|  Sub-Saharan Africa|   50|
|              Europe|   43|
|Caribbean and Cen...|   31|
|             Oceania|   30|
|Middle East and N...|   20|
|       South America|   13|
|Central and North...|   12|
|      Southeast Asia|   11|
|          South Asia|    8|
|       North America|    5|
|               Other|    4|
|               China|    3|
|East Asia excludi...|    3|
+--------------------+-----+



Despues de todo esto, al fin podemos proseguir con los grafos.

# Grafos

GraphFrame permite crear el grafo

In [14]:
g = GraphFrame(nodes_df, edges_df)
print(g)

GraphFrame(v:[id: string, name: string ... 1 more field], e:[src: string, dst: string ... 3 more fields])


Atributos del grafo `g` como nodos y relaciones son Dataframes, asi que todo lo que vimos en la anterior seccion puede ser aplicado sobre los atributos `.vertices` y `.edges`. 

In [None]:
display(g.vertices)

g.vertices.describe().show()

id,name,region
AFG,Afghanistan,South Asia
ATG,Antigua and Barbuda,Caribbean and Central America
ARG,Argentina,South America
AUS,Australia,Oceania
BHS,Bahamas,Caribbean and Central America
BGD,Bangladesh,South Asia
BMU,Bermuda,Caribbean and Central America
BTN,Bhutan,South Asia
BVT,Bouvet Island,Oceania
BRA,Brazil,South America


Este grafo tiene demadiadas aristas, mostramos solo las primeras 10.

In [16]:
g.edges.count()

102566

In [17]:
g.edges.show(10)

+---+---+--------------------+----------+----------+
|src|dst|            resource|     value|    weight|
+---+---+--------------------+----------+----------+
|AFG|DZA|        Horticulture|    26.313|         3|
|AFG|DZA|     Rubber and gums|     1.507|     0.585|
|AFG|AUS|        Horticulture|40.0930017|34.3058425|
|AFG|AUS|            Oilseeds|     1.248|      0.46|
|AFG|AUS|Stimulants, tobac...|    34.645|      0.02|
|AFG|BHR|Dairy, eggs, and ...|      0.02|      0.02|
|AFG|BHR|        Horticulture|   105.007|    62.036|
|AFG|BHR|              Pulses|    14.976|     14.69|
|AFG|BHR|Stimulants, tobac...|    110.66|     0.297|
|AFG|BGD|Stimulants, tobac...|   133.438|    65.411|
+---+---+--------------------+----------+----------+
only showing top 10 rows



In [18]:
g.edges.describe().show()

+-------+------+------+--------+------------------+------------------+
|summary|   src|   dst|resource|             value|            weight|
+-------+------+------+--------+------------------+------------------+
|  count|102566|102566|  102566|            102566|            102566|
|   mean|  null|  null|    null|11050.712367125223|20483.579529860268|
| stddev|  null|  null|    null|112612.07404792846| 1970626.311174012|
|    min|   ABW|   ABW| Cereals|             0.001|             0.001|
|    max|   ZWE|   ZWE|   Water|                NA|                NA|
+-------+------+------+--------+------------------+------------------+



podemos tambien mostrar el grafo. Recuerde cambiar el tipo de grafico a Directed Graph (con el icono que parece un share)

In [19]:
#display(g)

# Que podemos saber de este grafo?

## Pattern matching

De una manera muy similar a Neo4j podemos hacer queries sobre el grafo. En GraphFrames queries son llamadas motifs.

In [20]:
#Ejemplos de pattern matching

Por ejemplo, encontremos todos los nodos que tienen relaciones bilaterales.

In [21]:
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")
#display(motifs) #display con motif tiene problemas en bluemix, por eso usamos show, o take, o head.
motifs.show(10)

+--------------------+--------------------+--------------------+--------------------+
|                   a|                   e|                   b|                  e2|
+--------------------+--------------------+--------------------+--------------------+
|[AFG,Afghanistan,...|[AFG,AUS,Horticul...|[AUS,Australia,Oc...|[AUS,AFG,Other ag...|
|[AFG,Afghanistan,...|[AFG,AUS,Horticul...|[AUS,Australia,Oc...|[AUS,AFG,Meat,583...|
|[AFG,Afghanistan,...|[AFG,AUS,Oilseeds...|[AUS,Australia,Oc...|[AUS,AFG,Other ag...|
|[AFG,Afghanistan,...|[AFG,AUS,Oilseeds...|[AUS,Australia,Oc...|[AUS,AFG,Meat,583...|
|[AFG,Afghanistan,...|[AFG,AUS,Stimulan...|[AUS,Australia,Oc...|[AUS,AFG,Other ag...|
|[AFG,Afghanistan,...|[AFG,AUS,Stimulan...|[AUS,Australia,Oc...|[AUS,AFG,Meat,583...|
|[AFG,Afghanistan,...|[AFG,BHR,Dairy, e...|[BHR,Bahrain,Midd...|[BHR,AFG,Dairy, e...|
|[AFG,Afghanistan,...|[AFG,BHR,Horticul...|[BHR,Bahrain,Midd...|[BHR,AFG,Dairy, e...|
|[AFG,Afghanistan,...|[AFG,BHR,Pulses,1...|[BHR,Bahrai

Note que nodos en a y b se encuentre repetidos ya que un registro aparece tantas veces como haya relaciones `e` y `e2` que satisfagan la condicion. Puede listar los paises mejor eliminando duplicados.

In [22]:
motifs.count() #con combinaciones de paises repetidos

735102

In [23]:
bilaterals = motifs.selectExpr("a.name as aName","b.name as bName").dropDuplicates().sort(['aName', 'bName']) #solo seleccionamos los nombres con selectExpr
#display(bilaterals)
bilaterals.count() #eliminamos duplicados

14651

In [24]:
bilaterals.show(20)

+-----------+----------+
|      aName|     bName|
+-----------+----------+
|Afghanistan| Australia|
|Afghanistan|   Bahrain|
|Afghanistan|Bangladesh|
|Afghanistan|   Belgium|
|Afghanistan|    Brazil|
|Afghanistan|  Bulgaria|
|Afghanistan|    Canada|
|Afghanistan|     China|
|Afghanistan|Czech Rep.|
|Afghanistan|   Denmark|
|Afghanistan|     Egypt|
|Afghanistan|   Finland|
|Afghanistan|    France|
|Afghanistan|   Germany|
|Afghanistan|   Hungary|
|Afghanistan|     India|
|Afghanistan| Indonesia|
|Afghanistan|      Iran|
|Afghanistan|     Italy|
|Afghanistan|     Japan|
+-----------+----------+
only showing top 20 rows



Seleccionamos el grafo que corresponde a nodos de paises latinoamericanos, codificado en la columna `region` del nodo como `South America`.

In [25]:
#paths = g.find("(a)-[e]->(b)")\
#  .filter("a.region = 'South America' and b.region = 'South America'")\
#  .filter("e.resource = 'Pearls and gemstones'")
  

paths = g.find("(a)-[e]->(b)")\
  .filter("a.region = 'South America' and b.region = 'South America'")

In [26]:
paths.show(10)

+--------------------+--------------------+--------------------+
|                   a|                   e|                   b|
+--------------------+--------------------+--------------------+
|[ARG,Argentina,So...|[ARG,ARG,Cereals,...|[ARG,Argentina,So...|
|[ARG,Argentina,So...|[ARG,ARG,Dairy, e...|[ARG,Argentina,So...|
|[ARG,Argentina,So...|[ARG,ARG,Fish and...|[ARG,Argentina,So...|
|[ARG,Argentina,So...|[ARG,ARG,Horticul...|[ARG,Argentina,So...|
|[ARG,Argentina,So...|[ARG,ARG,Live ani...|[ARG,Argentina,So...|
|[ARG,Argentina,So...|[ARG,ARG,Meat,536...|[ARG,Argentina,So...|
|[ARG,Argentina,So...|[ARG,ARG,Oilseeds...|[ARG,Argentina,So...|
|[ARG,Argentina,So...|[ARG,ARG,Other ag...|[ARG,Argentina,So...|
|[ARG,Argentina,So...|[ARG,ARG,Pulses,3...|[ARG,Argentina,So...|
|[ARG,Argentina,So...|[ARG,ARG,Roots an...|[ARG,Argentina,So...|
+--------------------+--------------------+--------------------+
only showing top 10 rows



In [27]:
latam_edges = paths \
    .selectExpr("e.src as src", "e.dst as dst", "e.resource as resource")

In [28]:
latam_vertices = g.vertices.filter("region = 'South America'")

In [29]:
latam_g = GraphFrame(latam_vertices, latam_edges)

In [30]:
latam_g.vertices.count()

13

In [31]:
latam_g.edges.count()

921

In [32]:
display(latam_g)

Otro mas, vamos a quedarnos con paises que tengan movimientos con peso mayor a la media en carne

In [33]:
media_peso = 20000

In [None]:
big_edges = g.edges.filter(col("weight") >= media_peso).filter(col("resource")== "Meat")

In [None]:
big_edges.count()

In [None]:
big_g = GraphFrame(g.vertices, e = big_edges)

In [None]:
display(big_g)

En capacidades de pattern matching GraphFrames y GraphX resulta mas complicado y menos intuitivo que Neo4j, especialmente porque ver resultados como un grafo requiere construir otro. Sin embargo, el core de GraphFrames y GraphX es procesamiento y algoritmos avanzados sobre grafos a gran escala.

## Degree distribution

Grados de entrada. Podemos interpretarlo como que tanto importan los paises

In [None]:
display(g.inDegrees.sort(desc("inDegree")))

Grados de salida, Interpretamos como que tanto exportan los paises

In [None]:
display(g.outDegrees.sort(desc("outDegree")))

Claro, tambien tenemos la distribucion global.

In [None]:
display(g.degrees.sort(desc("degree")))

Todos estos resultados son simplemente tablas (DataFrames). Podemos graficarlos.

In [None]:
degree_dist = g.degrees.toPandas()\
    .sort_values(['degree'], ascending = False).reset_index(drop=True)

In [None]:
plt.figure(figsize=(8,12))
sns.barplot(x = 'degree', y= 'id', data = degree_dist.head(30), color='xkcd:sea blue')
plt.tight_layout()
plt.show()


## Comunidades

Las aplicamos sobre el grafo `big_g`

In [None]:
results = big_g.labelPropagation(maxIter=5)
display(results)

In [None]:
display(results.groupby('label').count().sort(desc('count')))

## Componentes conectados

In [65]:
#!pwd
#!mkdir /gpfs/fs01/user/se1f-2003f1257f20bb-7f578b9e48a9/notebook/work/checkpoints

In [46]:
spark.sparkContext.setCheckpointDir("/gpfs/fs01/user/se1f-2003f1257f20bb-7f578b9e48a9/notebook/work/checkpoints")

In [47]:
result = big_g.connectedComponents()
display(result)

id,name,region,component
ARG,Argentina,South America,42949672960
BOL,Bolivia,South America,42949672960
BRA,Brazil,South America,42949672960
CHL,Chile,South America,42949672960
COL,Colombia,South America,42949672960
ECU,Ecuador,South America,42949672960
FLK,Falkland Isds (Malvinas),South America,489626271744
GUY,Guyana,South America,42949672960
PRY,Paraguay,South America,42949672960
PER,Peru,South America,42949672960


## Componentes fuertemente conectados

In [None]:
result = big_g.stronglyConnectedComponents(maxIter=5)
display(result.select("id", "component"))

id,component
BRN,128849018880
LCA,283467841538
IND,1511828488192
TZA,1640677507072
HTI,8589934592
NZL,618475290624
IOT,266287972353
CCK,0
MKD,1563368095745
SOM,103079215106


In [37]:
result.select("id", "component").groupby("component").count().sort(desc("count")).show()

+-------------+-----+
|    component|count|
+-------------+-----+
|  17179869186|   15|
| 489626271744|    5|
|1537598291968|    2|
|1511828488192|    1|
| 970662608897|    1|
| 283467841538|    1|
| 652835028992|    1|
|1640677507072|    1|
|   8589934592|    1|
|  34359738369|    1|
|1245540515840|    1|
|1022202216448|    1|
| 618475290624|    1|
| 678604832768|    1|
|1683627180036|    1|
| 266287972353|    1|
|            0|    1|
| 919123001344|    1|
|1563368095745|    1|
|1314259992577|    1|
+-------------+-----+
only showing top 20 rows



In [42]:
result.filter(col('component') == "489626271744" ).show()

+---+-------------+-------------+------------+
| id|         name|       region|   component|
+---+-------------+-------------+------------+
|MEX|       Mexico|North America|489626271744|
|CHL|        Chile|South America|489626271744|
|USA|United States|North America|489626271744|
|AUS|    Australia|      Oceania|489626271744|
|CAN|       Canada|North America|489626271744|
+---+-------------+-------------+------------+



## Pagerank

In [34]:
results = latam_g.pageRank(resetProbability=0.15, tol=0.05)

In [35]:
display(results.vertices.sort(desc("pagerank")))

id,name,region,pagerank
COL,Colombia,South America,0.9110889674566296
CHL,Chile,South America,0.8885496004178979
BRA,Brazil,South America,0.753684719265628
VEN,Venezuela,South America,0.7208393826862725
BOL,Bolivia,South America,0.7117460704416794
ARG,Argentina,South America,0.6962886616301689
PER,Peru,South America,0.6956809881138851
ECU,Ecuador,South America,0.6516483954578789
URY,Uruguay,South America,0.6351416327433211
GUY,Guyana,South America,0.5778581422037792
