# Load all libraries and global variables.

In [28]:
import xml.etree.ElementTree as ET
import pandas as pd
import numpy as np
from datetime import datetime
import os

# Read the XML file
xml_file_path = r"..\data\originalData\emissiezones.xml"

output_folder_path = r"..\data\processedData"


# Parse the XML file
tree = ET.parse(xml_file_path)
root = tree.getroot()
print(root)

 # Define all namespaces used in the XML
namespaces = {
    'mc': 'http://datex2.eu/schema/3/messageContainer',
    'cz': 'http://datex2.eu/schema/3/controlledZone',
    'com': 'http://datex2.eu/schema/3/common',
    'tro': 'http://datex2.eu/schema/3/trafficRegulation',
    'loc': 'http://datex2.eu/schema/3/locationReferencing',
    'xsi': 'http://www.w3.org/2001/XMLSchema-instance'
}

# Helper functions to get text from elements
def get_text(elem):
            return elem.text if elem is not None else None
def get_all_text(elem):
            return ''.join(elem.itertext()).strip() if elem is not None else None  

def find_restriction_conditions(zone):
        restrictions = []
        
        # Get the main access restriction type
        access_restriction = zone.find('.//tro:accessRestrictionType', namespaces)
        if access_restriction is not None:
            restrictions.append(f"Access restriction: {access_restriction.text}")
        
        # Get vehicle conditions that are being restricted (not negated)
        restriction_conditions = zone.findall('.//tro:conditions[@xsi:type="tro:VehicleCondition"]', namespaces)
        for condition in restriction_conditions:
            condition_id = condition.get('id', '')
            if 'restriction' in condition_id.lower():
                # Extract vehicle characteristics from this restriction
                fuel_type = condition.find('.//com:fuelType', namespaces)
                vehicle_type = condition.find('.//com:vehicleType', namespaces)
                euro_class = condition.find('.//com:emissionClassificationEuro', namespaces)
                
                restriction_text = "Restricted: "
                if fuel_type is not None:
                    restriction_text += f"fuel={fuel_type.text} "
                if vehicle_type is not None:
                    restriction_text += f"type={vehicle_type.text} "
                if euro_class is not None:
                    restriction_text += f"euro={euro_class.text}"
                
                if restriction_text != "Restricted: ":
                    restrictions.append(restriction_text.strip())
        return restrictions

def find_exceptions(zone):
        exceptions = []
        
        # Find condition sets with negate=true (these are exceptions)
        negated_conditions = zone.findall('.//tro:conditions[@xsi:type="tro:ConditionSet"]', namespaces)
        for condition_set in negated_conditions:
            negate_elem = condition_set.find('tro:negate', namespaces)
            if negate_elem is not None and negate_elem.text == 'true':
                condition_id = condition_set.get('id', '')
                

                # Check the conditions if it is an exemption/exception
                if 'exemption' in condition_id.lower() or 'exception' in condition_id.lower():
                    for key in ['euro','camper','oldtimer','wheelchair','special','zero','emission',]:
                        if key in condition_id.lower():
                            exceptions.append(key)
                
                # Also extract specific fuel type exemptions
                fuel_exemptions = condition_set.findall('.//com:fuelType', namespaces)
                for fuel in fuel_exemptions:
                    if fuel.text in ['battery', 'hydrogen', 'electric']:
                        exceptions.append(f"Fuel exemption: {fuel.text}")
        return exceptions


def xml_to_dataframe(root):
    all_records = []
    zones = root.findall('.//cz:urbanVehicleAccessRegulation', namespaces)
    
    for zone in zones:
        
        zone_data = {
            'zone_id': zone.get('id'),
            'version': zone.get('version'),
            'zone_name': get_all_text(zone.find('.//cz:name', namespaces)),
            'zone_type': get_text(zone.find('cz:controlledZoneType', namespaces)),
            'record_version_time': get_text(zone.find('cz:controlledZoneRecordVersionTime', namespaces)),
            'url_info': get_text(zone.find('cz:urlForFurtherInformation', namespaces)),
            'status': get_text(zone.find('cz:status', namespaces)),
            'start_date': get_text(zone.find('.//com:overallStartTime', namespaces)),
            'end_date': get_text(zone.find('.//com:overallEndTime', namespaces)),
            'coordinate_sets': [get_text(coord) for coord in zone.findall('.//loc:posList', namespaces)],
            'restrictions': find_restriction_conditions(zone), # custom function
            'exceptions': find_exceptions(zone),  # custom function

        }

        all_records.append(zone_data)
    
    return pd.DataFrame(all_records)

df = xml_to_dataframe(root)


<Element '{http://datex2.eu/schema/3/messageContainer}messageContainer' at 0x000001B344085080>


# Find metadata about the XML


In [29]:
# Quick dataset overview
print(f"Dataset: {len(df)} zones, {len(df.columns)} columns")

print("\nColumns and types:")
print(df.dtypes)

# Show unique values for key columns
key_cols = ['zone_type', 'status', 'has_location_data', 'issuing_authority']
for col in key_cols:
    if col in df.columns:
        unique_vals = df[col]
        print(f"{col}: {unique_vals}")

# Check which columns have duplicates and show count for each column
duplicate_counts = {}
for col in df.columns:
    # Count duplicates for each column (excluding NaN)
    dup_count = df.duplicated(subset=[col], keep=False).sum()
    duplicate_counts[col] = dup_count

print("\nDuplicate counts per column:")
for col, count in duplicate_counts.items():
    print(f"{col}: {count}")

# Show what collumns can be used as identifiers (no duplicates)
identifier_cols = [col for col, count in duplicate_counts.items() if count == 0]
print(f"\nPotential identifier columns (no duplicates): {identifier_cols}")



# Missing data summary
missing_data = df.isnull().sum()
if missing_data.sum() > 0:
    print(f"\nMissing data:\n{missing_data[missing_data > 0]}")
else:
    print("\nNo missing data found")

print(df.info()) 

df.head()

Dataset: 42 zones, 12 columns

Columns and types:
zone_id                object
version                object
zone_name              object
zone_type              object
record_version_time    object
url_info               object
status                 object
start_date             object
end_date               object
coordinate_sets        object
restrictions           object
exceptions             object
dtype: object
zone_type: 0     lowEmissionZone
1     lowEmissionZone
2     lowEmissionZone
3     lowEmissionZone
4     lowEmissionZone
5     lowEmissionZone
6     lowEmissionZone
7     lowEmissionZone
8     lowEmissionZone
9     lowEmissionZone
10    lowEmissionZone
11    lowEmissionZone
12    lowEmissionZone
13    lowEmissionZone
14    lowEmissionZone
15    lowEmissionZone
16    lowEmissionZone
17    lowEmissionZone
18    lowEmissionZone
19    lowEmissionZone
20    lowEmissionZone
21    lowEmissionZone
22    lowEmissionZone
23    lowEmissionZone
24    lowEmissionZone
25    lowEmissi

Unnamed: 0,zone_id,version,zone_name,zone_type,record_version_time,url_info,status,start_date,end_date,coordinate_sets,restrictions,exceptions
0,NDW11_d4b4b627-a1ad-492a-90b3-d35a1396be7f,3,LEZ 's-Hertogenbosch,lowEmissionZone,2025-07-23T11:00:33.245126Z,https://www.s-hertogenbosch.nl/ondernemer/mili...,active,2020-01-01T00:00:00Z,2025-03-01T00:00:00Z,[51.691127953 5.294504572 51.6886171294 5.2935...,"[Access restriction: noEntry, Restricted: fuel...",[]
1,NDW11_cdfc54b4-103d-4f74-8278-3f21b43084ef,2,ZE Dordrecht,lowEmissionZone,2024-10-07T13:53:38.167822Z,https://cms.dordrecht.nl/Inwoners/Overzicht_In...,active,2026-01-01T00:00:00Z,2026-12-31T23:00:00Z,[51.811666 4.653311 51.811746 4.654824 51.8123...,[Access restriction: noEntry],"[Fuel exemption: battery, Fuel exemption: hydr..."
2,NDW11_adb2d781-4cf1-419f-a254-d4f656b89d85,3,LEZ Delft,lowEmissionZone,2025-07-23T11:00:33.245126Z,https://www.delft.nl/milieuzone,active,2020-01-01T00:00:00Z,2999-01-01T00:00:00Z,[52.013883 4.365207 52.013771 4.365007 52.0137...,"[Access restriction: noEntry, Restricted: fuel...",[]
3,NDW11_fab66db4-319d-471c-9a40-d37436243648,3,ZE zone Alphen aan den Rijn,lowEmissionZone,2025-07-23T11:00:33.245126Z,https://www.alphenaandenrijn.nl/Ondernemen/Zer...,active,2026-07-01T00:00:00Z,2999-01-01T00:00:00Z,[52.15498907983988 4.661329273134823 52.154647...,[Access restriction: noEntry],"[Fuel exemption: battery, Fuel exemption: hydr..."
4,NDW11_ea9ea2b6-4daf-4264-ab5b-b15f45da3a20,3,LEZ Haarlem,lowEmissionZone,2025-07-23T11:00:33.245126Z,https://haarlem.nl/ontheffing-milieuzone-voor-...,active,2020-01-01T00:00:00Z,2999-01-01T00:00:00Z,[52.387318 4.650521 52.387734 4.64918 52.38836...,"[Access restriction: noEntry, Restricted: fuel...",[]


# Display and proces the data in snippits

In [30]:
# Print the location lists for all zones
print(df['coordinate_sets'][0])

# Remove None, empty strings, or "\n" from each location list
def clean_location_list(location_list):
    if location_list is None:
        return []
    
    newList = []
    for location in location_list:
        if location and location.strip() != "":
            newList.append(location)
    return newList

df['coordinate_sets'] = df['coordinate_sets'].apply(clean_location_list)



['51.691127953 5.294504572 51.6886171294 5.2935798162 51.6874442351 5.2928948555 51.6867373725 5.2923222568 51.6862067599 5.2920239752 51.6853480644 5.2916141191 51.6844498378 5.2912230751 51.6835539914 5.2908994003 51.6828863462 5.2908633946 51.6828684221 5.2917016657 51.6829129214 5.2920720422 51.6830644108 5.2924272273 51.6832343612 5.2925990112 51.6834740298 5.2926883238 51.6839626124 5.2927920889 51.6841390984 5.2927948063 51.6841393075 5.2927954421 51.6843605479 5.2927688728 51.6844053047 5.293068968 51.6844243194 5.2932072507 51.6842605485 5.2935381882 51.6842604441 5.2935381002 51.6840966479 5.2938676964 51.6839114242 5.2940493411 51.6837878367 5.2940378994 51.6835006428 5.2939683233 51.6833080293 5.2939979537 51.6831300929 5.2942029671 51.6829635553 5.2943840102 51.6829633621 5.2943839687 51.6829259725 5.2944043611 51.6828701469 5.2944295136 51.6827346659 5.2944250013 51.6825321512 5.2943703312 51.6825071431 5.2947511775 51.6824063807 5.2953525091 51.6824258219 5.2954650319 51

# Make CSV Files from processed data

We are choosing zone_names as our unique identifier as it is unique for each emission zone and is the most userfriendly for humans. 

In [31]:
id_string = 'zone_name'
# make csv for identifiers and if it is active
identifier_df = df[[id_string, 'status']]
identifier_df.to_csv(os.path.join(output_folder_path, "NDW_identifiers.csv", ), index=False)

# make csv for locations, end and start date, based on identifiers
location_df = df[[id_string, 'coordinate_sets', 'start_date', 'end_date']]
location_df.to_csv(os.path.join(output_folder_path, "NDW_locations.csv"), index=False)


# make csv for restrictions and exceptions, based on identifiers
restrictions_df = df[[id_string, 'restrictions', 'exceptions']]
restrictions_df.to_csv(os.path.join(output_folder_path, "NDW_restrictions_exceptions.csv"), index=False)

# make csv for all data points
df.to_csv(os.path.join(output_folder_path, "NDW_all_data.csv"), index=False)