In [5]:
import duckdb
import glob
import os
import polars as pl
from IPython.display import display
from lxml import etree

# --- Configuration ---
DUCKDB_FILE = 'data/roadworks_data.duckdb'
RAW_NEW_TABLE_NAME = 'raw_new_roadworks'
RAW_OLD_TABLE_NAME = 'raw_old_roadworks'
UNIFIED_TABLE_NAME = 'uk_roadworks'

NEW_DATA_DIRECTORY = 'data/new_format'     # data from Sept 2017 onwards
OLD_DATA_DIRECTORY = 'data/old_format'     # data from August 2017 and earlier

# Define the namespace map
NSMAP = {'d': 'WebTeam'}

# XPath to find the repeating record element
NEW_ROADWORK_RECORD_XPATH = './/d:HE_PLANNED_WORKS'
OLD_ROADWORK_RECORD_XPATH = './/ha_planned_works'

# Columns for the 'new' format raw table
RAW_NEW_COLUMNS = [
    'source_filename', 'NEW_EVENT_NUMBER', 'OLD_REFERENCE_NUMBER', 'SDATE', 'EDATE',
    'EXPDEL', 'DESCRIPTION', 'CLOSURE_TYPE', 'STATUS', 'PUBLISHED_DATE',
    'CENTRE_EASTING', 'CENTRE_NORTHING', 'ROAD_NUMBERS'
]

# Columns for the 'old' format raw table
RAW_OLD_COLUMNS = [
    'source_filename', 'reference_number', 'start_date', 'end_date', 'expected_delay',
    'description', 'closure_type', 'status', 'published_date', 'centre_easting',
    'centre_northing', 'road', 'location', 'local_authority', 'traffic_management'
]

# Define XPaths for nested data relative to the NEW format HE_PLANNED_WORKS element
NEW_COORD_XPATH = './d:EASTNORTH/d:Report/d:EASTINGNORTHING/d:EASTNORTH_Collection/d:EASTNORTH'
NEW_ROAD_XPATH = './d:ROADS/d:Report/d:ROADS/d:ROAD_Collection/d:ROAD'


# --- Helper function to run queries (optional, or use con.sql().pl() directly) ---
def run_query_df(connection, sql_query):
    """Helper function to run a query and return a Polars DataFrame."""
    if not connection:
        print("Error: Database connection is not established.")
        return None
    try:
        return connection.sql(sql_query).pl()
    except duckdb.Error as e:
        print(f"Error running query:\n{sql_query}\nError: {e}")
        return None
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None

# --- Connect to DuckDB ---
con = None
try:
    con = duckdb.connect(database=DUCKDB_FILE, read_only=False)
    print(f"Successfully connected to {DUCKDB_FILE}")
except Exception as e:
    print(f"Failed to connect to DuckDB: {e}")

pl.Config.set_tbl_rows(50)

Successfully connected to data/roadworks_data.duckdb


polars.config.Config

## A. Define XML Parsing and Data Loading Utilities

In [None]:
def extract_record_new_format(record_element, source_filename):
    """
    Extracts raw data from a 'new' format <HE_PLANNED_WORKS> element
    into a dictionary matching RAW_NEW_COLUMNS.
    """
    data = {col: None for col in RAW_NEW_COLUMNS} 
    data['source_filename'] = source_filename

    data['NEW_EVENT_NUMBER'] = record_element.get('NEW_EVENT_NUMBER')
    data['OLD_REFERENCE_NUMBER'] = record_element.get('OLD_REFERENCE_NUMBER')
    data['SDATE'] = record_element.get('SDATE')
    data['EDATE'] = record_element.get('EDATE')
    data['EXPDEL'] = record_element.get('EXPDEL')
    data['DESCRIPTION'] = record_element.get('DESCRIPTION')
    data['CLOSURE_TYPE'] = record_element.get('CLOSURE_TYPE')
    data['STATUS'] = record_element.get('STATUS')
    data['PUBLISHED_DATE'] = record_element.get('PUBLISHED_DATE')

    if data.get('NEW_EVENT_NUMBER') is None:
        return None

    coord_elements = record_element.xpath(NEW_COORD_XPATH, namespaces=NSMAP)
    if coord_elements:
        coord_element = coord_elements[0]
        data['CENTRE_EASTING'] = coord_element.get('CENTRE_EASTING')
        data['CENTRE_NORTHING'] = coord_element.get('CENTRE_NORTHING')

    road_elements = record_element.xpath(NEW_ROAD_XPATH, namespaces=NSMAP)
    if road_elements:
        road_numbers_list = [road.get('ROAD_NUMBER') for road in road_elements if road.get('ROAD_NUMBER')]
        data['ROAD_NUMBERS'] = '; '.join(road_numbers_list) if road_numbers_list else None
    return data

def extract_record_old_format(record_element, source_filename):
    """
    Extracts raw data from an 'old' format <ha_planned_works> element
    into a dictionary matching RAW_OLD_COLUMNS.
    """
    data = {col: None for col in RAW_OLD_COLUMNS}
    data['source_filename'] = source_filename

    def get_text(tag_name):
        element = record_element.find(tag_name)
        return element.text.strip() if element is not None and element.text else None

    for col_name in RAW_OLD_COLUMNS:
        if col_name != 'source_filename':
             data[col_name] = get_text(col_name)

    if data.get('reference_number') is None:
        return None
    return data

def process_directory(directory_path, record_xpath, extraction_func, nsmap=None):
    """
    Processes all XML files in a directory, yielding each processed record.
    """
    xml_files = glob.glob(os.path.join(directory_path, '*.xml'))
    parser = etree.XMLParser(recover=True, ns_clean=True)

    if not xml_files:
        print(f"Warning: No XML files found in directory: {directory_path}")
        return

    print(f"\n--- Processing Directory: {directory_path} ---")
    total_yielded_records = 0
    for file_path in xml_files:
        filename = os.path.basename(file_path)
        try:
            tree = etree.parse(file_path, parser)
            root = tree.getroot()
            records = root.xpath(record_xpath, namespaces=nsmap)
            for record in records:
                extracted_dict = extraction_func(record, filename)
                if extracted_dict:
                    yield extracted_dict
                    total_yielded_records += 1
        except Exception as e_file:
            print(f"  Error processing file {filename}: {e_file}. Skipping file.")
    print(f"--- Directory Scan Complete: {directory_path}. Yielded {total_yielded_records} records. ---")

def load_data_in_batches(db_connection, table_name, target_columns, data_iterator, batch_size=1000):
    """
    Loads data from an iterator into a DuckDB table in batches.
    """
    batch_data = []
    total_inserted = 0
    placeholders = ', '.join(['?'] * len(target_columns))
    insert_sql = f'INSERT INTO "{table_name}" VALUES ({placeholders})'

    print(f"Starting batch insertion into '{table_name}'...")
    for record_dict in data_iterator:
        row_values = [record_dict.get(col_name) for col_name in target_columns]
        batch_data.append(row_values)
        if len(batch_data) >= batch_size:
            try:
                db_connection.executemany(insert_sql, batch_data)
                total_inserted += len(batch_data)
                batch_data = []
            except duckdb.Error as e:
                print(f"  Error inserting batch: {e}")
                batch_data = [] 
    if batch_data:
        try:
            db_connection.executemany(insert_sql, batch_data)
            total_inserted += len(batch_data)
        except duckdb.Error as e:
            print(f"  Error inserting final batch: {e}")
    print(f"Batch insertion complete for '{table_name}'. Total records inserted: {total_inserted}")

## B. Load Raw XML Data into Staging Tables

In [None]:
if con:
    # --- Create/Replace RAW NEW Table Structure ---
    print(f"Creating or replacing table: {RAW_NEW_TABLE_NAME}")
    new_column_defs = [f'"{col}" VARCHAR' for col in RAW_NEW_COLUMNS]
    create_new_table_sql = f'CREATE OR REPLACE TABLE "{RAW_NEW_TABLE_NAME}" ({", ".join(new_column_defs)})'
    con.execute(create_new_table_sql)
    print(f"Table '{RAW_NEW_TABLE_NAME}' created/replaced successfully.")

    # --- Process and Load New Format Raw Data ---
    print("\nProcessing NEW format data...")
    new_data_iterator = process_directory(
        directory_path=NEW_DATA_DIRECTORY,
        record_xpath=NEW_ROADWORK_RECORD_XPATH,
        extraction_func=extract_record_new_format,
        nsmap=NSMAP
    )
    load_data_in_batches(con, RAW_NEW_TABLE_NAME, RAW_NEW_COLUMNS, new_data_iterator)

    # --- Create/Replace RAW OLD Table Structure ---
    print(f"\nCreating or replacing table: {RAW_OLD_TABLE_NAME}")
    old_column_defs = [f'"{col}" VARCHAR' for col in RAW_OLD_COLUMNS]
    create_old_table_sql = f'CREATE OR REPLACE TABLE "{RAW_OLD_TABLE_NAME}" ({", ".join(old_column_defs)})'
    con.execute(create_old_table_sql)
    print(f"Table '{RAW_OLD_TABLE_NAME}' created/replaced successfully.")

    # --- Process and Load Old Format Raw Data ---
    print("\nProcessing OLD format data...")
    old_data_iterator = process_directory(
        directory_path=OLD_DATA_DIRECTORY,
        record_xpath=OLD_ROADWORK_RECORD_XPATH,
        extraction_func=extract_record_old_format,
        nsmap=None 
    )
    load_data_in_batches(con, RAW_OLD_TABLE_NAME, RAW_OLD_COLUMNS, old_data_iterator)

    con.commit()
    print("\nRaw data loading transaction committed.")

    # Verify final counts
    count_new_df = run_query_df(con, f'SELECT COUNT(*) as count FROM "{RAW_NEW_TABLE_NAME}"')
    count_old_df = run_query_df(con, f'SELECT COUNT(*) as count FROM "{RAW_OLD_TABLE_NAME}"')
    if count_new_df is not None: print(f"Verification: Table '{RAW_NEW_TABLE_NAME}' now contains {count_new_df[0, 'count']} rows.")
    if count_old_df is not None: print(f"Verification: Table '{RAW_OLD_TABLE_NAME}' now contains {count_old_df[0, 'count']} rows.")
else:
    print("Database connection not established. Skipping raw data loading.")

Creating or replacing table: raw_new_roadworks
Table 'raw_new_roadworks' created/replaced successfully.

Processing NEW format data...
Starting batch insertion into 'raw_new_roadworks'...

--- Processing Directory: data/new_format ---


## C. Perform Data Type Conversions on Staging Tables

In [None]:
# Numeric Conversions
if con:
    print(f"--- Adding Numeric Columns and Converting Data ---")
    # --- Process RAW_NEW_TABLE_NAME ---
    print(f"\n--- Processing table for numeric conversion: {RAW_NEW_TABLE_NAME} ---")
    cols_to_convert_new_numeric = { "OLD_REFERENCE_NUMBER": "BIGINT", "CENTRE_EASTING": "INTEGER", "CENTRE_NORTHING": "INTEGER" }
    for original_col, numeric_type in cols_to_convert_new_numeric.items():
        new_col_name = f"{original_col}_NUMERIC"
        con.execute(f'ALTER TABLE "{RAW_NEW_TABLE_NAME}" ADD COLUMN IF NOT EXISTS "{new_col_name}" {numeric_type};')
        con.execute(f'UPDATE "{RAW_NEW_TABLE_NAME}" SET "{new_col_name}" = TRY_CAST("{original_col}" AS {numeric_type});')
    print(f"Numeric columns processed for {RAW_NEW_TABLE_NAME}.")

    # --- Process RAW_OLD_TABLE_NAME ---
    print(f"\n--- Processing table for numeric conversion: {RAW_OLD_TABLE_NAME} ---")
    cols_to_convert_old_numeric = { "reference_number": "BIGINT", "centre_easting": "INTEGER", "centre_northing": "INTEGER" }
    for original_col, numeric_type in cols_to_convert_old_numeric.items():
        new_col_name = f"{original_col}_numeric"
        con.execute(f'ALTER TABLE "{RAW_OLD_TABLE_NAME}" ADD COLUMN IF NOT EXISTS "{new_col_name}" {numeric_type};')
        con.execute(f'UPDATE "{RAW_OLD_TABLE_NAME}" SET "{new_col_name}" = TRY_CAST("{original_col}" AS {numeric_type});')
    print(f"Numeric columns processed for {RAW_OLD_TABLE_NAME}.")
    con.commit()
    print("\nNumeric conversion changes committed.")
else:
    print("Database connection not established. Skipping numeric conversions.")

In [None]:
# Datetime Conversions
if con:
    print(f"--- Adding Timestamp Columns and Converting Data ---")
    # --- Process RAW_NEW_TABLE_NAME ---
    print(f"\n--- Processing table for datetime conversion: {RAW_NEW_TABLE_NAME} ---")
    cols_to_convert_new_dt = {
        "SDATE": {"new_col": "SDATE_DT", "format": "%d-%b-%Y %H:%M"},
        "EDATE": {"new_col": "EDATE_DT", "format": "%d-%b-%Y %H:%M"},
        "PUBLISHED_DATE": {"new_col": "PUBLISHED_DATE_DT", "format": "ISO"}
    }
    for original_col, details in cols_to_convert_new_dt.items():
        new_col_name = details["new_col"]
        original_format = details["format"]
        con.execute(f'ALTER TABLE "{RAW_NEW_TABLE_NAME}" ADD COLUMN IF NOT EXISTS "{new_col_name}" TIMESTAMP;')
        if original_format == "ISO":
            con.execute(f'UPDATE "{RAW_NEW_TABLE_NAME}" SET "{new_col_name}" = TRY_CAST("{original_col}" AS TIMESTAMP);')
        else:
            con.execute(f'UPDATE "{RAW_NEW_TABLE_NAME}" SET "{new_col_name}" = TRY_STRPTIME(trim("{original_col}"), \'{original_format}\');')
    print(f"Timestamp columns processed for {RAW_NEW_TABLE_NAME}.")

    # --- Process RAW_OLD_TABLE_NAME ---
    print(f"\n--- Processing table for datetime conversion: {RAW_OLD_TABLE_NAME} ---")
    cols_to_convert_old_dt = {
        "start_date": {"new_col": "start_date_dt", "format": "ISO"},
        "end_date": {"new_col": "end_date_dt", "format": "ISO"},
        "published_date": {"new_col": "published_date_dt", "format": "ISO"}
    }
    for original_col, details in cols_to_convert_old_dt.items():
        new_col_name = details["new_col"]
        con.execute(f'ALTER TABLE "{RAW_OLD_TABLE_NAME}" ADD COLUMN IF NOT EXISTS "{new_col_name}" TIMESTAMP;')
        con.execute(f'UPDATE "{RAW_OLD_TABLE_NAME}" SET "{new_col_name}" = TRY_CAST("{original_col}" AS TIMESTAMP);')
    print(f"Timestamp columns processed for {RAW_OLD_TABLE_NAME}.")
    con.commit()
    print("\nDatetime conversion changes committed.")
else:
    print("Database connection not established. Skipping datetime conversions.")

## D. Perform Coordinate System Conversion (OSGB36 to WGS84)

In [None]:
if con:
    print(f"--- Converting Coordinates to WGS84 ---")
    try:
        con.execute("INSTALL spatial; LOAD spatial;")
        gsb_file_path = 'OSTN15_NTv2_OSGBtoETRS.gsb' # Assumed to be in the working directory or accessible path
        
        source_crs_epsg27700 = 'EPSG:27700'
        source_crs_nadgrids = f'+proj=tmerc +lat_0=49 +lon_0=-2 +k=0.9996012717 +x_0=400000 +y_0=-100000 +ellps=airy +units=m +no_defs +nadgrids={gsb_file_path} +type=crs'
        target_crs_epsg = 'EPSG:4326'
        chosen_source_crs = source_crs_epsg27700 # Default
        crs_method_message = f"Using '{source_crs_epsg27700}' (fallback)."

        if os.path.exists(gsb_file_path):
            # Simple test with nadgrids to see if it works without erroring out immediately
            try:
                test_nad_df = run_query_df(con, f"SELECT ST_Transform(ST_Point(529090, 179645), '{source_crs_nadgrids}', '{target_crs_epsg}', always_xy := true) AS test_geom;")
                if test_nad_df is not None and not test_nad_df.is_empty() and test_nad_df[0, "test_geom"] is not None:
                     # Check if result is not inf
                    test_val_x = run_query_df(con, f"SELECT ST_X(ST_Transform(ST_Point(529090, 179645), '{source_crs_nadgrids}', '{target_crs_epsg}', always_xy := true)) as val;")
                    if test_val_x is not None and abs(test_val_x[0,'val']) != float('inf'):
                        chosen_source_crs = source_crs_nadgrids
                        crs_method_message = f"Using '{source_crs_nadgrids}' (more accurate, with GSB file)."
                    else:
                        print(f"Warning: NADGRIDS transformation resulted in INF, falling back to {source_crs_epsg27700}.")
                else:
                    print(f"Warning: NADGRIDS transformation test failed or returned empty/null, falling back to {source_crs_epsg27700}.")
            except Exception as e_nad:
                print(f"Warning: NADGRIDS transformation test failed with error: {e_nad}. Falling back to {source_crs_epsg27700}.")
        else:
            print(f"INFO: GSB file '{gsb_file_path}' not found. Using '{source_crs_epsg27700}'.")
        
        print(crs_method_message)

        tables_to_transform = {
            RAW_NEW_TABLE_NAME: {"easting_col": "CENTRE_EASTING_NUMERIC", "northing_col": "CENTRE_NORTHING_NUMERIC"},
            RAW_OLD_TABLE_NAME: {"easting_col": "centre_easting_numeric", "northing_col": "centre_northing_numeric"}
        }

        for table_name, cols_info in tables_to_transform.items():
            easting_col = cols_info["easting_col"]
            northing_col = cols_info["northing_col"]
            print(f"\nProcessing table for coordinate conversion: {table_name}")
            
            con.execute(f'ALTER TABLE "{table_name}" ADD COLUMN IF NOT EXISTS longitude_wgs84 DOUBLE;')
            con.execute(f'ALTER TABLE "{table_name}" ADD COLUMN IF NOT EXISTS latitude_wgs84 DOUBLE;')
            con.execute(f'UPDATE "{table_name}" SET longitude_wgs84 = NULL, latitude_wgs84 = NULL;') # Clear existing

            update_sql = f'''
            UPDATE "{table_name}"
            SET
                longitude_wgs84 = ST_X(ST_Transform(ST_Point("{easting_col}", "{northing_col}"), '{chosen_source_crs}', '{target_crs_epsg}', always_xy := true)),
                latitude_wgs84 = ST_Y(ST_Transform(ST_Point("{easting_col}", "{northing_col}"), '{chosen_source_crs}', '{target_crs_epsg}', always_xy := true))
            WHERE "{easting_col}" IS NOT NULL AND "{northing_col}" IS NOT NULL AND "{easting_col}" != 0 AND "{northing_col}" != 0;
            '''
            con.execute(update_sql)
            print(f"Coordinate transformation complete for {table_name}.")
        
        con.commit()
        print("\nCoordinate transformation changes committed.")

    except Exception as e:
        print(f"An error occurred during coordinate transformation: {e}")
        if con: con.rollback()
else:
    print("Database connection not established. Skipping coordinate conversions.")

# Display some samples with new WGS84 coordinates
if con and chosen_source_crs:
    print(f"\nSample from '{RAW_NEW_TABLE_NAME}' with WGS84 coordinates:")
    sample_new_coords = run_query_df(con, f'SELECT "NEW_EVENT_NUMBER", "CENTRE_EASTING_NUMERIC", "CENTRE_NORTHING_NUMERIC", longitude_wgs84, latitude_wgs84 FROM "{RAW_NEW_TABLE_NAME}" WHERE longitude_wgs84 IS NOT NULL LIMIT 3')
    if sample_new_coords is not None: display(sample_new_coords)

    print(f"\nSample from '{RAW_OLD_TABLE_NAME}' with WGS84 coordinates:")
    sample_old_coords = run_query_df(con, f'SELECT "reference_number", "centre_easting_numeric", "centre_northing_numeric", longitude_wgs84, latitude_wgs84 FROM "{RAW_OLD_TABLE_NAME}" WHERE longitude_wgs84 IS NOT NULL LIMIT 3')
    if sample_old_coords is not None: display(sample_old_coords)

## E. Define and create unified table

In [None]:
# --- 1. Define and Create Unified Table ---
# Uses the transformed columns (_NUMERIC, _DT, WGS84) from the raw tables.

create_unified_table_sql = f"""
CREATE OR REPLACE TABLE "{UNIFIED_TABLE_NAME}" (
    event_id VARCHAR,                        -- NEW_EVENT_NUMBER (new) or reference_number (old)
    legacy_reference_id BIGINT,              -- OLD_REFERENCE_NUMBER_NUMERIC (new)
    start_datetime TIMESTAMP,                -- SDATE_DT (new) or start_date_dt (old)
    end_datetime TIMESTAMP,                  -- EDATE_DT (new) or end_date_dt (old)
    published_datetime TIMESTAMP,            -- PUBLISHED_DATE_DT (new) or published_date_dt (old)
    expected_delay VARCHAR,                  -- EXPDEL (new) or expected_delay (old)
    description VARCHAR,                     -- DESCRIPTION (new) or description (old)
    closure_type VARCHAR,                    -- CLOSURE_TYPE (new) or closure_type (old)
    status VARCHAR,                          -- STATUS (new) or status (old)
    road_names VARCHAR,                      -- ROAD_NUMBERS (new) or road (old)
    easting_osgb INTEGER,                    -- CENTRE_EASTING_NUMERIC (new) or centre_easting_numeric (old)
    northing_osgb INTEGER,                   -- CENTRE_NORTHING_NUMERIC (new) or centre_northing_numeric (old)
    longitude_wgs84 DOUBLE,
    latitude_wgs84 DOUBLE,
    location_detail VARCHAR,                 -- location (old only)
    local_authority VARCHAR,                 -- local_authority (old only)
    traffic_management_type VARCHAR,         -- traffic_management (old only)
    source_filename VARCHAR,
    data_source_format VARCHAR              -- 'new_xml' or 'old_xml'
);
"""
if con:
    con.execute(create_unified_table_sql)
    print(f"Table '{UNIFIED_TABLE_NAME}' created/re-created successfully.")

    # --- 2. Populate Unified Table from New Format Data ---
    insert_from_new_sql = f"""
    INSERT INTO "{UNIFIED_TABLE_NAME}"
    SELECT
        "NEW_EVENT_NUMBER" AS event_id,
        "OLD_REFERENCE_NUMBER_NUMERIC" AS legacy_reference_id,
        "SDATE_DT" AS start_datetime,
        "EDATE_DT" AS end_datetime,
        "PUBLISHED_DATE_DT" AS published_datetime,
        "EXPDEL" AS expected_delay,
        "DESCRIPTION" AS description,
        "CLOSURE_TYPE" AS closure_type,
        "STATUS" AS status,
        "ROAD_NUMBERS" AS road_names,
        "CENTRE_EASTING_NUMERIC" AS easting_osgb,
        "CENTRE_NORTHING_NUMERIC" AS northing_osgb,
        longitude_wgs84,
        latitude_wgs84,
        NULL AS location_detail,
        NULL AS local_authority,
        NULL AS traffic_management_type,
        source_filename,
        'new_xml' AS data_source_format
    FROM "{RAW_NEW_TABLE_NAME}";
    """
    con.execute(insert_from_new_sql)
    print(f"Data inserted from '{RAW_NEW_TABLE_NAME}' into '{UNIFIED_TABLE_NAME}'.")

    # --- 3. Populate Unified Table from Old Format Data ---
    insert_from_old_sql = f"""
    INSERT INTO "{UNIFIED_TABLE_NAME}"
    SELECT
        "reference_number" AS event_id, -- Using original string ID, could also use reference_number_numeric if desired and schema changed
        NULL AS legacy_reference_id,
        "start_date_dt" AS start_datetime,
        "end_date_dt" AS end_datetime,
        "published_date_dt" AS published_datetime,
        "expected_delay" AS expected_delay,
        "description" AS description,
        "closure_type" AS closure_type,
        "status" AS status,
        "road" AS road_names,
        "centre_easting_numeric" AS easting_osgb,
        "centre_northing_numeric" AS northing_osgb,
        longitude_wgs84,
        latitude_wgs84,
        "location" AS location_detail,
        "local_authority" AS local_authority,
        "traffic_management" AS traffic_management_type,
        source_filename,
        'old_xml' AS data_source_format
    FROM "{RAW_OLD_TABLE_NAME}";
    """
    con.execute(insert_from_old_sql)
    print(f"Data inserted from '{RAW_OLD_TABLE_NAME}' into '{UNIFIED_TABLE_NAME}'.")

    # --- 4. Verification (Example) ---
    print(f"\n--- Verifying {UNIFIED_TABLE_NAME} ---")
    total_rows_unified_df = run_query_df(con, f'SELECT COUNT(*) as count FROM "{UNIFIED_TABLE_NAME}"')
    if total_rows_unified_df is not None:
        print(f"Total rows in '{UNIFIED_TABLE_NAME}': {total_rows_unified_df[0, 'count']}")

    print(f"\nSample of 5 rows from '{UNIFIED_TABLE_NAME}':")
    sample_unified_df = run_query_df(con, f'SELECT * FROM "{UNIFIED_TABLE_NAME}" LIMIT 5')
    if sample_unified_df is not None:
        display(sample_unified_df)

    con.commit()
    print("Changes for unified table committed.")
else:
    print("Database connection not established. Skipping unified table creation and population.")

Table 'uk_roadworks' created/re-created successfully.
Data inserted from 'raw_new_roadworks' into 'uk_roadworks'.
Data inserted from 'raw_old_roadworks' into 'uk_roadworks'.

--- Verifying uk_roadworks ---
Total rows in 'uk_roadworks': 23421

Sample of 5 rows from 'uk_roadworks':


event_id,legacy_reference_id,start_datetime,end_datetime,published_datetime,expected_delay,description,closure_type,status,road_names,easting_osgb,northing_osgb,longitude_wgs84,latitude_wgs84,location_detail,local_authority,traffic_management_type,source_filename,data_source_format
str,i64,datetime[μs],datetime[μs],datetime[μs],str,str,str,str,str,i32,i32,f64,f64,str,str,str,str,str
"""00026976-005""",,2018-02-26 21:00:00,2018-02-28 06:00:00,2018-02-22 16:49:17,"""Slight (less than 10 mins)""","""A3 northbound Sheet Link entry…","""Area Renewals""","""Published""","""A3""",475209,124975,-0.929132,51.019241,,,,"""he_roadworks_2018_02_26.xml""","""new_xml"""
"""00004020-008""",4188720.0,2018-01-08 20:00:00,2018-03-10 06:00:00,2018-02-22 10:13:27,"""Moderate (10 - 30 mins)""","""A14 Westbound Jct 58 to Jct 57…","""Area Schemes""","""Published""","""A14""",614569,241115,1.126274,52.026925,,,,"""he_roadworks_2018_02_26.xml""","""new_xml"""
"""00001459-026""",4215713.0,2017-07-31 14:47:00,2018-04-01 06:00:00,2018-02-15 14:38:05,"""Slight (less than 10 mins)""","""M1 northbound and southbound T…","""Major Schemes""","""Published""","""M1""",445124,364308,-1.32637,53.173976,,,,"""he_roadworks_2018_02_26.xml""","""new_xml"""
"""00027883-003""",,2018-02-12 20:00:00,2018-03-17 06:00:00,2018-02-21 10:36:47,"""Moderate (10 - 30 mins)""","""A259, east and westbound betwe…","""Area Schemes""","""Published""","""A259""",596442,123787,0.797101,50.979974,,,,"""he_roadworks_2018_02_26.xml""","""new_xml"""
"""00026799-002""",,2018-02-10 22:00:00,2018-03-22 06:00:00,2018-02-22 14:08:43,"""Slight (less than 10 mins)""","""A3 northbound Compton to Denni…","""Regional Technology Works""","""Published""","""A3""",498261,150727,-0.593562,51.247262,,,,"""he_roadworks_2018_02_26.xml""","""new_xml"""


Changes committed.


## F. Final Data Quality Checks on Unified Table

### F.1. Unique Categorical Values

In [None]:
pl.Config.set_tbl_rows(20)

# (Re-)open the connection
if con:
    con.close()
con = duckdb.connect(database=DUCKDB_FILE, read_only=False)

# Define common placeholders for checking original string values
PLACEHOLDERS_LOWER = ["", "none", "n/a", "null", "unknown"]
PLACEHOLDERS_SQL_LIST_STR = f"({', '.join([f'{pl!r}' for pl in PLACEHOLDERS_LOWER])})"

categorical_columns_unified = [
    'expected_delay',
    'closure_type',
    'status',
    'road_names',
    'local_authority',
    'traffic_management_type',
    'data_source_format'
]
print(f"\n--- Unique Categorical Values in '{UNIFIED_TABLE_NAME}' ---")
for col in categorical_columns_unified:
    print(f"\nDistinct values for '{col}':")
    query = f"""
        SELECT "{col}", COUNT(*) as count
        FROM "{UNIFIED_TABLE_NAME}"
        GROUP BY "{col}"
        ORDER BY count DESC;
    """
    df = run_query_df(con, query)
    if df is not None:
        display(df)
    else:
        print(f"Could not retrieve distinct values for '{col}'.")

### F.2. Validate Date Ranges (End Date before Start Date)

In [None]:
print(f"\n--- Checking for Invalid Date Ranges in '{UNIFIED_TABLE_NAME}' (end_datetime < start_datetime) ---")
query_invalid_dates = f"""
    SELECT event_id, source_filename, start_datetime, end_datetime, description
    FROM "{UNIFIED_TABLE_NAME}"
    WHERE end_datetime < start_datetime;
"""
df_invalid_dates = run_query_df(con, query_invalid_dates)
if df_invalid_dates is not None:
    if not df_invalid_dates.is_empty():
        print(f"Found {df_invalid_dates.height} records where end_datetime is before start_datetime:")
        display(df_invalid_dates)
    else:
        print("No records found where end_datetime is before start_datetime.")
else:
    print("Could not execute query to check for invalid date ranges.")


--- Checking for Invalid Date Ranges in 'uk_roadworks' (end_datetime < start_datetime) ---
No records found where end_datetime is before start_datetime.


### F.3. Extreme Coordinate Check (WGS84)

In [15]:
print(f"\n--- Extreme Coordinates in '{UNIFIED_TABLE_NAME}' (WGS84) ---")
extreme_coords_queries = {
    "Northmost": f"""SELECT event_id, description, road_names, latitude_wgs84, longitude_wgs84, source_filename FROM "{UNIFIED_TABLE_NAME}" WHERE latitude_wgs84 IS NOT NULL ORDER BY latitude_wgs84 DESC LIMIT 1""",
    "Southmost": f"""SELECT event_id, description, road_names, latitude_wgs84, longitude_wgs84, source_filename FROM "{UNIFIED_TABLE_NAME}" WHERE latitude_wgs84 IS NOT NULL ORDER BY latitude_wgs84 ASC LIMIT 1""",
    "Eastmost":  f"""SELECT event_id, description, road_names, latitude_wgs84, longitude_wgs84, source_filename FROM "{UNIFIED_TABLE_NAME}" WHERE longitude_wgs84 IS NOT NULL ORDER BY longitude_wgs84 DESC LIMIT 1""",
    "Westmost":  f"""SELECT event_id, description, road_names, latitude_wgs84, longitude_wgs84, source_filename FROM "{UNIFIED_TABLE_NAME}" WHERE longitude_wgs84 IS NOT NULL ORDER BY longitude_wgs84 ASC LIMIT 1"""
}
for name, query in extreme_coords_queries.items():
    print(f"\n{name} point:")
    df_coord = run_query_df(con, query)
    if df_coord is not None:
        display(df_coord)
    else:
        print(f"Could not retrieve {name} coordinate.")


--- Extreme Coordinates in 'uk_roadworks' (WGS84) ---

Northmost point:


event_id,description,road_names,latitude_wgs84,longitude_wgs84,source_filename
str,str,str,f64,f64,str
"""00034223-002""","""A1 Berwick upon Tweed. Southb…","""A1""",55.806145,-2.042906,"""he_roadworks_2018_09_03.xml"""



Southmost point:


event_id,description,road_names,latitude_wgs84,longitude_wgs84,source_filename
str,str,str,f64,f64,str
"""00331971-001""","""A30 Lands End Roundabout used …","""A30""",50.129302,-5.513296,"""nh_roadworks_2024_10_7.xml"""



Eastmost point:


event_id,description,road_names,latitude_wgs84,longitude_wgs84,source_filename
str,str,str,f64,f64,str
"""2646860""","""S/B lane 1 closure for Inspec…","""A12""",52.485251,1.756125,"""ha-roadworks_2013_11_04.xml"""



Westmost point:


event_id,description,road_names,latitude_wgs84,longitude_wgs84,source_filename
str,str,str,f64,f64,str
"""00331971-001""","""A30 Lands End Roundabout used …","""A30""",50.129302,-5.513296,"""nh_roadworks_2024_10_7.xml"""


### F.4. Type Conversion Discrepancy Checks (Unified NULL vs. Raw Original Not NULL/Placeholder)

In [16]:
print(f"\n--- Checking Numeric Conversion Discrepancies ---")
# Numeric: legacy_reference_id, easting_osgb, northing_osgb
# Check legacy_reference_id (from new format OLD_REFERENCE_NUMBER)
query_legacy_ref_fail = f"""
SELECT u.event_id, u.source_filename, r."OLD_REFERENCE_NUMBER" as original_raw_value, u.legacy_reference_id
FROM "{UNIFIED_TABLE_NAME}" u
JOIN "{RAW_NEW_TABLE_NAME}" r ON u.event_id = r."NEW_EVENT_NUMBER" AND u.source_filename = r.source_filename
WHERE u.data_source_format = 'new_xml'
    AND u.legacy_reference_id IS NULL
    AND r."OLD_REFERENCE_NUMBER" IS NOT NULL
    AND lower(trim(r."OLD_REFERENCE_NUMBER")) NOT IN {PLACEHOLDERS_SQL_LIST_STR}
LIMIT 10;
"""
print("\nDiscrepancies for 'legacy_reference_id' (unified NULL, original raw was valid):")
df_legacy_fail = run_query_df(con, query_legacy_ref_fail)
if df_legacy_fail is not None: display(df_legacy_fail if not df_legacy_fail.is_empty() else "No discrepancies found.")

# Check easting_osgb
query_easting_fail_new = f"""
SELECT u.event_id, u.source_filename, r."CENTRE_EASTING" as original_raw_value, u.easting_osgb
FROM "{UNIFIED_TABLE_NAME}" u
JOIN "{RAW_NEW_TABLE_NAME}" r ON u.event_id = r."NEW_EVENT_NUMBER" AND u.source_filename = r.source_filename
WHERE u.data_source_format = 'new_xml'
    AND u.easting_osgb IS NULL
    AND r."CENTRE_EASTING" IS NOT NULL
    AND lower(trim(r."CENTRE_EASTING")) NOT IN {PLACEHOLDERS_SQL_LIST_STR}
LIMIT 10;
"""
print("\nDiscrepancies for 'easting_osgb' from new_xml (unified NULL, original raw was valid):")
df_easting_fail_new = run_query_df(con, query_easting_fail_new)
if df_easting_fail_new is not None: display(df_easting_fail_new if not df_easting_fail_new.is_empty() else "No discrepancies found.")

query_easting_fail_old = f"""
SELECT u.event_id, u.source_filename, r."centre_easting" as original_raw_value, u.easting_osgb
FROM "{UNIFIED_TABLE_NAME}" u
JOIN "{RAW_OLD_TABLE_NAME}" r ON u.event_id = r."reference_number" AND u.source_filename = r.source_filename
WHERE u.data_source_format = 'old_xml'
    AND u.easting_osgb IS NULL
    AND r."centre_easting" IS NOT NULL
    AND lower(trim(r."centre_easting")) NOT IN {PLACEHOLDERS_SQL_LIST_STR}
LIMIT 10;
"""
print("\nDiscrepancies for 'easting_osgb' from old_xml (unified NULL, original raw was valid):")
df_easting_fail_old = run_query_df(con, query_easting_fail_old)
if df_easting_fail_old is not None: display(df_easting_fail_old if not df_easting_fail_old.is_empty() else "No discrepancies found.")

# Similar checks for northing_osgb can be added here following the pattern for easting_osgb
print("\n(Skipping northing_osgb check for brevity, pattern is similar to easting_osgb)")


--- Checking Numeric Conversion Discrepancies ---

Discrepancies for 'legacy_reference_id' (unified NULL, original raw was valid):


'No discrepancies found.'


Discrepancies for 'easting_osgb' from new_xml (unified NULL, original raw was valid):


'No discrepancies found.'


Discrepancies for 'easting_osgb' from old_xml (unified NULL, original raw was valid):


'No discrepancies found.'


(Skipping northing_osgb check for brevity, pattern is similar to easting_osgb)


In [17]:
print(f"\n--- Checking Datetime Conversion Discrepancies ---")
# Datetime: start_datetime, end_datetime, published_datetime
# Check start_datetime
query_start_dt_fail_new = f"""
SELECT u.event_id, u.source_filename, r."SDATE" as original_raw_value, u.start_datetime
FROM "{UNIFIED_TABLE_NAME}" u
JOIN "{RAW_NEW_TABLE_NAME}" r ON u.event_id = r."NEW_EVENT_NUMBER" AND u.source_filename = r.source_filename
WHERE u.data_source_format = 'new_xml'
    AND u.start_datetime IS NULL
    AND r."SDATE" IS NOT NULL
    AND lower(trim(r."SDATE")) NOT IN {PLACEHOLDERS_SQL_LIST_STR}
LIMIT 10;
"""
print("\nDiscrepancies for 'start_datetime' from new_xml (unified NULL, original raw was valid):")
df_start_dt_fail_new = run_query_df(con, query_start_dt_fail_new)
if df_start_dt_fail_new is not None: display(df_start_dt_fail_new if not df_start_dt_fail_new.is_empty() else "No discrepancies found.")

query_start_dt_fail_old = f"""
SELECT u.event_id, u.source_filename, r."start_date" as original_raw_value, u.start_datetime
FROM "{UNIFIED_TABLE_NAME}" u
JOIN "{RAW_OLD_TABLE_NAME}" r ON u.event_id = r."reference_number" AND u.source_filename = r.source_filename
WHERE u.data_source_format = 'old_xml'
    AND u.start_datetime IS NULL
    AND r."start_date" IS NOT NULL
    AND lower(trim(r."start_date")) NOT IN {PLACEHOLDERS_SQL_LIST_STR}
LIMIT 10;
"""
print("\nDiscrepancies for 'start_datetime' from old_xml (unified NULL, original raw was valid):")
df_start_dt_fail_old = run_query_df(con, query_start_dt_fail_old)
if df_start_dt_fail_old is not None: display(df_start_dt_fail_old if not df_start_dt_fail_old.is_empty() else "No discrepancies found.")

# Similar checks for end_datetime and published_datetime can be added here
print("\n(Skipping end_datetime and published_datetime checks for brevity, pattern is similar)")


--- Checking Datetime Conversion Discrepancies ---

Discrepancies for 'start_datetime' from new_xml (unified NULL, original raw was valid):


'No discrepancies found.'


Discrepancies for 'start_datetime' from old_xml (unified NULL, original raw was valid):


'No discrepancies found.'


(Skipping end_datetime and published_datetime checks for brevity, pattern is similar)


### F.5. WGS84 Coordinate Conversion Check

In [18]:
print(f"\n--- Checking WGS84 Coordinate Conversion Failures (WGS84 NULL but OSGB Not NULL) ---")
query_wgs84_fail = f"""
    SELECT event_id, source_filename, easting_osgb, northing_osgb, longitude_wgs84, latitude_wgs84, description
    FROM "{UNIFIED_TABLE_NAME}"
    WHERE (longitude_wgs84 IS NULL OR latitude_wgs84 IS NULL)
        AND easting_osgb IS NOT NULL AND northing_osgb IS NOT NULL 
        AND easting_osgb != 0 AND northing_osgb != 0 /* Exclude (0,0) OSGB as potentially invalid input */
    LIMIT 20;
"""
df_wgs84_fail = run_query_df(con, query_wgs84_fail)
if df_wgs84_fail is not None:
    if not df_wgs84_fail.is_empty():
        print(f"Found {df_wgs84_fail.height} records where WGS84 coordinates are NULL but OSGB coordinates were present and non-zero:")
        display(df_wgs84_fail)
    else:
        print("No records found where WGS84 is NULL but OSGB was validly populated (and non-zero).")
else:
    print("Could not execute check for WGS84 conversion failures.")


--- Checking WGS84 Coordinate Conversion Failures (WGS84 NULL but OSGB Not NULL) ---
No records found where WGS84 is NULL but OSGB was validly populated (and non-zero).


## G. ...

In [None]:
if con:
    con.close()
    print("Database connection closed.")