# HandsOn Week 14
Welcome to handsOn week 14: graph in distributed system, using GraphFrames. To use GraphFrames, you need to run ```pyspark --packages graphframes:graphframes:0.8.0-spark2.4-s_2.11``` (adjust with your Spark version. That script uses Spark 2.4.x, the one used in our VM, and it will download the graphframes package automatically, thus, make sure you have internet connection when launching the script).

Note: you can use any Spark & GraphFrames API (without building from-the-scratch).

### Read The Dataset and Build The Graph

In [1]:
from pyspark.sql.types import *
from graphframes import *

def create_transport_graph(nodes_filePath, edges_filePath):
    node_fields = [
        StructField("id", StringType(), True),
        StructField("latitude", FloatType(), True),
        StructField("longitude", FloatType(), True),
        StructField("population", IntegerType(), True)
    ]
    nodes = spark.read.csv(nodes_filePath, header=True,
                           schema=StructType(node_fields))
    edge_fields = [
        StructField("src", StringType(), True),
        StructField("dst", StringType(), True),
        StructField("relationship", StringType(), True),
        StructField("cost", FloatType(), True)
    ]
    rels = spark.read.csv(edges_filePath, header=True,
                         schema=StructType(edge_fields))
    reversed_rels = (rels.withColumn("newSrc", rels.dst)
                     .withColumn("newDst", rels.src)
                     .drop("dst", "src")
                     .withColumnRenamed("newSrc", "src")
                     .withColumnRenamed("newDst", "dst")
                     .select("src", "dst", "relationship", "cost"))

    relationships = rels.union(reversed_rels)
    
    return GraphFrame(nodes, relationships)

g = create_transport_graph("./transport-nodes.csv", "./transport-relationships.csv")
g.vertices.show()
g.edges.show()

+----------------+---------+---------+----------+
|              id| latitude|longitude|population|
+----------------+---------+---------+----------+
|       Amsterdam| 52.37919| 4.899431|    821752|
|         Utrecht|52.092876|  5.10448|    334176|
|        Den Haag|52.078663| 4.288788|    514861|
|       Immingham| 53.61239| -0.22219|      9642|
|       Doncaster| 53.52285| -1.13116|    302400|
|Hoek van Holland|  51.9775|  4.13333|      9382|
|      Felixstowe| 51.96375|   1.3511|     23689|
|         Ipswich| 52.05917|  1.15545|    133384|
|      Colchester| 51.88921|  0.90421|    104390|
|          London|51.509865|-0.118092|   8787892|
|       Rotterdam|  51.9225|  4.47917|    623652|
|           Gouda| 52.01667|  4.70833|     70939|
+----------------+---------+---------+----------+

+----------------+----------------+------------+-----+
|             src|             dst|relationship| cost|
+----------------+----------------+------------+-----+
|       Amsterdam|         Utrecht

## Milestone 01
1. Find source-destination airport pair/pairs with maximum cost
2. Find source-destination airport pair/pairs with minimum cost
3. Calculate the average cost

In [2]:
# write your code here
print('1. ')
g.edges.createOrReplaceTempView('g_edges')
spark.sql("SELECT src, dst, cost FROM g_edges WHERE cost == (SELECT max(cost) FROM g_edges)").show()

print('2. ')
spark.sql("SELECT src, dst, cost FROM g_edges WHERE cost == (SELECT min(cost) FROM g_edges)").show()

print('3. ')
spark.sql("SELECT avg(cost) FROM g_edges").show()

1. 
+---------+---------+-----+
|      src|      dst| cost|
+---------+---------+-----+
|Amsterdam|Immingham|369.0|
|Immingham|Amsterdam|369.0|
+---------+---------+-----+

2. 
+----------+----------+----+
|       src|       dst|cost|
+----------+----------+----+
|   Ipswich|Felixstowe|22.0|
|Felixstowe|   Ipswich|22.0|
+----------+----------+----+

3. 
+-----------------+
|        avg(cost)|
+-----------------+
|91.33333333333333|
+-----------------+



### Milestone 02
1. Find flight routes that have no direct connection

In [3]:
# write your code here
print('1. ')
g.find("!(city_1)-[]->(city_2)").filter("city_1.id != city_2.id").show()

1. 
+--------------------+--------------------+
|              city_1|              city_2|
+--------------------+--------------------+
|[Felixstowe, 51.9...|[Gouda, 52.01667,...|
|[London, 51.50986...|[Felixstowe, 51.9...|
|[Immingham, 53.61...|[Hoek van Holland...|
|[Rotterdam, 51.92...|[London, 51.50986...|
|[Immingham, 53.61...|[Gouda, 52.01667,...|
|[Utrecht, 52.0928...|[Colchester, 51.8...|
|[Amsterdam, 52.37...|[Hoek van Holland...|
|[Amsterdam, 52.37...|[Colchester, 51.8...|
|[Den Haag, 52.078...|[London, 51.50986...|
|[Felixstowe, 51.9...|[Doncaster, 53.52...|
|[Gouda, 52.01667,...|[Doncaster, 53.52...|
|[Hoek van Holland...|[Utrecht, 52.0928...|
|[Doncaster, 53.52...|[Colchester, 51.8...|
|[Felixstowe, 51.9...|[Utrecht, 52.0928...|
|[Colchester, 51.8...|[Amsterdam, 52.37...|
|[Hoek van Holland...|[Doncaster, 53.52...|
|[London, 51.50986...|[Den Haag, 52.078...|
|[Felixstowe, 51.9...|[Immingham, 53.61...|
|[Hoek van Holland...|[London, 51.50986...|
|[Utrecht, 52.0928...|[Ipswi

### Milestone 03
1. Find the most important airport (you can use any measurement to judge the importance level of airports, and please describe why you choose it -the measurement-)

In [4]:
# write your code here
print('1. ')

g.degrees.createOrReplaceTempView('g_degrees')
spark.sql("SELECT id, degree FROM g_degrees WHERE degree = (SELECT max(degree) FROM g_degrees)").show()


1. 
+--------+------+
|      id|degree|
+--------+------+
|Den Haag|     8|
+--------+------+



Degree of importance used is based on how busy the airport, it is depicted by how many routes does the airport served. So, the code finds the maximum value of degree.

### Milestone 04
1. Find the sortest path based on "node" -path with fewest nodes- from Amsterdam to London 

In [5]:
# write your code here
print('1. ')
g.bfs("id = 'Amsterdam'", "id = 'London'").select('e0', 'e1', 'e2').show()

1. 
+--------------------+--------------------+--------------------+
|                  e0|                  e1|                  e2|
+--------------------+--------------------+--------------------+
|[Amsterdam, Immin...|[Immingham, Donca...|[Doncaster, Londo...|
+--------------------+--------------------+--------------------+



### Bonus
1. Find the sortest path based on "cost value" -see 'cost' column in the edge/relationship dataframe - from Amsterdam to London 

In [6]:
# Code taken from oreilly.com

from graphframes.lib import AggregateMessages as AM
from pyspark.sql import functions as F

add_path_udf = F.udf(lambda path, id: path + [id], ArrayType(StringType()))

def shortest_path(g, origin, destination, column_name="cost"):
    if g.vertices.filter(g.vertices.id == destination).count() == 0:
        return (spark.createDataFrame(sc.emptyRDD(), g.vertices.schema)
                .withColumn("path", F.array()))

    vertices = (g.vertices.withColumn("visited", F.lit(False))
                .withColumn("distance", F.when(g.vertices["id"] == origin, 0)
                            .otherwise(float("inf")))
                .withColumn("path", F.array()))
    cached_vertices = AM.getCachedDataFrame(vertices)
    g2 = GraphFrame(cached_vertices, g.edges)

    while g2.vertices.filter('visited == False').first():
        current_node_id = g2.vertices.filter('visited == False').sort("distance").first().id

        msg_distance = AM.edge[column_name] + AM.src['distance']
        msg_path = add_path_udf(AM.src["path"], AM.src["id"])
        msg_for_dst = F.when(AM.src['id'] == current_node_id,
                             F.struct(msg_distance, msg_path))
        new_distances = g2.aggregateMessages(F.min(AM.msg).alias("aggMess"),
                                             sendToDst=msg_for_dst)

        new_visited_col = F.when(
            g2.vertices.visited | (g2.vertices.id == current_node_id), True).otherwise(False)
        new_distance_col = F.when(new_distances["aggMess"].isNotNull() &
                                 (new_distances.aggMess["col1"]
                                 < g2.vertices.distance),
                                 new_distances.aggMess["col1"]).otherwise(g2.vertices.distance)
        new_path_col = F.when(new_distances["aggMess"].isNotNull() &
                       (new_distances.aggMess["col1"]
                       < g2.vertices.distance), new_distances.aggMess["col2"]
                       .cast("array<string>")).otherwise(g2.vertices.path)

        new_vertices = (g2.vertices.join(new_distances, on="id",
                                         how="left_outer")
                        .drop(new_distances["id"])
                        .withColumn("visited", new_visited_col)
                        .withColumn("newDistance", new_distance_col)
                        .withColumn("newPath", new_path_col)
                        .drop("aggMess", "distance", "path")
                        .withColumnRenamed('newDistance', 'distance')
                        .withColumnRenamed('newPath', 'path'))
        cached_new_vertices = AM.getCachedDataFrame(new_vertices)
        g2 = GraphFrame(cached_new_vertices, g2.edges)
        if g2.vertices.filter(g2.vertices.id == destination).first().visited:
            return (g2.vertices.filter(g2.vertices.id == destination)
                    .withColumn("newPath", add_path_udf("path", "id"))
                    .drop("visited", "path")
                    .withColumnRenamed("newPath", "path"))
    return (spark.createDataFrame(sc.emptyRDD(), g.vertices.schema)
            .withColumn("path", F.array()))

result = shortest_path(g, "Amsterdam", "London", "cost")
result.select("id", "distance", "path").show(truncate=False)

+------+--------+--------------------------------------------------------------------------------+
|id    |distance|path                                                                            |
+------+--------+--------------------------------------------------------------------------------+
|London|453.0   |[Amsterdam, Den Haag, Hoek van Holland, Felixstowe, Ipswich, Colchester, London]|
+------+--------+--------------------------------------------------------------------------------+



# Submission
Submit this ```ipynb``` file to the course portal, with format: ```HandsOnWeek14_NIM_NamaLengkap.ipynb```. Make sure when submitting this file, each code cell has the outputs (not blank).