In [1]:
import requests
import uuid
from datetime import datetime
from lxml import etree
import os

In [2]:
# Register ISO namespaces
namespaces = {
    "gmd": "http://www.isotc211.org/2005/gmd",
    "gco": "http://www.isotc211.org/2005/gco",
    "gfc": "http://www.isotc211.org/2005/gfc",
    "srv": "http://www.isotc211.org/2005/srv",
    "gmx": "http://www.isotc211.org/2005/gmx",
    "gts": "http://www.isotc211.org/2005/gts",
    "gsr": "http://www.isotc211.org/2005/gsr",
    "gss": "http://www.isotc211.org/2005/gss",
    "gmi": "http://www.isotc211.org/2005/gmi",
    "napm": "http://www.geconnections.org/nap/napMetadataTools/napXsd/napm",
    "gml": "http://www.opengis.net/gml/3.2",
    "xlink": "http://www.w3.org/1999/xlink",
    "xsi": "http://www.w3.org/2001/XMLSchema-instance"
}

In [3]:
def resolve_tag(tag):
    """
    Convert 'gmd:tag' to '{uri}tag' using namespace mappings
    """
    if ":" not in tag:
        return tag

    prefix, tag = tag.split(":", 1)
    return f"{{{namespaces[prefix]}}}{tag}"

In [4]:
def get_value(record, source, default=""):
  """
  Extracts nested values from the dict record using the source path connected by dot
  Example: 
    - "edhProfile.characterSet": str
    - "files.0.id": str. get("files") is a list, "0" is the index
    - "topicCategory": []
  """
  if not source:
    return default
  
  value = record
  parts = source.split(".")

  for part in parts:
    if isinstance(value, list):
      try:
        idx = int(part)
        value = value[idx]
      except (ValueError, IndexError):
        return default
    else:
      try:
        value = value[part]
      except (KeyError, TypeError):
        return default

  return value

In [5]:
def ensure_child(parent, tag, attrib=None):
  """
  Find an existing child with the same tag, or create it.
  """
  tag = resolve_tag(tag)
  
  for child in parent:
    if child.tag == tag:
      return child
    
  # Special attrib xsi:type 
  attrib = {
    resolve_tag(k): v for k, v in (attrib or {}).items()
  }

  return etree.SubElement(parent, tag, attrib)

In [6]:
def normalize_path_item(item):
  """
  Normalize path item into (tag, attrib)
  Support
  - ("tag") -> (tag, None)
  - ("tag",) -> (tag, None)
  - ("tag", {attrib}) -> (tag, attrib)
  """
  if isinstance(item, str):
    return item, None
  
  if isinstance(item, tuple):
    if len(item) == 1:
      return item[0], None
    elif len(item) == 2:
      return item[0], item[1]
    else:
      raise ValueError(f"Invalid tuple length in path item {item}")
  raise ValueError(f"Invalid path entry {item}")

In [7]:
def add_bilingual_text(parent, en_value, fr_value=None):
  """
  Builds bilingual text
  """
  en_elem = etree.SubElement(parent, resolve_tag("gco:CharacterString"))
  en_elem.text = str(en_value)

  if fr_value:
    pt = etree.SubElement(parent, resolve_tag("gmd:PT_FreeText"))
    tg = etree.SubElement(pt, resolve_tag("gmd:textGroup"))
    fr_elem = etree.SubElement(
      tg,
      resolve_tag("gmd:LocalisedCharacterString"),
      {"locale": "#fra"}
    )
    fr_elem.text = str(fr_value)

In [8]:
def attach_value(node, value, fr_value=None, is_bilingual=False):
    """
    Attach value (optional bilingual) to a leaf node.
    """
    if is_bilingual:
        add_bilingual_text(node, value, fr_value)
    else:
        node.text = str(value)

In [None]:
def apply_mapping(root, record, mapping):
  # Collect all repeat tags defined in the mapping
  all_repeat_tags = {
    conf.get("repeat")
    for conf in mapping.values()
    if isinstance(conf.get("repeat"), str)
  }
  
  for _, conf in mapping.items():
    repeat_tag = conf.get("repeat")
    is_repeat = isinstance(repeat_tag, str)

    # Resolve value
    if "text" in conf:
      value = conf.get("text")
    else:
      value = get_value(record, conf.get("source"))
    
    if value in (None, "", []):
      continue

    # Normalize path
    path = [normalize_path_item(item) for item in conf["path"]]

    # Resolve bilingual values once
    fr_values = None
    is_bilingual = "source_fr" in conf and "text" not in conf
    if is_bilingual:
      fr_values = get_value(record, conf.get("source_fr"))

    # CASE 1: Defines repetition
    # ==========================

    # Split path into container -> repeat subtree
    if is_repeat:
      # Normalize values
      values = value if is_repeat else [value]
      
      try:
        repeat_idx = next(
          i for i, (tag, _) in enumerate(path) if tag == repeat_tag
        )
      except StopIteration:
        raise ValueError(
          f"Repeat tag '{repeat_tag}' not found in path: {path}"
        )
      container_path = path[:repeat_idx]
      repeat_path = path[repeat_idx:]

      # Build container once
      parent = root
      for tag, attrib in container_path:
        parent = ensure_child(parent, tag, attrib)

      # Repeat repeated subtree
      for idx, value in enumerate(values):
        current = parent

        for tag, attrib in repeat_path:
          current = etree.SubElement(
            current,
            resolve_tag(tag),
            {resolve_tag(k): v for k, v in (attrib or {}).items()}
          )

        fr_value = None
        if is_bilingual:
          if isinstance(fr_values, list) and idx < len(fr_values):
            fr_value = fr_values[idx]
          elif not isinstance(fr_values, list):
            fr_value = fr_values

        attach_value(current, value, fr_value, is_bilingual)

    # CASE 2: Decorates an existing repeating structure
    # =================================================
    else:
      repeat_positions = [
        i for i, (tag, _) in enumerate(path) if tag in all_repeat_tags
      ]

      if repeat_positions:
        repeat_idx = repeat_positions[0]
        container_path = path[:repeat_idx]
        post_repeat_path = path[repeat_idx + 1:]
        repeat_tag_name, _ = path[repeat_idx]

        # Walk to parent to repeated elements
        parent = root
        for tag, attrib in container_path:
          parent = ensure_child(parent, tag, attrib)

        repeated_nodes = parent.findall(resolve_tag(repeat_tag_name))

        for idx, repeated_node in enumerate(repeated_nodes):
          current = repeated_node
          for tag, attrib in post_repeat_path:
            current = ensure_child(current, tag, attrib)

          fr_value = None
          if is_bilingual:
            if isinstance(fr_values, list) and idx < len(fr_values):
              fr_value = fr_values[idx]
            elif not isinstance(fr_values, list):
              fr_value = fr_values

          attach_value(current, value, fr_value, is_bilingual)
      
      # CASE 3: Normal non-repeat field
      # ===============================
      else:
        parent = root
        for tag, attrib in path[:-1]:
          parent = ensure_child(parent, tag, attrib)

        leaf_tag, leaf_attrib = path[-1]
        leaf = ensure_child(parent, leaf_tag, leaf_attrib)

        fr_value = None
        if is_bilingual:
          fr_value = fr_values

        attach_value(leaf, value, fr_value, is_bilingual)

In [10]:
def build_xml(record, record_id, field_mapping):
  root = etree.Element(resolve_tag("gmd:MD_Metadata"), nsmap=namespaces)
  apply_mapping(root, record, field_mapping)
  return etree.ElementTree(root)

In [11]:
def fetch_json(api_url):
    response = requests.get(api_url)
    response.raise_for_status()
    return response.json()

In [12]:
def extract_record_id(record):
    return record.get("files", [{}])[0].get("id", str(uuid.uuid4()))

In [13]:
def generate_xml(api_url, field_mapping, output_dir="output"):
    os.makedirs(output_dir, exist_ok=True)

    raw = fetch_json(api_url)
    records = raw.get("data", [])

    print(f"Found {len(records)} records.")

    for record in records:
        record_id = extract_record_id(record)
        xml_tree = build_xml(record, record_id, field_mapping)

        filename = f"{output_dir}/{record_id}.xml"
        xml_tree.write(filename, pretty_print=True)
        print(f"Saved {filename}")

In [14]:
"""
source: path in source object
source_fr: this is optional. it builds a #fra block in the output
text: direct value for the element. if text exist, ignore source
repeat:
path:
"""

field_mapping = {
  "id": {
    "source": "files.0.id",
    "path": [
      ("gmd:fileIdentifier"),
      ("gco:CharacterString")
    ]
  },
  "dateStamp": {
    "text": datetime.now().isoformat(),
    "path": [
      ("gmd:dateStamp"),
      ("gco:DateTime")
    ]
  },
  "language": {
    "source": "edhProfile.language",
    "path": [
      ("gmd:language"),
      ("gco:CharacterString")
    ]
  },
  "characterSet": {
    "source": "edhProfile.characterSet",
    "path": [
      ("gmd:characterSet"),
      ("gmd:MD_CharacterSetCode", 
       { 
         "codeList": "https://schemas.metadata.geo.ca/register/napMetadataRegister.xml#IC_95",
         "codeListValue": "RI_458"
        }
      )
    ]
  },
  "hierarchyLevel": {
    "source": "edhProfile.hierarchyLevel",
    "path": [
      ("gmd:hierarchyLevel"),
      ("gmd:MD_ScopeCode", 
       { 
         "codeList": "https://schemas.metadata.geo.ca/register/napMetadataRegister.xml#IC_108",
         "codeListValue": "RI_623"
        }
      )
    ]
  },
  "organizationName": {
    "source": "organizationName",
    "source_fr": "organizationName",
    "path": [
      ("gmd:contact"),
      ("gmd:CI_ResponsibleParty"),
      ("gmd:organisationName", {"xsi:type": "gmd:PT_FreeText_PropertyType"}),
    ]
  },
  "emailAddress": {
    "source": "emailAddress",
    "source_fr": "emailAddress",
    "path": [
      ("gmd:contact"),
      ("gmd:CI_ResponsibleParty"),
      ("gmd:contactInfo"),
      ("gmd:CI_Contact"),
      ("gmd:address"),
      ("gmd:electronicMailAddress", {"xsi:type": "gmd:PT_FreeText_PropertyType"}),
    ]
  },
  "role": {
    "source": "edhProfile.contactRole",
    "path": [
      ("gmd:contact"),
      ("gmd:CI_ResponsibleParty"),
      ("gmd:role"),
      ("gmd:CI_RoleCode", 
       { 
         "codeList": "https://schemas.metadata.geo.ca/register/napMetadataRegister.xml#IC_90",
         "codeListValue": "RI_414"
        }
      ),
    ]
  },
  "title": {
    "source": "title",
    "source_fr": "titleFr",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:citation"),
      ("gmd:CI_Citation"),
      ("gmd:title", {"xsi:type": "gmd:PT_FreeText_PropertyType"})
    ]
  },
  "dataPublication": {
    "source": "edhProfile.dataPublication",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:citation"),
      ("gmd:CI_Citation"),
      ("gmd:date"),
      ("gmd:CI_Date"),
      ("gmd:date"),
      ("gco:Date")
    ]
  },
  "abstract": {
    "source": "abstractEN",
    "source_fr": "abstractFR",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:abstract", {"xsi:type": "gmd:PT_FreeText_PropertyType"})
    ]
  },
  "status": {
    "source": "edhProfile.datasetStatus",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:status"),
      ("gmd:MD_ProgressCode", 
       { 
         "codeList": "https://schemas.metadata.geo.ca/register/napMetadataRegister.xml#IC_106",
         "codeListValue": "RI_596"
        }
      ),
    ]
  },
  "identificationInfo_language": {
    "source": "edhProfile.language",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:language"),
      ("gco:CharacterString")
    ]
  },
  "topicCategory": {
    "source": "edhProfile.topicCategory",
    "repeat": "gmd:topicCategory",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:topicCategory"),
      ("gmd:MD_TopicCategoryCode")
    ]
  },
  "updateFrequency": {
    "source": "updateFrequency",
    "path": [
      ("gmd:resourceMaintenance"),
      ("gmd:MD_MaintenanceInformation"),
      ("gmd:maintenanceAndUpdateFrequency"),
      ("gmd:MD_MaintenanceFrequencyCode", 
       { 
         "codeList": "https://schemas.metadata.geo.ca/register/napMetadataRegister.xml#IC_102",
         "codeListValue": "RI_539"
        }
      ),
    ]
  },
  "citedResponsiblePartyIndividualName": {
    "source": "edhProfile.citedResponsiblePartyIndividualName",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:citation"),
      ("gmd:CI_Citation"),
      ("gmd:citedResponsibleParty"),
      ("gmd:CI_ResponsibleParty"),
      ("gmd:individualName"),
      ("gco:CharacterString"),
    ]
  },
  "citedResponsiblePartyOrganizationName": {
    "source": "edhProfile.citedResponsiblePartyOrganizationName",
    "source_fr": "edhProfile.citedResponsiblePartyOrganizationName",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:citation"),
      ("gmd:CI_Citation"),
      ("gmd:citedResponsibleParty"),
      ("gmd:CI_ResponsibleParty"),
      ("gmd:organisationName", {"xsi:type": "gmd:PT_FreeText_PropertyType"}),
    ]
  },
  "citedResponsiblePartyEmail": {
    "source": "edhProfile.citedResponsiblePartyEmail",
    "source_fr": "edhProfile.citedResponsiblePartyEmail",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:citation"),
      ("gmd:CI_Citation"),
      ("gmd:citedResponsibleParty"),
      ("gmd:CI_ResponsibleParty"),
      ("gmd:contactInfo"),
      ("gmd:CI_Contact"),
      ("gmd:address"),
      ("gmd:CI_Address"),
      ("gmd:electronicMailAddress", {"xsi:type": "gmd:PT_FreeText_PropertyType"}),
    ]
  },
  "citedResponsiblePartyRole": {
    "source": "edhProfile.citedResponsiblePartyRole",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:citation"),
      ("gmd:CI_Citation"),
      ("gmd:citedResponsibleParty"),
      ("gmd:CI_ResponsibleParty"),
      ("gmd:role"),
      ("gmd:CI_RoleCode", 
       { 
         "codeList": "https://schemas.metadata.geo.ca/register/napMetadataRegister.xml#IC_90",
         "codeListValue": "RI_415"
        }
      ),
    ]
  },
  "descriptiveKeywords": {
    "source": "pacificSalmonTopicCategory",
    "source_fr": "pacificSalmonTopicCategory",
    "repeat": "gmd:keyword",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:descriptiveKeywords"),
      ("gmd:MD_Keywords"),
      ("gmd:keyword", {"xsi:type": "gmd:PT_FreeText_PropertyType"}),
    ]
  },
  "useLimitation": {
    "source": "license",
    "source_fr": "license",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:resourceConstraints"),
      ("gmd:MD_LegalConstraints"),
      ("gmd:useLimitation", {"xsi:type": "gmd:PT_FreeText_PropertyType"}),
    ]
  },
  "accessConstraints": {
    "source": "license",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:resourceConstraints"),
      ("gmd:MD_LegalConstraints"),
      ("gmd:accessConstraints"),
      ("gmd:MD_RestrictionCode", 
       { 
         "codeList": "https://schemas.metadata.geo.ca/register/napMetadataRegister.xml#IC_107",
         "codeListValue": "RI_606"
        }
      ),
    ]
  },
  "useConstraints": {
    "source": "license",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:resourceConstraints"),
      ("gmd:MD_LegalConstraints"),
      ("gmd:useConstraints"),
      ("gmd:MD_RestrictionCode", 
       { 
         "codeList": "https://schemas.metadata.geo.ca/register/napMetadataRegister.xml#IC_107",
         "codeListValue": "RI_606"
        }
      ),
    ]
  },
  "useConstraints": {
    "source": "license",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:resourceConstraints"),
      ("gmd:MD_SecurityConstraints"),
      ("gmd:classification"),
      ("gmd:MD_ClassificationCode", 
       { 
         "codeList": "https://schemas.metadata.geo.ca/register/napMetadataRegister.xml#IC_96",
         "codeListValue": "RI_484"
        }
      ),
    ]
  },
  "beginDate": {
    "source": "beginDate",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:extent"),
      ("gmd:EX_Extent"),
      ("gmd:EX_TemporalExtent"),
      ("gmd:extent"),
      ("gml:TimePeriod", {"gml:id": "d135389e942"}),
      ("gml:beginPosition"),
    ]
  },
  "endDate": {
    "source": "endDate",
    "path": [
      ("gmd:identificationInfo"),
      ("gmd:MD_DataIdentification"),
      ("gmd:extent"),
      ("gmd:EX_Extent"),
      ("gmd:EX_TemporalExtent"),
      ("gmd:extent"),
      ("gml:TimePeriod", {"gml:id": "d135389e942"}),
      ("gml:endPosition"),
    ]
  },
  "code": {
    "source": "spatialCode",
    "path": [
      ("gmd:referenceSystemInfo"),
      ("gmd:MD_ReferenceSystem"),
      ("gmd:referenceSystemIdentifier"),
      ("gmd:RS_Identifier"),
      ("gmd:code"),
      ("gco:CharacterString"),
    ]
  },
  "codeSpace": {
    "text": "https://epsg.io",
    "path": [
      ("gmd:referenceSystemInfo"),
      ("gmd:MD_ReferenceSystem"),
      ("gmd:referenceSystemIdentifier"),
      ("gmd:RS_Identifier"),
      ("gmd:codeSpace"),
      ("gco:CharacterString"),
    ]
  },
  "fileFormatName": {
    "source": "edhProfile.fileFormatName",
    "repeat": "gmd:distributionFormat",
    "path": [
      ("gmd:distributionInfo"),
      ("gmd:MD_Distribution"),
      ("gmd:distributionFormat"),
      ("gmd:MD_Format"),
      ("gmd:name"),
      ("gco:CharacterString")
    ]
  },
  "fileFormatVersion": {
    "source": "edhProfile.fileFormatVersion",
    "path": [
      ("gmd:distributionInfo"),
      ("gmd:MD_Distribution"),
      ("gmd:distributionFormat"),
      ("gmd:MD_Format"),
      ("gmd:version"),
      ("gco:CharacterString")
    ]
  },
  "distributionContactOrganizationName": {
    "source": "edhProfile.distributionContactOrganizationName",
    "source_fr": "edhProfile.distributionContactOrganizationName",
    "path": [
      ("gmd:distributionInfo"),
      ("gmd:MD_Distribution"),
      ("gmd:distributor"),
      ("gmd:MD_Distributor"),
      ("gmd:distributorContact"),
      ("gmd:CI_ResponsibleParty"),
      ("gmd:organisationName", {"xsi:type": "gmd:PT_FreeText_PropertyType"})
    ]
  },
  "distributionContactEmail": {
    "source": "edhProfile.distributionContactEmail",
    "source_fr": "edhProfile.distributionContactEmail",
    "path": [
      ("gmd:distributionInfo"),
      ("gmd:MD_Distribution"),
      ("gmd:distributor"),
      ("gmd:MD_Distributor"),
      ("gmd:distributorContact"),
      ("gmd:CI_ResponsibleParty"),
      ("gmd:contactInfo"),
      ("gmd:CI_Contact"),
      ("gmd:address"),
      ("gmd:CI_Address"),
      ("gmd:electronicMailAddress", {"xsi:type": "gmd:PT_FreeText_PropertyType"})
    ]
  },

  "distributionContactRole": {
    "source": "edhProfile.distributionContactRole",
    "path": [
      ("gmd:distributionInfo"),
      ("gmd:MD_Distribution"),
      ("gmd:distributor"),
      ("gmd:MD_Distributor"),
      ("gmd:distributorContact"),
      ("gmd:CI_ResponsibleParty"),
      ("gmd:role"),
      ("gmd:CI_RoleCode", 
       { 
         "codeList": "https://schemas.metadata.geo.ca/register/napMetadataRegister.xml#IC_90",
         "codeListValue": "RI_412"
        }
      ),
    ]
  },
}

In [15]:
api_url = "http://qc-cdos-css-1:8815/api/portal/dataset/harvest"
generate_xml(api_url, field_mapping)

Found 15 records.
Saved output/7d9c74a1-dd80-4362-af74-bc1c4c5b10e4.xml
Saved output/b627e320-d417-47fa-9c4a-e7d9d84602ed.xml
Saved output/dc7ebc7f-fe37-4576-b901-5be5c35cc541.xml
Saved output/6b0cb02c-8811-481e-b6e1-7dc7b601067e.xml
Saved output/a751a4e5-4ea4-4a51-9ca5-024cd353483d.xml
Saved output/63bcaf24-5734-4e2f-8482-84f9aceef3b4.xml
Saved output/16871717-3b3f-41fc-b11d-28caf4dc3791.xml
Saved output/e8238156-750b-4d72-af7d-588d6910080a.xml
Saved output/9203e352-e949-4e69-a8b8-85ddd39446d6.xml
Saved output/56416564-bce5-4908-8184-a2f31104178c.xml
Saved output/9302799d-b17e-4a9e-be44-47c41bf13c6a.xml
Saved output/c3d8a14a-7eb3-453b-b054-5da9a9da3b1a.xml
Saved output/367a50eb-9588-41c6-94e2-fe6a7aa70aa3.xml
Saved output/54d071ff-d7fb-469d-af0c-1b29b1bc8e1c.xml
Saved output/637ffcf2-7505-40bb-bf2b-f999e78f89f6.xml
