In [None]:
# initialise the qgis application
# TODO - please add the path to your qgis installation and then run this script

In [None]:
# import statements
import processing
import os
from osgeo import gdal, osr
import numpy as np

In [None]:
# Define the input/output paths
input_dl = ''  # Drainage line vector shapefile
input_dem = ''   # Depressionless DEM file
output_dir = ''  # Output directory path to store the temporary and final outputs

# Ensure output directory exists
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

In [None]:
# Define temp path for catchment raster
catchment_path = os.path.join(output_dir, 'catchment_raster.tif')

# Run the processing command
processing.run("grass:r.watershed", {
    'elevation': input_dem,
    'depression': None,
    'flow': None,
    'disturbed_land': None,
    'blocking': None,
    'threshold': 100,
    'max_slope_length': None,
    'convergence': 5,
    'memory': 100000,
    '-s': False,
    '-m': False,
    '-4': False,
    '-a': False,
    '-b': False,
    'basin': catchment_path,
    # Map other outputs if required like accumulation or drainage or stream ...
    # 'accumulation': 'TEMPORARY_OUTPUT',
    # 'drainage': 'TEMPORARY_OUTPUT',
    # 'stream': 'TEMPORARY_OUTPUT',
    'GRASS_REGION_PARAMETER': None,
    'GRASS_REGION_CELLSIZE_PARAMETER': 0,
    'GRASS_RASTER_FORMAT_OPT': '',
    'GRASS_RASTER_FORMAT_META': ''
})

In [None]:
# Define the temp path for the drainage raster
drainage_raster_path = os.path.join(output_dir, 'drainage_raster.tif')

try:
    dataset = gdal.Open(catchment_path)
    if dataset is None:
        print("Unable to open file:", catchment_path)
    
    width = dataset.RasterXSize
    height = dataset.RasterYSize
    transform = dataset.GetGeoTransform()
    xmin = transform[0]
    ymax = transform[3]
    xmax = xmin + transform[1] * dataset.RasterXSize
    ymin = ymax + transform[5] * dataset.RasterYSize

    # Get the CRS (Coordinate Reference System)
    srs = osr.SpatialReference()
    srs.ImportFromWkt(dataset.GetProjection())

    # Format the extent and CRS
    extent_str = f'{xmin},{xmax},{ymin},{ymax} [{srs.GetAttrValue("AUTHORITY", 0)}:{srs.GetAttrValue("AUTHORITY", 1)}]'
    dataset = None

    # Run the processing command
    processing.run("gdal:rasterize", {'INPUT':input_dl,
                                    'FIELD':'ORDER','BURN':0,'USE_Z':False,'UNITS':0,
                                    'WIDTH':width,
                                    'HEIGHT':height,
                                    'EXTENT':extent_str,
                                    'NODATA':0,'OPTIONS':'','DATA_TYPE':11,'INIT':None,'INVERT':False,'EXTRA':'',
                                    'OUTPUT':drainage_raster_path})
    # print("ended")
except Exception as e:
    print("Error:", e)




In [None]:
# Function to get raster band data as numpy array
def raster_to_array(raster):
    band = raster.GetRasterBand(1)
    return band.ReadAsArray()

# Function to write array to raster
def array_to_raster(array, ref_raster, output_path):
    driver = gdal.GetDriverByName('GTiff')
    rows, cols = array.shape
    output_raster = driver.Create(output_path, cols, rows, 1, gdal.GDT_Float32)
    output_raster.SetGeoTransform(ref_raster.GetGeoTransform())
    output_raster.SetProjection(ref_raster.GetProjection())
    output_band = output_raster.GetRasterBand(1)
    output_band.WriteArray(array)
    output_band.FlushCache()
    output_band = None
    output_raster = None

In [None]:
# Open raster datasets
catchment_raster = gdal.Open(catchment_path, gdal.GA_ReadOnly)
drainage_raster = gdal.Open(drainage_raster_path, gdal.GA_ReadOnly)

if catchment_raster is None or drainage_raster is None:
    print("Error: Could not open raster files.")
    exit(1)

# Get raster band data as numpy arrays
data1 = raster_to_array(catchment_raster)
data2 = raster_to_array(drainage_raster)

# Find unique IDs in catchment_raster
unique_ids = np.unique(data1)

# Iterate over unique IDs and assign highest pixel value from drainage_raster
for uid in unique_ids:
    # Create a mask for the current unique ID
    mask = (data1 == uid)
    
    # Extract values from drainage_raster where mask is True
    values_for_id = data2[mask]
    
    if len(values_for_id) > 0:
        max_value_for_id = np.max(values_for_id)
        
        # Assign max_value_for_id to catchment_raster where mask is True
        data1[mask] = max_value_for_id

# Output path for the modified catchment_raster
output_raster_path = os.path.join(output_dir, 'Stream_Order_Raster.tif')

# Write the updated data1 to a new raster file
array_to_raster(data1, catchment_raster, output_raster_path)

# Close raster files
catchment_raster = None
drainage_raster = None

print("Operation completed successfully.")

# optionally you can delete the temporary files
os.remove(catchment_path)
os.remove(drainage_raster_path)
