# -----import package-----

In [66]:
import geopandas as gpd
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import math
from shapely.ops import linemerge
from shapely.wkt import loads
from shapely.geometry import LineString
from shapely.geometry import Point, Polygon
from scipy.spatial import Delaunay

# -----point file-----

In [2]:
df_point = gpd.read_file('hydrant_location/hydrant_location.shp')

In [3]:
df_point['id'] = df_point.reset_index().index

In [4]:
df_point

Unnamed: 0,id,geometry
0,0,POINT (-74.00698 40.74748)
1,1,POINT (-74.00722 40.74722)
2,2,POINT (-74.00721 40.74697)
3,3,POINT (-74.00722 40.74722)
4,4,POINT (-74.00728 40.74667)
...,...,...
2295,2295,POINT (-73.97976 40.71559)
2296,2296,POINT (-73.97987 40.71361)
2297,2297,POINT (-73.98387 40.72077)
2298,2298,POINT (-73.97780 40.71833)


# -----road file-----

In [5]:
df_road = gpd.read_file('road_location/road_location.shp')

In [6]:
df_road.drop('road_id', axis=1, inplace=True)

In [7]:
df_road

Unnamed: 0,id,geometry
0,1,"LINESTRING (-74.01301 40.70215, -74.01167 40.7..."
1,1,"LINESTRING (-74.01171 40.70247, -74.00999 40.7..."
2,1,"LINESTRING (-74.00577 40.70566, -74.00396 40.7..."
3,1,"LINESTRING (-74.00385 40.70651, -74.00142 40.7..."
4,2,"LINESTRING (-74.01270 40.70275, -74.01180 40.7..."
...,...,...
331,248,"LINESTRING (-73.99741 40.72164, -73.99389 40.7..."
332,249,"LINESTRING (-74.00203 40.71159, -74.00252 40.7..."
333,250,"LINESTRING (-74.00296 40.71334, -74.00150 40.7..."
334,251,"LINESTRING (-74.00799 40.70899, -74.00720 40.7..."


# -----calculate distance between road and point-----

In [10]:
#find the 1 closest road for each point
def find_close_road(point, roads_df):
    close_road = None
    min_distance = float('inf')
    
    for idx, road in roads_df.iterrows():
        distance = point.distance(road['geometry'])
        if distance < min_distance:
            min_distance = distance
            close_road = road['id']
    return close_road

In [11]:
#apply above method to all points in df_point
df_point['close_road'] = df_point.apply(lambda row: find_close_road(row['geometry'], df_road), axis=1)

In [12]:
def find_closest_road(point, roads_df, min_distance):
    closest_road_id = []

    for idx, road in roads_df.iterrows():
        distance = point.distance(road['geometry'])
        if distance < min_distance:
            closest_road_id.append(road['id'])

    return closest_road_id

In [13]:
df_point['closest_road_id'] = df_point.apply(lambda row: find_closest_road(row['geometry'], df_road, 0.0004), axis=1)

In [14]:
df_point

Unnamed: 0,id,geometry,close_road,closest_road_id
0,0,POINT (-74.00698 40.74748),119,"[119, 170]"
1,1,POINT (-74.00722 40.74722),119,"[119, 120, 170]"
2,2,POINT (-74.00721 40.74697),120,"[120, 170]"
3,3,POINT (-74.00722 40.74722),119,"[119, 120, 170]"
4,4,POINT (-74.00728 40.74667),120,"[120, 121, 170]"
...,...,...,...,...
2295,2295,POINT (-73.97976 40.71559),234,"[33, 234]"
2296,2296,POINT (-73.97987 40.71361),30,"[30, 35, 235]"
2297,2297,POINT (-73.98387 40.72077),221,[221]
2298,2298,POINT (-73.97780 40.71833),199,"[199, 235]"


# -----block file-----

In [15]:
def blockInfo(shapefile):
    gdf = gpd.read_file(shapefile)
    block_info_list = []
    for idx, row in gdf.iterrows():
        block_id = row['id'] 
        polygon = row['geometry'] 
        corners = list(polygon.exterior.coords)
        block_info = {
            'blockID': block_id,
            'corners': corners
        }
        block_info_list.append(block_info)
    return block_info_list

In [16]:
def blockIdentify(hydrantCenter, blocks):
    hydrantPoint = Point(hydrantCenter)
    for block in blocks:
        blockCoords = block['corners'] 
        blockPolygon = Polygon(blockCoords)
        if hydrantPoint.within(blockPolygon):
            hydrantBlockID = block['blockID'] 
            return hydrantBlockID
    return None

In [17]:
blocks = blockInfo("block_location/blocksagain.shp")
df_point['block_id'] = df_point.apply(lambda row: blockIdentify(row['geometry'], blocks), axis=1)

In [18]:
# Check if the entire dataframe contains NA values
is_na_anywhere = df_point.isna().any().any()

print("Dataframe contains NA values:", is_na_anywhere)

Dataframe contains NA values: True


In [19]:
df_point

Unnamed: 0,id,geometry,close_road,closest_road_id,block_id
0,0,POINT (-74.00698 40.74748),119,"[119, 170]",1.0
1,1,POINT (-74.00722 40.74722),119,"[119, 120, 170]",2.0
2,2,POINT (-74.00721 40.74697),120,"[120, 170]",2.0
3,3,POINT (-74.00722 40.74722),119,"[119, 120, 170]",2.0
4,4,POINT (-74.00728 40.74667),120,"[120, 121, 170]",3.0
...,...,...,...,...,...
2295,2295,POINT (-73.97976 40.71559),234,"[33, 234]",505.0
2296,2296,POINT (-73.97987 40.71361),30,"[30, 35, 235]",901.0
2297,2297,POINT (-73.98387 40.72077),221,[221],437.0
2298,2298,POINT (-73.97780 40.71833),199,"[199, 235]",444.0


# -----Hydrant Class-----

In [40]:
class Hydrant:
    
    def __init__(self, id, x, y, block_id, closest_road_id, close_road): # constructor
        
        # From shapefile
        self.pointID = id
        self.x = x
        self.y = y
        self.blockID = block_id
        self.roadID = closest_road_id
        self.closeRoadID = close_road
        
        # Will initialize once into network generation
        self.branchID = 0
        self.neighbor = [] # List to store all the neighbors of this hydrant
        self.edge = [] # Dict to store edge information

# -----Edge Class-----

In [54]:
class Edge:
    
    def __init__(self, id, p1, p2, length, cross_road_id, angle_with_road, intersection_angle): # constructor
    
        self.edgeID = id
        self.p1 = p1
        self.p2 = p2
        self.length = length
        self.cross_road_id = cross_road_id
        self.angle_with_road = angle_with_road
        self.intersection_angle = intersection_angle

# -----convert the above dataframe to hydrant object-----

In [42]:
# Create an empty list to store the 'hydrant' instances
hydrant_instances = []

# Iterate through each row in the DataFrame and create 'hydrant' instances
for index, row in df_point.iterrows():
    # Get the Point object from the 'geometry' column
    point = row['geometry']
    
    # Extract 'x' and 'y' coordinates from the Point object
    x, y = point.x, point.y
    
    # Create 'hydrant' instance and add it to the list
    hydrant_obj = Hydrant(row['id'], x, y, row['block_id'], row['closest_road_id'], row['close_road'])
    hydrant_instances.append(hydrant_obj)

In [43]:
hydrant_instances[2].blockID

2.0

In [44]:
# Function to calculate the Euclidean distance between two points
def distance(p1, p2):
    return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)

# -----Triangulation-----

In [45]:
# Extract x and y coordinates from hydrant instances
hydrant_points = [(hydrant.x, hydrant.y) for hydrant in hydrant_instances]

# Convert the list of points to a numpy array
points = np.array(hydrant_points)

# Perform Delaunay triangulation
triangulation = Delaunay(points)

# -----Filter out edges with different roadID-----

# -----Filter out edges cross Road that exceed the maximum-----

# -----Filter cross block with different roadID-----

In [46]:
# set the maximum length
max_edge_length = 0.002

In [47]:
#two functions altered for the closest point
def edge_in_block(hydrant1, hydrant2):
    return hydrant1.blockID == hydrant2.blockID

def blocked_by_block(hydrant1, hydrant2):
    if edge_in_block(hydrant1, hydrant2):
        if hydrant1.closeRoadID == hydrant2.closeRoadID:
            return False  
        return True
    else:
        return False

In [48]:
# Set to store filtered edges (edges that don't exceed the maximum length)
filtered_edges_road_cross = set()
edge_set_pro = []

In [49]:
# track edgeID
edge_id = 0

In [50]:
# Loop through each triangle (simplex)
for simplex in triangulation.simplices:
    
    # Get the indices of the points forming the edges of the triangle
    edge1 = frozenset([simplex[0], simplex[1]])
    edge2 = frozenset([simplex[1], simplex[2]])
    edge3 = frozenset([simplex[0], simplex[2]])

    # Get the actual points from the 'points' list using the indices
    point1, point2, point3 = points[simplex[0]], points[simplex[1]], points[simplex[2]]
    
    # Get the actual points from the 'hydrant_instances' list using the indices
    hydrant1, hydrant2, hydrant3 = hydrant_instances[simplex[0]], hydrant_instances[simplex[1]], hydrant_instances[simplex[2]]
    
    # Calculate the edge lengths
    edge1_length = distance(point1, point2)
    edge2_length = distance(point2, point3)
    edge3_length = distance(point1, point3)
        
    # Check each closest road ID for each point in the triangle
    for road_id in df_point.at[simplex[0], 'closest_road_id']:
        if edge1_length <= max_edge_length \
            and road_id in df_point.at[simplex[1], 'closest_road_id'] \
            and not blocked_by_block(hydrant1, hydrant2):
            filtered_edges_road_cross.add(edge1)
            edge_obj = Edge(edge_id, simplex[0], simplex[1], edge1_length)
            edge_set_pro.append(edge_obj)
            edge_id += 1
            hydrant1.neighbor.append(hydrant2)
            hydrant1.edge.append(edge_obj)
            break  # Break the loop if an edge is found for this road_id
    for road_id in df_point.at[simplex[1], 'closest_road_id']:
        if edge2_length <= max_edge_length \
            and road_id in df_point.at[simplex[2], 'closest_road_id'] \
            and not blocked_by_block(hydrant2, hydrant3):
            filtered_edges_road_cross.add(edge2)
            edge_obj = Edge(edge_id, simplex[1], simplex[2], edge2_length)
            edge_set_pro.append(edge_obj)
            edge_id += 1
            hydrant2.neighbor.append(hydrant3)
            hydrant2.edge.append(edge_obj)
            break
    for road_id in df_point.at[simplex[0], 'closest_road_id']:
        if edge3_length <= max_edge_length \
            and road_id in df_point.at[simplex[2], 'closest_road_id'] \
            and not blocked_by_block(hydrant1, hydrant3):
            filtered_edges_road_cross.add(edge3)
            edge_obj = Edge(edge_id, simplex[0], simplex[2], edge3_length)
            edge_set_pro.append(edge_obj)
            edge_id += 1
            hydrant1.neighbor.append(hydrant3)
            hydrant3.edge.append(edge_obj)
            break

In [51]:
# -----Create a GeoDataFrame for filtered edges-----

# Create a list of LineString objects for filtered edges
lines = []
for edge in filtered_edges_road_cross:
    edge_points = list(edge)
    line = LineString(np.array(points)[edge_points])
    lines.append(line)

# Create a GeoDataFrame from the LineString objects
gdf_filtered_edges_road_cross = gpd.GeoDataFrame(geometry=lines)

# Save the GeoDataFrame as a shapefile
gdf_filtered_edges_road_cross.to_file('filtered_edges_road_block.shp')

# -----Calculate angle cross road for each edge-----

In [59]:
def calculate_angle(p1, p2):
    dx = p2[0] - p1[0]
    dy = p2[1] - p1[1]
    return math.degrees(math.atan2(dy, dx))

In [60]:
def unit_vector(p1, p2):
    dx = p2[0] - p1[0]
    dy = p2[1] - p1[1]
    length = np.sqrt(dx**2 + dy**2)
    if length > 0:
        return dx / length, dy / length
    else:
        return 0, 0

In [61]:
def find_edge_crossing_road(edges, roads_df):
    for edge in edges:
        road_id = None
        min_distance = float('inf')
        intersection_angle = 0  # Initialize the angle at the intersection
        
        # Get the actual points from the 'points' list using the indices
        point1, point2 = points[edge.p1], points[edge.p2]

        # Calculate the unit direction vector of the edge
        edge_vector = unit_vector(point1, point2)

        # Calculate the angle between the edge and each road
        for _, road_row in roads_df.iterrows():
            road_geometry = road_row['geometry']

            # Assuming road_geometry is a LINESTRING object containing coordinates of the road
            road_coords = road_geometry.coords[:]
            road_angle = calculate_angle(road_coords[0], road_coords[-1])

            # Calculate the unit direction vector of the road
            road_vector = unit_vector(road_coords[0], road_coords[-1])

            # Calculate the angle between the edge and the road
            angle_diff = np.arccos(np.clip(np.dot(edge_vector, road_vector), -1.0, 1.0))

            # Convert the angle from radians to degrees
            angle_diff_degrees = np.degrees(angle_diff)

            # Check if this road is closer than previous ones
            if angle_diff_degrees < min_distance:
                min_distance = angle_diff_degrees
                road_id = road_row['id']
                intersection_angle = road_angle

        # Now you have the road_id that the edge crosses, the angle between the edge and the road,
        # and the angle at the intersection of the edge and the road.
        # You can store this information in the edge object if desired.
        edge.cross_road_id = road_id
        edge.angle_with_road = min_distance
        edge.intersection_angle = intersection_angle


In [64]:
find_edge_crossing_road(edge_set_pro, df_road)

In [68]:
# Define a function to create a list of dictionaries containing the relevant attributes of each Edge object
def edge_attributes_to_list(edges):
    edge_list = []
    for edge in edges:
        edge_dict = {
            'edgeID': edge.edgeID,
            'p1': edge.p1,
            'p2': edge.p2,
            'length': edge.length,
            'cross_road_id': edge.cross_road_id,
            'angle_with_road': edge.angle_with_road,
            'intersection_angle': edge.intersection_angle
        }
        edge_list.append(edge_dict)
    return edge_list

# Call the function to convert the Edge objects to a list of dictionaries
edge_list = edge_attributes_to_list(edge_set_pro)

# Create a DataFrame from the list of dictionaries
edge_df = pd.DataFrame(edge_list)

# Display the DataFrame containing edge information
edge_df

Unnamed: 0,edgeID,p1,p2,length,cross_road_id,angle_with_road,intersection_angle
0,0,293,312,0.001844,189,0.033167,-103.984768
1,1,312,304,0.000686,243,0.105671,75.330291
2,2,293,304,0.001158,221,0.016852,-103.480157
3,3,307,304,0.000651,45,0.977769,146.990835
4,4,312,304,0.000686,243,0.105671,75.330291
...,...,...,...,...,...,...,...
8777,8777,445,436,0.000762,180,2.150902,-123.486169
8778,8778,436,441,0.000576,248,0.822384,-20.160698
8779,8779,445,441,0.000855,3,0.077049,-80.134193
8780,8780,445,435,0.000629,110,5.426604,-130.537332


# -----not exclude in block-----

In [None]:
# Set to store filtered edges (edges that don't exceed the maximum length)
filtered_edges_cp = set()

In [None]:
edge_set = []

In [None]:
edge_id = 0

In [None]:
len(triangulation.simplices)

In [None]:
# Loop through each triangle (simplex)
for simplex in triangulation.simplices:
    # Get the indices of the points forming the edges of the triangle
    #####print(simplex[0], simplex[1], simplex[2])
    edge1 = frozenset([simplex[0], simplex[1]])
    edge2 = frozenset([simplex[1], simplex[2]])
    edge3 = frozenset([simplex[0], simplex[2]])

    # Get the actual points from the 'points' list using the indices
    point1, point2, point3 = points[simplex[0]], points[simplex[1]], points[simplex[2]]
    #####print(point1, point2, point3)
    # Check if any edge length exceeds the maximum allowed length
    # If any edge length exceeds the limit, remove the corresponding edges from the set
    if distance(point1, point2) <= max_edge_length:
        filtered_edges_cp.add(edge1)
        edge_obj = Edge(edge_id, simplex[0], simplex[1], distance(point1, point2))
        edge_set.append(edge_obj)
        edge_id += 1
    if distance(point2, point3) <= max_edge_length:
        filtered_edges_cp.add(edge2)
        edge_obj = Edge(edge_id, simplex[1], simplex[2], distance(point2, point3))
        edge_set.append(edge_obj)
        edge_id += 1
    if distance(point1, point3) <= max_edge_length:
        filtered_edges_cp.add(edge3)
        edge_obj = Edge(edge_id, simplex[0], simplex[2], distance(point1, point3))
        edge_set.append(edge_obj)
        edge_id += 1

In [None]:
len(filtered_edges_cp)

In [None]:
len(edge_set)

In [None]:
# -----Create a GeoDataFrame for filtered edges-----

# Create a list of LineString objects for filtered edges
lines = []
for edge in filtered_edges_cp:
    edge_points = list(edge)
    line = LineString(np.array(points)[edge_points])
    lines.append(line)

# Create a GeoDataFrame from the LineString objects
gdf_filtered_edges_cp = gpd.GeoDataFrame(geometry=lines)

# Save the GeoDataFrame as a shapefile
gdf_filtered_edges_cp.to_file('filtered_edges_cp.shp')