# Graph Analytics by AllegroGraph and Apache Spark

[Apache Spark](https://spark.apache.org/) is one of the most popular platforms for large-scale data processing. In addition to machine learning, SQL database solutions, Spark also comes with [GraphX](https://spark.apache.org/graphx/) and [GraphFrames](https://graphframes.github.io/graphframes/docs/_site/index.html) two frameworks for running graph compute operations on your data. In this notebook, we will show you how to read data from AllegroGraph and then perform graph analytics by Spark.

## Apache Spark Basics

Apache Spark was built on top of Hadoop MapReduce and it extends the MapReduce model to efficiently use more types of computations. It provides interfaces (inlcuding interactive shells) for programming entire clusters with implicit data parallelism and fault-tolerance. For a quick start of more Spark APIs, please go to [here](https://spark.apache.org/docs/latest/quick-start.html).

**SparkContext** is the entry gate of Apache Spark functionality. The next cell shows some basic parameters of the current *SparkContext*.

In [1]:
sc

The entry point into all functionality in Spark SQL is the **SQLContext** class. The next cell shows an instance of it.

In [2]:
sqlContext

<pyspark.sql.context.SQLContext at 0x7f53dc870710>

## Set up AllegroGraph connection to repository _kennedy_

An instance of AllegroGraph is running in the a Docker network (port **10035**) where our Spark cluster is also running within. However, if you want to access the WebView page, please visit [http://localhost:10015](http://localhost:10015) instead.

In this example we are using a Kennedy family graph. It is a rather small dataset as our main focus is to transform RDF triples to what Spark and GraphFrames need as input.

![Kenney](img/kennedy.png)

The next cell displays our parameters for connecting to AllegroGraph.

In [3]:
from franz.openrdf.connect import ag_connect
from franz.openrdf.query.query import QueryLanguage

conn_args = {
    "host": "ag",
    "port": 10035,
    "user": "test",
    "password": "xyzzy",
    "create": False,
    "clear": False
}

## _Person_ DataFrame

Before Spark version 1.6, users have to use the [**RDD**](https://spark.apache.org/docs/latest/rdd-programming-guide.html) API to achieve parallel computation. Starting from version 1.6, **Dataset** is a new interface added that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. Finally, a **DataFrame** is a Dataset organized into named columns. It is conceptually equivalent to a table in a relational database. 

In the next cell, we use a SPARQL query to collect people's metadata (including first name, last name, sex and birth year), and then use `sqlContext.createDataFrame` method to construct a DataFrame. Note that we are using the IRI as each row's identifier (column `id`). As shortly we will see, this is what GraphFrames want.

In [4]:
def person_iter():
    q = """PREFIX : <http://www.franz.com/simple#>
SELECT DISTINCT ?person ?first_name ?last_name ?sex ?birth_year {
  ?person a :person ;
        :first-name ?first_name ;
        :last-name ?last_name ;
        :sex ?sex ;
        :birth-year ?birth_year .           
}
ORDER BY ?birth_year"""
    with ag_connect("kennedy", **conn_args) as conn:
        with conn.prepareTupleQuery(QueryLanguage.SPARQL, q).evaluate() as res:
            for binding_set in res:
                person_iri = binding_set.getValue("person").getURI()
                first_name = binding_set.getValue("first_name").toPython()
                last_name = binding_set.getValue("last_name").toPython()
                sex = binding_set.getValue("sex").getLocalName()
                birth_year = int(binding_set.getValue("birth_year").toPython())
                yield (person_iri, first_name, last_name, sex, birth_year)
                
df_person = sqlContext.createDataFrame(person_iter(), ("id", "first_name", "last_name", "sex", "birth_year"))
print("{:,} people collected in total".format(df_person.count()))
df_person.show()

75 people collected in total
+--------------------+----------+-----------+------+----------+
|                  id|first_name|  last_name|   sex|birth_year|
+--------------------+----------+-----------+------+----------+
|http://www.franz....|    Joseph|    Kennedy|  male|      1888|
|http://www.franz....|      Rose| Fitzgerald|female|      1890|
|http://www.franz....|    Robert|    Shriver|  male|      1915|
|http://www.franz....|    Joseph|    Kennedy|  male|      1915|
|http://www.franz....|   William|  Cavendish|  male|      1917|
|http://www.franz....|      John|    Kennedy|  male|      1917|
|http://www.franz....|      Rose|    Kennedy|female|      1918|
|http://www.franz....|  Kathleen|    Kennedy|female|      1920|
|http://www.franz....|    Eunice|    Kennedy|female|      1921|
|http://www.franz....|     Peter|    Lawford|  male|      1923|
|http://www.franz....|  Patricia|    Kennedy|female|      1924|
|http://www.franz....|    Robert|    Kennedy|  male|      1925|
|http://www

## _Relations_ DataFrame

The next step is to collect **relations** information among people. We will aim at these 3 types of relations:

1. Spouce
2. Has Child
3. Has Parent

Similar to how we construct the person DataFrame, we do it again by using a SPARQL query. Meanwhile, note that we have two special columns `src` and `dst` that are important for constructing the final graph in next section.

In [5]:
from itertools import chain

def relation_iter(rel: str):
    q = f"""PREFIX : <http://www.franz.com/simple#>
SELECT DISTINCT ?p1 ?p2 {{
  ?p1 a :person .
  ?p2 a :person .
  ?p1 :{rel} ?p2 .
}}"""
    with ag_connect("kennedy", **conn_args) as conn:
        with conn.prepareTupleQuery(QueryLanguage.SPARQL, q).evaluate() as res:
            for binding_set in res:
                p1_iri = binding_set.getValue("p1").getURI()
                p2_iri = binding_set.getValue("p2").getURI()
                yield (p1_iri, p2_iri, rel)

all_relations = chain(*[relation_iter(rel) for rel in ("spouse", "has-child", "has-parent")])
df_relation = sqlContext.createDataFrame(all_relations, ("src", "dst", "relation"))
print("{:,} relations collected in total".format(df_relation.count()))
df_relation.show()

226 relations collected in total
+--------------------+--------------------+--------+
|                 src|                 dst|relation|
+--------------------+--------------------+--------+
|http://www.franz....|http://www.franz....|  spouse|
|http://www.franz....|http://www.franz....|  spouse|
|http://www.franz....|http://www.franz....|  spouse|
|http://www.franz....|http://www.franz....|  spouse|
|http://www.franz....|http://www.franz....|  spouse|
|http://www.franz....|http://www.franz....|  spouse|
|http://www.franz....|http://www.franz....|  spouse|
|http://www.franz....|http://www.franz....|  spouse|
|http://www.franz....|http://www.franz....|  spouse|
|http://www.franz....|http://www.franz....|  spouse|
|http://www.franz....|http://www.franz....|  spouse|
|http://www.franz....|http://www.franz....|  spouse|
|http://www.franz....|http://www.franz....|  spouse|
|http://www.franz....|http://www.franz....|  spouse|
|http://www.franz....|http://www.franz....|  spouse|
|http://www.f

## GraphX and GraphFrames

[GraphX](https://spark.apache.org/graphx/) is a Spark component for graphs and graph-parallel computation. Just like Dataset and DataFrame are higher-level APIs to RDD, [GraphFrames](https://graphframes.github.io/graphframes/docs/_site/index.html) provide both the functionality of GraphX and extended functionality taking advantage of Spark DataFrames. In this tutorial, we will use GraphFrames.

To create GraphFrames, vertex and edge DataFrames are needed:

* Vertex DataFrame: A vertex DataFrame should contain a special column named “id” which specifies unique IDs for each vertex in the graph.
* Edge DataFrame: An edge DataFrame should contain two special columns: “src” (source vertex ID of edge) and “dst” (destination vertex ID of edge).

In the cell below, we use the _person_ DataFrame and also the _relation_ DataFrame as the Vertex and Edge respectively. Finally, the variable `g` is the an instance of our GraphFrame.

In [6]:
from graphframes import *

g = GraphFrame(
    df_person, # vertices
    df_relation, # edges
)

## Graph Analysis

We can start using `g` to perform various graph analytics provided by GraphFrames APIs, including but not limited to:
    
* [Motif finding](https://graphframes.github.io/graphframes/docs/_site/user-guide.html#motif-finding)
* [Connected components](https://graphframes.github.io/graphframes/docs/_site/user-guide.html#connected-components)
* [Label Propagation Algorithm](https://graphframes.github.io/graphframes/docs/_site/user-guide.html#label-propagation-algorithm-lpa)
* [PageRank](https://graphframes.github.io/graphframes/docs/_site/user-guide.html#pagerank)

For more APIs information, please visit [GraphFrames User Guides](https://graphframes.github.io/graphframes/docs/_site/user-guide.html).

### PageRank

We are doing a simple PageRank analysis in the next cell. The results indicates that [Robert Kennedy](https://en.wikipedia.org/wiki/Robert_F._Kennedy) is very "important" in the family.

In [7]:
from pyspark.sql.functions import desc

results = g.pageRank(resetProbability=0.15, tol=0.01)
results.vertices.select("first_name", "last_name", "birth_year", "pagerank") \
    .orderBy(desc("pagerank")) \
    .show(30, False)

+-----------+----------+----------+------------------+
|first_name |last_name |birth_year|pagerank          |
+-----------+----------+----------+------------------+
|Robert     |Kennedy   |1925      |4.029459626495298 |
|Ethel      |Skakel    |1928      |3.576717463298322 |
|Joseph     |Kennedy   |1888      |2.6219862110262917|
|Rose       |Fitzgerald|1890      |2.6219862110262917|
|Eunice     |Kennedy   |1921      |2.298703166856844 |
|Edward     |Kennedy   |1932      |2.0938386383501943|
|Patricia   |Kennedy   |1924      |2.0240009400459353|
|Jean       |Kennedy   |1928      |1.9843031856876374|
|Robert     |Shriver   |1915      |1.8386363287897425|
|John       |Kennedy   |1917      |1.7415387136767322|
|Peter      |Lawford   |1923      |1.560736771187538 |
|Stephen    |Smith     |1927      |1.5226742073189046|
|Mary       |Kennedy   |1956      |1.4077893004500306|
|Joseph     |Kennedy   |1952      |1.4077893004500306|
|Robert     |Kennedy   |1954      |1.4077893004500306|
|Virginia 