# Creating the unified POI table
- Used notebook to start so I can see the output and use markdwon to make notes as I go along
- Once done will copy accross to a .py script

In [None]:
# Import Librabies
import osmnx as ox # to fetch data from OpenStreetMap
import geopandas as gpd # to work with geospatial data
import pandas as pd
from sqlalchemy import create_engine, text
import warnings
import json


warnings.filterwarnings("ignore")

# Credentials
user_name=''
password=''

# Conection
host = 'localhost'
port = '5433'
database = 'layereddb'
schema='berlin_source_data'

# Connection to DB
engine = create_engine(f'postgresql+psycopg2://{user_name}:{password}@{host}:{port}/{database}')

## Step 1 - Creating missing data table


In [None]:
query = """
    DROP TABLE IF EXISTS excluded_tables_log CASCADE;

    CREATE TABLE excluded_tables_log (
        table_name VARCHAR(255),
        reason VARCHAR(500),
        exclusion_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );

    WITH all_tables AS (
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema = 'berlin_source_data' 
        AND table_name NOT ILIKE '%districts%'
        AND table_name NOT ILIKE '%neighborhoods%'
    ),

    -- Required schema and constraints
    expected_schema AS (
        SELECT * FROM (VALUES
            ('id', 'character varying', NULL, 'PRIMARY KEY', 'NO'),
            ('district_id', 'character varying', NULL, 'FOREIGN KEY', 'NO'),
            ('name', 'character varying', NULL, NULL, 'YES'),
            ('latitude', 'numeric', NULL, NULL, 'YES'),
            ('longitude', 'numeric', NULL, NULL, 'YES'),
            ('neighborhood', 'character varying', NULL, NULL, 'YES'),
            ('district', 'character varying', NULL, NULL, 'YES'),
            ('neighborhood_id', 'character varying', NULL, NULL, 'YES')
        ) AS t(column_name, expected_type, expected_length, constraint_type, is_nullable)
    ),

    table_columns AS (
        SELECT 
            t.table_name,
            COUNT(CASE WHEN LOWER(c.column_name) IN (
                'id','name','district_id','district','latitude','longitude','neighborhood_id','neighborhood','geometry'
            ) THEN 1 END) AS required_col_count
        FROM all_tables t
        LEFT JOIN information_schema.columns c 
            ON t.table_name = c.table_name 
        AND c.table_schema = 'berlin_source_data'
        GROUP BY t.table_name
    ),

    missing_columns AS (
        SELECT 
            tc.table_name,
            STRING_AGG(DISTINCT col, ', ' ORDER BY col) AS missing_cols
        FROM table_columns tc
        CROSS JOIN (VALUES 
            ('id'), ('name'), ('district_id'), ('district'),
            ('latitude'), ('longitude'), ('neighborhood_id'),
            ('neighborhood'), ('geometry')
        ) AS req_cols(col)
        LEFT JOIN information_schema.columns c 
            ON tc.table_name = c.table_name 
        AND c.table_schema = 'berlin_source_data'
        AND LOWER(c.column_name) = req_cols.col
        WHERE c.column_name IS NULL
        GROUP BY tc.table_name
        HAVING COUNT(*) > 0
    ),

    -- ❗ NEW: Data type checks (VARCHAR length ignored)
    datatype_issues AS (
        SELECT 
            c.table_name,
            e.column_name,
            'Expected data type ' || e.expected_type || ', got ' || c.data_type AS reason
        FROM information_schema.columns c
        JOIN expected_schema e 
            ON LOWER(c.column_name) = e.column_name
        WHERE c.table_schema = 'berlin_source_data'
        AND (
                c.data_type <> e.expected_type
                OR (
                    c.data_type = 'numeric'
                    AND (c.numeric_precision || ',' || c.numeric_scale) <> e.expected_length
                )
        )
    ),

    -- ❗ NEW: NULL constraint issues
    null_issues AS (
        SELECT 
            c.table_name,
            'Column ' || c.column_name || ' allows NULL, expected NOT NULL' AS reason
        FROM information_schema.columns c
        JOIN expected_schema e 
            ON LOWER(c.column_name) = e.column_name
        WHERE c.table_schema = 'berlin_source_data'
        AND e.is_nullable = 'NO'
        AND c.is_nullable = 'YES'
    ),

    -- ❗ PRIMARY KEY check on id
    pk_issues AS (
        SELECT 
            t.table_name,
            'Missing PRIMARY KEY on id column' AS reason
        FROM all_tables t
        WHERE NOT EXISTS (
            SELECT 1
            FROM information_schema.table_constraints tc
            JOIN information_schema.key_column_usage kcu
            ON kcu.constraint_name = tc.constraint_name
            AND kcu.table_name = tc.table_name
            WHERE tc.table_schema = 'berlin_source_data'
            AND tc.table_name = t.table_name
            AND tc.constraint_type = 'PRIMARY KEY'
            AND LOWER(kcu.column_name) = 'id'
        )
    ),

-- ❗ FOREIGN KEY check on district_id with RESTRICT / CASCADE
fk_issues AS (
    SELECT t.table_name,
           'Missing or incorrect foreign key on district_id (expected → berlin_data.districts(district_id) ON DELETE RESTRICT ON UPDATE CASCADE)' AS reason
    FROM all_tables t
    WHERE NOT EXISTS (
        SELECT 1
        FROM information_schema.table_constraints tc
        JOIN information_schema.key_column_usage kcu
          ON kcu.constraint_name = tc.constraint_name
         AND kcu.table_name = tc.table_name
        JOIN information_schema.referential_constraints rc
          ON rc.constraint_name = tc.constraint_name
         AND rc.constraint_schema = tc.table_schema
        JOIN information_schema.constraint_column_usage ccu
          ON ccu.constraint_name = rc.unique_constraint_name
         AND ccu.table_schema = 'berlin_source_data'
         AND ccu.table_name = 'districts'
         AND LOWER(ccu.column_name) = 'district_id'
        WHERE tc.table_schema = 'berlin_source_data'
          AND tc.table_name = t.table_name
          AND tc.constraint_type = 'FOREIGN KEY'
          AND tc.constraint_name = 'district_id_fk'  -- <== exact constraint name
          AND LOWER(kcu.column_name) = 'district_id'
          AND rc.delete_rule = 'RESTRICT'
          AND rc.update_rule = 'CASCADE'
        LIMIT 1
    )
)

-- INSERT all results
INSERT INTO excluded_tables_log (table_name, reason)
SELECT table_name, 'Missing columns: ' || missing_cols FROM missing_columns
UNION ALL
SELECT table_name, reason FROM datatype_issues
UNION ALL
SELECT table_name, reason FROM null_issues
UNION ALL
SELECT table_name, reason FROM pk_issues
UNION ALL
SELECT table_name, reason FROM fk_issues;



"""

with engine.begin() as conn:
    conn.execute(text(query))

## Step 2 - Creating missing data table, valid table and full unified_pois table

In [None]:
# Create the excluded_table_logs table
validation_query = """
DROP TABLE IF EXISTS excluded_tables_log CASCADE;

CREATE TABLE excluded_tables_log (
    table_name VARCHAR(255),
    reason VARCHAR(500),
    exclusion_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Dont include the districts and neighborhood tables as seperate entities
WITH all_tables AS (
    SELECT table_name
    FROM information_schema.tables
    WHERE table_schema = 'berlin_source_data'
      AND table_name NOT ILIKE '%districts%'
      AND table_name NOT ILIKE '%neighborhoods%'
),

-- What the data types should look like on tables already created
expected_schema AS (
    SELECT * FROM (VALUES
        ('id', 'character varying', NULL, 'PRIMARY KEY', 'NO'),
        ('district_id', 'character varying', NULL, 'FOREIGN KEY', 'NO'),
        ('name', 'character varying', NULL, NULL, 'YES'),
        ('latitude', 'numeric', NULL, NULL, 'YES'),
        ('longitude', 'numeric', NULL, NULL, 'YES'),
        ('neighborhood', 'character varying', NULL, NULL, 'YES'),
        ('district', 'character varying', NULL, NULL, 'YES'),
        ('neighborhood_id', 'character varying', NULL, NULL, 'YES')
    ) AS t(column_name, expected_type, expected_length, constraint_type, is_nullable)
),

-- Missing columns checks that table has all columns needed else rejects it
missing_columns AS (
    SELECT 
        tc.table_name,
        STRING_AGG(DISTINCT col, ', ' ORDER BY col) AS missing_cols
    FROM all_tables tc
    CROSS JOIN (VALUES 
        ('id'), ('name'), ('district_id'), ('district'),
        ('latitude'), ('longitude'), ('neighborhood_id'),
        ('neighborhood'), ('geometry')
    ) AS req_cols(col)
    LEFT JOIN information_schema.columns c 
        ON tc.table_name = c.table_name 
        AND c.table_schema = 'berlin_source_data'
        AND LOWER(c.column_name) = req_cols.col
    WHERE c.column_name IS NULL
    GROUP BY tc.table_name
    HAVING COUNT(*) > 0
),

-- using above schema setup checks the tables in the database have correct data type, nulls, primary key & foreign key
-- Datatype issues 
datatype_issues AS (
    SELECT 
        c.table_name,
        'Expected data type ' || e.expected_type || ', got ' || c.data_type AS reason
    FROM information_schema.columns c
    JOIN expected_schema e 
        ON LOWER(c.column_name) = e.column_name
    WHERE c.table_schema = 'berlin_source_data'
      AND (
            c.data_type <> e.expected_type
            OR (c.data_type = 'numeric' AND (c.numeric_precision || ',' || c.numeric_scale) <> e.expected_length)
      )
),

-- Null issues
null_issues AS (
    SELECT 
        c.table_name,
        'Column ' || c.column_name || ' allows NULL, expected NOT NULL' AS reason
    FROM information_schema.columns c
    JOIN expected_schema e 
        ON LOWER(c.column_name) = e.column_name
    WHERE c.table_schema = 'berlin_source_data'
      AND e.is_nullable = 'NO'
      AND c.is_nullable = 'YES'
),

-- PK issues
pk_issues AS (
    SELECT 
        t.table_name,
        'Missing PRIMARY KEY on id column' AS reason
    FROM all_tables t
    WHERE NOT EXISTS (
        SELECT 1
        FROM information_schema.table_constraints tc
        JOIN information_schema.key_column_usage kcu
            ON kcu.constraint_name = tc.constraint_name
            AND kcu.table_name = tc.table_name
        WHERE tc.table_schema = 'berlin_source_data'
          AND tc.table_name = t.table_name
          AND tc.constraint_type = 'PRIMARY KEY'
          AND LOWER(kcu.column_name) = 'id'
    )
),

-- FK issues
fk_issues AS (
    SELECT t.table_name,
           'Missing or incorrect foreign key on district_id (expected → berlin_data.districts(district_id) ON DELETE RESTRICT ON UPDATE CASCADE)' AS reason
    FROM all_tables t
    WHERE NOT EXISTS (
        SELECT 1
        FROM information_schema.table_constraints tc
        JOIN information_schema.key_column_usage kcu
          ON kcu.constraint_name = tc.constraint_name
         AND kcu.table_name = tc.table_name
        JOIN information_schema.referential_constraints rc
          ON rc.constraint_name = tc.constraint_name
         AND rc.constraint_schema = tc.table_schema
        JOIN information_schema.constraint_column_usage ccu
          ON ccu.constraint_name = rc.unique_constraint_name
         AND ccu.table_schema = 'berlin_source_data'
         AND ccu.table_name = 'districts'
         AND LOWER(ccu.column_name) = 'district_id'
        WHERE tc.table_schema = 'berlin_source_data'
          AND tc.table_name = t.table_name
          AND tc.constraint_type = 'FOREIGN KEY'
          AND tc.constraint_name = 'district_id_fk'
          AND LOWER(kcu.column_name) = 'district_id'
          AND rc.delete_rule = 'RESTRICT'
          AND rc.update_rule = 'CASCADE'
        LIMIT 1
    )
),

-- All invalid tables - creates rows from tables that dont match above
invalid_tables AS (
    SELECT table_name, 'Missing columns: ' || missing_cols AS reason FROM missing_columns
    UNION ALL
    SELECT table_name, reason FROM datatype_issues
    UNION ALL
    SELECT table_name, reason FROM null_issues
    UNION ALL
    SELECT table_name, reason FROM pk_issues
    UNION ALL
    SELECT table_name, reason FROM fk_issues
)

-- Insert invalid tables into log
INSERT INTO excluded_tables_log (table_name, reason)
SELECT table_name, reason FROM invalid_tables;
"""

# Runs the above Validation Query 
with engine.begin() as conn:
    conn.execute(text(validation_query))

print("✅ Validation query complete")

# Creates a list of valid tables (excludes districts, neighborhoods and invalid tables) 
valid_tables_sql = """
    SELECT table_name
    FROM information_schema.tables
    WHERE table_schema = 'berlin_source_data'
        AND table_name NOT ILIKE '%districts%'
        AND table_name NOT ILIKE '%neighborhoods%'
        AND table_name NOT IN (SELECT table_name FROM excluded_tables_log)
        AND table_name IN ('galleries', 'food_markets', 'long_term_listings')
"""

# Runs above valid_tables query 
with engine.begin() as conn:
    valid_tables = [row[0] for row in conn.execute(text(valid_tables_sql))]

print("valid tables complete")
print(valid_tables)

# Drop table if exists (this is just for practice - must be removed)
with engine.begin() as conn:
   conn.execute(text("DROP TABLE IF EXISTS unified_pois CASCADE;"))

# Create the unified_pois table
create_unified_table = """
        CREATE TABLE IF NOT EXISTS public.unified_pois (
            poi_id VARCHAR(50) PRIMARY KEY,
            name VARCHAR(200),
            layer VARCHAR(100),
            district_id VARCHAR(20),
            district VARCHAR(100),
            neighborhood_id VARCHAR(20),
            neighborhood VARCHAR(100),
            latitude DECIMAL(9,6),
            longitude DECIMAL(9,6),
            geometry GEOMETRY, -- changed from geometry to text for testing
            attributes JSONB,
            nearest_pois JSONB
        );
    """
# Runs the above create_unified_table query
with engine.connect() as conn:
    conn.execute(text(create_unified_table))
    conn.commit()
    print('Unified Table created')

# Step 2: Insert all data with nearest_pois computed via CTEs
union_queries = []
for table in valid_tables:
        union_queries.append(f"""
            SELECT 
                CONCAT(SUBSTRING('{table}' FROM 1 FOR 4), '-', t.id) AS poi_id,
                t.name,
                '{table}' AS layer,
                t.district_id,
                t.district,
                t.neighborhood_id,
                t.neighborhood,
                t.latitude,
                t.longitude,
                ST_SetSRID(ST_GeomFromEWKT(t.geometry), 4326) AS geometry,
                (to_jsonb(t) - 'id' - 'name' - 'district_id' - 'neighborhood_id' - 'latitude' - 'longitude' - 'geometry') AS attributes
            FROM berlin_source_data.{table} t
        """)
    
union_sql = " UNION ALL ".join(union_queries)
    
insert_sql = f"""
        WITH all_pois AS (
            {union_sql}
        ),
        unique_layers AS (
            SELECT DISTINCT layer FROM all_pois WHERE layer <> 'listings'
        ),
        pois_with_nearest AS (
            SELECT 
                ap.poi_id,
                ap.name,
                ap.layer,
                ap.district_id,
                ap.district,
                ap.neighborhood_id,
                ap.neighborhood,
                ap.latitude,
                ap.longitude,
                ap.geometry,
                ap.attributes,
                (
                    SELECT jsonb_object_agg(layername, nearestinfo)
                    FROM (
                        SELECT 
                            ul.layer AS layername,
                            (
                                SELECT jsonb_build_object(
                                    'id', p.poi_id,
                                    'name', p.name,
                                    'distance', ST_Distance(ap.geometry, p.geometry),
                                    'address', jsonb_build_object(
                                        'street', p.attributes->>'street',
                                        'housenumber', p.attributes->>'housenumber'
                                    )
                                )
                                FROM all_pois p
                                WHERE p.layer = ul.layer 
                                ORDER BY ap.geometry <-> p.geometry
                                LIMIT 1
                            ) AS nearestinfo
                        FROM unique_layers ul
                        WHERE ap.layer = 'long_term_listings'
                    ) AS layer_nearest
                ) AS nearest_pois
            FROM all_pois ap
        )
        INSERT INTO unified_pois 
            (poi_id, name, layer, district_id, district, neighborhood_id, neighborhood, 
             latitude, longitude, geometry, attributes, nearest_pois)
        SELECT 
            poi_id, name, layer, district_id, district, neighborhood_id, neighborhood,
            latitude, longitude, geometry, attributes, nearest_pois
        FROM pois_with_nearest;
    """
with engine.begin() as conn:
    conn.execute(text(insert_sql))
    print(f"Inserted {len(valid_tables)} tables with computed nearest_pois.")

# Step 3: Create spatial index
with engine.begin() as conn:
    conn.execute(text("""
        CREATE INDEX IF NOT EXISTS idx_poi_geom ON unified_pois
        USING GIST (geometry);
    """))
    
print("Spatial index created.")

 

✅ Validation query complete
valid tables complete
['food_markets', 'galleries', 'long_term_listings']
Unified Table created
Inserted 3 tables with computed nearest_pois.
Spatial index created.


## Step 3 - Full code including adding tables
    - Originally setup to just add layers to the unified table but decided better to recreate the table from scratch each time

In [None]:

# Step 1: Validation query for excluded tables
validation_query = """
    DROP TABLE IF EXISTS excluded_tables_log CASCADE;

    -- Create the excluded_tables_log table
    CREATE TABLE public.excluded_tables_log (           -- Adding to public schema for now
        table_name VARCHAR(255),
        reason VARCHAR(500),
        exclusion_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    );

    WITH all_tables AS (
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema = 'berlin_source_data'
        AND table_name NOT ILIKE '%districts%'          -- Must not be included in final table so doesnt need to be checked
        AND table_name NOT ILIKE '%neighborhoods%'      -- Must not be included in final table so doesnt need to be checked
    ),
    -- Standard schema for each table
    expected_schema AS (
        SELECT * FROM (VALUES
            ('id', 'character varying', NULL, 'PRIMARY KEY', 'NO'),
            ('district_id', 'character varying', NULL, 'FOREIGN KEY', 'NO'),
            ('name', 'character varying', NULL, NULL, 'YES'),
            ('latitude', 'numeric', NULL, NULL, 'YES'),
            ('longitude', 'numeric', NULL, NULL, 'YES'),
            ('neighborhood', 'character varying', NULL, NULL, 'YES'),
            ('district', 'character varying', NULL, NULL, 'YES'),
            ('neighborhood_id', 'character varying', NULL, NULL, 'YES')
        ) AS t(column_name, expected_type, expected_length, constraint_type, is_nullable)
    ),
    -- Checks all columns exist
    missing_columns AS (
        SELECT tc.table_name,
            STRING_AGG(DISTINCT col, ', ' ORDER BY col) AS missing_cols
        FROM all_tables tc
        CROSS JOIN (VALUES 
            ('id'), ('name'), ('district_id'), ('district'),
            ('latitude'), ('longitude'), ('neighborhood_id'),
            ('neighborhood'), ('geometry')
        ) AS req_cols(col)
        LEFT JOIN information_schema.columns c 
            ON tc.table_name = c.table_name 
            AND c.table_schema = 'berlin_source_data'
            AND LOWER(c.column_name) = req_cols.col
        WHERE c.column_name IS NULL
        GROUP BY tc.table_name
        HAVING COUNT(*) > 0
    ),
    -- Checks all data types correct and creates and adds an error to the excluded_tables_log table when incorrect
    datatype_issues AS (
        SELECT 
            c.table_name,
            'Expected data type ' || e.expected_type || ', got ' || c.data_type AS reason
        FROM information_schema.columns c
        JOIN expected_schema e 
            ON LOWER(c.column_name) = e.column_name
        WHERE c.table_schema = 'berlin_source_data'
        AND (
                c.data_type <> e.expected_type
                OR (c.data_type = 'numeric' AND (c.numeric_precision || ',' || c.numeric_scale) <> e.expected_length)
        )
    ),
    -- Checks all columns that cannot contain NULL are correct and adds an error to the excluded_tables_log table when incorrect
    null_issues AS (
        SELECT c.table_name,
            'Column ' || c.column_name || ' allows NULL, expected NOT NULL' AS reason
        FROM information_schema.columns c
        JOIN expected_schema e 
            ON LOWER(c.column_name) = e.column_name
        WHERE c.table_schema = 'berlin_source_data'
        AND e.is_nullable = 'NO'
        AND c.is_nullable = 'YES'
    ),
    -- Checks primary key on id column and creates and adds an error to the excluded_tables_log table when incorrect
    pk_issues AS (
        SELECT t.table_name,
            'Missing PRIMARY KEY on id column' AS reason
        FROM all_tables t
        WHERE NOT EXISTS (
            SELECT 1
            FROM information_schema.table_constraints tc
            JOIN information_schema.key_column_usage kcu
                ON kcu.constraint_name = tc.constraint_name
                AND kcu.table_name = tc.table_name
            WHERE tc.table_schema = 'berlin_source_data'
            AND tc.table_name = t.table_name
            AND tc.constraint_type = 'PRIMARY KEY'
            AND LOWER(kcu.column_name) = 'id'
        )
    ),
    -- Checks foreign key on district_id column and creates and adds an error to the excluded_tables_log table when incorrect
    fk_issues AS (
        SELECT t.table_name,
            'Missing or incorrect foreign key on district_id' AS reason
        FROM all_tables t
        WHERE NOT EXISTS (
            SELECT 1
            FROM information_schema.table_constraints tc
            WHERE tc.table_schema = 'berlin_source_data'
            AND tc.table_name = t.table_name
            AND tc.constraint_type = 'FOREIGN KEY'
        )
    ),
    -- Combines all invalid tables into one
    invalid_tables AS (
        SELECT table_name, 'Missing columns: ' || missing_cols AS reason FROM missing_columns
        UNION ALL SELECT table_name, reason FROM datatype_issues
        UNION ALL SELECT table_name, reason FROM null_issues
        UNION ALL SELECT table_name, reason FROM pk_issues
        UNION ALL SELECT table_name, reason FROM fk_issues
    )
    -- Inserts all invalid tables into excluded_tables_log
    INSERT INTO excluded_tables_log (table_name, reason)
    SELECT table_name, reason FROM invalid_tables;
"""

with engine.begin() as conn:
    conn.execute(text(validation_query))
print("✅ Validation query complete")

# Step 2: Drop unified_pois and processed_tables_log if they exist, then create them again from scratch
with engine.begin() as conn:
   conn.execute(text("DROP TABLE IF EXISTS unified_pois CASCADE;"))
   conn.execute(text("DROP TABLE IF EXISTS processed_tables_log CASCADE;"))

with engine.begin() as conn:
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS public.unified_pois (                                                                    -- Adding to public schema for now
            poi_id VARCHAR(50) PRIMARY KEY,
            name VARCHAR(200),
            layer VARCHAR(100),
            district_id VARCHAR(20),
            district VARCHAR(100),
            neighborhood_id VARCHAR(20),
            neighborhood VARCHAR(100),
            latitude DECIMAL(9,6),
            longitude DECIMAL(9,6),
            geometry GEOMETRY, 
            attributes JSONB,
            nearest_pois JSONB
        );
    """))
    conn.execute(text("""                                                                                                   
        CREATE TABLE IF NOT EXISTS public.processed_tables_log (                                                            -- Adding to public schema for now (Creates log of processed tables)
            table_name VARCHAR(255) PRIMARY KEY,
            processed_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
    """))
print("✅ Unified table and log table ready")

# Step 3: Get new tables to process
valid_tables_sql = """
    SELECT table_name
    FROM information_schema.tables
    WHERE table_schema = 'berlin_source_data'
        AND table_name NOT ILIKE '%districts%'                                                                                  -- doesn't need to be included in final table
        AND table_name NOT ILIKE '%neighborhoods%'                                                                              -- doesn't need to be included in final table
        AND table_name NOT IN (SELECT table_name FROM excluded_tables_log)                                                      -- Exclude invalid tables
        AND table_name IN ('galleries', 'food_markets','long_term_listings', 'banks', 'bike_lanes', 'malls', 'museums', 'bike_lanes', 'dental_offices', 'bus_tram_stops' )  -- This line is just for testing purposes so can load a few tables at a time 
;
"""

with engine.begin() as conn:
    new_tables = [row[0] for row in conn.execute(text(valid_tables_sql))]                                                       # Get list of tables to process

print(f"✅ New tables to process: {new_tables}")

# Step 4: Insert only new tables
if new_tables:
    union_queries = []                                                                                                          # Reset union_queries for new tables
    for table in new_tables:
        union_queries.append(f"""
            SELECT 
                CONCAT(SUBSTRING('{table}' FROM 1 FOR 4), '-', t.id) AS poi_id,                                                   -- Unique poi_id created so no duplicates can happen in error
                t.name,
                '{table}' AS layer,
                t.district_id,
                t.district,
                t.neighborhood_id,
                t.neighborhood,
                t.latitude,
                t.longitude,
                ST_SetSRID(ST_GeomFromEWKT(t.geometry), 4326) AS geometry,                                                         -- Ensure geometry is in correct SRID
                (to_jsonb(t) - 'id' - 'name' - 'district_id' - 'neighborhood_id' - 'latitude' - 'longitude' - 'geometry') AS attributes     -- All other columns as attributes in JSONB
            FROM berlin_source_data.{table} t
        """)
    union_sql = " UNION ALL ".join(union_queries)                                                                                   # Combine all SELECTs into one UNION ALL query (one for each valid table)

    insert_sql = f"""
        WITH all_pois AS (
            {union_sql}
        ),
        unique_layers AS (
            SELECT DISTINCT layer FROM all_pois WHERE layer <> 'long_term_listings'                                                  -- So that it doesnt add the listing itself to the nearest_pois json doc
        ),
        pois_with_nearest AS ( 
            SELECT 
                ap.poi_id,
                ap.name,
                ap.layer,
                ap.district_id,
                ap.district,
                ap.neighborhood_id,
                ap.neighborhood,
                ap.latitude,
                ap.longitude,
                ap.geometry,
                ap.attributes,
            (
                    SELECT jsonb_object_agg(layername, nearestinfo)                                                                 -- Create jsonb object for nearest pois to the listing
                    FROM (
                        SELECT 
                            ul.layer AS layername,
                            (
                                SELECT jsonb_build_object(
                                    'id', p.poi_id,
                                    'name', p.name,
                                    'distance', ST_Distance(ap.geometry, p.geometry),
                                    'address', jsonb_build_object(
                                        'street', p.attributes->>'street',
                                        'housenumber', p.attributes->>'housenumber'
                                    )
                                )
                                FROM all_pois p
                                WHERE p.layer = ul.layer 
                                ORDER BY ap.geometry <-> p.geometry
                                LIMIT 1
                            ) AS nearestinfo
                        FROM unique_layers ul
                        WHERE ap.layer = 'long_term_listings' AND ul.layer <> 'long_term_listings'                                  -- Only add nearest pois for long_term_listings layer
                    ) AS layer_nearest
                ) AS nearest_pois
            FROM all_pois ap
        )
        INSERT INTO unified_pois 
            (poi_id, name, layer, district_id, district, neighborhood_id, neighborhood, 
             latitude, longitude, geometry, attributes, nearest_pois)
        SELECT 
            poi_id, name, layer, district_id, district, neighborhood_id, neighborhood,
            latitude, longitude, geometry, attributes, nearest_pois
        FROM pois_with_nearest;
    """
    

    with engine.begin() as conn:
        conn.execute(text(insert_sql))
        for table in new_tables:
            conn.execute(text(f"""
                INSERT INTO public.processed_tables_log (table_name)                                                                -- Create a list of tables that have been processed
                VALUES ('{table}') ON CONFLICT DO NOTHING;
                """))
                
    print(f"✅ Added {len(new_tables)} new tables to unified_pois.")
else:
    print(f"✅ Insert complete.")

# Step 3: Create spatial index
with engine.begin() as conn:
    conn.execute(text("""
        CREATE INDEX IF NOT EXISTS idx_poi_geom ON unified_pois                                                                     -- Create an INDEX so queries run faster on GIST
        USING GIST (geometry);
    """))
    
print("✅Spatial index created.")


✅ Validation query complete
✅ Unified table and log table ready
✅ New tables to process: ['banks', 'bike_lanes', 'food_markets', 'galleries', 'long_term_listings', 'malls', 'museums']
✅ Added 7 new tables to unified_pois.
✅Spatial index created.


### Check that the table looks correct - including the nearerst_pois

In [73]:
query = f"""
    SELECT distinct layer
    FROM unified_pois  
"""

# Execute the query
with engine.connect() as conn:
    df= pd.read_sql(text(query), conn)
    conn.commit()  # commit the transaction
df

Unnamed: 0,layer
0,bike_lanes
1,food_markets
2,galleries
3,malls
4,banks
5,long_term_listings
6,museums


### Check the nearest_pois is correct

In [74]:
query = f"""
SELECT poi_id ,nearest_pois
FROM unified_pois  
;
"""
poi_id_to_check = 'long-HAU_2950_165_12589'
# Execute the query
with engine.connect() as conn:
    df= pd.read_sql(text(query), conn)
    conn.commit()  # commit the transaction
df

# Select a specific POI by poi_id

row = df[df['poi_id'] == poi_id_to_check].iloc[0]  # get the first match

# Extract the JSON dict
nearest_json = row['nearest_pois']  

# Only keep id and name for each POI type
simplified_json = {
    key: {"id": value.get("id"), "name": value.get("name"), "distance": value.get("distance"), "address": value.get("address")}
    for key, value in nearest_json.items()
}

# Print
print(f"POI ID: {row['poi_id']}")
print(json.dumps(simplified_json, indent=2))

POI ID: long-HAU_2950_165_12589
{
  "banks": {
    "id": "bank-439234898",
    "name": "santander",
    "distance": 0.0832779671080533,
    "address": {
      "street": "b\u00f6lschestra\u00dfe",
      "housenumber": "63"
    }
  },
  "malls": {
    "id": "mall-26285059",
    "name": "Allende-Center",
    "distance": 0.12074375494790712,
    "address": {
      "street": "Pablo-Neruda-Stra\u00dfe",
      "housenumber": "2-4"
    }
  },
  "museums": {
    "id": "muse-738500475",
    "name": "heimatmuseum",
    "distance": 0.05624731648576706,
    "address": {
      "street": null,
      "housenumber": null
    }
  },
  "galleries": {
    "id": "gall-1051272400",
    "name": "ausstellungszentrum pyramide",
    "distance": 0.12685044474281532,
    "address": {
      "street": "riesaer stra\u00dfe",
      "housenumber": null
    }
  },
  "bike_lanes": {
    "id": "bike-way/1161732832",
    "name": "Gr\u00fcnheider Weg",
    "distance": 0.0003777766451616758,
    "address": {
      "street":

## Step 4 - Final code
    - using only 1 engine.begin() at the start to speed up queries
    - removed any unneeded text and code

In [82]:
with engine.begin() as conn:
# Step 1: Validation query for excluded tables
    validation_query = """
        DROP TABLE IF EXISTS excluded_tables_log CASCADE;

        -- Create the excluded_tables_log table
        CREATE TABLE public.excluded_tables_log (           -- Adding to public schema for now
            table_name VARCHAR(255),
            reason VARCHAR(500),
            exclusion_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        WITH all_tables AS (
            SELECT table_name
            FROM information_schema.tables
            WHERE table_schema = 'berlin_source_data'
            AND table_name NOT ILIKE '%districts%'          -- Must not be included in final table so doesnt need to be checked
            AND table_name NOT ILIKE '%neighborhoods%'      -- Must not be included in final table so doesnt need to be checked
        ),
        -- Standard schema for each table
        expected_schema AS (
            SELECT * FROM (VALUES
                ('id', 'character varying', NULL, 'PRIMARY KEY', 'NO'),
                ('district_id', 'character varying', NULL, 'FOREIGN KEY', 'NO'),
                ('name', 'character varying', NULL, NULL, 'YES'),
                ('latitude', 'numeric', NULL, NULL, 'YES'),
                ('longitude', 'numeric', NULL, NULL, 'YES'),
                ('neighborhood', 'character varying', NULL, NULL, 'YES'),
                ('district', 'character varying', NULL, NULL, 'YES'),
                ('neighborhood_id', 'character varying', NULL, NULL, 'YES')
            ) AS t(column_name, expected_type, expected_length, constraint_type, is_nullable)
        ),
        -- Checks all columns exist
        missing_columns AS (
            SELECT tc.table_name,
                STRING_AGG(DISTINCT col, ', ' ORDER BY col) AS missing_cols
            FROM all_tables tc
            CROSS JOIN (VALUES 
                ('id'), ('name'), ('district_id'), ('district'),
                ('latitude'), ('longitude'), ('neighborhood_id'),
                ('neighborhood'), ('geometry')
            ) AS req_cols(col)
            LEFT JOIN information_schema.columns c 
                ON tc.table_name = c.table_name 
                AND c.table_schema = 'berlin_source_data'
                AND LOWER(c.column_name) = req_cols.col
            WHERE c.column_name IS NULL
            GROUP BY tc.table_name
            HAVING COUNT(*) > 0
        ),
        -- Checks all data types correct and creates and adds an error to the excluded_tables_log table when incorrect
        datatype_issues AS (
            SELECT 
                c.table_name,
                'Expected data type ' || e.expected_type || ', got ' || c.data_type AS reason
            FROM information_schema.columns c
            JOIN expected_schema e 
                ON LOWER(c.column_name) = e.column_name
            WHERE c.table_schema = 'berlin_source_data'
            AND (
                    c.data_type <> e.expected_type
                    OR (c.data_type = 'numeric' AND (c.numeric_precision || ',' || c.numeric_scale) <> e.expected_length)
            )
        ),
        -- Checks all columns that cannot contain NULL are correct and adds an error to the excluded_tables_log table when incorrect
        null_issues AS (
            SELECT c.table_name,
                'Column ' || c.column_name || ' allows NULL, expected NOT NULL' AS reason
            FROM information_schema.columns c
            JOIN expected_schema e 
                ON LOWER(c.column_name) = e.column_name
            WHERE c.table_schema = 'berlin_source_data'
            AND e.is_nullable = 'NO'
            AND c.is_nullable = 'YES'
        ),
        -- Checks primary key on id column and creates and adds an error to the excluded_tables_log table when incorrect
        pk_issues AS (
            SELECT t.table_name,
                'Missing PRIMARY KEY on id column' AS reason
            FROM all_tables t
            WHERE NOT EXISTS (
                SELECT 1
                FROM information_schema.table_constraints tc
                JOIN information_schema.key_column_usage kcu
                    ON kcu.constraint_name = tc.constraint_name
                    AND kcu.table_name = tc.table_name
                WHERE tc.table_schema = 'berlin_source_data'
                AND tc.table_name = t.table_name
                AND tc.constraint_type = 'PRIMARY KEY'
                AND LOWER(kcu.column_name) = 'id'
            )
        ),
        -- Checks foreign key on district_id column and creates and adds an error to the excluded_tables_log table when incorrect
        fk_issues AS (
            SELECT t.table_name,
                'Missing or incorrect foreign key on district_id' AS reason
            FROM all_tables t
            WHERE NOT EXISTS (
                SELECT 1
                FROM information_schema.table_constraints tc
                WHERE tc.table_schema = 'berlin_source_data'
                AND tc.table_name = t.table_name
                AND tc.constraint_type = 'FOREIGN KEY'
            )
        ),
        -- Combines all invalid tables into one
        invalid_tables AS (
            SELECT table_name, 'Missing columns: ' || missing_cols AS reason FROM missing_columns
            UNION ALL SELECT table_name, reason FROM datatype_issues
            UNION ALL SELECT table_name, reason FROM null_issues
            UNION ALL SELECT table_name, reason FROM pk_issues
            UNION ALL SELECT table_name, reason FROM fk_issues
        )
        -- Inserts all invalid tables into excluded_tables_log
        INSERT INTO excluded_tables_log (table_name, reason)
        SELECT table_name, reason FROM invalid_tables;
"""

    conn.execute(text(validation_query))
    print("✅ Validation query complete")

# Step 2: Drop unified_pois and processed_tables_log if they exist, then create them again from scratch

    conn.execute(text("DROP TABLE IF EXISTS unified_pois CASCADE;"))
    conn.execute(text("DROP TABLE IF EXISTS processed_tables_log CASCADE;"))

    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS public.unified_pois (                                                                    -- Adding to public schema for now
            poi_id VARCHAR(50) PRIMARY KEY,
            name VARCHAR(200),
            layer VARCHAR(100),
            district_id VARCHAR(20),
            district VARCHAR(100),
            neighborhood_id VARCHAR(20),
            neighborhood VARCHAR(100),
            latitude DECIMAL(9,6),
            longitude DECIMAL(9,6),
            geometry GEOMETRY, 
            attributes JSONB,
            nearest_pois JSONB
        );
    """))
    conn.execute(text("""                                                                                                   
        CREATE TABLE IF NOT EXISTS public.processed_tables_log (                                                            -- Adding to public schema for now (Creates log of processed tables)
            table_name VARCHAR(255) PRIMARY KEY,
            processed_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
    """))
    print("✅ Unified table and log table ready")

    # Step 3: Get new tables to process
    valid_tables_sql = """
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema = 'berlin_source_data'
            AND table_name NOT ILIKE '%districts%'                                                                          -- doesn't need to be included in final table
            AND table_name NOT ILIKE '%neighborhoods%'                                                                      -- doesn't need to be included in final table
            AND table_name NOT IN (SELECT table_name FROM excluded_tables_log)                                              -- Exclude invalid tables
            AND table_name IN ('galleries', 'food_markets','long_term_listings', 'banks', 'malls', 'museums', 'dental_offices', 'bus_tram_stops' )  -- This line is just for testing purposes so can load a few tables at a time , 'bike_lanes'
    ;
    """

    new_tables = [row[0] for row in conn.execute(text(valid_tables_sql))]                                                    # Get list of tables to process
    print(f"✅ New tables to process: {new_tables}")

    # Step 4: Insert only new tables
    if new_tables:
        union_queries = []                                                                                                   # Reset union_queries for new tables
        for table in new_tables:
            union_queries.append(f"""
                SELECT 
                    CONCAT(SUBSTRING('{table}' FROM 1 FOR 4), '-', t.id) AS poi_id,                                          -- Unique poi_id created so no duplicates can happen in error
                    t.name,
                    '{table}' AS layer,
                    t.district_id,
                    t.district,
                    t.neighborhood_id,
                    t.neighborhood,
                    t.latitude,
                    t.longitude,
                    ST_SetSRID(ST_GeomFromEWKT(t.geometry), 4326) AS geometry,                                                -- Ensure geometry is in correct SRID
                    (to_jsonb(t) - 'id' - 'name' - 'district_id' - 'neighborhood_id' - 'latitude' - 'longitude' - 'geometry') AS attributes     -- All other columns as attributes in JSONB
                FROM berlin_source_data.{table} t
            """)
        union_sql = " UNION ALL ".join(union_queries)                                                                          # Combine all SELECTs into one UNION ALL query (one for each valid table)

        insert_sql = f"""
            WITH all_pois AS (
                {union_sql}
            ),
            unique_layers AS (
                SELECT DISTINCT layer FROM all_pois WHERE layer <> 'long_term_listings'                                          -- Won't add the listing itself to the nearest_pois json doc
            ),
            pois_with_nearest AS ( 
                SELECT 
                    ap.poi_id,
                    ap.name,
                    ap.layer,
                    ap.district_id,
                    ap.district,
                    ap.neighborhood_id,
                    ap.neighborhood,
                    ap.latitude,
                    ap.longitude,
                    ap.geometry,
                    ap.attributes,
                (
                    SELECT jsonb_object_agg(layername, nearestinfo)                                                               -- Create jsonb object for nearest pois to the listing
                    FROM (
                        SELECT 
                            ul.layer AS layername,
                            (
                                SELECT jsonb_build_object(
                                    'id', p.poi_id,
                                    'name', p.name,
                                    'distance', ST_Distance(ap.geometry, p.geometry),
                                    'address', jsonb_build_object(
                                        'street', p.attributes->>'street',
                                        'housenumber', p.attributes->>'housenumber'
                                    )
                                )
                                FROM all_pois p
                                WHERE p.layer = ul.layer 
                                ORDER BY ap.geometry <-> p.geometry
                                LIMIT 1
                            ) AS nearestinfo
                        FROM unique_layers ul
                        WHERE ap.layer = 'long_term_listings' AND ul.layer <> 'long_term_listings'                                -- Only add nearest pois for long_term_listings layer
                    ) AS layer_nearest
                ) AS nearest_pois
            FROM all_pois ap
        )
        INSERT INTO unified_pois 
            (poi_id, name, layer, district_id, district, neighborhood_id, neighborhood, 
             latitude, longitude, geometry, attributes, nearest_pois)
        SELECT 
            poi_id, name, layer, district_id, district, neighborhood_id, neighborhood,
            latitude, longitude, geometry, attributes, nearest_pois
        FROM pois_with_nearest;
    """
    
    conn.execute(text(insert_sql))

    for table in new_tables:
        conn.execute(text(f"""
            INSERT INTO public.processed_tables_log (table_name)                                                                    -- Create a list of tables that have been processed
            VALUES ('{table}') ON CONFLICT DO NOTHING;
        """))
                    
        print(f"✅ Adding {table} to unified_pois.")
    else:
        print(f"✅ Insert complete.")

    # Step 3: Create spatial index
    conn.execute(text("""
        CREATE INDEX IF NOT EXISTS idx_poi_geom ON unified_pois                                                                   -- Create an INDEX so queries run faster on GIST
        USING GIST (geometry);
    """))
        
    print("✅Spatial index created.")


✅ Validation query complete
✅ Unified table and log table ready
✅ New tables to process: ['banks', 'bus_tram_stops', 'dental_offices', 'food_markets', 'galleries', 'long_term_listings', 'malls', 'museums']
✅ Adding banks to unified_pois.
✅ Adding bus_tram_stops to unified_pois.
✅ Adding dental_offices to unified_pois.
✅ Adding food_markets to unified_pois.
✅ Adding galleries to unified_pois.
✅ Adding long_term_listings to unified_pois.
✅ Adding malls to unified_pois.
✅ Adding museums to unified_pois.
✅ Insert complete.
✅Spatial index created.


## Show excluded table logs

In [83]:
query = f"""
    SELECT *
    FROM excluded_tables_log
"""

# Execute the query
with engine.connect() as conn:
    df= pd.read_sql(text(query), conn)
    conn.commit()  # commit the transaction
df

Unnamed: 0,table_name,reason,exclusion_date
0,parking_spaces,"Missing columns: latitude, longitude",2025-11-26 11:28:11.275156
1,veterinary_clinics,"Column district_id allows NULL, expected NOT NULL",2025-11-26 11:28:11.275156
2,schools,"Column district_id allows NULL, expected NOT NULL",2025-11-26 11:28:11.275156


### Notes on the missing data

- parking_spaces has many types of geometries, maybe we can ask Dido to adjust her script as looking at it scares me
- schools - district_id column has NULLS so cant add constraint
- vets - district_id column has NULLS so cant add constraint



## Show processed table logs


In [84]:
query = f"""
    SELECT *
    FROM processed_tables_log
"""

# Execute the query
with engine.connect() as conn:
    df= pd.read_sql(text(query), conn)
    conn.commit()  # commit the transaction
df

Unnamed: 0,table_name,processed_date
0,banks,2025-11-26 11:28:11.275156
1,bus_tram_stops,2025-11-26 11:28:11.275156
2,dental_offices,2025-11-26 11:28:11.275156
3,food_markets,2025-11-26 11:28:11.275156
4,galleries,2025-11-26 11:28:11.275156
5,long_term_listings,2025-11-26 11:28:11.275156
6,malls,2025-11-26 11:28:11.275156
7,museums,2025-11-26 11:28:11.275156


### Example of what the Union query looks like

In [85]:
print(union_sql)


                SELECT 
                    CONCAT(SUBSTRING('banks' FROM 1 FOR 4), '-', t.id) AS poi_id,                                          -- Unique poi_id created so no duplicates can happen in error
                    t.name,
                    'banks' AS layer,
                    t.district_id,
                    t.district,
                    t.neighborhood_id,
                    t.neighborhood,
                    t.latitude,
                    t.longitude,
                    ST_SetSRID(ST_GeomFromEWKT(t.geometry), 4326) AS geometry,                                                -- Ensure geometry is in correct SRID
                    (to_jsonb(t) - 'id' - 'name' - 'district_id' - 'neighborhood_id' - 'latitude' - 'longitude' - 'geometry') AS attributes     -- All other columns as attributes in JSONB
                FROM berlin_source_data.banks t
             UNION ALL 
                SELECT 
                    CONCAT(SUBSTRING('bus_tram_stops' FROM 1 FOR 4), 