## Configure - Environment

In [0]:
%sql
USE CATALOG ggw;
USE SCHEMA hr;

SELECT current_catalog(), current_database();

current_catalog(),current_schema()
ggw,hr


## Configure - Setup

In [0]:
%sql
CREATE OR REPLACE TABLE Employees (
    EmployeeID INT PRIMARY KEY,
    Name VARCHAR(100),
    ManagerID INT,
    ManagementHierarchy STRING
);

SELECT *
  FROM employees;

EmployeeID,Name,ManagerID,ManagementHierarchy


In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define the schema
schema = StructType([
    StructField("EmployeeID", IntegerType(), True),
    StructField("Name", StringType(), True),
    StructField("ManagerID", IntegerType(), True),
    StructField("ManagementHierarchy", StringType(), True)
])

# Create sample data for the Employees table
data = [
    # (1, 'Alice', None),
    (1, 'Alice', 0, None),
    (2, 'Bob', 1, 1),
    (3, 'Charlie', 1, 1),
    (4, 'David', 2, 2),
    (5, 'Eve', 2, 2),
    (6, 'Frank', 3, 3),
    (7, 'Grace', 3, 3),
    (8, 'Hank', 4, 4),
    (9, 'Ivy', 4, 4),
    (10, 'Jack', 5, 5),
    (11, 'Karen', 5, 5),
    (12, 'Leo', 6, 6),
    (13, 'Mona', 6, 6),
    (14, 'Nina', 7, 7),
    (15, 'Oscar', 7, 7),
    (16, 'Paul', 8, 8),
    (17, 'Quinn', 8, 8),
    (18, 'Rose', 9, 9),
    (19, 'Steve', 9, 9),
    (20, 'Tina', 10, 10)
]

# Create a Spark DataFrame using the schema
spark_df = spark.createDataFrame(data, schema)

# Display the sample data
display(spark_df)

# Create the table schema
spark.sql("""
CREATE TABLE IF NOT EXISTS employees (
    EmployeeID INT,
    Name STRING,
    ManagerID INT,
    ManagementHierarchy STRING
)
""")

# Save the Spark DataFrame as a table with overwrite mode
# spark_df.write.mode('append').saveAsTable('employees')
spark_df.write.mode('overwrite').saveAsTable('employees')

EmployeeID,Name,ManagerID,ManagementHierarchy
1,Alice,0,
2,Bob,1,1.0
3,Charlie,1,1.0
4,David,2,2.0
5,Eve,2,2.0
6,Frank,3,3.0
7,Grace,3,3.0
8,Hank,4,4.0
9,Ivy,4,4.0
10,Jack,5,5.0


## Management Hierarchy Logic  
  
1. Self-join to employee (alias "Manager")
2. If the Last Management Hierarchy ManagerID was Top/"0" then stop (i.e. only repeat for non-0 records)

In [0]:
%sql
SELECT e.*,
       SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] AS LastHierarchyManagerID,
       m.ManagerID AS ManagersManagerID
  FROM employees e
  JOIN employees m ON SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] = m.EmployeeID
 WHERE m.ManagerID != 0 -- Don't update if you are the top already
 ORDER BY e.EmployeeID ASC
;

EmployeeID,Name,ManagerID,ManagementHierarchy,LastHierarchyManagerID,ManagersManagerID
4,David,2,2,2,1
5,Eve,2,2,2,1
6,Frank,3,3,3,1
7,Grace,3,3,3,1
8,Hank,4,4,4,2
9,Ivy,4,4,4,2
10,Jack,5,5,5,2
11,Karen,5,5,5,2
12,Leo,6,6,6,3
13,Mona,6,6,6,3


In [0]:
%sql
WITH EmployeeHierarchy AS (
  SELECT e.EmployeeID,
         e.ManagementHierarchy,
         SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] AS LastHierarchyManagerID,
         m.ManagerID AS ManagersManagerID
    FROM employees e
    JOIN employees m ON CAST(SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] AS INT) = m.EmployeeID
   WHERE m.ManagerID != 0 -- Don't update if you are the top already
)
MERGE INTO employees e
USING EmployeeHierarchy eh
ON e.EmployeeID = eh.EmployeeID
WHEN MATCHED
THEN UPDATE SET e.ManagementHierarchy = CONCAT(e.ManagementHierarchy, '-', COALESCE(eh.ManagersManagerID, '0'));

SELECT *
  FROM employees;

EmployeeID,Name,ManagerID,ManagementHierarchy
1,Alice,0,
2,Bob,1,1
3,Charlie,1,1
4,David,2,2-1
5,Eve,2,2-1
6,Frank,3,3-1
7,Grace,3,3-1
8,Hank,4,4-2
9,Ivy,4,4-2
10,Jack,5,5-2


In [0]:
%sql
SELECT e.*,
       SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] AS LastHierarchyManagerID,
       m.ManagerID AS ManagersManagerID
  FROM employees e
  JOIN employees m ON SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] = m.EmployeeID
 WHERE m.ManagerID != 0 -- Don't update if you are the top already
 ORDER BY e.EmployeeID ASC
;

EmployeeID,Name,ManagerID,ManagementHierarchy,LastHierarchyManagerID,ManagersManagerID
8,Hank,4,4-2,2,1
9,Ivy,4,4-2,2,1
10,Jack,5,5-2,2,1
11,Karen,5,5-2,2,1
12,Leo,6,6-3,3,1
13,Mona,6,6-3,3,1
14,Nina,7,7-3,3,1
15,Oscar,7,7-3,3,1
16,Paul,8,8-4,4,2
17,Quinn,8,8-4,4,2


In [0]:
%sql
WITH EmployeeHierarchy AS (
  SELECT e.EmployeeID,
         e.ManagementHierarchy,
         SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] AS LastHierarchyManagerID,
         m.ManagerID AS ManagersManagerID
    FROM employees e
    JOIN employees m ON CAST(SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] AS INT) = m.EmployeeID
   WHERE m.ManagerID != 0 -- Don't update if you are the top already
)
MERGE INTO employees e
USING EmployeeHierarchy eh
ON e.EmployeeID = eh.EmployeeID
WHEN MATCHED
THEN UPDATE SET e.ManagementHierarchy = CONCAT(e.ManagementHierarchy, '-', COALESCE(eh.ManagersManagerID, '0'));

SELECT *
  FROM employees;

EmployeeID,Name,ManagerID,ManagementHierarchy
1,Alice,0,
2,Bob,1,1
3,Charlie,1,1
4,David,2,2-1
5,Eve,2,2-1
6,Frank,3,3-1
7,Grace,3,3-1
8,Hank,4,4-2-1
9,Ivy,4,4-2-1
10,Jack,5,5-2-1


In [0]:
%sql
WITH EmployeeHierarchy AS (
  SELECT e.EmployeeID,
         e.ManagementHierarchy,
         SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] AS LastHierarchyManagerID,
         m.ManagerID AS ManagersManagerID
    FROM employees e
    JOIN employees m ON CAST(SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] AS INT) = m.EmployeeID
   WHERE m.ManagerID != 0 -- Don't update if you are the top already
)
MERGE INTO employees e
USING EmployeeHierarchy eh
ON e.EmployeeID = eh.EmployeeID
WHEN MATCHED
THEN UPDATE SET e.ManagementHierarchy = CONCAT(e.ManagementHierarchy, '-', COALESCE(eh.ManagersManagerID, '0'));

SELECT *
  FROM employees;

EmployeeID,Name,ManagerID,ManagementHierarchy
1,Alice,0,
2,Bob,1,1
3,Charlie,1,1
4,David,2,2-1
5,Eve,2,2-1
6,Frank,3,3-1
7,Grace,3,3-1
8,Hank,4,4-2-1
9,Ivy,4,4-2-1
10,Jack,5,5-2-1


In [0]:
%sql
SELECT e.*,
       SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] AS LastHierarchyManagerID,
       m.ManagerID AS ManagersManagerID
  FROM employees e
  LEFT OUTER JOIN employees m ON SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] = m.EmployeeID
 WHERE SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] != 1
 ORDER BY e.EmployeeID ASC
;

EmployeeID,Name,ManagerID,ManagementHierarchy,LastHierarchyManagerID,ManagersManagerID


## Re-write the above pattern in a single loop

In [0]:
from delta.tables import *

# Reference to the Delta table
empTable = DeltaTable.forName(spark, "employees")

rows_updated = 1

while rows_updated > 0:
    result = spark.sql("""
    WITH EmployeeHierarchy AS (
      SELECT e.EmployeeID,
             e.ManagementHierarchy,
             SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] AS LastHierarchyManagerID,
             m.ManagerID AS ManagersManagerID
        FROM employees e
        JOIN employees m ON CAST(SPLIT(e.ManagementHierarchy, '-')[SIZE(SPLIT(e.ManagementHierarchy, '-')) - 1] AS INT) = m.EmployeeID
       WHERE m.ManagerID != 0 -- Don't update if you are the top already
    )
    MERGE INTO employees e
    USING EmployeeHierarchy eh
    ON e.EmployeeID = eh.EmployeeID
    WHEN MATCHED
    THEN UPDATE SET e.ManagementHierarchy = CONCAT(e.ManagementHierarchy, '-', COALESCE(eh.ManagersManagerID, '0'))
    """)
    
    # result.count() does not work with merge!!! See below
    # merge_count = result.count()
    # print(f"Rows updated via merge: {merge_count}")

    # Get the history of the table
    history = empTable.history()
    # Get latest merge results from the history table
    latest_operation = history.select("operationMetrics").limit(1).collect()[0][0]
    # Extract the row counts from the history
    rows_inserted = int(latest_operation["numTargetRowsInserted"])
    rows_updated = int(latest_operation["numTargetRowsUpdated"])
    rows_deleted = int(latest_operation["numTargetRowsDeleted"])

    print(f"Rows inserted: {rows_inserted}; Rows updated: {rows_updated}; Rows deleted: {rows_deleted}")

display(spark.sql("SELECT *, SIZE(SPLIT(ManagementHierarchy, '-')) AS HierarchySize FROM employees"))

Rows inserted: 0; Rows updated: 17; Rows deleted: 0
Rows inserted: 0; Rows updated: 13; Rows deleted: 0
Rows inserted: 0; Rows updated: 5; Rows deleted: 0
Rows inserted: 0; Rows updated: 0; Rows deleted: 0


EmployeeID,Name,ManagerID,ManagementHierarchy,HierarchySize
1,Alice,0,,
2,Bob,1,1,1.0
3,Charlie,1,1,1.0
4,David,2,2-1,2.0
5,Eve,2,2-1,2.0
6,Frank,3,3-1,2.0
7,Grace,3,3-1,2.0
8,Hank,4,4-2-1,3.0
9,Ivy,4,4-2-1,3.0
10,Jack,5,5-2-1,3.0


In [0]:
%sql
SELECT *,
       SIZE(SPLIT(ManagementHierarchy, '-')) AS HierarchySize
  FROM employees

EmployeeID,Name,ManagerID,ManagementHierarchy,HierarchySize
1,Alice,0,,
2,Bob,1,1,1.0
3,Charlie,1,1,1.0
4,David,2,2-1,2.0
5,Eve,2,2-1,2.0
6,Frank,3,3-1,2.0
7,Grace,3,3-1,2.0
8,Hank,4,4-2-1,3.0
9,Ivy,4,4-2-1,3.0
10,Jack,5,5-2-1,3.0


In [0]:
dbutils.notebook.exit("Success")

## Sample Queries

In [0]:
%sql
        --  CASE 
        --    WHEN l4.ManagerID IS NULL THEN '0' 
        --    ELSE CAST(l4.ManagerID AS STRING) 
        --  END,
        --  '-',
        --  CASE 
        --    WHEN l3.ManagerID IS NULL THEN '0' 
        --    ELSE CAST(l3.ManagerID AS STRING) 
        --  END,
        --  '-',
        --  CASE 
        --    WHEN l2.ManagerID IS NULL THEN '0' 
        --    ELSE CAST(l2.ManagerID AS STRING) 
        --  END,
        --  '-',
        --  CASE 
        --    WHEN l1.ManagerID IS NULL THEN '0' 
        --    ELSE CAST(l1.ManagerID AS STRING) 
        --  END,
        --  '-',
        --  CAST(e.ManagerID AS STRING)

SELECT e.*, -- EXCEPT (ManagementHierarchy), <== This is the right answer, so hide
      --  l1.Name AS L1_ManagerName,
      --  l1.ManagerID AS L1_ManagerID,
       CASE 
         WHEN l1.Name IS NULL THEN 'Top' 
         ELSE l1.Name
       END AS L1_ManagerName,
       CASE 
         WHEN l1.ManagerID IS NULL THEN '0' 
         ELSE CAST(l1.ManagerID AS STRING) 
       END AS L1_ManagerID,
      --  l2.Name AS L2_ManagerName,
      --  l2.ManagerID AS L2_ManagerID,
       CASE 
         WHEN l2.Name IS NULL THEN 'Top' 
         ELSE l2.Name
       END AS L2_ManagerName,
       CASE 
         WHEN l2.ManagerID IS NULL THEN '0' 
         ELSE CAST(l2.ManagerID AS STRING) 
       END AS L2_ManagerID,
      --  l3.Name AS L3_ManagerName,
      --  l3.ManagerID AS L3_ManagerID,
       CASE 
         WHEN l3.Name IS NULL THEN 'Top' 
         ELSE l3.Name
       END AS L3_ManagerName,
       CASE 
         WHEN l3.ManagerID IS NULL THEN '0' 
         ELSE CAST(l3.ManagerID AS STRING) 
       END AS L3_ManagerID,
      --  l4.Name AS L4_ManagerName,
      --  l4.ManagerID AS L4_ManagerID,
       CASE 
         WHEN l4.Name IS NULL THEN 'Top' 
         ELSE l4.Name
       END AS L4_ManagerName,
       CASE 
         WHEN l4.ManagerID IS NULL THEN '0' 
         ELSE CAST(l4.ManagerID AS STRING) 
       END AS L4_ManagerID,
       CONCAT(
         L4_ManagerID,
         '-',
         L3_ManagerID,
         '-',
         L2_ManagerID,
         '-',
         L1_ManagerID,
         '-',
         CAST(e.ManagerID AS STRING)
       ) AS ManagerHierarchy
  FROM employees e
  LEFT OUTER JOIN employees l1
    ON l1.EmployeeID = e.ManagerID
  LEFT OUTER JOIN employees l2
    ON l2.EmployeeID = l1.ManagerID
  LEFT OUTER JOIN employees l3
    ON l3.EmployeeID = l2.ManagerID
  LEFT OUTER JOIN employees l4
    ON l4.EmployeeID = l3.ManagerID
--  ORDER BY ManagerHierarchy ASC
 ORDER BY e.EmployeeID ASC
;

EmployeeID,Name,ManagerID,ManagementHierarchy,L1_ManagerName,L1_ManagerID,L2_ManagerName,L2_ManagerID,L3_ManagerName,L3_ManagerID,L4_ManagerName,L4_ManagerID,ManagerHierarchy
1,Alice,0,,Top,0,Top,0,Top,0,Top,0,0-0-0-0-0
2,Bob,1,1,Alice,0,Top,0,Top,0,Top,0,0-0-0-0-1
3,Charlie,1,1,Alice,0,Top,0,Top,0,Top,0,0-0-0-0-1
4,David,2,2-1,Bob,1,Alice,0,Top,0,Top,0,0-0-0-1-2
5,Eve,2,2-1,Bob,1,Alice,0,Top,0,Top,0,0-0-0-1-2
6,Frank,3,3-1,Charlie,1,Alice,0,Top,0,Top,0,0-0-0-1-3
7,Grace,3,3-1,Charlie,1,Alice,0,Top,0,Top,0,0-0-0-1-3
8,Hank,4,4-2-1,David,2,Bob,1,Alice,0,Top,0,0-0-1-2-4
9,Ivy,4,4-2-1,David,2,Bob,1,Alice,0,Top,0,0-0-1-2-4
10,Jack,5,5-2-1,Eve,2,Bob,1,Alice,0,Top,0,0-0-1-2-5


### GraphFrames?

In [0]:
# Install the GraphFrames library
%pip install graphframes

In [0]:
dbutils.library.restartPython()

In [0]:
%sql
USE CATALOG ggw;
USE SCHEMA hr;

SELECT current_catalog(), current_database(), current_timestamp();

current_catalog(),current_database(),current_timestamp()
ggw,hr,2025-01-31T16:34:07.124+0000


In [0]:
employees_df = spark.table("employees")
managers_df = spark.sql("SELECT EmployeeID, ManagerID, 'Manager' AS Relationship FROM employees WHERE ManagerID IS NOT NULL")

display(managers_df)

In [0]:
from graphframes import GraphFrame

# Create a GraphFrame
g = GraphFrame(v=employees_df, e=managers_df)

# Display Graph
display(g)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPySparkTypeError[0m                          Traceback (most recent call last)
File [0;32m<command-971429592434590>, line 4[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01mgraphframes[39;00m [38;5;28;01mimport[39;00m GraphFrame
[1;32m      3[0m [38;5;66;03m# Create a GraphFrame[39;00m
[0;32m----> 4[0m g [38;5;241m=[39m GraphFrame(v[38;5;241m=[39mspark[38;5;241m.[39mcreateDataFrame(employees_df), e[38;5;241m=[39mspark[38;5;241m.[39mcreateDataFrame(managers_df))
[1;32m      6[0m [38;5;66;03m# Display Graph[39;00m
[1;32m      7[0m display(g)

File [0;32m/databricks/spark/python/pyspark/sql/connect/session.py:462[0m, in [0;36mSparkSession.createDataFrame[0;34m(self, data, schema, samplingRatio, verifySchema)[0m
[1;32m    460[0m [38;5;28;01massert[39;00m data [38;5;129;01mis[39;00m [38;5;129;01mnot[39;00m [38;5;28;01mNone[39;00m
[1;32m    461

In [0]:
from graphframes import GraphFrame

# Ensure employees_df and managers_df are Spark DataFrames
emp_df = spark.createDataFrame(employees_df)
mgr_df = spark.createDataFrame(managers_df)

# Create a GraphFrame
g = GraphFrame(v=employees_df, e=managers_df)

# Display Graph
display(g)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPySparkTypeError[0m                          Traceback (most recent call last)
File [0;32m<command-971429592433441>, line 4[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01mgraphframes[39;00m [38;5;28;01mimport[39;00m GraphFrame
[1;32m      3[0m [38;5;66;03m# Ensure employees_df and managers_df are Spark DataFrames[39;00m
[0;32m----> 4[0m emp_df [38;5;241m=[39m spark[38;5;241m.[39mcreateDataFrame(employees_df)
[1;32m      5[0m mgr_df [38;5;241m=[39m spark[38;5;241m.[39mcreateDataFrame(managers_df)
[1;32m      7[0m [38;5;66;03m# Create a GraphFrame[39;00m

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46[0m [38;5;28;01mtry[39;00m:
[0;32m---> 47[0m     res [38;5;

In [0]:
from graphframes import GraphFrame

# Create the vertices DataFrame
vertices = employees_df.select("EmployeeID").withColumnRenamed("EmployeeID", "id")

# Create the edges DataFrame
edges = managers_df.withColumnRenamed("EmployeeID", "src").withColumnRenamed("ManagerID", "dst")

# Create the GraphFrame
g = GraphFrame(vertices, edges)


[0;31m---------------------------------------------------------------------------[0m
[0;31mPySparkAttributeError[0m                     Traceback (most recent call last)
File [0;32m<command-971429592434280>, line 10[0m
[1;32m      7[0m edges [38;5;241m=[39m managers_df[38;5;241m.[39mwithColumnRenamed([38;5;124m"[39m[38;5;124mEmployeeID[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124msrc[39m[38;5;124m"[39m)[38;5;241m.[39mwithColumnRenamed([38;5;124m"[39m[38;5;124mManagerID[39m[38;5;124m"[39m, [38;5;124m"[39m[38;5;124mdst[39m[38;5;124m"[39m)
[1;32m      9[0m [38;5;66;03m# Create the GraphFrame[39;00m
[0;32m---> 10[0m g [38;5;241m=[39m GraphFrame(vertices, edges)

File [0;32m/local_disk0/.ephemeral_nfs/envs/pythonEnv-b880d600-b475-472f-be3f-2d91d3097461/lib/python3.11/site-packages/graphframes/graphframe.py:63[0m, in [0;36mGraphFrame.__init__[0;34m(self, v, e)[0m
[1;32m     61[0m [38;5;28mself[39m[38;5;241m.[39m_vertices [38;5;241m=[

In [0]:
vertices = spark.createDataFrame([
    ("1", "Carter", "Derrick", 50),
    ("2", "May", "Derrick", 26),
    # ... other vertices ...
], ["id", "name", "firstname", "age"])

edges = spark.createDataFrame([
    ("1", "2", "friend"),
    ("2", "1", "friend"),
    # ... other edges ...
], ["src", "dst", "type"])


In [0]:
from graphframes import GraphFrame
g = GraphFrame(vertices, edges)


[0;31m---------------------------------------------------------------------------[0m
[0;31mPySparkAttributeError[0m                     Traceback (most recent call last)
File [0;32m<command-971429592434321>, line 2[0m
[1;32m      1[0m [38;5;28;01mfrom[39;00m [38;5;21;01mgraphframes[39;00m [38;5;28;01mimport[39;00m GraphFrame
[0;32m----> 2[0m g [38;5;241m=[39m GraphFrame(vertices, edges)

File [0;32m/local_disk0/.ephemeral_nfs/envs/pythonEnv-0436c599-57f3-41b2-9b5b-fad5881fb54b/lib/python3.11/site-packages/graphframes/graphframe.py:63[0m, in [0;36mGraphFrame.__init__[0;34m(self, v, e)[0m
[1;32m     61[0m [38;5;28mself[39m[38;5;241m.[39m_vertices [38;5;241m=[39m v
[1;32m     62[0m [38;5;28mself[39m[38;5;241m.[39m_edges [38;5;241m=[39m e
[0;32m---> 63[0m [38;5;28mself[39m[38;5;241m.[39m_sqlContext [38;5;241m=[39m v[38;5;241m.[39msql_ctx
[1;32m     64[0m [38;5;28mself[39m[38;5;241m.[39m_sc [38;5;241m=[39m [38;5;28mself[39m[38;5;24

In [0]:
# Perform some graph operations
print("In-degrees:")
g.inDegrees.show()

print("Out-degrees:")
g.outDegrees.show()

# Find the shortest paths from 'a' to all other vertices
results = g.shortestPaths(landmarks=["a"])
print("Shortest paths from 'a':")
results.select("id", "distances").show()

# Stop the SparkSession
spark.stop()
