# HandsOn Week 13
Welcome to handsOn week 13: graph in distributed system, using GraphFrames. To use GraphFrames, you need to run ```pyspark --packages graphframes:graphframes:0.8.0-spark2.4-s_2.11``` (adjust with your Spark version. That script uses Spark 2.4.x, the one used in our VM, and it will download the graphframes package automatically, thus, make sure you have internet connection when launching the script).

Note: you can use any Spark & GraphFrames API (without building from-the-scratch).

### Read The Dataset and Build The Graph

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from graphframes import *

# Initialize SparkSession with GraphFrames package
spark = SparkSession.builder \
    .appName("TransportGraphApp") \
    .config("spark.jars.packages", "graphframes:graphframes:0.8.2-spark3.1-s_2.12") \
    .config("spark.jars.repositories", "https://repos.spark-packages.org/") \
    .getOrCreate()

def create_transport_graph(nodes_filePath, edges_filePath):
    node_fields = [
        StructField("id", StringType(), True),
        StructField("latitude", FloatType(), True),
        StructField("longitude", FloatType(), True),
        StructField("population", IntegerType(), True)
    ]
    nodes = spark.read.csv(nodes_filePath, header=True,
                           schema=StructType(node_fields))
    edge_fields = [
        StructField("src", StringType(), True),
        StructField("dst", StringType(), True),
        StructField("relationship", StringType(), True),
        StructField("cost", FloatType(), True)
    ]
    rels = spark.read.csv(edges_filePath, header=True,
                         schema=StructType(edge_fields))
    reversed_rels = (rels.withColumn("newSrc", rels.dst)
                     .withColumn("newDst", rels.src)
                     .drop("dst", "src")
                     .withColumnRenamed("newSrc", "src")
                     .withColumnRenamed("newDst", "dst")
                     .select("src", "dst", "relationship", "cost"))

    relationships = rels.union(reversed_rels)
    
    return GraphFrame(nodes, relationships)

g = create_transport_graph("./transport-nodes.csv", "./transport-relationships.csv")
g.vertices.show()
g.edges.show()

24/03/26 20:41:27 WARN Utils: Your hostname, pop-os resolves to a loopback address: 127.0.1.1; using 10.1.78.182 instead (on interface wlp0s20f3)
24/03/26 20:41:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
https://repos.spark-packages.org/ added as a remote repository with the name: repo-1
Ivy Default Cache set to: /home/ziantsabit/.ivy2/cache
The jars for the packages stored in: /home/ziantsabit/.ivy2/jars
graphframes#graphframes added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-038ff242-dab6-486a-8f53-07fc46f8986b;1.0
	confs: [default]


:: loading settings :: url = jar:file:/home/ziantsabit/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found graphframes#graphframes;0.8.2-spark3.1-s_2.12 in spark-packages
	found org.slf4j#slf4j-api;1.7.16 in central
downloading https://repos.spark-packages.org/graphframes/graphframes/0.8.2-spark3.1-s_2.12/graphframes-0.8.2-spark3.1-s_2.12.jar ...
	[SUCCESSFUL ] graphframes#graphframes;0.8.2-spark3.1-s_2.12!graphframes.jar (152ms)
downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.16/slf4j-api-1.7.16.jar ...
	[SUCCESSFUL ] org.slf4j#slf4j-api;1.7.16!slf4j-api.jar (822ms)
:: resolution report :: resolve 5705ms :: artifacts dl 979ms
	:: modules in use:
	graphframes#graphframes;0.8.2-spark3.1-s_2.12 from spark-packages in [default]
	org.slf4j#slf4j-api;1.7.16 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      

+----------------+---------+---------+----------+
|              id| latitude|longitude|population|
+----------------+---------+---------+----------+
|       Amsterdam| 52.37919| 4.899431|    821752|
|         Utrecht|52.092876|  5.10448|    334176|
|        Den Haag|52.078663| 4.288788|    514861|
|       Immingham| 53.61239| -0.22219|      9642|
|       Doncaster| 53.52285| -1.13116|    302400|
|Hoek van Holland|  51.9775|  4.13333|      9382|
|      Felixstowe| 51.96375|   1.3511|     23689|
|         Ipswich| 52.05917|  1.15545|    133384|
|      Colchester| 51.88921|  0.90421|    104390|
|          London|51.509865|-0.118092|   8787892|
|       Rotterdam|  51.9225|  4.47917|    623652|
|           Gouda| 52.01667|  4.70833|     70939|
+----------------+---------+---------+----------+

+----------------+----------------+------------+-----+
|             src|             dst|relationship| cost|
+----------------+----------------+------------+-----+
|       Amsterdam|         Utrecht

## Milestone 01

In [20]:
# 1. Find source-destination airport pair/pairs with maximum cost
from pyspark.sql.functions import max, col

max_cost = g.edges.agg(max("cost")).collect()[0][0]
max_cost_edges = g.edges.filter(col("cost") == max_cost)
max_cost_edges.show()

+---------+---------+------------+-----+
|      src|      dst|relationship| cost|
+---------+---------+------------+-----+
|Amsterdam|Immingham|       EROAD|369.0|
|Immingham|Amsterdam|       EROAD|369.0|
+---------+---------+------------+-----+



In [21]:
# 2. Find source-destination airport pair/pairs with minimum cost
from pyspark.sql.functions import min, col

min_cost = g.edges.agg(min("cost")).collect()[0][0]
min_cost_edges = g.edges.filter(col("cost") == min_cost)
min_cost_edges.show()

+----------+----------+------------+----+
|       src|       dst|relationship|cost|
+----------+----------+------------+----+
|   Ipswich|Felixstowe|       EROAD|22.0|
|Felixstowe|   Ipswich|       EROAD|22.0|
+----------+----------+------------+----+



In [22]:
# 3. Calculate the average cost
from pyspark.sql.functions import avg

average_cost = g.edges.agg(avg("cost")).collect()[0][0]

print("Average cost:", average_cost)

Average cost: 91.33333333333333


### Milestone 02
1. Find flight routes that have no direct connection

In [30]:
# write your code here
from pyspark.sql.functions import col

all_vertex_pairs = g.vertices.alias("a").crossJoin(g.vertices.alias("b")).filter("a.id != b.id")

direct_edges = g.edges.select(col("src").alias("a_id"), col("dst").alias("b_id"))

no_direct_connection = all_vertex_pairs.join(direct_edges, 
                                             (col("a.id") == col("a_id")) & (col("b.id") == col("b_id")), 
                                             "left_anti")

no_direct_connection.select(col("a.id").alias("Source"), col("b.id").alias("Destination")).show()

+---------+----------------+
|   Source|     Destination|
+---------+----------------+
|Amsterdam|       Doncaster|
|Amsterdam|Hoek van Holland|
|Amsterdam|      Felixstowe|
|Amsterdam|         Ipswich|
|Amsterdam|      Colchester|
|Amsterdam|          London|
|Amsterdam|       Rotterdam|
|Amsterdam|           Gouda|
|  Utrecht|        Den Haag|
|  Utrecht|       Immingham|
|  Utrecht|       Doncaster|
|  Utrecht|Hoek van Holland|
|  Utrecht|      Felixstowe|
|  Utrecht|         Ipswich|
|  Utrecht|      Colchester|
|  Utrecht|          London|
|  Utrecht|       Rotterdam|
| Den Haag|         Utrecht|
| Den Haag|       Immingham|
| Den Haag|       Doncaster|
+---------+----------------+
only showing top 20 rows



### Milestone 03
1. Find the most important airport (you can use any measurement to judge the importance level of airports, and please describe why you choose it -the measurement-)

In [31]:
# write your code here
degree_centrality = g.degrees.withColumnRenamed("id", "airport_id").orderBy(col("degree").desc())

degree_centrality.show()



+----------------+------+
|      airport_id|degree|
+----------------+------+
|        Den Haag|     8|
|       Rotterdam|     6|
|       Amsterdam|     6|
|           Gouda|     6|
|Hoek van Holland|     6|
|       Doncaster|     4|
|          London|     4|
|       Immingham|     4|
|      Colchester|     4|
|         Utrecht|     4|
|         Ipswich|     4|
|      Felixstowe|     4|
+----------------+------+



### Milestone 04
1. Find the sortest path based on "node" -path with fewest nodes- from Amsterdam to London 

In [32]:
# write your code here
shortest_path = g.shortestPaths(landmarks=["Amsterdam"]).select("id", "distances")

shortest_path_distance = shortest_path.filter(shortest_path.id == "London").select("distances").collect()[0][0]

print("Shortest path distance from Amsterdam to London:", shortest_path_distance)

                                                                                

Shortest path distance from Amsterdam to London: {'Amsterdam': 3}


### Bonus
1. Find the sortest path based on "cost value" -see 'cost' column in the edge/relationship dataframe - from Amsterdam to London 

In [35]:
# write your code here


Shortest path distance from Amsterdam to London based on cost: {'Amsterdam': 3}


# Submission
Submit this ```ipynb``` file to the course portal, with format: ```HandsOnWeek13_NIM_NamaLengkap.ipynb```. Make sure when submitting this file, each code cell has the outputs (not blank).