In [1]:
import json
import pandas as pd
from arcgis.gis import GIS
from arcgis.features import FeatureLayer, FeatureSet
from arcgis.geometry import Geometry, buffer, LengthUnits
from arcgis.geometry import SpatialReference
import tempfile
import os
import math 
from arcgis.geometry import Point, Polyline, Polygon
from IPython.display import clear_output

In [2]:
gis = GIS()
print(f"Connected to: {gis}")

Connected to: GIS @ https://www.arcgis.com version:2025.2


In [3]:
def load_geojson_from_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        return json.load(f)

lands_data = load_geojson_from_file('land.geojson')
boundary_data = load_geojson_from_file('boundary.geojson')

In [4]:
map = gis.map('Bangkok') 
map.zoom = 15
map.center = [100.5535, 13.7585]

In [5]:
def parse_land_features(lands_data):
    land_features = []
    features = lands_data['features']
    for feature in features:  
        geometry = Geometry(feature['geometry'])
        attributes = feature['properties']
        
        arcgis_feature = {
            'geometry': geometry,
            'attributes': attributes
        }
        land_features.append(arcgis_feature) 
        
    spatial_ref = SpatialReference(wkid=24047)
    land_feature_set = FeatureSet(land_features, spatial_reference=spatial_ref)  # Corrected list name
    return land_features, land_feature_set


land_features, land_feature_set = parse_land_features(lands_data)
map.content.add(land_feature_set)

In [6]:
def parse_boundary_feature(boundary_data):
    boundary_feature = []
    for feature in boundary_data['features']:
        geometry = Geometry(feature['geometry'])
        attributes = feature['properties']
        
        arcgis_feature = {
            'geometry': geometry,
            'attributes': attributes
        }
        boundary_feature.append(arcgis_feature)
    spatial_ref = SpatialReference(wkid=24047)
    boundary_feature_set = FeatureSet(boundary_feature, spatial_reference=spatial_ref)
        
    return boundary_feature, boundary_feature_set

boundary_feature, boundary_feature_set = parse_boundary_feature(boundary_data)

In [7]:
def min_area_parcel(lands):
    min_area = lands[0]['geometry'].area
    for land in lands: 
        geometry = land['geometry']
        area = geometry.area
        if area < min_area:
            min_area = area 
    return min_area 

min_area = min_area_parcel(land_features)
min_area

19431.00555419922

In [8]:
import math 
from arcgis.geometry import Point, Polyline, Polygon

def generate_road_features(boundary, lands):
    min_area = min_area_parcel(lands)
    cell_size = math.sqrt(min_area) 
    road_features = [] 
    boundary_extent = boundary[0]['geometry'].extent
    min_x, min_y, max_x, max_y = boundary_extent[0], boundary_extent[1], boundary_extent[2], boundary_extent[3]
    cols = int(math.ceil((max_x - min_x) / cell_size))
    rows = int(math.ceil((max_y - min_y) / cell_size))

    id = 0 

    for row in range(rows + 1):
        y = min_y + row * cell_size
        for col in range(cols):
            x1 = min_x + col * cell_size
            x2 = min_x + (col + 1) * cell_size
            road = generate_road_geometry(x1, y, x2, y, 12)
            arcgis_feature = {
                'geometry': road,
                'attributes': {
                    'id': id,
                    'road_price': float('inf'),
                    'connected_lands': [],
                    'connected_roads': [],
                    'efficiency': float('inf'),
                    'road_type': 'horizontal', 
                    'row': row, 
                    'col': col
                }
            }
            clear_output(wait=True) 
            print(f" working on road: {id}")
            road_features.append(arcgis_feature)
            id += 1 
            
    # Vertical segments (between cells horizontally)  
    for col in range(cols + 1):
        x = min_x + col * cell_size
        for row in range(rows):
            y1 = min_y + row * cell_size
            y2 = min_y + (row + 1) * cell_size
            road = generate_road_geometry(x, y1, x, y2, 12)
            arcgis_feature = {
                'geometry': road,
                'attributes': {
                    'id': id,
                    'road_price': float('inf'),
                    'connected_lands': [],
                    'connected_roads': [],
                    'efficiency': float('inf'),
                    'road_type': 'vertical',
                    'row': row, 
                    'col': col
                }
            }
            clear_output(wait=True) 
            print(f" working on road: {id}")
            road_features.append(arcgis_feature)
            id += 1
            
    return road_features  

def generate_road_geometry(x1, y1, x2, y2, buffer_size):
    border_line = Polyline({
                'paths': [[[x1, y1], [x2, y2]]],
                'spatialReference': {'wkid': 24047}
    })

    buffer_result = buffer(geometries=[border_line], distances=[12], in_sr={"wkid": 24047}, unit=LengthUnits.METER)
    if len(buffer_result) < 1:
        print(buffer_result)
    road = Geometry(buffer_result[0])
    return road 

    
print("start generating ...")
road_features = generate_road_features(boundary_feature, land_features) 
print("done")

 working on road: 179
done


In [9]:
def get_connected_roads(road, road_features):
    road_id = road['attributes']['row']
    road_type = road['attributes']['road_type']
    row = road['attributes']['row']
    col = road['attributes']['col']
    connected_roads = []

    for other_road in road_features:
        other_id = other_road['attributes']['id']
        other_type = other_road['attributes']['road_type']
        other_row = other_road['attributes']['row']
        other_col = other_road['attributes']['col']
        
        if other_id == road_id:
            continue
            
        if road_type == 'horizontal' and other_type == 'vertical':
            if (other_row == row - 1 or other_row == row) and (other_col == col or other_col == col + 1):
                connected_roads.append(other_id)
        elif road_type == 'vertical' and other_type == 'horizontal':
            if (other_row == row or other_row == row + 1) and (other_col == col - 1 or other_col == col):
                connected_roads.append(other_id)
    
    return connected_roads

def update_connected_roads(road_features):
    for road in road_features:
        road['attributes']['connected_roads'] = get_connected_roads(road, road_features)
        clear_output(wait=True) 
        print(f"updating road: {road['attributes']['id']}")
    return road_features 

print("start updating")
road_features = update_connected_roads(road_features) 
print("update connected road done")

updating road: 179
update connected road done


In [10]:
from arcgis.geometry import buffer, LengthUnits, Geometry
from arcgis.geometry import functions as geom_functions
from arcgis.geometry import SpatialReference
from arcgis.features import FeatureSet
from IPython.display import clear_output

def calculate_road_impact(road, land_features):
    connected_lands = [] 
    total_price = 0
    for land in land_features:
        intersect_geom = geom_functions.intersect(24047, [land['geometry']], road['geometry'])[0]
        land_id = land['attributes']['id']
        price_per_sqm = land['attributes']['price_per_sqm']
        area = intersect_geom.area
        if area > 0:
            total_price += area * price_per_sqm
            connected_lands.append(land_id)
    return connected_lands, total_price 

def update_connected_lands(road_features, land_features): 
    for i, road in enumerate(road_features):
        connected_lands, road_price = calculate_road_impact(road, land_features)
        clear_output(wait=True)  
        print(f"Updating connected lands: Road {i+1}/{len(road_features)} - Road ID: {road['attributes']['id']} Price: {road_price} Connected lands: {connected_lands}")
        road['attributes']['connected_lands'] = connected_lands
        road['attributes']['road_price'] = road_price
    return road_features

print("updating connected lands")
print(f"total road: {len(road_features)}")
road_features = update_connected_lands(road_features, land_features)
print("done")

Updating connected lands: Road 180/180 - Road ID: 179 Price: 0 Connected lands: []
done


In [None]:
# ------------------------------------------------------------------------------------------# --------------------------------------------------------

In [12]:
import copy 
road_features_copy = copy.deepcopy(road_features)
len(road_features_copy)

180

In [13]:
def calculate_road_efficiency(road_features, connected_land_ids):
    for road in road_features:
        road_connected_lands = road['attributes']['connected_lands']
        unconnected_land_count = 0
        for land_id in road_connected_lands:
            if land_id not in connected_land_ids:
                unconnected_land_count += 1
                
        if unconnected_land_count > 0:
            road['attributes']['efficiency'] = road['attributes']['road_price'] / unconnected_land_count
        else:
            road['attributes']['efficiency'] = float('inf')

connected_land_ids = [] 
selected_roads = [] 

while len(connected_land_ids) < 20:
    # Calculate the efficiency of each road
    calculate_road_efficiency(road_features_copy, connected_land_ids)
    
    # Filter out roads that can't connect any new land
    candidate_roads = [road for road in road_features_copy if road['attributes']['efficiency'] != float('inf')]
    
    if not candidate_roads:
        print("No more roads can connect new lands!")
        break
    
    # Sort roads by efficiency (lowest first)
    candidate_roads.sort(key=lambda x: x['attributes']['efficiency'])
    
    # Select the road with the highest efficiency
    selected_road = candidate_roads[0]
    selected_roads.append(selected_road)
    
    # Add the lands connected by the selected road to the connected land list
    for land_id in selected_road['attributes']['connected_lands']:
        if land_id not in connected_land_ids:
            connected_land_ids.append(land_id)
    
    # Remove the selected road from the list of roads
    road_features_copy.remove(selected_road)


In [14]:
spatial_ref = SpatialReference(wkid=24047)
selected_road_feature_set = FeatureSet(selected_roads, spatial_reference=spatial_ref)
map.content.add(selected_road_feature_set)
map 

Map(center=[100.5535, 13.7585], extent={'xmin': 11158024.913073143, 'ymin': 1515306.9157138057, 'xmax': 112176…

In [15]:

selected_road_copy = copy.deepcopy(selected_roads)
remaining_road_copy = copy.deepcopy(road_features_copy)
# road['attributes']['connected_roads']
# road['attributes']['road_price']


In [16]:
import heapq
from collections import defaultdict
from itertools import combinations

def steiner_tree_connect_roads(selected_roads, remaining_roads):
    """
    Find minimum cost Steiner tree to connect all selected roads.
    Returns the road IDs that need to be added to connect all selected roads.
    """
    
    selected_ids = {road['attributes']['id'] for road in selected_roads}
    all_roads = {road['attributes']['id']: road for road in selected_roads + remaining_roads}
    
    # Build adjacency graph with road prices as edge weights
    graph = defaultdict(list)
    road_prices = {}
    
    # First, collect all road prices
    for road in selected_roads + remaining_roads:
        road_id = road['attributes']['id']
        price = road['attributes']['road_price']
        road_prices[road_id] = price
    
    # Then build the graph, skipping invalid connections
    for road in selected_roads + remaining_roads:
        road_id = road['attributes']['id']
        
        for connected_id in road['attributes']['connected_roads']:
            # Only add edge if both roads exist in our dataset
            if connected_id in all_roads and connected_id in road_prices:
                # Edge weight is the price of the destination road
                graph[road_id].append((connected_id, road_prices[connected_id]))
    
    def dijkstra_shortest_paths(source, targets):
        """Find shortest paths from source to all targets"""
        distances = {node: float('inf') for node in all_roads}
        distances[source] = 0
        previous = {node: None for node in all_roads}
        pq = [(0, source)]
        
        while pq:
            current_dist, current = heapq.heappop(pq)
            
            if current_dist > distances[current]:
                continue
                
            for neighbor, weight in graph[current]:
                distance = current_dist + weight
                
                if distance < distances[neighbor]:
                    distances[neighbor] = distance
                    previous[neighbor] = current
                    heapq.heappush(pq, (distance, neighbor))
        
        # Return paths to targets
        paths = {}
        costs = {}
        for target in targets:
            if distances[target] < float('inf'):
                path = []
                current = target
                while current is not None:
                    path.append(current)
                    current = previous[current]
                path.reverse()
                paths[target] = path
                costs[target] = distances[target]
        
        return paths, costs
    
    def steiner_tree_dp():
        """Dynamic programming approach for Steiner tree"""
        terminals = list(selected_ids)
        n = len(terminals)
        
        if n <= 1:
            return set(), 0
        
        if n == 2:
            # Simple case: find shortest path between two terminals
            paths, costs = dijkstra_shortest_paths(terminals[0], {terminals[1]})
            if terminals[1] in paths:
                path_nodes = set(paths[terminals[1]])
                steiner_nodes = path_nodes - selected_ids
                return steiner_nodes, costs[terminals[1]]
            else:
                return set(), float('inf')  # No path exists
        
        # For larger sets, use approximation algorithm
        return steiner_approximation(terminals)
    
    def steiner_approximation(terminals):
        """
        2-approximation algorithm for Steiner tree:
        1. Find shortest paths between all pairs of terminals
        2. Build complete graph with these distances
        3. Find MST of complete graph
        4. Replace MST edges with actual shortest paths
        """
        
        # Step 1: Find shortest paths between all terminal pairs
        terminal_distances = {}
        terminal_paths = {}
        
        for i, terminal in enumerate(terminals):
            targets = set(terminals[i+1:])
            if targets:
                paths, costs = dijkstra_shortest_paths(terminal, targets)
                for target in targets:
                    if target in paths:
                        key = (min(terminal, target), max(terminal, target))
                        terminal_distances[key] = costs[target]
                        terminal_paths[key] = paths[target]
        
        # Step 2: Build MST on complete graph of terminals
        edges = [(cost, t1, t2) for (t1, t2), cost in terminal_distances.items()]
        edges.sort()
        
        # Kruskal's algorithm for MST
        parent = {}
        def find(x):
            if x not in parent:
                parent[x] = x
            if parent[x] != x:
                parent[x] = find(parent[x])
            return parent[x]
        
        def union(x, y):
            px, py = find(x), find(y)
            if px != py:
                parent[py] = px
                return True
            return False
        
        mst_edges = []
        total_cost = 0
        
        for cost, t1, t2 in edges:
            if union(t1, t2):
                mst_edges.append((t1, t2))
                total_cost += cost
                if len(mst_edges) == len(terminals) - 1:
                    break
        
        # Step 3: Collect all nodes in the shortest paths of MST edges
        steiner_nodes = set()
        
        for t1, t2 in mst_edges:
            key = (min(t1, t2), max(t1, t2))
            if key in terminal_paths:
                path = terminal_paths[key]
                steiner_nodes.update(path)
        
        # Remove terminals from steiner nodes (we only want intermediate nodes)
        steiner_nodes -= selected_ids
        
        return steiner_nodes, total_cost
    
    # Run the Steiner tree algorithm
    try:
        steiner_nodes, total_cost = steiner_tree_dp()
        
        # Verify connectivity by checking if all selected roads are connected
        # through the steiner tree
        connected_nodes = selected_ids | steiner_nodes
        
        # Build subgraph with only connected nodes
        subgraph = defaultdict(list)
        for road_id in connected_nodes:
            for neighbor, weight in graph[road_id]:
                if neighbor in connected_nodes:
                    subgraph[road_id].append(neighbor)
        
        # Check if all selected roads are reachable from one selected road
        def is_connected():
            if not selected_ids:
                return True
            
            start = next(iter(selected_ids))
            visited = set()
            stack = [start]
            
            while stack:
                current = stack.pop()
                if current in visited:
                    continue
                visited.add(current)
                
                for neighbor in subgraph[current]:
                    if neighbor not in visited:
                        stack.append(neighbor)
            
            return selected_ids.issubset(visited)
        
        if is_connected():
            return list(steiner_nodes), total_cost
        else:
            print("Warning: Could not find a connected Steiner tree")
            return list(steiner_nodes), float('inf')
            
    except Exception as e:
        print(f"Error in Steiner tree calculation: {e}")
        return [], float('inf')

# Example usage:
def add_steiner_roads(selected_roads, remaining_roads):
    """
    Main function to connect all selected roads optimally
    """
    print("Finding optimal roads to connect all selected roads...")
    
    steiner_road_ids, total_cost = steiner_tree_connect_roads(selected_roads, remaining_roads)
    
    if steiner_road_ids:
        print(f"Roads to add: {steiner_road_ids}")
        print(f"Total cost: {total_cost/1000000:.2f}M")
        
        # Add these roads to selected_roads
        remaining_roads_dict = {road['attributes']['id']: road for road in remaining_roads}
        
        roads_to_add = []
        for road_id in steiner_road_ids:
            if road_id in remaining_roads_dict:
                roads_to_add.append(remaining_roads_dict[road_id])
        
        return roads_to_add, total_cost
    else:
        print("No additional roads needed or no solution found")
        return [], total_cost

# Usage:
roads_to_add, cost = add_steiner_roads(selected_road_copy, remaining_road_copy)

Finding optimal roads to connect all selected roads...
Roads to add: [6, 7, 134, 143, 147, 149, 152, 153, 26, 27, 156, 157, 158, 160, 34, 162, 164, 36, 161, 32, 168, 43, 172, 45, 46, 52, 59, 61, 63, 70, 72, 75, 79, 82, 83, 84, 85, 86, 87, 92, 93, 94, 95, 96, 97, 104, 116, 125]
Total cost: 1855.75M


In [17]:
road_ids_to_add = [road['attributes']['id'] for road in roads_to_add]
print(len(roads_to_add))
print(len(selected_road_copy))
print(len(remaining_road_copy))

48
12
168


In [18]:
def add_roads_from_ids(ids, selected_roads, remaining_roads):
    roads_to_add = [road for road in remaining_roads if road['attributes']['id'] in ids]
    
    # Add roads to selected_roads
    selected_roads.extend(roads_to_add)

    # Remove added roads from remaining_roads
    remaining_roads = [road for road in remaining_roads if road['attributes']['id'] not in ids]
    
    return selected_roads, remaining_roads

print(f"before: {len(selected_road_copy)}")
selected_road_copy, remaining_road_copy = add_roads_from_ids(road_ids_to_add, selected_road_copy, remaining_road_copy)
print(f"after: {len(selected_road_copy)}")

spatial_ref = SpatialReference(wkid=24047)
update_selected_road_feature_set = FeatureSet(selected_road_copy, spatial_reference=spatial_ref)


before: 12
after: 60


In [19]:
map3 = gis.map('Bangkok') 
map3.zoom = 15
map3.center = [100.5535, 13.7585]
map3.content.add(update_selected_road_feature_set)
map3.content.add(land_feature_set)
map3

Map(center=[100.5535, 13.7585], extent={'xmin': 11158024.913073143, 'ymin': 1515306.9157138057, 'xmax': 112176…

In [26]:
def calculate_total_land_price(land_features):
    total_price = 0
    for land_feature in land_features:
        total_price += land_feature['attributes']['price_per_sqm'] * land_feature['geometry'].area
    return total_price 

total_land_price = calculate_total_land_price(land_features) 
print(f"total land price {total_land_price}")
        

total land price 39707075173.72131


In [27]:
def calculate_total_road_price(road_features):
    total_price = 0 
    for road_feature in road_features:
            total_price += road_feature['attributes']['road_price']
    return total_price 

total_road_price = calculate_total_road_price(selected_road_copy) 
print(f"total land price {total_land_price}")

total land price 39707075173.72131


In [29]:
original_land_price = total_land_price 
deducted_land_price = total_land_price - total_road_price 
new_land_price = deducted_land_price * 1.2 
new_land_price / original_land_price * 100 

percentage_increase = (new_land_price / original_land_price) * 100

print(f"Original Land Price: {original_land_price}")
print(f"Deducted Land Price (after road price deduction): {deducted_land_price}")
print(f"New Land Price (after 20% increase): {new_land_price}")
print(f"Percentage Increase: {percentage_increase-100:.2f}%")

Original Land Price: 39707075173.72131
Deducted Land Price (after road price deduction): 37498583473.66333
New Land Price (after 20% increase): 44998300168.395996
Percentage Increase: 13.33%


In [None]:
# ------------------------------------------------------------------------------------------# --------------------------------------------------------