In [7]:
# AuDroK XML → WAV + TXT extractor (multi-folder, split outputs)
# --------------------------------------------------------------

import re, base64, struct, hashlib
import xml.etree.ElementTree as ET
from pathlib import Path

# ---------------- CONFIG ----------------
INPUT_FOLDERS = [
    "/home/destrox-907/Husnian's FYP/Dataset/data_german/Measurements_1-19",
    "/home/destrox-907/Husnian's FYP/Dataset/data_german/Measurements_20-39",
    "/home/destrox-907/Husnian's FYP/Dataset/data_german/Measurements_40-59",
    "/home/destrox-907/Husnian's FYP/Dataset/data_german/Measurements_60-79",
]
OUTPUT_DIR = Path("/home/destrox-907/Husnian's FYP/Dataset/AuDro_Dataset")
WAV_DIR = OUTPUT_DIR / "wav"
TXT_DIR = OUTPUT_DIR / "txt"
# ----------------------------------------

WAV_DIR.mkdir(parents=True, exist_ok=True)
TXT_DIR.mkdir(parents=True, exist_ok=True)

NS = {
    "ns3": "http://ws.bast.de/container/TrafficDataService",
    "ns1": "http://schemas.xmlsoap.org/ws/2002/07/utility",
}

def find_first_binary_b64(xml_root):
    el = xml_root.find(".//ns3:body/ns3:binary", NS)
    if el is None or not (el.text or "").strip():
        return None, {}
    return el.text.strip(), el.attrib

def get_header_info(xml_root):
    info = {}
    ident   = xml_root.find(".//ns3:header/ns3:Identifier", NS)
    created = xml_root.find(".//ns1:TimeStamp/ns1:Created", NS)
    expires = xml_root.find(".//ns1:TimeStamp/ns1:Expires", NS)
    if ident is not None and ident.text:   info["xml_identifier"] = ident.text.strip()
    if created is not None and created.text: info["xml_created"]  = created.text.strip()
    if expires is not None and expires.text: info["xml_expires"]  = expires.text.strip()
    return info

def parse_wav_info(b):
    if b[:4]!=b"RIFF" or b[8:12]!=b"WAVE":
        raise ValueError("Not a valid WAVE header")
    u16=lambda off: struct.unpack_from("<H",b,off)[0]
    u32=lambda off: struct.unpack_from("<I",b,off)[0]
    i=12; fmt={}; data=None
    while i+8<=len(b):
        cid=b[i:i+4]; sz=u32(i+4); i+=8
        if cid==b"fmt ":
            fmt={"audio_format":u16(i+0),"channels":u16(i+2),
                 "sample_rate":u32(i+4),"bits_per_sample":u16(i+14),
                 "block_align":u16(i+12)}
        elif cid==b"data":
            data=sz
        i+=sz+(sz%2)
        if fmt and data is not None: break
    if not fmt or data is None: raise ValueError("Missing fmt/data chunk")
    frames=data//(fmt["block_align"] or 1)
    dur=frames/float(fmt["sample_rate"] or 1)
    return {"format_tag":fmt["audio_format"],"channels":fmt["channels"],
            "sample_rate":fmt["sample_rate"],"bits_per_sample":fmt["bits_per_sample"],
            "duration_sec":round(dur,6),"data_bytes":data}

def human_format_tag(tag): return {1:"PCM",3:"IEEE_FLOAT"}.get(tag,f"UNKNOWN({tag})")

def md5(b): return hashlib.md5(b).hexdigest()

def sanitize(s): return re.sub(r"[^\w\-.]+","_",s).strip("_") or "file"

def process_one(xml_path: Path):
    try:
        root=ET.parse(xml_path).getroot()
    except Exception as e:
        print("Parse fail:",xml_path,e); return
    b64,attrs=find_first_binary_b64(root)
    if not b64: 
        print("No binary in",xml_path); return
    wav_bytes=base64.b64decode(b64)
    header=get_header_info(root)

    parent = xml_path.parent.name  # folder name (e.g., Measurements_1-19)
    base = f"{parent}_{sanitize(xml_path.stem)}"
    if "id" in attrs:
        base = f"{base}_{attrs['id']}"

    wav_path = WAV_DIR/f"{base}.wav"
    txt_path = TXT_DIR/f"{base}.txt"

    wav_path.write_bytes(wav_bytes)
    try: info=parse_wav_info(wav_bytes)
    except Exception as e: info={"error":str(e)}

    lines=[f"Source XML: {xml_path}",
           f"Output WAV: {wav_path.name}",
           f"MD5(wav): {md5(wav_bytes)}",
           f"XML binary attrs: {attrs}"]
    if header:
        lines.append("XML header:")
        for k,v in header.items(): lines.append(f"  - {k}: {v}")
    if "error" not in info:
        lines.append("WAV info:")
        lines.append(f"  - format: {human_format_tag(info['format_tag'])} (tag={info['format_tag']})")
        lines.append(f"  - channels: {info['channels']}")
        lines.append(f"  - sample_rate_hz: {info['sample_rate']}")
        lines.append(f"  - bits_per_sample: {info['bits_per_sample']}")
        lines.append(f"  - data_bytes: {info['data_bytes']}")
        lines.append(f"  - duration_sec: {info['duration_sec']}")
    else:
        lines.append(f"WAV parse note: {info['error']}")
    txt_path.write_text("\n".join(lines),encoding="utf-8")

# -------- Main loop: process all 4 folders ----------
all_xml = []
for folder in INPUT_FOLDERS:
    all_xml.extend(Path(folder).rglob("*.xml"))

print(f"Found {len(all_xml)} XML files in all 4 folders")
for xp in sorted(all_xml):
    process_one(xp)

print(f"✅ Done. WAVs in {WAV_DIR}, TXTs in {TXT_DIR}")


Found 3276 XML files in all 4 folders
✅ Done. WAVs in /home/destrox-907/Husnian's FYP/Dataset/AuDro_Dataset/wav, TXTs in /home/destrox-907/Husnian's FYP/Dataset/AuDro_Dataset/txt
