# LUP Overlap Tool

This was developed by North Ross for use with Tahltan, Blueberry River FN and Doig River FN LUP planning analysis. The example here is for the Doig analysis.

Developed with Python version: 3.13.1;

Geopandas 1.0.1;

Pandas 2.2.3

The optional last section requires xlsxwriter 3.2.3

In [None]:
import geopandas as gpd 
import pandas as pd
from shapely import geometry, wkt
from json import load
import os
import sys
from datetime import date
import oracledb

import logging
#set up logging 
logging.basicConfig(level=logging.DEBUG)
debug=logging.debug
info=logging.info
warning=logging.warning
error=logging.error

## Preform an Identity operation on the relevant layers in ArcPro. 

Note - Geopandas might run out of memory when working on data with lots of verticies. Using the .simplify() method might help, or you can do all the union operations in ArcPro, then read the output and start in the next section.

### Doig Layers
- Doig LPP
- Enhanced Planning Areas
- Doig Management Zones
- Consensus Protection Zones
- Enhanced Management

### FN Planning Zones
- LRMPs
- LRMP related Non Legal Planning Features
- Priority WMBs
- HV1 Zones
- Traplines
- PNG Zones
- HRFN AMPP
- HRFN EMC
- South Peace Strategy
- Saulteau Enhanced Management
- Liard Water and Land Stewardship Forum
- Liard Watershed Groups

In [3]:
# define BCGW connection from config file

# Either edit this to add your BCGW user and pass, or connect to it a separate config file as I've done here
with open('bcgw-config.json') as f:
    config = load(f)
    class connection:
        user_nm = config['user_nm']
        bcgw_pass = config['bcgw_pass']
        host_nm= "bcgw.bcgov"
        service_nm= "idwprod1.bcgov"
    

In [39]:
# Read the input definition excel sheets which list the output rows and columns.
define_xlsx = 'OutputRowsAndCols_2025-04-17.xlsx'

# Define the output file - this can be read to avoid re-running the intersect analysis portion if you need to restart the kernel
outdir = 'output-example'
output_union_file = outdir + '\\union_data.gpkg'
output_union_layer = f'union_{(date.today().strftime("%Y-%m-%d"))}'

define_inputs_df = pd.read_excel(define_xlsx, sheet_name='Inputs')
define_rows_df = pd.read_excel(define_xlsx, sheet_name='Rows')
define_columns_df = pd.read_excel(define_xlsx, sheet_name='Columns')

conn=oracledb.connect(user=connection.user_nm, password= connection.bcgw_pass, host=connection.host_nm, port=1521,
                        service_name=connection.service_nm)

In [40]:
# Define functions
def get_bcgw_layer(connection, table, clause, aoi_geom, do_aoi_select, col_list):
    conn=oracledb.connect(user=connection.user_nm, password= connection.bcgw_pass, host=connection.host_nm, port=1521,
                            service_name=connection.service_nm)
    #create cursor and excecute sql
    with conn.cursor() as cursor:
        
        # First, find the primary geometry column
        cursor.execute(f"SELECT * FROM {table}")
        for col in cursor.description:
            if col.type.name == 'SDO_GEOMETRY':
                info(f"Geometry column for {table}: {col.name}")
                geometry_column = col.name
        
        # Next, do the actual query
        # add a spatial selector to the end of the query after the clause (if exists)
        aoi_clause = f"""SDO_ANYINTERACT ({geometry_column},
                            SDO_GEOMETRY(2003, 3005, NULL,
                                SDO_ELEM_INFO_ARRAY(1,1003,3),
                                SDO_ORDINATE_ARRAY({",".join(map(str, aoi_geom.bounds))}) 
                            )
                        ) = 'TRUE'"""
        if do_aoi_select == 'Yes':
            if clause:
                clause = f'WHERE {clause} AND {aoi_clause}'
            else:
                clause = 'WHERE ' + aoi_clause
        else:
            if clause:
                clause = 'WHERE ' + clause
                
        # append geometry col to col list:
        
        query = f"""SELECT SDO_UTIL.TO_WKTGEOMETRY({geometry_column}) AS WKT_LOB, {','.join(col_list)} 
                    FROM {table} {clause}
                """
        info(f"executing query {query}")
        cursor.execute(query)
        columns = [col[0] for col in cursor.description]
        rows = cursor.fetchall()
        
        # read to df and convert to shapely geometry
        df = pd.DataFrame(rows, columns=columns)
        df['wkt'] = df['WKT_LOB'].apply(oracledb.LOB.read) # read oracle LOB object to WKT string
        df['geometry'] = df['wkt'].apply(wkt.loads) # read WKT string to shapely geometry with shapely.wkt.loads()
        df = df.drop(columns=['WKT_LOB', 'wkt'])
        
        info(f"Returned {len(df)} records")
    conn.close()
    # make wkt readable by gpd by using a geoseries and the function from_wkt
    # df['geometry']=gpd.GeoSeries.from_wkt(df['geometry'])
    gdf=gpd.GeoDataFrame(df, geometry='geometry')
    # set crs to bc albers
    gdf = gdf.set_crs(3005, allow_override=True)
    return gdf

def read_input(src, rename_dict, clause = None, aoi_geom = None, aoi_select = None, clip = None):
    # clean inputs
    if type(clause) != str or clause == '':
        clause = None
    if type(clip) != str or clip == '':
        clip = None
    rename_dict = eval(rename_dict)
    
    # Get BCGW layer as geodataframe
    if src[:4] == 'WHSE':
        df = get_bcgw_layer(connection, src, clause, aoi_geom, aoi_select, col_list = list(rename_dict.keys()))
    
    # Get feature class from geodatabase as gdf
    elif '.gdb' in src:
        gdf_i = src.find('.gdb')
        gdb_path = src[:gdf_i+4]
        layer = src[gdf_i+5:]
        df = gpd.read_file(gdb_path, layer = layer, where=clause)
    
    # read shp or other
    else:
        df = gpd.read_file(src, where=clause)
    
    # Drop fields, rename and dissolve
    col_list = list(rename_dict.keys())
    col_list.append('geometry')
    df = df[col_list]
    df = df.rename(columns=rename_dict)
    df = df.dissolve(by = list(rename_dict.values())).reset_index()
    
    # reproject to BC albers (3005)
    if df.crs.to_epsg() != 3005:
        df = df.to_crs("EPSG:3005")
    
    # Select only polygons within bounds of AOI (if defined). 
    if aoi_geom:
        df = df[df.intersects(geometry.box(*aoi_geom.bounds))]
        
        # Clip to AOI geometry if indicated in spreadsheet
        if clip != None:
            df = gpd.clip(df, aoi_geom)
    
    # make geometry valid
    df['geometry'] = df['geometry'].make_valid()
        
    # set precision to 0.1m to avoid errors:
    df['geometry'] = df['geometry'].set_precision(0.1)
        
    return df

# Read all inputs to a list    
def read_all_inputs_to_list(define_inputs_df, aoi_gdf):
    input_gdf_list = []
    aoi_geom = aoi_gdf.geometry.union_all()
    for i, r, in define_inputs_df.iterrows():
        info(f"reading layer {r['Name']}")
        input_gdf = read_input(r['Source'], r['Rename Dictionary'], r['Clause'], aoi_geom, r['Select AOI'], r['Clip to AOI'])
        input_gdf_list.append(input_gdf)
    
    return input_gdf_list

# Preform union of all inputs
def union_all_inputs(aoi_gdf, input_gdf_list):
    union_gdf = aoi_gdf
    for gdf in input_gdf_list[1:]:
        info(f"intersecting {list(gdf.columns)[0]}")
        for attempt in range(2):
            try:
                overlay_gdf = gpd.overlay(union_gdf, gdf, how='union')
                info("Successfully intersected")
                break
            except Exception as e:
                if 'Topology' in str(e):
                    if attempt != 1:
                        union_gdf.geometry = union_gdf.geometry.buffer(0)
                        gdf.geometry = gdf.geometry.simplify(1) # attempt to simplify new layer to 1m
                        gdf.geometry = gdf.geometry.buffer(0) # buffer by 0m - this sometimes fixes the problem
                        gdf.geometry = gdf.geometry.make_valid() # fix geometry
                        warning(e)
                        warning("trying again with 1m simplication, 0m buffer and fixing geometry")
                    else:
                        error(e)
                else:
                    error("could not intersect")
                    sys.exit()
        union_gdf = overlay_gdf
        info(f"output polygons: {len(union_gdf)}")
    return union_gdf

In [None]:
# optional - print outputs for reference
define_inputs_df

In [8]:
# execute read and union functions
aoi_row = define_inputs_df[define_inputs_df['Name'] == 'AOI']

aoi_gdf = read_input(
                aoi_row['Source'][0],
                aoi_row['Rename Dictionary'][0],
                aoi_row['Clause'][0],
            )
input_gdf_list = read_all_inputs_to_list(define_inputs_df, aoi_gdf)
df = union_all_inputs(aoi_gdf, input_gdf_list)
df['AreaHa'] = df.area / 10000
df
# save df to file so it can be retrieved
df.to_file(output_union_file, layer = output_union_layer)

INFO:root:reading layer AOI
INFO:root:reading layer DRFN Landscape Planning Pilot
INFO:root:reading layer DRFN Management Zones
INFO:root:reading layer DRFN Enhanced Planning Areas
INFO:root:reading layer DRFN Consensus Protection Zones
INFO:root:reading layer BRFN Priority WMB
INFO:root:reading layer BRFN HV1
  return ogr_read(
INFO:root:reading layer South Peace Strategy Polygon
INFO:root:reading layer Liard WLSF
INFO:root:reading layer HRFN AMPP
INFO:root:reading layer Boreal Caribou RRAs
INFO:root:intersecting DRFN_LPP
INFO:root:Successfully intersected
INFO:root:output polygons: 3
INFO:root:intersecting DRFN_MgmtZone_Name
  overlay_gdf = gpd.overlay(union_gdf, gdf, how='union')
INFO:root:Successfully intersected
INFO:root:output polygons: 15
INFO:root:intersecting Doig_EPA_Name
  overlay_gdf = gpd.overlay(union_gdf, gdf, how='union')
INFO:root:Successfully intersected
INFO:root:output polygons: 28
INFO:root:intersecting Doig_CPZ_Name
  overlay_gdf = gpd.overlay(union_gdf, gdf, how

### If you skip the overlay part and do it in another software - run the following cell:

In [None]:
# Read final polygon after union operations
gdb = r"\\<path_to_gdb>"
layer_name = 'Union_Planning_2025_03_12'

df = gpd.read_file(gdb, layer=layer_name)

# then rename all your fields to match the what's in the input sheet

## Create Output Dataframe

In [9]:
# Make any edits to the data required here:

# Replace "GRAZING RESERVE 1", "GRAZING RESERVE 2" etc. with 'GRAZING RESERVE' so that results are combined in output
df.replace(regex = r'GRAZING RESERVE \d', value = 'GRAZING RESERVE', inplace=True)
df = df.rename(columns={'RMZName': "RMZ Name", "RMZType": "RMZ Type"})
df.loc[
    (df['DRFN_MgmtZone_Name'].notnull()) &
    ~(df['BRFN_WMB'].notnull()) &
    ~(df['HRFN_AMPP'].notnull()) &
    ~(df['South Peace Strategy'].notnull()) &
    ~(df['Liard_WLSF'].notnull()), 
   
   'DRFN Core'] = True

In [29]:
# define functions used in 

# Prepare the output data frame. This can be exported to an Excel
def createOutputDf():
    # First define the empty dataframe to be added onto later
    outputdf = pd.DataFrame()

    # Loop over the rows df
    for i, r in define_rows_df.iterrows():
        # create a blank "row dataframe" that we will add all the columns to. This will be appended to the output df
        rdf = pd.DataFrame()
        group = r['GroupField']
        areaSeries = eval(r['Area'])
        # Loop over columns df
        for ic, c in define_columns_df.iterrows():
            # get some variables from the dict so it's easier to read:
            maskSeries = eval(c['Mask'])
            cname = c['Name']
            
            # Create "Column dataframe" using a subset of the one defined in the row dictionary.
            cdf = areaSeries.loc[maskSeries]
            
            # Choose the "Sum Field". If none is specified it does the Area.
            if pd.isnull(c['sumField']):
                sumField = 'AreaHa'
            else:
                sumField = c['sumField']
            
            # Group the results by designated field
            if pd.isnull(group):
                # If the group field is empty, sum the entire selection and remove all other fields
                cdf = pd.DataFrame([cdf[[sumField]].sum()], index=["All"])
            elif ', ' in group:
                # if the group has multiple fields, then concat them together
                group = group.split(", ")
                try:
                    cdf['merged'] = cdf[group[0]].astype('str') + " - " + cdf[group[1]].astype('str')
                    cdf = cdf[['merged', sumField]].groupby('merged').sum()
                except:
                    raise Exception(f"Cannot group by more than two fields. Remove extra commas from: {group} (row {ic+1} in columns).")
            else:
                # Remove all columns except group field and area, then preform group by and sum operations.
                cdf = cdf[[group, sumField]].groupby(group).sum()
        
            # Add category (from row dictionary)
            cdf['Category'] = r['Category']
            
            # Set index to a multi index of Category and "Polygon" name
            cdf = cdf.set_index(['Category', cdf.index.rename('Polygon')])

            # Rename the area field to the column name for output
            cdf = cdf.rename(columns={'AreaHa': cname})
            
            # If rdf is empty, set it to the "cdf"
            if len(rdf) == 0:
                rdf = cdf
            # otherwise, join the cdf on to the existing "rdf"
            else:
                rdf = rdf.join(cdf, how='outer')
        # Add the rdf to the output dataframe
        outputdf = pd.concat([outputdf, rdf])
    return outputdf

In [30]:
# save as Excel with date
now = date.today()
datestr = now.strftime("%Y-%m-%d")
outfile = os.path.join(outdir, f'LUP Analysis Pandas RMZ Planning {datestr}.xlsx')
outputdf = createOutputDf()
outputdf.style.map(lambda v: "number-format: #,##0").to_excel(outfile)

# Adding plots to the output excel

This optional part of the script takes geopandas plot images of the layers and embedded these in the columns for reference. This works best in cases when you have less than 10 columns with confusing geometry. The way this works is that after the Excel gets exported, this function opens the output XLSX, adds a row at the top and embeds all the PNGs of exported from the Column Dataframes used in the output function.


In [38]:
import openpyxl
from matplotlib import pyplot as plt

def savePlot(area, name, number, worksheet = "Sheet1"):
    """Save plot of area to png and embed in Excel worksheet above the column

    Args:
        area (geopandas.GeoDataFrame): Geodataframe of area for this column
        name (str): Column name
        number (int): Column number (index)
        worksheet (str): Name of worksheet to embed the analysis in
    """
    plt.ioff()
    
    ax = area.dissolve().plot(color='green', legend=True)
    # <add other reference layers here>
    
    # add full AOI
    p1 = aoi_gdf.plot(color='none', edgecolor='black', ax=ax)
    
    ax.set_axis_off()
    plotdir = os.path.join(outdir, worksheet)
    if not os.path.exists(plotdir):
        os.mkdir(plotdir)
    path = os.path.join(plotdir, f'{number}_{name}.png')
    plt.savefig(path, dpi=60.0, bbox_inches='tight', pad_inches=0)
    
# embed plot maps in Excel
def embedPlots(xlsx_str, worksheetName = "Sheet1"):
    """Embeds saved png plots into the output Excel in the top row above their relevant column

    Args:
        xlsx_str (str): Path to Excel file. 
        worksheetName (str): Name of output worksheet to embed in.
    """
    
    workbook = openpyxl.load_workbook(xlsx_str)
    if worksheetName:
        ws = workbook[worksheetName]
    else:
        ws = workbook.active

    ws.insert_rows(1)
    ws.row_dimensions[1].height = 170
    for f in os.listdir(os.path.join(outdir, worksheetName)):
        if f[-4:] == '.png':
            path = os.path.join(outdir, worksheetName, f)
            num = int(f.split('_')[0])
            col_letter = openpyxl.utils.get_column_letter(num+2)
            img = openpyxl.drawing.image.Image(path)
            ws.add_image(img, f'{col_letter}1')
            ws.column_dimensions[col_letter].width= 16

    workbook.save(outfile)

# export all plots:
for ic, c in define_columns_df.iterrows():
    # get some variables from the dict so it's easier to read:
    maskSeries = eval(c['Mask'])
    cname = c['Name']
    # remove non-alphanumeric characters in file name string
    cname_outstring = "".join(x for x in cname if x.isalnum())
    
    # get column df and save plot
    cdf = df.loc[maskSeries]
    savePlot(cdf, cname_outstring, ic+1)
    info(f"Saved plot {ic+1} for {cname_outstring}")

# embed in Excel
embedPlots(outfile)

DEBUG:PIL.PngImagePlugin:STREAM b'IHDR' 16 13
DEBUG:PIL.PngImagePlugin:STREAM b'tEXt' 41 58
DEBUG:PIL.PngImagePlugin:STREAM b'pHYs' 111 9
DEBUG:PIL.PngImagePlugin:STREAM b'IDAT' 132 6733
DEBUG:PIL.PngImagePlugin:STREAM b'IHDR' 16 13
DEBUG:PIL.PngImagePlugin:STREAM b'tEXt' 41 58
DEBUG:PIL.PngImagePlugin:STREAM b'pHYs' 111 9
DEBUG:PIL.PngImagePlugin:STREAM b'IDAT' 132 5454
DEBUG:PIL.PngImagePlugin:STREAM b'IHDR' 16 13
DEBUG:PIL.PngImagePlugin:STREAM b'tEXt' 41 58
DEBUG:PIL.PngImagePlugin:STREAM b'pHYs' 111 9
DEBUG:PIL.PngImagePlugin:STREAM b'IDAT' 132 5837
DEBUG:PIL.PngImagePlugin:STREAM b'IHDR' 16 13
DEBUG:PIL.PngImagePlugin:STREAM b'tEXt' 41 58
DEBUG:PIL.PngImagePlugin:STREAM b'pHYs' 111 9
DEBUG:PIL.PngImagePlugin:STREAM b'IDAT' 132 5245
DEBUG:PIL.PngImagePlugin:STREAM b'IHDR' 16 13
DEBUG:PIL.PngImagePlugin:STREAM b'tEXt' 41 58
DEBUG:PIL.PngImagePlugin:STREAM b'pHYs' 111 9
DEBUG:PIL.PngImagePlugin:STREAM b'IDAT' 132 5951
DEBUG:PIL.PngImagePlugin:STREAM b'IHDR' 16 13
DEBUG:PIL.PngImageP