In [1]:
import geopandas as gpd
from shapely.geometry import MultiPoint, Polygon, MultiPolygon
from shapely.ops import unary_union
from scipy.spatial import Delaunay
import numpy as np
import matplotlib.pyplot as plt
from scipy.sparse import csr_matrix
from scipy.sparse.csgraph import connected_components

In [None]:
def calculate_nearest_neighbor_distances(points_gdf):
    """Calculate nearest neighbor distance for each drillhole"""
    from scipy.spatial import cKDTree
    
    coords = np.array(list(zip(points_gdf.geometry.x, points_gdf.geometry.y)))
    tree = cKDTree(coords)
    
    # Query for 2 nearest neighbors (first one is the point itself)
    distances, indices = tree.query(coords, k=2)
    
    # Return the distance to the nearest neighbor (excluding self)
    return distances[:, 1]

def find_connected_clusters(points_gdf, max_distance):
    """
    Group drillholes into separate clusters based on distance threshold.
    Only drillholes within max_distance of each other are connected.
    """
    coords = np.array(list(zip(points_gdf.geometry.x, points_gdf.geometry.y)))
    n_points = len(coords)
    
    if n_points == 0:
        return np.array([])
    
    if n_points == 1:
        return np.array([0])
    
    # Build adjacency matrix based on distance threshold
    from scipy.spatial.distance import pdist, squareform
    
    # Calculate pairwise distances
    distances = squareform(pdist(coords))
    
    # Create adjacency matrix: 1 if distance <= max_distance, 0 otherwise
    adjacency = (distances <= max_distance).astype(int)
    np.fill_diagonal(adjacency, 0)  # Remove self-connections
    
    # Find connected components
    graph = csr_matrix(adjacency)
    n_components, labels = connected_components(csgraph=graph, directed=False)
    
    return labels

def calculate_angle(p1, p2, p3):
    """
    Calculate the angle at p2 in the triangle p1-p2-p3 (in degrees)
    """
    v1 = p1 - p2
    v2 = p3 - p2
    
    cos_angle = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
    # Clamp to avoid numerical errors
    cos_angle = np.clip(cos_angle, -1.0, 1.0)
    angle = np.arccos(cos_angle)
    
    return np.degrees(angle)

def is_valid_triangle(p1, p2, p3, max_short_edge, max_hypotenuse_ratio=1.5, max_angle=120):
    """
    Check if a triangle is valid for polygon creation.
    
    Rules for grid-like drilling patterns:
    1. The two shortest edges must be <= max_short_edge (grid spacing)
    2. The longest edge (hypotenuse) can be longer, but not more than max_hypotenuse_ratio * max_short_edge
    3. All angles must be <= max_angle degrees (to avoid very obtuse triangles)
    
    Parameters:
    -----------
    p1, p2, p3 : numpy arrays
        Triangle vertices
    max_short_edge : float
        Maximum allowed length for the two shorter edges (grid spacing)
    max_hypotenuse_ratio : float
        Maximum ratio of longest edge to max_short_edge (default 1.5, e.g., √2 ≈ 1.414 for diagonal)
    max_angle : float
        Maximum allowed angle in degrees (default 120)
    """
    # Calculate edge lengths
    edge1 = np.linalg.norm(p2 - p1)
    edge2 = np.linalg.norm(p3 - p2)
    edge3 = np.linalg.norm(p1 - p3)
    
    edges = np.array([edge1, edge2, edge3])
    edges_sorted = np.sort(edges)
    
    # Rule 1: Two shortest edges must be within grid spacing
    if edges_sorted[0] > max_short_edge or edges_sorted[1] > max_short_edge:
        return False
    
    # Rule 2: Longest edge (hypotenuse) can be longer but within ratio
    max_hypotenuse = max_short_edge * max_hypotenuse_ratio
    if edges_sorted[2] > max_hypotenuse:
        return False
    
    # Rule 3: Check all angles - reject if any angle is too obtuse
    angle1 = calculate_angle(p2, p1, p3)
    angle2 = calculate_angle(p1, p2, p3)
    angle3 = calculate_angle(p1, p3, p2)
    
    if max(angle1, angle2, angle3) > max_angle:
        return False
    
    return True

def create_constrained_polygon_from_cluster(points_gdf, max_edge_length, 
                                           hypotenuse_ratio=1.5, max_angle=120):
    """
    Create polygon from a cluster of points using Delaunay triangulation
    with grid-aware edge constraints
    
    Parameters:
    -----------
    points_gdf : GeoDataFrame
        Points to create polygon from
    max_edge_length : float
        Maximum edge length for grid spacing
    hypotenuse_ratio : float
        Ratio for diagonal edges (default 1.5 for ~√2)
    max_angle : float
        Maximum angle in degrees (default 120)
    """
    coords = np.array(list(zip(points_gdf.geometry.x, points_gdf.geometry.y)))
    
    if len(coords) < 3:
        # For 1-2 points, create a circular buffer
        if len(coords) == 1:
            return points_gdf.geometry.iloc[0].buffer(max_edge_length * 0.4)
        elif len(coords) == 2:
            line = MultiPoint(coords).convex_hull
            return line.buffer(max_edge_length * 0.4)
    
    # Create Delaunay triangulation
    tri = Delaunay(coords)
    
    # Filter triangles using grid-aware rules
    valid_triangles = []
    for simplex in tri.simplices:
        # Get the three points of the triangle
        p1, p2, p3 = coords[simplex]
        
        # Check if triangle is valid for grid pattern
        if is_valid_triangle(p1, p2, p3, max_edge_length, hypotenuse_ratio, max_angle):
            triangle = Polygon([p1, p2, p3])
            valid_triangles.append(triangle)
    
    if not valid_triangles:
        # If no valid triangles, create buffer around points
        points = MultiPoint(coords)
        return points.buffer(max_edge_length * 0.4)
    
    # Union all valid triangles
    polygon = unary_union(valid_triangles)
    
    return polygon

def generate_resource_polygons(gpkg_path, output_path=None, 
                               measured_distance=71, indicated_distance=121,
                               measured_buffer=25, indicated_buffer=50,
                               hypotenuse_ratio=1.5, max_angle=120,
                               min_drillholes=4):
    """
    Generate resource category polygons based on drillhole spacing
    
    Parameters:
    -----------
    gpkg_path : str
        Path to input GeoPackage with drillhole locations
    output_path : str, optional
        Path to save output GeoPackage
    measured_distance : float
        Maximum distance for Measured category (default 71m)
    indicated_distance : float
        Maximum distance for Indicated category (default 121m)
    measured_buffer : float
        Buffer distance for Measured polygons (default 25m)
    indicated_buffer : float
        Buffer distance for Indicated polygons (default 50m)
    hypotenuse_ratio : float
        Ratio for diagonal edges in grid (default 1.5, allows ~√2 ≈ 1.414)
    max_angle : float
        Maximum angle in triangles (default 120 degrees)
    min_drillholes : int
        Minimum number of drillholes required for a cluster (default 4)
    
    Returns:
    --------
    GeoDataFrame with resource category polygons (separate polygons for each cluster)
    """
    
    # Read drillhole data
    print(f"Reading drillhole data from {gpkg_path}...")
    drillholes = gpd.read_file(gpkg_path)
    
    print(f"Loaded {len(drillholes)} drillholes")
    print(f"CRS: {drillholes.crs}")
    print(f"Minimum drillholes per cluster: {min_drillholes}")
    
    # Calculate nearest neighbor distances
    print("\nCalculating nearest neighbor distances...")
    nn_distances = calculate_nearest_neighbor_distances(drillholes)
    drillholes['nn_distance'] = nn_distances
    
    # Classify drillholes
    drillholes['category'] = 'Inferred'
    drillholes.loc[nn_distances < indicated_distance, 'category'] = 'Indicated'
    drillholes.loc[nn_distances < measured_distance, 'category'] = 'Measured'
    
    print(f"\nDrillhole classification:")
    print(drillholes['category'].value_counts())
    
    # Store all polygons
    all_polygons = []
    rejected_clusters = {'Measured': 0, 'Indicated': 0}
    
    # Process Measured Resources (< 71m spacing)
    measured_holes = drillholes[drillholes['category'] == 'Measured'].copy()
    if len(measured_holes) > 0:
        print(f"\n{'='*60}")
        print(f"Processing MEASURED category ({len(measured_holes)} drillholes)")
        print(f"Distance threshold: {measured_distance}m | Buffer: {measured_buffer}m")
        print(f"Hypotenuse ratio: {hypotenuse_ratio} | Max angle: {max_angle}°")
        print(f"{'='*60}")
        
        # Find clusters within measured_distance
        clusters = find_connected_clusters(measured_holes, measured_distance)
        measured_holes['cluster'] = clusters
        
        n_clusters = len(np.unique(clusters))
        print(f"Found {n_clusters} separate cluster(s) for Measured category")
        
        # Create polygon for each cluster
        for cluster_id in np.unique(clusters):
            cluster_holes = measured_holes[measured_holes['cluster'] == cluster_id]
            cluster_size = len(cluster_holes)
            
            # Check minimum drillholes requirement
            if cluster_size < min_drillholes:
                print(f"  Cluster {cluster_id + 1}: {cluster_size} drillholes [REJECTED - below minimum of {min_drillholes}]")
                rejected_clusters['Measured'] += 1
                continue
            
            print(f"  Cluster {cluster_id + 1}: {cluster_size} drillholes [ACCEPTED]")
            
            if cluster_size >= 1:
                # Create base polygon
                polygon = create_constrained_polygon_from_cluster(
                    cluster_holes, measured_distance, hypotenuse_ratio, max_angle
                )
                
                # Apply buffer
                if polygon and not polygon.is_empty:
                    polygon_buffered = polygon.buffer(measured_buffer)
                    
                    # Handle MultiPolygon
                    if isinstance(polygon_buffered, MultiPolygon):
                        for poly in polygon_buffered.geoms:
                            all_polygons.append({
                                'geometry': poly,
                                'category': 'Measured',
                                'cluster_id': int(cluster_id),
                                'drillholes': cluster_size,
                                'max_spacing': measured_distance,
                                'buffer_m': measured_buffer
                            })
                    else:
                        all_polygons.append({
                            'geometry': polygon_buffered,
                            'category': 'Measured',
                            'cluster_id': int(cluster_id),
                            'drillholes': cluster_size,
                            'max_spacing': measured_distance,
                            'buffer_m': measured_buffer
                        })
    
    # Process Indicated Resources (< 121m spacing)
    indicated_holes = drillholes[drillholes['category'].isin(['Measured', 'Indicated'])].copy()
    if len(indicated_holes) > 0:
        print(f"\n{'='*60}")
        print(f"Processing INDICATED category ({len(indicated_holes)} drillholes)")
        print(f"Distance threshold: {indicated_distance}m | Buffer: {indicated_buffer}m")
        print(f"Hypotenuse ratio: {hypotenuse_ratio} | Max angle: {max_angle}°")
        print(f"{'='*60}")
        
        # Find clusters within indicated_distance
        clusters = find_connected_clusters(indicated_holes, indicated_distance)
        indicated_holes['cluster'] = clusters
        
        n_clusters = len(np.unique(clusters))
        print(f"Found {n_clusters} separate cluster(s) for Indicated category")
        
        # Get all measured polygons for subtraction
        measured_geoms = [p['geometry'] for p in all_polygons if p['category'] == 'Measured']
        measured_union = unary_union(measured_geoms) if measured_geoms else None
        
        # Create polygon for each cluster
        for cluster_id in np.unique(clusters):
            cluster_holes = indicated_holes[indicated_holes['cluster'] == cluster_id]
            cluster_size = len(cluster_holes)
            
            # Check minimum drillholes requirement
            if cluster_size < min_drillholes:
                print(f"  Cluster {cluster_id + 1}: {cluster_size} drillholes [REJECTED - below minimum of {min_drillholes}]")
                rejected_clusters['Indicated'] += 1
                continue
            
            print(f"  Cluster {cluster_id + 1}: {cluster_size} drillholes [ACCEPTED]")
            
            if cluster_size >= 1:
                # Create base polygon
                polygon = create_constrained_polygon_from_cluster(
                    cluster_holes, indicated_distance, hypotenuse_ratio, max_angle
                )
                
                # Apply buffer
                if polygon and not polygon.is_empty:
                    polygon_buffered = polygon.buffer(indicated_buffer)
                    
                    # Subtract measured areas from indicated
                    if measured_union:
                        polygon_buffered = polygon_buffered.difference(measured_union)
                    
                    if not polygon_buffered.is_empty:
                        # Handle MultiPolygon
                        if isinstance(polygon_buffered, MultiPolygon):
                            for poly in polygon_buffered.geoms:
                                all_polygons.append({
                                    'geometry': poly,
                                    'category': 'Indicated',
                                    'cluster_id': int(cluster_id),
                                    'drillholes': cluster_size,
                                    'max_spacing': indicated_distance,
                                    'buffer_m': indicated_buffer
                                })
                        else:
                            all_polygons.append({
                                'geometry': polygon_buffered,
                                'category': 'Indicated',
                                'cluster_id': int(cluster_id),
                                'drillholes': cluster_size,
                                'max_spacing': indicated_distance,
                                'buffer_m': indicated_buffer
                            })
    
    if not all_polygons:
        print("\nWarning: No polygons created. Check your distance thresholds and minimum drillhole requirement.")
        return gpd.GeoDataFrame()
    
    # Create output GeoDataFrame
    result_gdf = gpd.GeoDataFrame(all_polygons, crs=drillholes.crs)
    
    # Calculate areas
    result_gdf['area_m2'] = result_gdf.geometry.area
    result_gdf['area_ha'] = result_gdf['area_m2'] / 10000
    
    # Add polygon ID
    result_gdf['polygon_id'] = range(1, len(result_gdf) + 1)
    
    print("\n" + "="*80)
    print("RESOURCE CATEGORY SUMMARY")
    print("="*80)
    print(result_gdf[['polygon_id', 'category', 'cluster_id', 'drillholes', 'buffer_m', 'area_ha']].to_string(index=False))
    print("="*80)
    
    # Summary by category
    print("\nSummary by Category:")
    summary = result_gdf.groupby('category').agg({
        'polygon_id': 'count',
        'drillholes': 'sum',
        'area_ha': 'sum'
    }).rename(columns={'polygon_id': 'num_polygons'})
    print(summary)
    print("="*80)
    
    # Print rejected clusters summary
    if rejected_clusters['Measured'] > 0 or rejected_clusters['Indicated'] > 0:
        print(f"\nRejected Clusters (below minimum of {min_drillholes}):")
        print(f"  Measured: {rejected_clusters['Measured']} cluster(s)")
        print(f"  Indicated: {rejected_clusters['Indicated']} cluster(s)")
        print("="*80)
    
    # Save output if path provided
    if output_path:
        print(f"\nSaving results to {output_path}...")
        result_gdf.to_file(output_path, driver='GPKG')
        
        # Also save classified drillholes
        drillhole_output = output_path.replace('.gpkg', '_drillholes_classified.gpkg')
        drillholes.to_file(drillhole_output, driver='GPKG')
        print(f"Classified drillholes saved to {drillhole_output}")
    
    # Create visualization
    plot_results(drillholes, result_gdf)
    
    return result_gdf

def plot_results(drillholes, polygons):
    """Create visualization of results"""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(18, 8))
    
    # Plot 1: Drillholes colored by category
    colors = {'Measured': 'red', 'Indicated': 'orange', 'Inferred': 'yellow'}
    for category in ['Inferred', 'Indicated', 'Measured']:
        subset = drillholes[drillholes['category'] == category]
        if len(subset) > 0:
            subset.plot(ax=ax1, color=colors[category], markersize=40, 
                       label=f'{category} ({len(subset)})', alpha=0.7, edgecolor='black', linewidth=0.5)
    
    ax1.set_title('Drillhole Classification by Spacing', fontsize=14, fontweight='bold')
    ax1.legend(loc='upper right', fontsize=10)
    ax1.set_xlabel('Easting (m)')
    ax1.set_ylabel('Northing (m)')
    ax1.grid(True, alpha=0.3)
    ax1.set_aspect('equal')
    
    # Plot 2: Resource polygons with cluster separation
    if not polygons.empty:
        colors_poly = {'Measured': 'red', 'Indicated': 'orange'}
        
        for category in ['Indicated', 'Measured']:
            cat_polys = polygons[polygons['category'] == category]
            if len(cat_polys) > 0:
                cat_polys.plot(ax=ax2, color=colors_poly[category], 
                              alpha=0.3, edgecolor='black', linewidth=2)
        
        # Add labels for each polygon
        for idx, row in polygons.iterrows():
            centroid = row.geometry.centroid
            label = f"{row['category'][0]}{row['polygon_id']}\n{row['area_ha']:.1f}ha\n({row['buffer_m']}m buf)"
            ax2.annotate(label, 
                        xy=(centroid.x, centroid.y), ha='center', fontsize=7,
                        bbox=dict(boxstyle='round,pad=0.3', facecolor='white', alpha=0.8))
        
        # Plot drillholes on top
        drillholes.plot(ax=ax2, color='black', markersize=15, marker='x', alpha=0.6, linewidth=1)
        
        # Create legend
        from matplotlib.patches import Patch
        legend_elements = [
            Patch(facecolor='red', alpha=0.3, edgecolor='black', label='Measured (25m buffer)'),
            Patch(facecolor='orange', alpha=0.3, edgecolor='black', label='Indicated (50m buffer)')
        ]
        ax2.legend(handles=legend_elements, loc='upper right', fontsize=10)
    
    ax2.set_title('Resource Category Polygons (Buffered)', fontsize=14, fontweight='bold')
    ax2.set_xlabel('Easting (m)')
    ax2.set_ylabel('Northing (m)')
    ax2.grid(True, alpha=0.3)
    ax2.set_aspect('equal')
    
    plt.tight_layout()
    plt.savefig('resource_classification.png', dpi=300, bbox_inches='tight')
    print("\nVisualization saved as 'resource_classification.png'")
    plt.show()

# Example usage
if __name__ == "__main__":
    # Replace with your file path
    input_gpkg = "input_file.gpkg"
    output_gpkg = "output_file.gpkg"
    
    # Generate resource polygons
    result = generate_resource_polygons(
        gpkg_path=input_gpkg,
        output_path=output_gpkg,
        measured_distance=71,       # < 71m for Measured
        indicated_distance=121,     # < 121m for Indicated
        measured_buffer=25,         # 25m buffer for Measured
        indicated_buffer=50,        # 50m buffer for Indicated
        hypotenuse_ratio=1.5,       # Allow diagonals up to 1.5x grid spacing (√2 ≈ 1.414)
        max_angle=120,              # Maximum angle in triangles (degrees)
        min_drillholes=4            # Minimum 4 drillholes per cluster
    )
    
    print("\nDone! Check the output files and visualization.")