# XSD to CSV Conversion Testing

### Import needed libraries

- `zipfile` – for working with ZIP archives
- `json` – for working with jsons
- `xmlschema` – to read and validate XSD files (`pip install xmlschema`)
- `pandas` – for working with tables and dataframes (`pip install pandas`)
- `lxml.etree` – to parse and read XML/XSD structure (`pip install lxml`)
- `pathlib.Path` – for path handling

Make sure to install missing ones using `pip` if needed.

In [26]:
import zipfile
import json
import xmlschema
import pandas as pd
from lxml import etree
from pathlib import Path
from collections import OrderedDict

### Set paths for input ZIP and output folder

This cell sets:
- `input_zip_path`: path to the input ZIP file with XSDs
- `output_path`: where to extract files

If the folder doesn't exist, it will be created.

In [22]:
# Define the path to the input zip file and output directory
input_zip_path = Path("../tests/data/JVF_DTM_150_beta3_XSD.zip")
output_path = Path("../tests/output/JVF_DTM_150_beta3_XSD")

# Create the output directory if it doesn't exist
output_path.mkdir(parents=True, exist_ok=True)

### Unzip the input file

Extracts the ZIP archive to the output folder and prints the extracted file names.

In [23]:
# Extract the zip file
with zipfile.ZipFile(input_zip_path, 'r') as zip_ref:
    zip_ref.extractall(output_path)

# Show extracted files
print(f"Extracted to: {output_path}")
print(sorted(f.name for f in output_path.iterdir()))

Extracted to: ..\tests\output\JVF_DTM_150_beta3_XSD
['xsd']


### Load and parse XSD files

Defines a function that finds all `.xsd` files in the folder, parses them into XML trees, and returns a list of (file name, XML root element) pairs.

In [120]:
# Set the path to the folder with extracted XSD files
xsd_dir = output_path / "xsd"

def load_xsd_files(directory):
    """Load all XSD files from the given directory and parse them into XML trees.
    
    Args:
        directory (Path): Base directory to search for .xsd files.

    Returns:
        list of (filename, XML root element) tuples.
    """
    
    # Initialize an empty list to store parsed XSD file information
    xsd_files = []
    
    # Recursively find all .xsd files
    for path in directory.rglob("*.xsd"):
        try:
            # Parse the file into an XML tree
            tree = etree.parse(str(path))
    
            # Add the file name and root element to the list
            xsd_files.append((path.name, tree.getroot()))
    
        except etree.XMLSyntaxError as e:
            # Handle invalid XML syntax
            print(f"[XMLSyntaxError] Skipping {path.name}: {e}")
    
        except Exception as e:
            #  Handle all unexpected errors
            print(f"[UnexpectedError] Could not process {path.name}: {e}")
            raise  # Re-raise the exception

    return xsd_files

# Call the function to load and parse the XSD files
xsd_files = load_xsd_files(xsd_dir)

### Create summary table of XSD files

Builds a simple table showing each XSD file's name, root tag, and number of elements.

In [5]:
# Initialize empty lists for storing data
summary = []

for file_name, root in xsd_files:
    summary.append({
        "File Name": file_name,
        "Root Tag": root.tag,
        "Number of Elements": len(root)
    })

df_summary = pd.DataFrame(summary)
df_summary

Unnamed: 0,File Name,Root Tag,Number of Elements
0,atributy.xsd,{http://www.w3.org/2001/XMLSchema}schema,243
1,common.xsd,{http://www.w3.org/2001/XMLSchema}schema,10
2,doprovodne_informace.xsd,{http://www.w3.org/2001/XMLSchema}schema,56
3,extenze.xsd,{http://www.w3.org/2001/XMLSchema}schema,1
4,servis.xsd,{http://www.w3.org/2001/XMLSchema}schema,2
...,...,...,...
514,spatialReferencing.xsd,{http://www.w3.org/2001/XMLSchema}schema,14
515,geometry.xsd,{http://www.w3.org/2001/XMLSchema}schema,19
516,gss.xsd,{http://www.w3.org/2001/XMLSchema}schema,7
517,gts.xsd,{http://www.w3.org/2001/XMLSchema}schema,7


### Table of XSD Element References

This cell generates a table `str2.csv`

Each row includes:
- **filename**: the XSD file name,
- **nazev**: the reference name (`atr:` or `gml:`),
- **minOccurs**: optionality (`0` if the element is optional).

In [6]:
# Path to the folder with extracted XSD files
xsd_objects_path = output_path / "xsd" / "objekty"

records = []
seen_global = set()  # Track all (filename, nazev) pairs to avoid duplicates

# Loop through all .xsd files in the directory
for file_path in xsd_objects_path.glob("*.xsd"):
    try:
        # Parse the XSD file into an XML tree
        tree = etree.parse(str(file_path))
        root = tree.getroot()

        # Find all <complexType> elements in the schema
        complex_types = root.xpath(".//*[local-name()='complexType']")

        for complex_type in complex_types:
            atr_normal = []
            atr_ki = []
            gml_refs = []
            gml_min_flags = []
            atr_ki_with_0 = False

            # Find nested <element> definitions
            for element in complex_type.xpath(".//*[local-name()='element']"):
                ref = element.get("ref")
                min_occurs = element.get("minOccurs")

                if not ref:
                    continue

                key = (file_path.name, ref)
                if key in seen_global:
                    continue
                seen_global.add(key)

                # Handle atr: references
                if ref.startswith("atr:"):
                    entry = {
                        "filename": file_path.name,
                        "nazev": ref,
                        "minOccurs": min_occurs  # May be None
                    }
                    if ref.endswith("KI"):
                        atr_ki.append(entry)
                        if min_occurs == "0":
                            atr_ki_with_0 = True
                    else:
                        atr_normal.append(entry)

                # Handle gml: references
                elif ref.startswith("gml:"):
                    gml_refs.append(ref)
                    gml_min_flags.append(min_occurs)

            # Add atr: not ending with KI
            records.extend(atr_normal)

            # Add gml: references
            if gml_refs:
                min_occurs_final = "0" if "0" in gml_min_flags or atr_ki_with_0 else None
                records.append({
                    "filename": file_path.name,
                    "nazev": str(gml_refs),  # Format as list string e.g. ['gml:...']
                    "minOccurs": min_occurs_final
                })

            # Add atr: ending with KI
            records.extend(atr_ki)

    except etree.XMLSyntaxError as e:
        print(f"[XMLSyntaxError] {file_path.name}: {e}")
    except Exception as e:
        print(f"[Error] Failed to process {file_path.name}: {e}")
        raise

# Create DataFrame from extracted records
df_str2 = pd.DataFrame(records)

# Save the DataFrame to CSV
df_str2.to_csv(output_path.parent / "summary_1.5.0.csv", index=False)

# Display the resulting DataFrame
df_str2

Unnamed: 0,filename,nazev,minOccurs
0,BP_plynovodni_site-plocha.xsd,atr:OblastObjektuKI,0
1,BP_podzemniho_zasobniku_plynu-plocha.xsd,atr:TypPodzemnihoZasobnikuPlynu,
2,BP_podzemniho_zasobniku_plynu-plocha.xsd,atr:OblastObjektuKI,0
3,BP_zarizeni_PKO-plocha.xsd,atr:OblastObjektuKI,0
4,chodnik-defbod.xsd,atr:PrevazujiciPovrch,
...,...,...,...
552,zed-linie.xsd,atr:TypZdi,
553,zed-linie.xsd,atr:HraniceJinehoObjektu,
554,zed-plocha.xsd,atr:TypZdi,
555,zemedelska_plocha-defbod.xsd,atr:TypZemedelskePlochy,


## Detailed CSV

* target fields defined in JSON file - config_1.5.0.json or config_1.5.0.json
* can process both versions

Detailed CSV - testing new version on old data 1.4.3

In [31]:
def process_flag_value(data, element, matched_column, flag, is_unique=False):
    """
    Extracts the value of a specific attribute (flag) from an XML element and stores it in the data dictionary.
    
    Parameters:
    - data: dict where extracted values are stored.
    - element: XML element being processed.
    - matched_column: base name used to construct the output column name.
    - flag: the attribute name to extract from the element.
    - is_unique: if True, prevents duplicate values in the list for this column.
    """
    val = element.get(flag)
    if not val:
        return

    output_column_name = f"{matched_column}_{flag}"

    if output_column_name not in data:
        data[output_column_name] = [val]
    else:
        if not isinstance(data[output_column_name], list):
            data[output_column_name] = [data[output_column_name]]

        if is_unique and val in data[output_column_name]:
            return

        data[output_column_name].append(val)

def order_dataframe_columns_by_config(df, config, element_types):
    """
    Orders DataFrame columns based on the config and element_types definition.
    Columns from config (e.g. filename, namespace) and element attributes/flags
    will appear first, in the defined order. Any remaining columns are added at the end.
    """
    ordered_cols = []
    element_keys_order = list(config["element_types"].keys())

    # Add basic config-defined columns
    for key in ["filename", "namespace", "name", "type"]:
        if config.get(key):
            ordered_cols.append(key)

    # Add columns defined in element_types
    for col in element_keys_order:
        desc = element_types[col]
        base = col.split(":")[-1]

        # Add existence flag
        if desc.get("exist"):
            ordered_cols.append(base)

        # Add element flags like required, prohibited, etc.
        for flag, val in desc.items():
            if flag in {"exist", "match"}:
                continue
            if val is True or val == "unique":
                ordered_cols.append(f"{base}_{flag}")

        # Add attribute columns
        if "attributes" in desc:
            for attr, props in desc["attributes"].items():
                for prop in props:
                    ordered_cols.append(f"{attr}_{prop}")

    # Add any remaining columns from DataFrame
    remaining = [c for c in df.columns if c not in ordered_cols]
    return df.reindex(columns=ordered_cols + remaining)


# Load JSON config
with open("../tests/data/config_1.5.0.json", "r", encoding="utf-8") as f:
    # config = json.load(f)
    config = json.load(f)

# Path to the folder with extracted .xsd files
xsd_objects_path = output_path / "xsd" / "objekty"
records = []
element_types = config["element_types"]

for file_path in xsd_objects_path.glob("*.xsd"):
    try:
        tree = etree.parse(str(file_path))
        root = tree.getroot()
        namespace = root.attrib.get("targetNamespace", "")

        data = {}
        if config.get("filename") is True:
            data["filename"] = file_path.name
        if config.get("namespace") is True:
            data["namespace"] = namespace

        # Get top-level element
        if config.get("name") is True or config.get("type") is True:
            top_level_elems = root.xpath('./*[local-name()="element"]')
            main_elem = top_level_elems[0]
            if config.get("name") is True:
                data["name"] = main_elem.get("name", "")
            if config.get("type") is True:
                data["type"] = main_elem.get("type", "")
        
        # Initialize the value of exist to 0 when asking for existence
        for column_name, description in element_types.items():
            if description.get("exist") is True:
                cleaned_column_name = column_name.split(":")[-1]
                data[cleaned_column_name] = 0

        all_elements = root.xpath(".//*[local-name()='element']")
        for element in all_elements:
                etype = None
                match_key = None
                matched_column = None
                # Find "match" values from config file
                for column_name, description in element_types.items():
                    config_match = description.get("match")
                    # if value of "match" is not given, use "name"
                    if not config_match:
                        config_match = "name"
                        
                    # Try to find element match
                    match_element = element.get(config_match)
                    # if it is the right element save description, column_name and match value
                    if match_element == column_name:
                        etype = description
                        matched_column = column_name.split(":")[-1]
                        match_key = config_match
                        break
                        
                if not etype:
                    # skip if matching element type wasn't found
                    continue
                # Handle attributes
                if "attributes" in etype:
                    # Iterate over attributes and their properties
                    for attr_name, props in etype["attributes"].items():
                        for prop in props:
                            # find values using xpath
                            val = element.xpath(f".//*[local-name()='attribute' and @{match_key}='{attr_name}']/@{prop}")
                            # save to output data
                            if val:
                                data[f"{attr_name}_{prop}"] = val[0]
                
                # Handle existence of element
                if etype.get("exist") is True:
                    cleaned_matched_column = matched_column.split(":")[-1]
                    data[cleaned_matched_column] = 1
                
                # Special case for geometry in version 1.4.3 without explicit 'type' attribute
                if matched_column =="GeometrieObjektu" and not element.get("type"):
                    # Find all child <element> nodes with a 'ref' attribute
                    ref_values = element.xpath(".//*[local-name()='element' and @ref]/@ref")
                    if ref_values:
                        data["GeometrieObjektu"] = ref_values
                
                # Handle asking directly for element properties
                true_flags = [key for key, value in etype.items() if value is True]
                unique_flags = [key for key, value in etype.items() if value =="unique"]
                unique_cols = list(f"{matched_column}_{flag}" for flag in unique_flags)
                for flag in true_flags:
                    process_flag_value(data, element, matched_column, flag, is_unique=False)

                for flag in unique_flags:
                    process_flag_value(data, element, matched_column, flag, is_unique=True)

                for col in list(data):
                    if col in unique_cols:
                        if not isinstance(data[col], list):
                            data[col] = [data[col]]
                    else:
                        if isinstance(data[col], list) and len(data[col]) == 1 and col not in ("GeometrieObjektu_type","GeometrieObjektu"):
                            data[col] = data[col][0]

        records.append(data)

    except Exception as e:
        print(f"[Error] {file_path.name}: {e}")
        raise

# Output DataFrame
df_str1 = pd.DataFrame(records, dtype=object)
df_str1 = order_dataframe_columns_by_config(df_str1, config, element_types)
output_csv_path = output_path.parent / "detailed_1.5.0.csv"
output_csv_path.parent.mkdir(parents=True, exist_ok=True)
df_str1.to_csv(output_csv_path, index=False)
df_str1

Unnamed: 0,filename,namespace,name,type,ZaznamObjektu_type,GeometrieObjektu_type,OblastObjektuKI,code_base_fixed,code_base_use,code_suffix_fixed,code_suffix_use,dim_fixed,dim_use,gia_fixed,gia_use,KategorieObjektu_fixed,SkupinaObjektu_fixed,ObsahovaCast_fixed
0,autobusova_zastavka-bod.xsd,autzas,AutobusovaZastavka,AutobusovaZastavkaType,"[RefV, RefV, Ins, Upd, Del]",[cmn:GeometrieObjektuBodType],0,0100000374,required,01,required,3,prohibited,1,prohibited,Dopravní stavby,Silniční doprava,DI
1,BP_plynovodni_site-plocha.xsd,bpplsi,BPPlynovodniSite,BPPlynovodniSiteType,"[RefN, Ins, Upd, Del]",[cmn:GeometrieObjektuPlochaDTIType],1,0100000290,required,03,required,2,prohibited,0,prohibited,Ochranná a bezpečnostní pásma,Ochranné a bezpečnostní pásmo,TI
2,BP_podzemniho_zasobniku_plynu-plocha.xsd,bpppol,BPPodzemnihoZasobnikuPlynu,BPPodzemnihoZasobnikuPlynuType,"[RefN, Ins, Upd, Del]",[cmn:GeometrieObjektuPlochaDTIType],1,0100000369,required,03,required,2,prohibited,0,prohibited,Ochranná a bezpečnostní pásma,Ochranné a bezpečnostní pásmo,TI
3,BP_zarizeni_PKO-plocha.xsd,bpzpko,BPZarizeniPKO,BPZarizeniPKOType,"[RefN, Ins, Upd, Del]",[cmn:GeometrieObjektuPlochaDTIType],1,0100000291,required,03,required,2,prohibited,0,prohibited,Ochranná a bezpečnostní pásma,Ochranné a bezpečnostní pásmo,TI
4,budova-defbod.xsd,buddef,BudovaDefinicniBod,BudovaDefinicniBodType,"[RefV, Ins, Upd, Del]",[cmn:GeometrieObjektuBodType],0,0100000001,required,04,required,3,prohibited,0,prohibited,Budovy,Objekt budovy,ZPS
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
425,zed-plocha.xsd,zedpol,ZedPlocha,ZedPlochaType,"[RefV, Ins, Upd, Del]",[cmn:GeometrieObjektuPlochaZPSType],0,0100000168,required,03,required,3,prohibited,0,prohibited,Součásti a příslušenství staveb,Stavba společná pro více skupin,ZPS
426,zeleznicni_navestidlo-bod.xsd,zelnav,ZeleznicniNavestidlo,ZeleznicniNavestidloType,"[RefV, RefV, Ins, Upd, Del]",[cmn:GeometrieObjektuBodType],0,0100000376,required,01,required,3,prohibited,1,prohibited,Dopravní stavby,Drážní doprava,DI
427,zeleznicni_prejezd-plocha.xsd,zprpol,ZeleznicniPrejezd,ZeleznicniPrejezdType,"[RefV, RefV, Ins, Upd, Del]",[cmn:GeometrieObjektuPlochaDTIType],0,0100000022,required,03,required,3,prohibited,1,prohibited,Dopravní stavby,Drážní doprava,DI
428,zemedelska_plocha-defbod.xsd,zepdef,ZemedelskaPlochaDefinicniBod,ZemedelskaPlochaDefinicniBodType,"[RefV, Ins, Upd, Del]",[cmn:GeometrieObjektuBodType],0,0100000207,required,04,required,3,prohibited,0,prohibited,"Vodstvo, vegetace a terén",Hospodářská plocha,ZPS
