---

# Simplified access to Veluwe ecological data through OGC Web Services

**Author:** Hudson Passos  
**Internship host:** Netherlands Institute of Ecology (NIOO-KNAW)  
**Host supervisor:** Stefan Vriend (NIOO-KNAW)  
**WUR supervisor:** Liesbeth Bakker (WUR, NIOO-KNAW)  
**Repository:** [research-project-internship-nioo](https://github.com/hudsonpassos/research-project-internship-nioo)  
**Date:** July 18, 2025  
**Python version:** 3.11.9  
**License:** MIT  
**Description:**  
This notebook is part of a research internship project. It focuses on the automated selection, filtering, 
and preprocessing of open ecological geospatial datasets for the Veluwe region using OGC Web Services (WCS and WFS).


---

# Part 1: Harvesting metadata from NGR

### 1.1. Initialization: packages, paths, and spatial inputs

**Packages**

In [11]:
import pandas as pd
import geopandas as gpd
import time
from PIL import Image
import os
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from owslib.csw import CatalogueServiceWeb
from owslib.ows import ServiceIdentification
import requests
from lxml import etree
import xml.etree.ElementTree as ET
import pandas as pd
import re
from functools import lru_cache
from typing import List, Optional

**Setting pathways**

In [2]:
#root = "C:/Users/hudso/OneDrive/Documents/02. WUR/11. INTERNSHIP"
#outlines_path = os.path.join(root, "02 data/outlines")
#output_path = os.path.join(root, "05 python_project/output")

In [12]:
root = os.getcwd()
outlines_path = os.path.join(root, "outlines")
output_path = os.path.join(root, "output")

for path in (output_path, outlines_path):
    if not os.path.exists(path):
        os.makedirs(path)

### 1.2. Packages and functions:

In [13]:
# Namespaces for XML parsing
NS = {
    'gmd':   'http://www.isotc211.org/2005/gmd',
    'gco':   'http://www.isotc211.org/2005/gco',
    'gmx':   'http://www.isotc211.org/2005/gmx',
    'xlink': 'http://www.w3.org/1999/xlink',
}

@lru_cache(maxsize=128)
def fetch_root(identifier: str) -> ET.Element | None:
    url = f"https://www.nationaalgeoregister.nl/geonetwork/srv/api/records/{identifier}/formatters/xml"
    r = requests.get(url, timeout=20)
    if r.status_code == 404:
        return None
    r.raise_for_status()
    return ET.fromstring(r.content)

@lru_cache(maxsize=128)
def fetch_json(identifier: str) -> dict:
    """
    Try to fetch the JSON metadata for a given identifier.
    If the server returns 400 or 404, assume no JSON exists → return {}.
    Otherwise raise for other errors.
    """
    url = f"https://www.nationaalgeoregister.nl/geonetwork/srv/api/records/{identifier}/formatters/json"
    r = requests.get(url, timeout=20)
    if r.status_code in (400, 404):
        return {}
    r.raise_for_status()
    return r.json()

def first_text(elem: ET.Element, path: str) -> str | None:
    node = elem.find(path, NS)
    return node.text.strip() if node is not None and node.text else None

def get_resource_type(identifier: str) -> str | None:
    """
    Detect resource type (dataset, service, model, etc.) from JSON if available,
    otherwise fall back to the XML <gmd:hierarchyLevel>/<gmd:MD_ScopeCode>.
    """
    # 1) try the JSON endpoint
    j = fetch_json(identifier)
    t = j.get("type")
    if t:
        return t

    # 2) fallback to XML hierarchyLevel
    root = fetch_root(identifier)
    if root is None:
        return None

    # look for <gmd:hierarchyLevel><gmd:MD_ScopeCode codeListValue="..."/>
    scope = root.find('.//gmd:hierarchyLevel/gmd:MD_ScopeCode', NS)
    if scope is not None:
        val = scope.attrib.get('codeListValue')
        if val:
            return val

    # as a last resort, try the human-readable <gmd:hierarchyLevelName>
    name = root.find('.//gmd:hierarchyLevelName/gco:CharacterString', NS)
    if name is not None and name.text:
        return name.text.strip()

    return None

def get_metadata_date(identifier: str) -> str | None:
    root = fetch_root(identifier)
    if root is None:
        return None
    for tag in ('gco:Date', 'gco:DateTime'):
        node = root.find(f'.//gmd:dateStamp/{tag}', NS)
        if node is not None and node.text:
            return node.text.split('T')[0]
    return None

def get_language(identifier: str) -> str | None:
    root = fetch_root(identifier)
    if root is None:
        return None
    for lang_node in root.findall('.//gmd:LanguageCode', NS):
        code = lang_node.attrib.get('codeListValue')
        if code:
            return code.strip()
        if lang_node.text:
            return lang_node.text.strip()
    return None

def get_title(identifier: str) -> str | None:
    root = fetch_root(identifier)
    if root is None:
        return None
    ident = root.find('.//gmd:MD_DataIdentification', NS) or root
    return first_text(ident, './/gmd:citation//gmd:title//gco:CharacterString')

def get_keywords(identifier: str) -> str | None:
    root = fetch_root(identifier)
    if root is None:
        return None
    elems = root.findall('.//gmd:descriptiveKeywords//gmd:keyword//gco:CharacterString', NS)
    kw = [e.text.strip() for e in elems if e.text]
    return '; '.join(kw) if kw else None

def get_abstract(identifier: str) -> str | None:
    root = fetch_root(identifier)
    if root is None:
        return None
    ident = root.find('.//gmd:MD_DataIdentification', NS) or root
    return first_text(ident, './/gmd:abstract//gco:CharacterString')

def get_md_responsibleparty(identifier: str) -> str | None:
    root = fetch_root(identifier)
    if root is None:
        return None

    for contact in root.findall('.//gmd:contact', NS):
        party = contact.find('gmd:CI_ResponsibleParty', NS)
        if party is None:
            continue

        organisation = party.find('gmd:organisationName/gco:CharacterString', NS)
        if organisation is not None and organisation.text:
            return organisation.text.strip()

    return None

def get_md_contact_email(identifier: str) -> str | None:
    root = fetch_root(identifier)
    if root is None:
        return None

    for contact in root.findall('.//gmd:contact', NS):
        party = contact.find('gmd:CI_ResponsibleParty', NS)
        if party is None:
            continue

        email_el = party.find('.//gmd:electronicMailAddress/gco:CharacterString', NS)
        if email_el is not None and email_el.text:
            return email_el.text.strip()

    return None

def get_data_creators(identifier: str) -> list[str]:
    root = fetch_root(identifier)
    if root is None:
        return []
    creators: list[str] = []
    for party in root.findall('.//gmd:CI_ResponsibleParty', NS):
        rl = party.find('gmd:role/gmd:CI_RoleCode', NS)
        if rl is not None and rl.attrib.get('codeListValue') == 'originator':
            name = (
                first_text(party, './/gmd:individualName/gco:CharacterString')
                or first_text(party, './/gmd:organisationName/gco:CharacterString')
            )
            if name:
                creators.append(name)
    return creators

def get_data_publishers(identifier: str) -> list[str]:
    root = fetch_root(identifier)
    if root is None:
        return []
    pubs: list[str] = []
    for party in root.findall('.//gmd:CI_ResponsibleParty', NS):
        rl = party.find('gmd:role/gmd:CI_RoleCode', NS)
        if rl is not None and rl.attrib.get('codeListValue') in ('publisher'):
            name = (
                first_text(party, './/gmd:individualName/gco:CharacterString')
                or first_text(party, './/gmd:organisationName/gco:CharacterString')
            )
            if name:
                pubs.append(name)
    return pubs

def get_data_pointcontact(identifier: str) -> list[str]:
    root = fetch_root(identifier)
    if root is None:
        return []

    contacts: list[str] = []
    for party in root.findall('.//gmd:CI_ResponsibleParty', NS):
        rl = party.find('gmd:role/gmd:CI_RoleCode', NS)
        if rl is not None and rl.attrib.get('codeListValue') == 'pointOfContact':
            individual = first_text(party, './/gmd:individualName/gco:CharacterString')
            organisation = first_text(party, './/gmd:organisationName/gco:CharacterString')

            if individual and organisation:
                name = f"{individual} ({organisation})"
            elif individual:
                name = individual
            elif organisation:
                name = organisation
            else:
                name = ""

            contact_str = f"{name}" if name else None
            if contact_str:
                contacts.append(contact_str.strip())

    return contacts

def get_bounding_box(identifier: str) -> tuple[str,str,str,str] | None:
    root = fetch_root(identifier)
    if root is None:
        return None
    bbox = root.find('.//gmd:EX_GeographicBoundingBox', NS)
    if bbox is None:
        return None
    west  = first_text(bbox, './/gmd:westBoundLongitude/gco:Decimal')
    south = first_text(bbox, './/gmd:southBoundLatitude/gco:Decimal')
    east  = first_text(bbox, './/gmd:eastBoundLongitude/gco:Decimal')
    north = first_text(bbox, './/gmd:northBoundLatitude/gco:Decimal')
    return (west, south, east, north)

def get_license(identifier: str) -> str | None:
    root = fetch_root(identifier)
    if root is None:
        return None
    for oc in root.findall('.//gmd:MD_LegalConstraints/gmd:otherConstraints', NS):
        cs = oc.find('gco:CharacterString', NS)
        if cs is not None and cs.text and cs.text.strip():
            return cs.text.strip()
        anch = oc.find('gmx:Anchor', NS)
        if anch is not None:
            text = (anch.text or '').strip()
            href = anch.attrib.get('{http://www.w3.org/1999/xlink}href')
            if text and href:
                return f"{text} ({href})"
            if text:
                return text
            if href:
                return href
    return None

def get_access_rights(identifier: str) -> str | None:
    root = fetch_root(identifier)
    if root is None:
        return None
    for lc in root.findall('.//gmd:MD_LegalConstraints', NS):
        ac = lc.find('gmd:accessConstraints/gmd:MD_RestrictionCode', NS)
        if ac is not None:
            return ac.attrib.get('codeListValue')
    return None

def get_crs_epsg_codes(identifier: str) -> str | None:
    """
    Extract EPSG code(s) from <gmd:referenceSystemInfo>. Returns only the numeric parts
    (e.g., 28992, 5709), joined by semicolons if more than one.
    """
    root = fetch_root(identifier)
    if root is None:
        return None
    codes = set()
    for rs in root.findall('.//gmd:referenceSystemInfo//gmd:RS_Identifier//gmd:code', NS):
        # First check for gmx:Anchor with xlink:href
        anchor = rs.find('gmx:Anchor', NS)
        if anchor is not None:
            href = anchor.attrib.get('{http://www.w3.org/1999/xlink}href', '')
            if "EPSG" in href:
                epsg_code = href.rstrip('/').split('/')[-1]
                if epsg_code.isdigit():
                    codes.add(epsg_code)
        # Then check for plain CharacterString content
        else:
            text = rs.find('gco:CharacterString', NS)
            if text is not None and text.text:
                if text.text.strip().isdigit():
                    codes.add(text.text.strip())

    return '; '.join(sorted(codes)) if codes else None


def detect_md_standard(identifier: str) -> str:
    csw_url = "https://www.nationaalgeoregister.nl/geonetwork/srv/dut/csw"
    headers = {'Content-Type': 'application/xml'}

    def fetch(outputschema: str) -> ET.Element | None:
        xml = f"""<?xml version="1.0" encoding="UTF-8"?>
        <csw:GetRecordById
            xmlns:csw="http://www.opengis.net/cat/csw/2.0.2"
            service="CSW" version="2.0.2"
            outputSchema="{outputschema}"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://www.opengis.net/cat/csw/2.0.2
            http://schemas.opengis.net/csw/2.0.2/CSW-discovery.xsd">
          <csw:Id>{identifier}</csw:Id>
          <csw:ElementSetName>full</csw:ElementSetName>
        </csw:GetRecordById>"""
        try:
            resp = requests.post(csw_url, data=xml, headers=headers, timeout=20)
            resp.raise_for_status()
            return ET.fromstring(resp.content)
        except:
            return None

    results = []

    r_gmd = fetch("http://www.isotc211.org/2005/gmd")
    if r_gmd is not None and r_gmd.find('.//{http://www.isotc211.org/2005/gmd}MD_Metadata') is not None:
        results.append("ISO 19115")

    r_gfc = fetch("http://www.isotc211.org/2005/gfc")
    if r_gfc is not None and r_gfc.find('.//{http://www.isotc211.org/2005/gfc}FC_FeatureCatalogue') is not None:
        results.append("ISO 19110")

    if not results:
        r_csw = fetch("http://www.opengis.net/cat/csw/2.0.2")
        if r_csw is not None and r_csw.find('.//{http://www.opengis.net/cat/csw/2.0.2}Record') is not None:
            results.append("Dublin Core")

    return ', '.join(results) if results else np.nan

def get_landing_page_url(identifier: str) -> str | None:
    """
    Extracts the landing page URL from the metadata record by looking for
    a gmd:protocol indicating 'landingpage' and returning the associated gmd:URL.
    
    Args:
        identifier (str): The metadata record UUID or identifier.
    
    Returns:
        str | None: The landing page URL if found, else None.
    """
    root = fetch_root(identifier)
    if root is None:
        return None

    for ci in root.findall(
        './/gmd:distributionInfo//gmd:MD_DigitalTransferOptions//gmd:onLine//gmd:CI_OnlineResource',
        NS
    ):
        proto_el = ci.find('gmd:protocol/gmx:Anchor', NS)
        if proto_el is not None and proto_el.text and 'landingpage' in proto_el.text.lower():
            url = first_text(ci, 'gmd:linkage/gmd:URL')
            if url:
                return url.strip()
    return None


def get_ogc_web_services(identifier: str) -> str | None:
    root = fetch_root(identifier)
    if root is None:
        return None
    services = []
    candidates = root.findall(
        './/gmd:distributionInfo//gmd:MD_DigitalTransferOptions//gmd:onLine//gmd:CI_OnlineResource',
        NS
    )
    known = ['WMS', 'WFS', 'WCS', 'WMTS', 'CSW', 'SOS', 'WPS']
    for ci in candidates:
        proto = first_text(ci, 'gmd:protocol/gco:CharacterString') or ''
        url   = first_text(ci, 'gmd:linkage/gmd:URL') or ''
        text  = proto + ' ' + url
        for svc in known:
            if svc.lower() in text.lower() and svc not in services:
                services.append(svc)
    return '; '.join(services) if services else None

def get_ogc_capabilities_url(identifier: str, service: str) -> str | None:
    """
    Extract the most specific OGC GetCapabilities URL for the given service (e.g., WCS, WFS, WMS).
    Supports both API-style and standard OGC-style endpoints.
    
    Parameters:
        identifier (str): CSW record identifier
        service (str): Target OGC service type, e.g., 'wcs', 'wfs', 'wms' (case-insensitive)
    
    Returns:
        str | None: URL of the service's GetCapabilities endpoint, or None if not found.
    """
    try:
        service = service.lower()  # Normalize for comparison
        root = fetch_root(identifier)
        if root is None:
            return None

        urls = []
        for ci in root.findall(
            './/gmd:distributionInfo//gmd:MD_DigitalTransferOptions//gmd:onLine//gmd:CI_OnlineResource',
            NS
        ):
            url = first_text(ci, 'gmd:linkage/gmd:URL')
            proto = first_text(ci, 'gmd:protocol/gco:CharacterString') or ''
            if not url:
                continue

            # Identify service by either URL or protocol
            is_service = (
                service in url.lower() or
                service in proto.lower() or
                f'service={service}' in url.lower()
            )

            if is_service:
                urls.append(url)

        if not urls:
            return None

        # Prefer URLs with version info
        with_version = [u for u in urls if 'version=' in u.lower()]
        if with_version:
            return with_version[0]

        return urls[0]  # fallback

    except Exception as e:
        print(f"Failed for identifier={identifier}: {e}")
        return None

# ─── Get Identifiers via OWSLib ──────────────────────────────────────────────
def get_all_identifiers(max_records: int | None = 100) -> list[str]:
    csw_url = "https://www.nationaalgeoregister.nl/geonetwork/srv/dut/csw"
    csw = CatalogueServiceWeb(csw_url)

    csw.getrecords2(maxrecords=1, startposition=1, esn="brief")
    total = csw.results['matches']
    print(f"Total available: {total}")

    to_do = total if max_records is None else min(total, max_records)
    print(f"Will fetch: {to_do} identifiers")

    ids = []
    start = 1
    batch_size = 100

    while len(ids) < to_do:
        csw.getrecords2(maxrecords=batch_size, startposition=start, esn="brief")
        batch_ids = list(csw.records.keys())
        if not batch_ids:
            break
        ids.extend(batch_ids)
        start += batch_size

    return ids[:to_do]

def extract_all_metadata(max_records: int = 10) -> pd.DataFrame:
    ids = get_all_identifiers(max_records)
    rows = []
    for identifier in tqdm(ids, desc="Extracting metadata"):
        rows.append({
            "identifier":         identifier,
            "resource_type":      get_resource_type(identifier),
            "md_standard":        detect_md_standard(identifier),
            "ogc_web_services":   get_ogc_web_services(identifier),
            "md_date":            get_metadata_date(identifier),
            "language":           get_language(identifier),
            "crs_epsg_codes":     get_crs_epsg_codes(identifier),
            "title":              get_title(identifier),
            "keywords":           get_keywords(identifier),
            "abstract":           get_abstract(identifier),
            "md_contact_name":    get_md_responsibleparty(identifier),
            "md_contact_email":   get_md_contact_email(identifier),
            "landing_page":       get_landing_page_url(identifier),
            "data_creators":      "; ".join(get_data_creators(identifier))   or None,
            "data_publishers":    "; ".join(get_data_publishers(identifier)) or None,
            "data_pointcontact":      "; ".join(get_data_pointcontact(identifier))  or None,
            "bounding_box":       get_bounding_box(identifier),
            "license":            get_license(identifier),
            "access_rights":      get_access_rights(identifier),
            "wcs_getcapabilities_url": get_ogc_capabilities_url(identifier, "wcs"),
            "wfs_getcapabilities_url": get_ogc_capabilities_url(identifier, "wfs"),
        })
    return pd.DataFrame(rows)


### 1.3. Run metadata extraction:

In [None]:
df = extract_all_metadata(max_records=None) # To fetch all records, use max_records=None

Total available: 9510
Will fetch: 9510 identifiers


**Checkpoint 01:**

In [9]:
# Saving
df.to_csv(os.path.join(output_path, "checkpoint01_ngr_all_metadata.csv"), index=False)


---