In [2]:
# Import necessary libraries
import numpy as np                                                      # type: ignore
import pycuda.autoinit                                                  # type: ignore
import pycuda.driver as cuda                                            # type: ignore
from pycuda import gpuarray, compiler                                   # type: ignore
from osgeo import gdal                                                  # type: ignore
from osgeo import osr                                                   # type: ignore
import matplotlib.pyplot as plt                                         # type: ignore
from scipy.ndimage import gaussian_filter                               # type: ignore
from datetime import datetime
import zipfile
import os

In [3]:
# Helper function to print available CUDA devices
def printGPU():
    """
    Print information about available CUDA devices.
    """
    num_devices = cuda.Device.count()
    print(f"{num_devices} device(s) found.")
    for ordinal in range(num_devices):
        dev = cuda.Device(ordinal)
        print(f"Device #{ordinal}: {dev.name()}")
    print(cuda)

In [4]:
def svfCalculator_RayTracingOnGPU(dsm, cdsm, tdsm, scale,
                                  traceRadius, azimuth_start, azimuth_end, azimuth_interval, altitude_interval):
    """
    Calculate Sky View Factor (SVF) using ray tracing on a GPU.

    Parameters:
        dsm (np.ndarray): Digital Surface Model.
        cdsm (np.ndarray): Vegetation height model.
        tdsm (np.ndarray): Vegetation canopy bottom height model.
        scale (float): Scale factor for the height.
        theta_start (int): Starting angle for theta in degrees.
        theta_end (int): Ending angle for theta in degrees.
        traceRadius (float): The radius for ray tracing.
        azimuth_start (int): Starting azimuth angle in degrees.
        azimuth_end (int): Ending azimuth angle in degrees.
        azimuth_interval (int): Azimuth interval in degrees.
        altitude_interval (int): Altitude interval in degrees.

    Returns:
        np.ndarray: Calculated Sky View Factor as a 2D array of different anisotropic directions and svf types.
    """
    # Convert input arrays to float32
    px = np.array(dsm, dtype=np.float32)
    veg_px = np.array(cdsm, dtype=np.float32)
    veg_px2 = np.array(tdsm, dtype=np.float32)
    height, width = px.shape

    # Allocate GPU memory
    d_px = gpuarray.to_gpu(px)
    d_veg_px = gpuarray.to_gpu(veg_px)
    d_veg_px2 = gpuarray.to_gpu(veg_px2)

    d_svf_out = gpuarray.empty((height, width), np.float32)
    d_svfE_out = gpuarray.empty((height, width), np.float32)
    d_svfS_out = gpuarray.empty((height, width), np.float32)
    d_svfW_out = gpuarray.empty((height, width), np.float32)
    d_svfN_out = gpuarray.empty((height, width), np.float32)

    d_svfveg_out = gpuarray.empty((height, width), np.float32)
    d_svfEveg_out = gpuarray.empty((height, width), np.float32)
    d_svfSveg_out = gpuarray.empty((height, width), np.float32)
    d_svfWveg_out = gpuarray.empty((height, width), np.float32)
    d_svfNveg_out = gpuarray.empty((height, width), np.float32)

    d_svfaveg_out = gpuarray.empty((height, width), np.float32)
    d_svfEaveg_out = gpuarray.empty((height, width), np.float32)
    d_svfSaveg_out = gpuarray.empty((height, width), np.float32)
    d_svfWaveg_out = gpuarray.empty((height, width), np.float32)
    d_svfNaveg_out = gpuarray.empty((height, width), np.float32)

    with open("2025d_svf_kernel.cu", "r") as f:
        kernel_code = f.read()

    mod = pycuda.compiler.SourceModule(kernel_code)
    svf_kernel = mod.get_function("svfcalculator")

    block_size = (32, 32, 1)
    grid_size = (int(np.ceil(width / block_size[0])), int(np.ceil(height / block_size[1])))

    # Launch the kernel
    svf_kernel(
        d_svf_out, d_svfE_out, d_svfS_out, d_svfW_out, d_svfN_out,
        d_svfveg_out, d_svfEveg_out, d_svfSveg_out, d_svfWveg_out, d_svfNveg_out,
        d_svfaveg_out, d_svfEaveg_out, d_svfSaveg_out, d_svfWaveg_out, d_svfNaveg_out,
        d_px, d_veg_px, d_veg_px2, np.float32(scale), 
        np.int32(width), np.int32(height), np.float32(traceRadius),
        np.float32(azimuth_start), np.float32(azimuth_end), np.float32(azimuth_interval), np.float32(altitude_interval),       
        block=block_size, grid=grid_size
    )

    # Synchronize the CUDA context
    cuda.Context.synchronize()

    svf = d_svf_out.get()
    svfE = d_svfE_out.get()
    svfS = d_svfS_out.get()
    svfW = d_svfW_out.get()
    svfN = d_svfN_out.get()

    svfveg = d_svfveg_out.get()
    svfEveg = d_svfEveg_out.get()
    svfSveg = d_svfSveg_out.get()
    svfWveg = d_svfWveg_out.get()
    svfNveg = d_svfNveg_out.get()

    svfaveg = d_svfaveg_out.get()
    svfEaveg = d_svfEaveg_out.get()
    svfSaveg = d_svfSaveg_out.get()
    svfWaveg = d_svfWaveg_out.get()
    svfNaveg = d_svfNaveg_out.get()

    return svf, svfE, svfS, svfW, svfN, svfveg, svfEveg, svfSveg, svfWveg, svfNveg, svfaveg, svfEaveg, svfSaveg, svfWaveg, svfNaveg

In [None]:
# Main function for processing DSM and vegetation data
def main(dsm_path, cdsm_path, tdsm_path, traceRadius, output_dir, azimuth_interval, altitude_interval):
    """
    Main function to calculate SVF from DSM and vegetation models.

    Parameters:
        dsm_path (str): Path to the DSM raster.
        cdsm_path (str): Path to the vegetation height raster.
        tdsm_path (str): Path to the vegetation canopy bottom height raster.
        output_dir (str): Directory for saving output files.
    """
    # Print the start time and input file paths
    startTime = datetime.now()
    print(f"Starting SVF calculation at {startTime.strftime("%H:%M:%S")}")

    # Load DSM and vegetation rasters
    dsm_ds = gdal.Open(dsm_path)
    cdsm_ds = gdal.Open(cdsm_path)
    tdsm_ds = gdal.Open(tdsm_path)

    # Potentially rescaling the resolution to match 1mx1m
    geotransform = dsm_ds.GetGeoTransform()
    scale = 1 / geotransform[1]                            
    print(f"Scale factor: {scale}; Geotransform: {geotransform[1]}")                             

    # Read arrays from raster files
    dsm = dsm_ds.ReadAsArray().astype(np.float32)
    cdsm = cdsm_ds.ReadAsArray().astype(np.float32)
    tdsm = tdsm_ds.ReadAsArray().astype(np.float32)

    # Adjust vegetation heights by adding DSM height
#   tdsm = (cdsm * 0.25)
    tdsm = tdsm + dsm
    cdsm = cdsm + dsm

    #print("Initializing CUDA context:")
    #printGPU()
    driver = gdal.GetDriverByName("GTiff")

    # Helper function to save SVF results to GeoTIFF
    def save_svf(filename, svf_data):
        #svf_data = np.clip (svf_data, 0, 1)
        svf_data = np.round (svf_data, 4)

        out_ds = driver.Create(
            filename, dsm.shape[1], dsm.shape[0], 1, gdal.GDT_Float32
        )
        out_ds.GetRasterBand(1).WriteArray(svf_data)
        out_ds.SetGeoTransform(dsm_ds.GetGeoTransform())
        
        # Hardcode to EPSG:28992
        srs = osr.SpatialReference()
        srs.ImportFromEPSG(28992)
        out_ds.SetProjection(srs.ExportToWkt())  # Set the hardcoded CRS
        out_ds.FlushCache()

    # Helper function to plot SVF data
    def plot_svf(svf_data, title):
        plt.figure(figsize=(5, 3))
        plt.imshow(svf_data, cmap="magma", vmin=0, vmax=1)
        plt.colorbar(label="SVF Value")
        plt.title(title)
        plt.show()
   
    #plot_svf(dsm, "DSM")
    #plot_svf(cdsm, "CDSM")
    #plot_svf(tdsm, "TDSM")

    # Prepare a list to hold the file paths for zipping
    file_paths = []

    # Calculate different types of SVF
    svf, svfE, svfS, svfW, svfN, svfveg, svfEveg, svfSveg, svfWveg, svfNveg, svfaveg, svfEaveg, svfSaveg, svfWaveg, svfNaveg = svfCalculator_RayTracingOnGPU(dsm, cdsm, tdsm, scale, traceRadius, 0, 360, azimuth_interval, altitude_interval)
    
    # Define file names based on svf_type
    svf_files = [
        (svf, f"{output_dir}/svf.tif"),
        (svfE, f"{output_dir}/svfE.tif"),
        (svfS, f"{output_dir}/svfS.tif"),
        (svfW, f"{output_dir}/svfW.tif"),
        (svfN, f"{output_dir}/svfN.tif"), 
        (svfveg, f"{output_dir}/svfveg.tif"),
        (svfEveg, f"{output_dir}/svfEveg.tif"),
        (svfSveg, f"{output_dir}/svfSveg.tif"),
        (svfWveg, f"{output_dir}/svfWveg.tif"),
        (svfNveg, f"{output_dir}/svfNveg.tif"),
        (svfaveg, f"{output_dir}/svfaveg.tif"),
        (svfEaveg, f"{output_dir}/svfEaveg.tif"),
        (svfSaveg, f"{output_dir}/svfSaveg.tif"),
        (svfWaveg, f"{output_dir}/svfWaveg.tif"),
        (svfNaveg, f"{output_dir}/svfNaveg.tif")              
    ]

    # Plot the SVF files
    for svf_data, filename in svf_files:
        plot_svf(svf_data, os.path.splitext(os.path.basename(filename))[0])

    # Save the SVF files and collect the file paths
    for svf_data, filename in svf_files:
        save_svf(filename, svf_data)
        file_paths.append(filename)
        #print(f"Min. value for {os.path.splitext(os.path.basename(filename))[0]} : {np.min(svf_data):.4f}")
        #print(f"Max. value for {os.path.splitext(os.path.basename(filename))[0]} : {np.max(svf_data):.4f}")
    

    # Zip all the generated files
    zip_filename = f"{output_dir}/svf.zip"
    with zipfile.ZipFile(zip_filename, 'w') as zipf:
        for file in file_paths:
            zipf.write(file, os.path.basename(file))                

    # Delete the original individual files (not in .zip)
    for file in file_paths:
        os.remove(file)

    # Close datasets
    dsm_ds = cdsm_ds = tdsm_ds = None
    
    print(f"Finished SVF calculation at {datetime.now().strftime("%H:%M:%S")}")
    print(f"Total processing time for tile: {datetime.now() - startTime}")
    print(f"Output files saved in {zip_filename}")
    print("-" * 50)

Run SVF-calculation on selected DSM, CDSM and TDSM

In [None]:
# Run main function
if __name__ == "__main__":
    dsm_path            = r"path\to\dsm.tif"                # DSM raster file path
    cdsm_path           = r"path\to\dsm.tif"                # CDSM raster file path
    tdsm_path           = r"path\to\dsm.tif"                # TDSM raster file path
    traceRadius         =  400                              # Radius to check for obstructions in dsm resolution/meters
    azimuth_interval    =  5.0                              # Azimuth interval for ray tracing [degrees]
    altitude_interval   =  5.0                              # Altitude interval for ray tracing [degrees]
    output_dir          = r"path\to\output_directory"       # Output directory to save results
    os.makedirs(output_dir, exist_ok=True)                  # Create output directory if it doesn't exist
    gdal.DontUseExceptions()                                # Suppress warnings from GDAL

    main(dsm_path, cdsm_path, tdsm_path, traceRadius, output_dir, azimuth_interval, altitude_interval)