# Create SHETRAN Raster Data
*Ben Smith | 12/12/2025*

This script is designed to take online downloads and reconfigure them into raster layers that can be used to setup SHETRAN models.

Todo:
- Run at 100m, 200, and 500m.
Consider the fixes for the catchments that are below sea level (but that may be one for a later script).

### Preamble

In [19]:
import os
import zipfile

import rasterio
from rasterio.merge import merge
import numpy as np

# from scipy.ndimage import generic_filter

root = 'I:/SHETRAN_GB_2021/02_Input_Data/National Data Inputs for SHETRAN UK/'
resolution_output = 100

def write_ascii(
        array: np,
        ascii_ouput_path: str,
        xllcorner: float,
        yllcorner: float,
        cellsize: float,
        ncols: int = None,
        nrows: int = None,
        NODATA_value: int = -9999):

        if len(array.shape) > 0:
            nrows, ncols = array.shape

        file_head = "\n".join(
            ["ncols         " + str(ncols),
             "nrows         " + str(nrows),
             "xllcorner     " + str(xllcorner),
             "yllcorner     " + str(yllcorner),
             "cellsize      " + str(cellsize),
             "NODATA_value  " + str(NODATA_value)])

        with open(ascii_ouput_path, 'wb') as output_filepath:
            np.savetxt(fname=output_filepath, X=array,
                       delimiter=' ', newline='\n', fmt='%1.1f', comments="",
                       header=file_head
                       )


def read_ascii_raster(file_path, data_type=int, return_metadata=True):
    """
    Read ascii raster into numpy array, optionally returning headers.
    """
    headers = []
    dc = {}
    with open(file_path, 'r') as fh:
        for i in range(6):
            asc_line = fh.readline()
            headers.append(asc_line.rstrip())
            key, val = asc_line.rstrip().split()
            dc[key] = val
    ncols = int(dc['ncols'])
    nrows = int(dc['nrows'])
    xll = float(dc['xllcorner'])
    yll = float(dc['yllcorner'])
    cellsize = float(dc['cellsize'])
    nodata = float(dc['NODATA_value'])

    arr = np.loadtxt(file_path, dtype=data_type, skiprows=6)

    headers = '\n'.join(headers)
    headers = headers.rstrip()

    if return_metadata:
        return arr, ncols, nrows, xll, yll, cellsize, nodata, headers, dc
    else:
        return arr

# Function for cell aggregation
def cell_reduce(array, block_size, func=np.mean):
    """
    Resample a NumPy array by reducing its resolution using block aggregation.
    Parameters:
    - array: Input NumPy array.
    - block_size: Factor by which to reduce the resolution.
    - func: Aggregation function (e.g., np.mean, np.min, np.max).
    """
    shape = (array.shape[0] // block_size, block_size, array.shape[1] // block_size, block_size,)

    return func(array.reshape(shape), axis=(1, 3))

# Define a function to calculate the mean of valid neighbors:
def fill_holes(values):
    # This will fill all holes with a value in a neighboring cell.

    center = values[4]  # Center pixel in the 3x3 window
    if np.isnan(center):  # If the center is a hole
        neighbors = values[np.arange(len(values)) != 4]  # Exclude the center
        valid_neighbors = neighbors[~np.isnan(neighbors)]  # Keep valid neighbors
        if len(valid_neighbors) > 0:  # Fill only if there are valid neighbors
            return valid_neighbors.mean()
    return center  # Return the original value if not a hole

## Elevation Data

Elevation data for the DEM and minDEM is taken from the OS Terrain 50 dataset. This is free to download:
https://osdatahub.os.uk/downloads/open/Terrain50

Around the coastline, the OS data shows the sea using negative values (presumably taken from a low resolution bathymetry map). It is presumed that this will not impact SHETRAN elevations going forward as the setups do not run to the coast. If much larger negative values were used (i.e. -9999) then this may have a greater impact on those coastal cells compared to the current OS values (0 to -2m or so); although these would still be unlikely to be included within the model domains.

This is used to create the DEM and minimum DEM (which is used for rivers).

OSNI 50m data for Northern Ireland was downloaded as a csv of points. These were converted into an ascii grid using QGIS:
 1. Reprojected from ING to BNG.
2. Converted from points to gridded raster with extents rounded to the appropriate 50m.
3. No data cells (where there were no points in a raster cell) were filled using Fill No Data, ensuring to only look 1 cell away for a value. This does fill some water cells that should be missing data, but this is non-consequential.
4. Data written as an ascii grid for incorporation into the rasters below.
5. The NI data would not immediately merge with the GB data due to an issue with the projection. These were very similar (see below), and so I simply copied a GB projection from a prj file to the NI prj file... I don't think this makes any tangible difference.

GB Projection:
<code>
PROJCS["British_National_Grid",GEOGCS["GCS_OSGB_1936",DATUM["D_OSGB_1936",SPHEROID["Airy_1830",6377563.396,299.3249646]],PRIMEM["Greenwich",0],UNIT["Degree",0.017453292519943295]],PROJECTION["Transverse_Mercator"],PARAMETER["False_Easting",400000],PARAMETER["False_Northing",-100000],PARAMETER["Central_Meridian",-2],PARAMETER["Scale_Factor",0.999601272],PARAMETER["Latitude_Of_Origin",49],UNIT["Meter",1]]
</code>

Original NI Projection
<code>
PROJCS["British_National_Grid",GEOGCS["GCS_OSGB_1936",DATUM["D_OSGB_1936",SPHEROID["Airy_1830",6377563.396,299.3249646]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.0174532925199433]],PROJECTION["Transverse_Mercator"],PARAMETER["False_Easting",400000.0],PARAMETER["False_Northing",-100000.0],PARAMETER["Central_Meridian",-2.0],PARAMETER["Scale_Factor",0.9996012717],PARAMETER["Latitude_Of_Origin",49.0],UNIT["Meter",1.0]]
</code>



In [2]:
# The data is within sub-folders, list these:
OS50_zip_path = os.path.join(root, "terr50_gagg_gb/data/")
OS50_zip_folders = os.listdir(OS50_zip_path)
OS50_zip_folders = [a for a in OS50_zip_folders if 'Unzipped_data' not in a]

# Setup a new folder to hold the unzipped data:
OS50_unzipped_folder = os.path.join(OS50_zip_path, 'Unzipped_data/')
if not os.path.exists(OS50_unzipped_folder):
    os.mkdir(OS50_unzipped_folder)

# Unzip the data:
for OS50_zip_folder in OS50_zip_folders:
    zip_folders = os.listdir(os.path.join(OS50_zip_path, OS50_zip_folder))
    for zip_folder in zip_folders:
        print(os.path.join(OS50_zip_path, OS50_zip_folder, zip_folder))
        with zipfile.ZipFile(os.path.join(OS50_zip_path, OS50_zip_folder, zip_folder), 'r') as zip_ref:
            zip_ref.extractall(OS50_unzipped_folder)

Join the elevation rasters into a single file.

In [None]:
# List all .asc files in the folder
asc_files = [os.path.join(OS50_unzipped_folder, f) for f in os.listdir(OS50_unzipped_folder) if f.endswith('.asc')]

# Open the GB files using rasterio:
count = 1
raster_list = []
for asc_file in asc_files:
    print(count, "/", len(asc_files))
    raster = rasterio.open(asc_file,)
    raster_list.append(raster)
    count += 1

# Open the NI files using rasterio:
print('NI', "/", len(asc_files))
raster = rasterio.open(os.path.join(root, 'OSNI_OpenData_50m/OSNI_OpenData_50m_BNG.asc'),)
raster_list.append(raster)

# Combine (merge) the rasters:
merged_raster, merged_transform = merge(raster_list, nodata=-9999)

# Close the opened raster files - you may be able to incorporate this into the loop above.
for raster in raster_list:
    raster.close()

# Extract the first raster band and change 0s to -9999:
merged_raster = merged_raster[0]
# merged_raster[merged_raster == 0] = -9999  # This was changed to merge(..., nodata=-9999) as it created issues in the fens

National_OS50_path = os.path.join(root, 'Processed Data', 'National_OS50.asc')

# Write the file as an ascii:
write_ascii(
    array=merged_raster,
    ascii_ouput_path=National_OS50_path,
    xllcorner=merged_transform[2],
    yllcorner=merged_transform[5]-(merged_raster.shape[0]*merged_transform[0]),
    cellsize=merged_transform[0],
)


Regrid the elevation rasters to the desired size.

Note that this does assume that the lower left corner of the national OS50 file is at 0,0, easting northing. Check this if you are redoing this work. you can load the header of the file using the following code:
<code>
headers = []
with open(OS50_zip_path + 'National_OS50.asc', 'r') as fh:
for i in range(6):
asc_line = fh.readline()
headers.append(asc_line.rstrip())
headers
</code>

The first stage of this is to ensure that the 50m data is of the same extent as the 1km data. Rows and columns are added to ensure this. This means that the data has an extent that is in 1km, so can be resampled to divisions of this (1km, 500m, 200m, 100m). This may not work if you try other resolutions as, because the calculations will run from the top left, not the bottom left, the resampled dataset may not have llx/lly coordinates of 0,0. Think about this if you want to use other resolutions!

In [None]:
national_OS50, _, _, _, _, _, _, _, OS50_header = read_ascii_raster(National_OS50_path, data_type=float)

In [None]:
# # If you have not loaded in the dataset (perhaps because you are only testing the code), you can check the dimentions of the 50m dataset using this code:
#
# OS50_header = {}
# with open(OS50_zip_path + 'National_OS50.asc', 'r') as fh:
#     for i in range(6):
#         asc_line = fh.readline()
#         key, val = asc_line.rstrip().split()
#         OS50_header[key] = val
# OS50_header

In [None]:
# Resize the national dataset to match existing SHETRAN inputs:
# Resize the inputs to the desired SHETRAN grid (top right corner should be x: 661000, y: 1241000):
row_difference = ((661*1000) - float(OS50_header['nrows']) * float(OS50_header['cellsize'])) / float(OS50_header['cellsize'])
col_difference = ((1241*1000) - float(OS50_header['ncols']) * float(OS50_header['cellsize'])) / float(OS50_header['cellsize'])

if row_difference > 0:
    # Create the rows of -9999
    new_rows = np.full((row_difference, national_OS50.shape[1]), -9999)
    # Add the new rows to the top
    national_OS50 = np.vstack((new_rows, national_OS50))

# repeat for columns:
if row_difference > 0:
    new_cols = np.full((national_OS50.shape[0], col_difference), -9999)
    national_OS50 = np.hstack((national_OS50, new_cols))  # Remember that these need adding at the end.

_I have removed the code chuck below as I think it is superfluous. There were some issues resulting from changing 0 values to NA when in fact these are valid elevations. This has been corrected and the code designed to fix/fill the holes left below in case of potential future uses.

*_it may be of use in the Northern Ireland catchments, where there is a greater presence of NA values over lakes._*

_This will fill the holes (na/-9999 values) in the dataset - this code will only fill calls that have a valid value in an adjacent cell._

<code>
\# Replace hole_value with NaN for processing
raster[raster == -9999] = np.nan
\# Apply the function iteratively
filled_national_OS50 = generic_filter(national_OS50, fill_holes, size=3, mode='constant', cval=np.nan)
filled_national_OS50[filled_national_OS50 == np.nan] = -9999
\# Write the file as an ascii:
write_ascii(
    array=filled_national_OS50,
    ascii_ouput_path=f'{OS50_zip_path}National_OS50_DEM_preprocessed.asc',
    xllcorner=OS50_header['xllcorner'],
    yllcorner=OS50_header['yllcorner'],
    cellsize=float(OS50_header['cellsize'])
)
</code>

In [None]:
# Resample the data at the desired resolution:

# Define the block size for aggregation
resolution_input = float(OS50_header['cellsize'])
block_size = int(resolution_output/resolution_input)  # For 50m -> 100m, use a block size of 2

# Resample using the mean
DEM = cell_reduce(national_OS50, block_size, np.mean)

# Resample using the minimum
minDEM = cell_reduce(national_OS50, block_size, np.min)

In [3]:
# Write the file as an ascii:
write_ascii(
    array=DEM,
    ascii_ouput_path=f'{root}/Processed Data/National_OS50_DEM_{resolution_output}m.asc',
    xllcorner=OS50_header['xllcorner'],
    yllcorner=OS50_header['yllcorner'],
    cellsize=resolution_output
)

# Write the file as an ascii:
write_ascii(
    array=minDEM,
    ascii_ouput_path=f'{root}/Processed Data/National_OS50_minDEM_{resolution_output}m.asc',
    xllcorner=OS50_header['xllcorner'],
    yllcorner=OS50_header['yllcorner'],
    cellsize=resolution_output
)
