# TransXChange XML â†’ CSV with PySpark

This notebook parses all XML files in the Falcon Buses folder and produces clean CSV tables for:
- `stops`
- `lines`
- `routes`
- `route_links`
- `journey_patterns`
- `timing_links`
- `operators`
- `services`
- `service_lines`
- `service_journey_patterns`
- `operating_profile`
- `serviced_organisations`
- `serviced_org_working_days`
- `vehicle_journeys`

The outputs are saved under `parsed_data/<table_name>/` as CSV files.

In [1]:
# Setup Spark session (similar to start_check.ipynb)
import os
import sys
import glob
import re
import xml.etree.ElementTree as ET

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

spark = SparkSession.builder.appName("transxchange_parse").getOrCreate()

26/02/06 15:19:05 WARN Utils: Your hostname, Biplovs-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.76 instead (on interface en0)
26/02/06 15:19:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/02/06 15:19:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/02/06 15:19:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
# Configuration
# ROOT_FOLDER = "/Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/timetable_london/Falcon Buses_237/21332_106265_2026-01-02_16-02-13_current"
ROOT_FOLDER = os.path.join(os.getcwd(), "../timetable_london/Falcon Buses_237/21332_106265_2026-01-02_16-02-13_current")
OUTPUT_BASE = os.path.join(os.getcwd(), "../timetable_parsed_data")


xml_files = sorted(glob.glob(os.path.join(ROOT_FOLDER, "*.xml")))
print(f"Found {len(xml_files)} XML files")
xml_files[:5]

Found 103 XML files


['/Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_london/Falcon Buses_237/21332_106265_2026-01-02_16-02-13_current/FALC_12_FALCPK11099692512_20250831_-_2139939.xml',
 '/Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_london/Falcon Buses_237/21332_106265_2026-01-02_16-02-13_current/FALC_12_FALCPK11099692512_20250831_-_2139940.xml',
 '/Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_london/Falcon Buses_237/21332_106265_2026-01-02_16-02-13_current/FALC_12_FALCPK11099692512_20251229_20251231_2251476.xml',
 '/Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_london/Falcon Buses_237/21332_106265_2026-01-02_16-02-13_current/FALC_28_FALCPK1109969828_20250804_-_2142541.xml',
 '/Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_london/Falcon Buses_237/21332_106265_2026-01-

In [3]:
def _get_ns(root):
    match = re.match(r"\{.*\}", root.tag)
    return match.group(0) if match else ""

def _text(elem):
    if elem is None or elem.text is None:
        return None
    return elem.text.strip()

def _findtext(elem, path, ns):
    return _text(elem.find(path, ns))

def _strip_ns(tag):
    return tag.split("}", 1)[1] if "}" in tag else tag

def _children_tag_names(elem):
    if elem is None:
        return None
    names = [_strip_ns(child.tag) for child in list(elem)]
    return ",".join(names) if names else None

def _date_ranges(elem, ns):
    if elem is None:
        return None
    ranges = []
    for date_range in elem.findall("txc:DateRange", ns):
        start = _findtext(date_range, "txc:StartDate", ns)
        end = _findtext(date_range, "txc:EndDate", ns)
        if start or end:
            ranges.append(f"{start}:{end}")
    return ",".join(ranges) if ranges else None

def parse_transxchange_file(file_path):
    tree = ET.parse(file_path)
    root = tree.getroot()

    ns_url = _get_ns(root).strip("{}")
    ns = {"txc": ns_url} if ns_url else {}
    file_name = os.path.basename(file_path)

    tables = {
        "stops": [],
        "lines": [],
        "routes": [],
        "route_links": [],
        "journey_patterns": [],
        "timing_links": [],
        "operators": [],
        "services": [],
        "service_lines": [],
        "service_journey_patterns": [],
        "operating_profile": [],
        "serviced_organisations": [],
        "serviced_org_working_days": [],
        "vehicle_journeys": [],
    }

    # Stops
    for stop in root.findall(".//txc:AnnotatedStopPointRef", ns):
        tables["stops"].append({
            "file_name": file_name,
            "stop_id": _findtext(stop, "txc:StopPointRef", ns),
            "common_name": _findtext(stop, "txc:CommonName", ns),
            "latitude": _findtext(stop, ".//txc:Latitude", ns),
            "longitude": _findtext(stop, ".//txc:Longitude", ns),
        })

    # Lines
    for line in root.findall(".//txc:Line", ns):
        tables["lines"].append({
            "file_name": file_name,
            "line_id": line.get("id"),
            "line_name": _findtext(line, "txc:LineName", ns),
            "line_ref": _findtext(line, "txc:LineRef", ns),
            "inbound_description": _findtext(line, "txc:InboundDescription", ns),
            "outbound_description": _findtext(line, "txc:OutboundDescription", ns),
        })

    # Routes
    for route in root.findall(".//txc:Route", ns):
        tables["routes"].append({
            "file_name": file_name,
            "route_id": route.get("id"),
            "route_section_ref": _findtext(route, "txc:RouteSectionRef", ns),
            "description": _findtext(route, "txc:Description", ns),
        })

    # Route links (RouteSections)
    for section in root.findall(".//txc:RouteSection", ns):
        section_id = section.get("id")
        for link in section.findall("txc:RouteLink", ns):
            tables["route_links"].append({
                "file_name": file_name,
                "route_section_id": section_id,
                "route_link_id": link.get("id"),
                "from_stop": _findtext(link, "txc:From/txc:StopPointRef", ns),
                "to_stop": _findtext(link, "txc:To/txc:StopPointRef", ns),
                "distance": _findtext(link, "txc:Distance", ns),
            })

    # Journey patterns (Service > StandardService > JourneyPattern)
    for jp in root.findall(".//txc:JourneyPattern", ns):
        tables["journey_patterns"].append({
            "file_name": file_name,
            "journey_pattern_id": jp.get("id"),
            "destination_display": _findtext(jp, "txc:DestinationDisplay", ns),
            "direction": _findtext(jp, "txc:Direction", ns),
            "route_ref": _findtext(jp, "txc:RouteRef", ns),
            "section_refs": _findtext(jp, "txc:JourneyPatternSectionRefs", ns),
            "operator_ref": _findtext(jp, "txc:OperatorRef", ns),
        })

    # Timing links (JourneyPatternTimingLink)
    for jp_section in root.findall(".//txc:JourneyPatternSection", ns):
        jp_section_id = jp_section.get("id")
        for timing_link in jp_section.findall("txc:JourneyPatternTimingLink", ns):
            from_elem = timing_link.find("txc:From", ns)
            to_elem = timing_link.find("txc:To", ns)
            tables["timing_links"].append({
                "file_name": file_name,
                "journey_pattern_section_id": jp_section_id,
                "timing_link_id": timing_link.get("id"),
                "route_link_ref": _findtext(timing_link, "txc:RouteLinkRef", ns),
                "from_stop": _findtext(timing_link, "txc:From/txc:StopPointRef", ns),
                "to_stop": _findtext(timing_link, "txc:To/txc:StopPointRef", ns),
                "from_sequence": from_elem.get("SequenceNumber") if from_elem is not None else None,
                "to_sequence": to_elem.get("SequenceNumber") if to_elem is not None else None,
                "from_activity": _findtext(from_elem, "txc:Activity", ns) if from_elem is not None else None,
                "to_activity": _findtext(to_elem, "txc:Activity", ns) if to_elem is not None else None,
                "from_timing_status": _findtext(from_elem, "txc:TimingStatus", ns) if from_elem is not None else None,
                "to_timing_status": _findtext(to_elem, "txc:TimingStatus", ns) if to_elem is not None else None,
                "from_fare_stage": _findtext(from_elem, "txc:FareStageNumber", ns) if from_elem is not None else None,
                "to_fare_stage": _findtext(to_elem, "txc:FareStageNumber", ns) if to_elem is not None else None,
                "run_time": _findtext(timing_link, "txc:RunTime", ns),
            })

    # Operators
    for operator in root.findall(".//txc:Operator", ns):
        tables["operators"].append({
            "file_name": file_name,
            "operator_id": operator.get("id"),
            "operator_code": _findtext(operator, "txc:OperatorCode", ns),
            "operator_short_name": _findtext(operator, "txc:OperatorShortName", ns),
            "operator_name": _findtext(operator, "txc:OperatorName", ns),
            "licence_number": _findtext(operator, "txc:LicenceNumber", ns),
            "national_operator_code": _findtext(operator, "txc:NationalOperatorCode", ns),
        })

    # Services + Operating Profile
    for service in root.findall(".//txc:Service", ns):
        service_code = _findtext(service, "txc:ServiceCode", ns)
        tables["services"].append({
            "file_name": file_name,
            "service_id": service.get("id"),
            "service_code": service_code,
            "service_description": _findtext(service, "txc:Description", ns),
            "line_ref": _findtext(service, ".//txc:LineRef", ns),
            "start_date": _findtext(service, ".//txc:OperatingPeriod/txc:StartDate", ns),
            "end_date": _findtext(service, ".//txc:OperatingPeriod/txc:EndDate", ns),
            "registered_operator_ref": _findtext(service, "txc:RegisteredOperatorRef", ns),
            "public_use": _findtext(service, "txc:PublicUse", ns),
        })

        for service_line in service.findall("txc:Lines/txc:Line", ns):
            tables["service_lines"].append({
                "file_name": file_name,
                "service_code": service_code,
                "line_name": _findtext(service_line, "txc:LineName", ns),
                "inbound_description": _findtext(service_line, "txc:InboundDescription", ns),
                "outbound_description": _findtext(service_line, "txc:OutboundDescription", ns),
            })

        standard_service = service.find("txc:StandardService", ns)
        if standard_service is not None:
            origin = _findtext(standard_service, "txc:Origin", ns)
            destination = _findtext(standard_service, "txc:Destination", ns)
            for idx, jp in enumerate(standard_service.findall("txc:JourneyPattern", ns), start=1):
                tables["service_journey_patterns"].append({
                    "file_name": file_name,
                    "service_code": service_code,
                    "journey_pattern_index": str(idx),
                    "origin": origin,
                    "destination": destination,
                    "destination_display": _findtext(jp, "txc:DestinationDisplay", ns),
                    "direction": _findtext(jp, "txc:Direction", ns),
                    "route_ref": _findtext(jp, "txc:RouteRef", ns),
                    "section_refs": _findtext(jp, "txc:JourneyPatternSectionRefs", ns),
                    "operator_ref": _findtext(jp, "txc:OperatorRef", ns),
                })

        operating_profile = service.find("txc:OperatingProfile", ns)
        if operating_profile is not None:
            regular_days = _children_tag_names(operating_profile.find("txc:RegularDayType/txc:DaysOfWeek", ns))
            serviced_org_ref = _findtext(
                operating_profile,
                "txc:ServicedOrganisationDayType/txc:DaysOfNonOperation/txc:WorkingDays/txc:ServicedOrganisationRef",
                ns,
            )
            special_non_op = _date_ranges(
                operating_profile.find("txc:SpecialDaysOperation/txc:DaysOfNonOperation", ns),
                ns,
            )
            bank_op = _children_tag_names(operating_profile.find("txc:BankHolidayOperation/txc:DaysOfOperation", ns))
            bank_non_op = _children_tag_names(operating_profile.find("txc:BankHolidayOperation/txc:DaysOfNonOperation", ns))
            tables["operating_profile"].append({
                "file_name": file_name,
                "service_code": service_code,
                "regular_days": regular_days,
                "serviced_org_ref": serviced_org_ref,
                "special_days_non_operation": special_non_op,
                "bank_holiday_operation": bank_op,
                "bank_holiday_non_operation": bank_non_op,
            })

    # Serviced organisations + working days
    for org in root.findall(".//txc:ServicedOrganisation", ns):
        org_code = _findtext(org, "txc:OrganisationCode", ns)
        tables["serviced_organisations"].append({
            "file_name": file_name,
            "organisation_code": org_code,
            "name": _findtext(org, "txc:Name", ns),
        })

        for date_range in org.findall("txc:WorkingDays/txc:DateRange", ns):
            tables["serviced_org_working_days"].append({
                "file_name": file_name,
                "organisation_code": org_code,
                "start_date": _findtext(date_range, "txc:StartDate", ns),
                "end_date": _findtext(date_range, "txc:EndDate", ns),
            })

    # Vehicle journeys
    for journey in root.findall(".//txc:VehicleJourney", ns):
        journey_code = _findtext(journey, "txc:VehicleJourneyCode", ns)
        tables["vehicle_journeys"].append({
            "file_name": file_name,
            "vehicle_journey_code": journey_code,
            "service_ref": _findtext(journey, "txc:ServiceRef", ns),
            "line_ref": _findtext(journey, "txc:LineRef", ns),
            "journey_pattern_ref": _findtext(journey, "txc:JourneyPatternRef", ns),
            "departure_time": _findtext(journey, "txc:DepartureTime", ns),
            "operator_ref": _findtext(journey, "txc:OperatorRef", ns),
            "journey_ticket_machine_code": _findtext(journey, "txc:Operational/txc:TicketMachine/txc:JourneyCode", ns),
        })

    return tables

In [4]:
# Parse all files and collect rows for each table
all_tables = {
    "stops": [],
    "lines": [],
    "routes": [],
    "route_links": [],
    "journey_patterns": [],
    "timing_links": [],
    "operators": [],
    "services": [],
    "service_lines": [],
    "service_journey_patterns": [],
    "operating_profile": [],
    "serviced_organisations": [],
    "serviced_org_working_days": [],
    "vehicle_journeys": [],
}

for i, file_path in enumerate(xml_files, start=1):
    file_tables = parse_transxchange_file(file_path)
    for table_name, rows in file_tables.items():
        all_tables[table_name].extend(rows)
    if i % 10 == 0:
        print(f"Parsed {i} / {len(xml_files)} files")

{name: len(rows) for name, rows in all_tables.items()}

Parsed 10 / 103 files
Parsed 20 / 103 files
Parsed 30 / 103 files
Parsed 40 / 103 files
Parsed 50 / 103 files
Parsed 60 / 103 files
Parsed 70 / 103 files
Parsed 80 / 103 files
Parsed 90 / 103 files
Parsed 100 / 103 files


{'stops': 10588,
 'lines': 103,
 'routes': 602,
 'route_links': 28031,
 'journey_patterns': 602,
 'timing_links': 28031,
 'operators': 103,
 'services': 103,
 'service_lines': 103,
 'service_journey_patterns': 602,
 'operating_profile': 103,
 'serviced_organisations': 29,
 'serviced_org_working_days': 174,
 'vehicle_journeys': 1718}

In [5]:
# Write each table to CSV (one folder per table)
def write_table(table_name, rows):
    if not rows:
        print(f"Skipping {table_name}: no rows")
        return

    # Build a stable schema (all columns as strings) to avoid inference errors
    field_names = sorted({key for row in rows for key in row.keys()})
    schema = StructType([StructField(name, StringType(), True) for name in field_names])
    normalized_rows = [
        {name: (None if row.get(name) is None else str(row.get(name))) for name in field_names}
        for row in rows
    ]

    df = spark.createDataFrame(normalized_rows, schema=schema)
    output_path = os.path.join(OUTPUT_BASE, table_name)
    (
        df.coalesce(1)
        .write.mode("overwrite")
        .option("header", True)
        .csv(output_path)
    )
    print(f"Saved {table_name} -> {output_path}")

for table_name, rows in all_tables.items():
    write_table(table_name, rows)

                                                                                

Saved stops -> /Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_parsed_data/stops
Saved lines -> /Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_parsed_data/lines
Saved routes -> /Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_parsed_data/routes


26/02/06 15:19:13 WARN TaskSetManager: Stage 3 contains a task of very large size (1536 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Saved route_links -> /Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_parsed_data/route_links
Saved journey_patterns -> /Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_parsed_data/journey_patterns


26/02/06 15:19:15 WARN TaskSetManager: Stage 5 contains a task of very large size (3214 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Saved timing_links -> /Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_parsed_data/timing_links
Saved operators -> /Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_parsed_data/operators
Saved services -> /Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_parsed_data/services
Saved service_lines -> /Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_parsed_data/service_lines


26/02/06 15:19:18 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
                                                                                

Saved service_journey_patterns -> /Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_parsed_data/service_journey_patterns
Saved operating_profile -> /Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_parsed_data/operating_profile
Saved serviced_organisations -> /Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_parsed_data/serviced_organisations
Saved serviced_org_working_days -> /Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_parsed_data/serviced_org_working_days
Saved vehicle_journeys -> /Users/biplovgautam/Desktop/college/4 semester/BigDataCourseWork/parser_notebooks/../timetable_parsed_data/vehicle_journeys


In [6]:
# Optional: stop Spark when finished
spark.stop()