# Create the table of final layers tables

### Insert the libraries and connection

In [None]:
# Import Librabies
import osmnx as ox # to fetch data from OpenStreetMap
import geopandas as gpd # to work with geospatial data
import pandas as pd
from sqlalchemy import create_engine, text
import warnings
import json

warnings.filterwarnings("ignore")

# Credentials
user_name='postgres'
password='8006'

# Conection
host = 'localhost'
port = '5432'
database = 'postgres'
#schema='berlin_source_data'

# Connection to DB
engine = create_engine(f'postgresql+psycopg2://{user_name}:{password}@{host}:{port}/{database}')

### Create the table
    - This table is used to add each new layer in the future so the script can get the info to get from OSM

In [None]:
create_table = """
    DROP TABLE IF EXISTS berlin_unified.berlin_layers_final_tables CASCADE;

    CREATE TABLE berlin_unified.berlin_layers_final_tables (
        id VARCHAR PRIMARY KEY,
        table_name VARCHAR(255) NOT NULL UNIQUE,
        osm_tags JSON NOT NULL,
        columns JSON NOT NULL,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(create_table))

### Insert Rows
    - Each layer needs to include this code snippet in the final Notebook, once the table added to the berlin_source_data
    - The column names need to match what is on the OSM API
    - "id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry" must be included in column names even if the data isnt from OSM, will be added later in the script

- Blank Example

In [None]:
insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'unique id',
        'table name',
        '[{what ever tags used to create the table}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", plus any extra columns kept in final table]'::jsonb
    )    
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))

- Government Offices

In [None]:
insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'gov_25_12_04',
        'government_offices',
        '[{"office": ["government", "administrative"], "amenity": ["townhall", "public_building"], "office": "employment_agency"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "type", "addr:housenumber", "addr:street", "addr:postcode", "contact:phone", 
            "contact:email", "website", "opening_hours", "wheelchair"]'::jsonb
    )    
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))

- Banks

In [None]:
insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'ban_25_12_04',
        'banks',
        '[{"amenity": "bank"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "brand", "operator", "addr:street", "addr:housenumber", "addr:postcode", "opening_hours", "atm", "wheelchair"]'::jsonb
    )    
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))

- Galleries

In [None]:
insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'gal_25_12_04',
        'galleries',
        '[{"tourism": "gallery"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "addr:housenumber", "addr:street", "addr:postcode", "website", "opening_hours", "wheelchair", "fee"]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))

- Exhibition Centers

In [None]:

insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'exh_25_12_08',
        'exhibitions',
        '[{"amenity": "exhibition_centre"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "addr:housenumber", "addr:street", "addr:postcode", "website", "building", "wikipedia"]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))

- Gyms

In [None]:
insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'gym_25_12_08',
        'gyms',
        '[{"leisure":"fitness_centre", "sport":"yoga"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "addr:housenumber", "addr:street", "addr:postcode", "contact:phone", "email"]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))

- Hospitals

In [None]:
insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'hos_25_12_08',
        'hospitals',
        '[{ "healthcare": ["hospital","clinic"], "amenity": ["hospital","clinic"]}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "operator", "addr:street", "addr:housenumber", "addr:postcode", "contact:phone", "contact:email", "website", "wheelchair", "toilets:wheelchair", "emergency", "healthcare:speciality", "opening_hours", "amenity", "source", "healthcare"  ]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))

- Kindergartens

In [None]:
insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'kin_25_12_09',
        'kindergartens',
        '[{"amenity":"kindergarten" }]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry","addr:street", "addr:housenumber", "addr:postcode", "capacity", "operator"]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns
    ;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))

- Libraries

In [None]:
insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'lib_25_12_09',
        'libraries',
        '[{ "amenity": "library"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", 
        "operator:type", "operator", "addr:street", "addr:housenumber", "addr:postcode", "email", "contact:phone", "website", "opening_hours", "wheelchair", "toilets:wheelchair", "internet_access"]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""

with engine.begin() as conn:
    conn.execute(text(insert_rows))


- Malls

In [None]:
insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'mall_25_12_09',
        'malls',
        '[{"shop":"mall" }]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "website", "opening_hours", "addr:street", "addr:housenumber", "addr:postcode", "wheelchair"]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))

- Museums


In [None]:
insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'mus_25_12_09',
        'museums',
        '[{"tourism": "museum"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "addr:housenumber", "addr:street", "addr:postcode", "website", "contact:phone", "type", "operator", "building", "wikipedia", "opening_hours", "wheelchair", "toilets:wheelchair"]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))



- Public artworks

In [None]:
insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'art_25_12_09',
        'public_artworks',
        '[{"tourism": "artwork"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "artwork_type", "artist_name", "addr:street", "addr:postcode", "material", "start_date", "image", "wikidata", "wikimedia_commons"]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))



- Nightclubs

In [None]:
insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'nig_25_12_09',
        'nightclubs',
        '[{"amenity":"nightclub"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry",  "addr:postcode", "addr:street", "addr:housenumber", "contact:phone", "contact:email", "contact:website", "opening_hours", "wheelchair", "toilets:wheelchair", "wheelchair:description", "live_music"]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))



- Parks

In [None]:

insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'par_25_12_09',
        'parks',
        '[{ "leisure": ["park", "garden", "nature_reserve"], "landuse": "recreation_ground", "boundary": "national_park"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "addr:housenumber","addr:postcode" , "addr:street", "area" ]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))

- Playgrounds

In [None]:
insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'play_25_12_09',
        'playgrounds',
        '[{"leisure": "playground"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "addr:housenumber","addr:postcode" , "addr:street", "area"  ]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))


- Pharmacies

In [None]:

insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'pha_25_12_09',
        'pharmacies',
        '[{"amenity": "pharmacy"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "addr:street", "addr:housenumber", "addr:postcode", "contact:phone", "opening_hours", "website", "health_facility:type", "wheelchair" ]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))

- Religious Institutions

In [22]:

insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'rel_25_12_09',
        'religious_institutions',
        '[{"amenity": "place_of_worship"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "religion", "denomination", "type", "addr:street", "addr:housenumber", "addr:postcode", "contact:phone", "contact:email", "contact:website", "wheelchair", "service_times:en", "opening_hours", "heritage", "historic", "wikidata", "wikipedia" ]'::jsonb
    );
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))



- Social Clubs

In [None]:

insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'soc_25_12_09',
        'social_clubs',
        '[{
    "amenity": [
        "community_centre", "arts_centre", "social_centre", 
        "youth_centre", "social_club", "music_school","events_venue",
        "music_venue", 
        "dojo", "dancing_school","studio",
        "theatre"
    ],
    "leisure": [
       "sports_centre", "fitness_centre", "dance", 
        "hackerspace", "music_venue", "garden"
    ]
}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "club", "leisure", "sport", "amenity", "addr:street", "addr:housenumber", "addr:postcode", "contact:website", "contact:phone", "contact:email", "opening_hours", "wheelchair" ]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))



- Supermarkets

In [None]:

insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'sup_25_12_09',
        'supermarkets',
        '[{"shop": "supermarket"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "addr:street","addr:housenumber","addr:postcode","opening_hours","brand","type","payment_credit_card","payment_debit_cards","payment_cash","payment_contactless","wheelchair","internet_access","contact:phone","contact:email","contact:website" ]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))



- Theaters

In [None]:

insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'the_25_12_09',
        'theatres',
        '[{"amenity": ["theatre", "cinema"]}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "type", "operator", "opening_hours","wheelchair","screen","contact:website","contact:phone","contact:email","addr:street","addr:housenumber","addr:postcode","amenity","cinema:type" ]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))



- Venues

In [None]:
insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'ven_25_12_09',
        'venues',
        '[{"amenity": ["restaurant", "cafe", "bar"]}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "amenity","cuisine","contact:phone","addr:housenumber","addr:street","addr:postcode","contact:website","opening_hours","takeaway","wheelchair" ]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))


- Vets

In [None]:

insert_rows = """
    INSERT INTO berlin_unified.berlin_layers_final_tables (id,table_name, osm_tags, columns)
    VALUES (
        'vet_25_12_09',
        'veterinaries',
        '[{"amenity":"veterinary"}]'::jsonb,
        '["id", "name", "latitude", "longitude", "district", "district_id", "neighborhood_id", "neighborhood", "geometry", "addr:street", "addr:housenumber", "addr:postcode", "contact:phone", "website", "contact:email", "opening_hours", "operator", "wheelchair", "emergency" ]'::jsonb
    )
    ON CONFLICT (table_name) 
    DO UPDATE SET
        id = EXCLUDED.id,
        osm_tags = EXCLUDED.osm_tags,
        columns = EXCLUDED.columns;
"""
with engine.begin() as conn:
    conn.execute(text(insert_rows))



---

## Create table using table_name, tags and column names given by intern in code above

1) Import Libraries

In [None]:
import psycopg2
import requests
import osmnx as ox # to fetch data from OpenStreetMap
from shapely.geometry import Point
from shapely import wkt
import geopandas as gpd # to work with geospatial data
import pandas as pd
from sqlalchemy import create_engine, text
import warnings
import json


2) Create the connections

In [30]:
# --------------------------------------------------
# DB CONNECTION
# --------------------------------------------------
def get_db_connection():
    return psycopg2.connect(
        host="localhost",
        port = '5432',
        database='postgres',
        user='postgres',
        password='8006'
    )

3) Normalise the tags

In [31]:
# --------------------------------------------------
# NORMALIZE OSM TAGS (MOST IMPORTANT FIX)
# --------------------------------------------------
def normalize_osm_tags(raw_tags):
    """
    Convert DB JSON (often a list of dicts) into a dict compatible with OSMnx.
    
    Example input:
        [{"office": "government"}, {"office": "administrative"}, {"amenity": "townhall"}]
    
    Output:
        {"office": ["government", "administrative"], "amenity": "townhall"}
    """

    # Parse JSON string if necessary
    if isinstance(raw_tags, str):
        raw_tags = json.loads(raw_tags)

    # If already a dict, return as-is
    if isinstance(raw_tags, dict):
        return raw_tags

    # Expected case: list of dicts
    merged = {}
    if isinstance(raw_tags, list):
        for item in raw_tags:
            if not isinstance(item, dict):
                continue
            for key, val in item.items():
                if key not in merged:
                    merged[key] = []
                merged[key].append(val)

    # Convert lists with only 1 value to simple strings
    for key in merged:
        if len(merged[key]) == 1:
            merged[key] = merged[key][0]

    return merged

4) Fetch the layers configs

In [40]:
# --------------------------------------------------
# FETCH LAYER CONFIGS
# --------------------------------------------------
def get_layer_configs():
    """Fetch all layer configurations - NO AIRFLOW"""
    conn = get_db_connection()
    cursor = conn.cursor()
    
    sql = """
        SELECT table_name, osm_tags, columns
        FROM berlin_unified.berlin_layers_final_tables
        ORDER BY id;
    """
    
    cursor.execute(sql)
    results = cursor.fetchall()
    
    layer_configs = []
    for row in results:
        table_name, osm_tags, columns = row
        layer_configs.append({
            'table_name': table_name,
            'osm_tags': osm_tags,
            'columns': columns
        })
    
    cursor.close()
    conn.close()
    
    return layer_configs

configs = get_layer_configs()
for config in configs:
    print(config)


{'table_name': 'public_artworks', 'osm_tags': [{'tourism': 'artwork'}], 'columns': ['id', 'name', 'latitude', 'longitude', 'district', 'district_id', 'neighborhood_id', 'neighborhood', 'geometry', 'artwork_type', 'artist_name', 'addr:street', 'addr:postcode', 'material', 'start_date', 'image', 'wikidata', 'wikimedia_commons']}
{'table_name': 'banks', 'osm_tags': [{'amenity': 'bank'}], 'columns': ['id', 'name', 'latitude', 'longitude', 'district', 'district_id', 'neighborhood_id', 'neighborhood', 'geometry', 'brand', 'operator', 'addr:street', 'addr:housenumber', 'addr:postcode', 'opening_hours', 'atm', 'wheelchair']}
{'table_name': 'exhibitions', 'osm_tags': [{'amenity': 'exhibition_centre'}], 'columns': ['id', 'name', 'latitude', 'longitude', 'district', 'district_id', 'neighborhood_id', 'neighborhood', 'geometry', 'addr:housenumber', 'addr:street', 'addr:postcode', 'website', 'building', 'wikipedia']}
{'table_name': 'galleries', 'osm_tags': [{'tourism': 'gallery'}], 'columns': ['id',

5) Create the table

In [41]:
# --------------------------------------------------
# CREATE TABLE
# --------------------------------------------------
def create_table_for_layer(table_name, columns):
    """Create table - NO AIRFLOW"""
    conn = get_db_connection()
    cursor = conn.cursor()
    
    column_defs = []
    for col in columns:
        if col == 'latitude':
            column_defs.append(f"\"{col}\" DECIMAL(9,6)")
        elif col == 'longitude':
            column_defs.append(f"\"{col}\" DECIMAL(9,6)")
        else:
            column_defs.append(f"\"{col}\" VARCHAR")
    
    # Add NOT NULL + PK
    if "id" in columns:
        idx = columns.index("id")
        column_defs[idx] = column_defs[idx] + " NOT NULL PRIMARY KEY"

    if "name" in columns:
        idx = columns.index("name")
        column_defs[idx] = column_defs[idx] + " NOT NULL"

    if "district_id" in columns:
        idx = columns.index("district_id")
        column_defs[idx] = column_defs[idx] + " NOT NULL"

    # Add foreign key constraint
    if "district_id" in columns:
        column_defs.append(
            "FOREIGN KEY (\"district_id\") REFERENCES public.districts(district_id)"
        )
        
    create_table_sql = f"""
        DROP TABLE IF EXISTS berlin_data.{table_name};
        CREATE TABLE IF NOT EXISTS berlin_data.{table_name} (
            {', '.join(column_defs)}
        );
    """
    
    cursor.execute(create_table_sql)
    conn.commit()
    cursor.close()
    conn.close()
    print(f"✅ Table {table_name} created successfully")

# Create all tables  
for config in configs:
    create_table_for_layer(config['table_name'], config['columns'])

✅ Table public_artworks created successfully
✅ Table banks created successfully
✅ Table exhibitions created successfully
✅ Table galleries created successfully
✅ Table government_offices created successfully
✅ Table gyms created successfully
✅ Table hospitals created successfully
✅ Table kindergartens created successfully
✅ Table libraries created successfully
✅ Table malls created successfully
✅ Table museums created successfully
✅ Table nightclubs created successfully
✅ Table parks created successfully
✅ Table pharmacies created successfully
✅ Table playgrounds created successfully
✅ Table religious_institutions created successfully
✅ Table social_clubs created successfully
✅ Table supermarkets created successfully
✅ Table theatres created successfully
✅ Table venues created successfully
✅ Table veterinaries created successfully


6) Load the OSM data into each table

In [47]:
# --------------------------------------------------
# LOAD OSM DATA INTO TABLE
# --------------------------------------------------
def load_osm_data_into_table(table_name, osm_tags, columns,
                             districts_geojson_path="/Users/biancaniemann/Documents/Webeet/Python/create_layers/sources/geojson_files/lor_ortsteile.geojson"):
    """Fetch OSM data from OSM API (OSMnx), enrich with location attributes,
    and insert into the database."""

    print(f"Fetching OSM data for {table_name} ...")
    gdf = ox.features_from_place("Berlin, Germany", osm_tags)

    if gdf.empty:
        print(f"⚠️ No OSM data found for {table_name}")
        return

    # --- Prepare ID ---
    gdf = gdf.reset_index()
    gdf["id"] = gdf["id"].astype(str)
    gdf = gdf.drop(columns=["element"], errors="ignore")

    # ---- CRS and geometry cleanup ----
    gdf = gdf.to_crs(epsg=4326)

    # ensure geometry is always a point
    gdf["geometry"] = gdf["geometry"].apply(
        lambda geom: geom if geom and geom.geom_type == "Point"
        else geom.representative_point() if geom
        else None
    )

    # Latitude / longitude
    gdf["latitude"] = gdf.geometry.y
    gdf["longitude"] = gdf.geometry.x

    # Replace missing names
    if "name" in gdf.columns:
        gdf["name"] = gdf["name"].fillna("unknown")

    # ---- District + Neighborhood (Spatial Join) ----
    districts_gdf = gpd.read_file(districts_geojson_path).to_crs(epsg=4326)

    gdf = gpd.sjoin(
        gdf,
        districts_gdf[["BEZIRK", "spatial_name", "geometry"]],
        how="left",
        predicate="within",
    ).rename(columns={
        "BEZIRK": "district",
        "spatial_name": "neighborhood_id",
    }).drop(columns=["index_right"], errors="ignore")

    # ---- Remove rows where district is null ----
    initial_count = len(gdf)
    gdf = gdf[gdf["district"].notna()]
    removed_count = initial_count - len(gdf)
    if removed_count > 0:
        print(f"⚠️ Removed {removed_count} rows with no district match")

    # ---- District ID Mapping ----
    district_mapping = {
        'Mitte': '11001001',
        'Friedrichshain-Kreuzberg': '11002002',
        'Pankow': '11003003',
        'Charlottenburg-Wilmersdorf': '11004004',
        'Spandau': '11005005',
        'Steglitz-Zehlendorf': '11006006',
        'Tempelhof-Schöneberg': '11007007',
        'Neukölln': '11008008',
        'Treptow-Köpenick': '11009009',
        'Marzahn-Hellersdorf': '11010010',
        'Lichtenberg': '11011011',
        'Reinickendorf': '11012012'
    }
    gdf["district_id"] = gdf["district"].map(district_mapping).astype(str)

     # ---- Fetch neighborhood names from database ----
    conn = get_db_connection()
    cursor = conn.cursor()
    
    cursor.execute("""
        SELECT neighborhood_id, neighborhood
        FROM berlin_data.neighborhoods
    """)
    neighborhood_dict = {row[0]: row[1] for row in cursor.fetchall()}
    
    # Map neighborhood names using the neighborhood_id
    gdf["neighborhood"] = gdf["neighborhood_id"].map(neighborhood_dict)
    
    cursor.close()

    # ---- Ensure all expected columns exist ----
    for col in columns:
        if col not in gdf.columns:
            gdf[col] = None

    # Convert geometry to WKT for PostGIS insertion
    gdf["geometry"] = gdf["geometry"].apply(lambda geom: geom.wkt if geom is not None else None)

    # ---- Insert into DB ----
    conn = get_db_connection()
    cursor = conn.cursor()

    # Quote column names because of ":" and uppercase names
    col_list_str = ", ".join([f"\"{col}\"" for col in columns])
    placeholders = ", ".join(["%s"] * len(columns))

    insert_sql = f"""
        INSERT INTO berlin_data.{table_name} ({col_list_str})
        VALUES ({placeholders})
        ON CONFLICT DO NOTHING;
    """

    print(f"Inserting records into {table_name} ...")

    for _, row in gdf.iterrows():
        values = [row[col] for col in columns]
        cursor.execute(insert_sql, values)

    conn.commit()
    cursor.close()
    conn.close()

    print(f"✅ Inserted {len(gdf)} records into {table_name}")
   

7) Run the loaded OSM above

In [48]:
# --------------------------------------------------
# RUN OSM LOADING
# --------------------------------------------------
for config in configs:
    table = config["table_name"]
    raw_tags = config["osm_tags"]
    columns = config["columns"]

    osm_tags = normalize_osm_tags(raw_tags)

    print(f"Using normalized tags for {table}: {osm_tags}")

    load_osm_data_into_table(table, osm_tags, columns)

Using normalized tags for public_artworks: {'tourism': 'artwork'}
Fetching OSM data for public_artworks ...
Inserting records into public_artworks ...
✅ Inserted 2625 records into public_artworks
Using normalized tags for banks: {'amenity': 'bank'}
Fetching OSM data for banks ...
Inserting records into banks ...
✅ Inserted 324 records into banks
Using normalized tags for exhibitions: {'amenity': 'exhibition_centre'}
Fetching OSM data for exhibitions ...
Inserting records into exhibitions ...
✅ Inserted 4 records into exhibitions
Using normalized tags for galleries: {'tourism': 'gallery'}
Fetching OSM data for galleries ...
Inserting records into galleries ...
✅ Inserted 324 records into galleries
Using normalized tags for government_offices: {'office': 'employment_agency', 'amenity': ['townhall', 'public_building']}
Fetching OSM data for government_offices ...
Inserting records into government_offices ...
✅ Inserted 85 records into government_offices
Using normalized tags for gyms: {'s

---

# Below is the DAG tasks broken down into 3 seperate sections for testing purposes

### Create unified table

In [57]:
# Create the excluded_table_logs table
task_1 = """
 DROP TABLE IF EXISTS berlin_unified.excluded_tables_log CASCADE;

        -- Create the excluded_tables_log table
        CREATE TABLE berlin_unified.excluded_tables_log (           -- Adding to the berlin_unified schema
            table_name VARCHAR(255),
            reason VARCHAR(500),
            exclusion_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

        WITH all_tables AS (
            SELECT table_name
            FROM information_schema.tables
            WHERE table_schema = 'berlin_data'           -- Source schema chnage to 'berlin_source_data' 
                AND table_name NOT ILIKE '%districts%'          -- Must not be included in final table so doesnt need to be checked
                AND table_name NOT ILIKE '%neighborhoods%'      -- Must not be included in final table so doesnt need to be checked
                AND table_name NOT ILIKE '%berlin_layers_final_tables%' 
        ),
        -- Standard schema for each table
        expected_schema AS (
            SELECT * FROM (VALUES
                ('id', 'character varying', NULL, 'PRIMARY KEY', 'NO'),
                ('district_id', 'character varying', NULL, 'FOREIGN KEY', 'NO'),
                ('name', 'character varying', NULL, NULL, 'YES'),
                ('latitude', 'numeric', NULL, NULL, 'YES'),
                ('longitude', 'numeric', NULL, NULL, 'YES'),
                ('neighborhood', 'character varying', NULL, NULL, 'YES'),
                ('district', 'character varying', NULL, NULL, 'YES'),
                ('neighborhood_id', 'character varying', NULL, NULL, 'YES')
            ) AS t(column_name, expected_type, expected_length, constraint_type, is_nullable)
        ),
        -- Checks all columns exist
        missing_columns AS (
            SELECT tc.table_name,
                STRING_AGG(DISTINCT col, ', ' ORDER BY col) AS missing_cols
            FROM all_tables tc
            CROSS JOIN (VALUES 
                ('id'), ('name'), ('district_id'), ('district'),
                ('latitude'), ('longitude'), ('neighborhood_id'),
                ('neighborhood'), ('geometry')
            ) AS req_cols(col)
            LEFT JOIN information_schema.columns c 
                ON tc.table_name = c.table_name 
                AND c.table_schema = 'berlin_data'
                AND LOWER(c.column_name) = req_cols.col
            WHERE c.column_name IS NULL
            GROUP BY tc.table_name
            HAVING COUNT(*) > 0
        ),
        -- Checks all data types correct and creates and adds an error to the excluded_tables_log table when incorrect
        datatype_issues AS (
            SELECT 
                c.table_name,
                'Expected data type ' || e.expected_type || ', got ' || c.data_type AS reason
            FROM information_schema.columns c
            JOIN expected_schema e 
                ON LOWER(c.column_name) = e.column_name
            WHERE c.table_schema = 'berlin_data'
            AND (
                    c.data_type <> e.expected_type
                    OR (c.data_type = 'numeric' AND (c.numeric_precision || ',' || c.numeric_scale) <> e.expected_length)
            )
        ),
        -- Checks all columns that cannot contain NULL are correct and adds an error to the excluded_tables_log table when incorrect
        null_issues AS (
            SELECT c.table_name,
                'Column ' || c.column_name || ' allows NULL, expected NOT NULL' AS reason
            FROM information_schema.columns c
            JOIN expected_schema e 
                ON LOWER(c.column_name) = e.column_name
            WHERE c.table_schema = 'berlin_data'
            AND e.is_nullable = 'NO'
            AND c.is_nullable = 'YES'
        ),
        -- Checks primary key on id column and creates and adds an error to the excluded_tables_log table when incorrect
        pk_issues AS (
            SELECT t.table_name,
                'Missing PRIMARY KEY on id column' AS reason
            FROM all_tables t
            WHERE NOT EXISTS (
                SELECT 1
                FROM information_schema.table_constraints tc
                JOIN information_schema.key_column_usage kcu
                    ON kcu.constraint_name = tc.constraint_name
                    AND kcu.table_name = tc.table_name
                WHERE tc.table_schema = 'berlin_data'
                AND tc.table_name = t.table_name
                AND tc.constraint_type = 'PRIMARY KEY'
                AND LOWER(kcu.column_name) = 'id'
            )
        ),
        -- Checks foreign key on district_id column and creates and adds an error to the excluded_tables_log table when incorrect
        fk_issues AS (
            SELECT t.table_name,
                'Missing or incorrect foreign key on district_id' AS reason
            FROM all_tables t
            WHERE NOT EXISTS (
                SELECT 1
                FROM information_schema.table_constraints tc
                WHERE tc.table_schema = 'berlin_data'
                AND tc.table_name = t.table_name
                AND tc.constraint_type = 'FOREIGN KEY'
            )
        ),
        -- Combines all invalid tables into one
        invalid_tables AS (
            SELECT table_name, 'Missing columns: ' || missing_cols AS reason FROM missing_columns
            UNION ALL SELECT table_name, reason FROM datatype_issues
            UNION ALL SELECT table_name, reason FROM null_issues
            UNION ALL SELECT table_name, reason FROM pk_issues
            UNION ALL SELECT table_name, reason FROM fk_issues
        )
        -- Inserts all invalid tables into excluded_tables_log
        INSERT INTO berlin_unified.excluded_tables_log (table_name, reason)
        SELECT table_name, reason FROM invalid_tables;
        ;
    """
with engine.begin() as conn:
    conn.execute(text(task_1))

In [58]:
# Create the unified_pois table with nearest POIs
task_2 = """
        DROP TABLE IF EXISTS berlin_unified.unified_pois CASCADE;
        DROP TABLE IF EXISTS berlin_unified.processed_tables_log CASCADE;

        CREATE TABLE IF NOT EXISTS berlin_unified.unified_pois (                           -- Adding to berlin_unified schema 
            poi_id VARCHAR(50) PRIMARY KEY,
            name VARCHAR(200),
            layer VARCHAR(100),
            district_id VARCHAR(20),
            district VARCHAR(100),
            neighborhood_id VARCHAR(20),
            neighborhood VARCHAR(100),
            latitude DECIMAL(9,6),
            longitude DECIMAL(9,6),
            geometry GEOMETRY, 
            attributes JSONB,
            nearest_pois JSONB
        );

        CREATE TABLE IF NOT EXISTS berlin_unified.processed_tables_log (                  -- Adding to berlin_unified schema (Creates log of processed tables)
            table_name VARCHAR(255) PRIMARY KEY,
            processed_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );

    ;
    """
with engine.begin() as conn:
    conn.execute(text(task_2))

In [59]:
# Insert the rows into the unified_pois table
task_3 = """
 DO $$
    DECLARE
        union_sql TEXT;
        final_sql TEXT;
    BEGIN
        -- Build the UNION ALL query dynamically from valid tables
        SELECT string_agg(
            format(
                $f$
                SELECT 
                    CONCAT(SUBSTRING('%s' FROM 1 FOR 4), '-', t.id) AS poi_id,
                    t.name,
                    '%s' AS layer,
                    t.district_id,
                    t.district,
                    t.neighborhood_id,
                    t.neighborhood,
                    t.latitude,
                    t.longitude,
                    ST_SetSRID(ST_GeomFromEWKT(t.geometry), 4326) AS geometry,
                    (to_jsonb(t) - 'id' - 'name' - 'district_id' - 'neighborhood_id' - 'latitude' - 'longitude' - 'geometry') AS attributes
                FROM berlin_data.%I t
                $f$,
                table_name, table_name, table_name
            ),
            ' UNION ALL '
        ) INTO union_sql
        FROM information_schema.tables
        WHERE table_schema = 'berlin_data'
            AND table_name NOT ILIKE '%districts%'
            AND table_name NOT ILIKE '%neighborhoods%'
            AND table_name NOT ILIKE '%berlin_layers_final_tables%' 
            AND table_name NOT IN (SELECT table_name FROM berlin_unified.excluded_tables_log)
            ;  --Remove this line after testing  'bike_lanes', 'dental_offices','doctors',

        -- Exit if no valid tables
        IF union_sql IS NULL THEN
            RAISE NOTICE 'No valid tables found to process';
            RETURN;
        END IF;

        -- Build final SQL with nearest POIs using LATERAL
       final_sql := '
            WITH all_pois AS (' || union_sql || '),
            unique_layers AS (
                SELECT DISTINCT layer FROM all_pois WHERE layer <> ''long_term_listings''
            ),
            pois_with_nearest AS (
                SELECT 
                    ap.poi_id,
                    ap.name,
                    ap.layer,
                    ap.district_id,
                    ap.district,
                    ap.neighborhood_id,
                    ap.neighborhood,
                    ap.latitude,
                    ap.longitude,
                    ap.geometry,
                    ap.attributes,
                    CASE 
                        WHEN ap.layer = ''long_term_listings'' THEN (
                            SELECT jsonb_object_agg(ul.layer, nearest_info)
                            FROM unique_layers ul
                            LEFT JOIN LATERAL (
                                SELECT jsonb_build_object(
                                    ''id'', p.poi_id,
                                    ''name'', p.name,
                                    ''distance'', ST_Distance(ap.geometry::geography, p.geometry::geography),
                                    ''address'', jsonb_build_object(
                                        ''street'', p.attributes->>''street'',
                                        ''housenumber'', p.attributes->>''housenumber''
                                    )
                                ) AS nearest_info
                                FROM all_pois p
                                WHERE p.layer = ul.layer
                                ORDER BY ap.geometry <-> p.geometry
                                LIMIT 1
                            ) AS nearest_per_layer ON TRUE
                        )
                        ELSE NULL
                    END AS nearest_pois
                FROM all_pois ap
            )
            INSERT INTO berlin_unified.unified_pois
                (poi_id, name, layer, district_id, district, neighborhood_id, neighborhood, 
                 latitude, longitude, geometry, attributes, nearest_pois)
            SELECT 
                poi_id, name, layer, district_id, district, neighborhood_id, neighborhood,
                latitude, longitude, geometry, attributes, nearest_pois
            FROM pois_with_nearest;

            -- Log processed tables
            INSERT INTO berlin_unified.processed_tables_log (table_name)
            SELECT DISTINCT layer
            FROM berlin_unified.unified_pois

            ON CONFLICT DO NOTHING;
        ';

        -- Execute the final SQL
        EXECUTE final_sql;

        RAISE NOTICE 'Successfully inserted POIs from dynamic tables';
    END;
    $$;
    """
with engine.begin() as conn:
    conn.execute(text(task_3))

## Testing the nearest_pois works on long_term_listings

In [63]:
# Checking the unified_pois table nearest_pois JSON field
query = f"""
SELECT poi_id, attributes, nearest_pois
FROM berlin_unified.unified_pois  
;
"""
poi_id_to_check = 'long-WOH_2772_178_14055'
# Execute the query
with engine.connect() as conn:
    df= pd.read_sql(text(query), conn)
    conn.commit()  # commit the transaction
df

# Select a specific POI by poi_id

row = df[df['poi_id'] == poi_id_to_check].iloc[0]  # get the first match

# Extract the JSON dict
nearest_json = row['nearest_pois']  

# Only keep id and name for each POI type
simplified_json = {
    key: {"id": value.get("id"), "name": value.get("name"), "distance": value.get("distance"), "address": value.get("address")}
    for key, value in nearest_json.items()
}

# Print
print(f"POI ID: {row['poi_id']}")
print(json.dumps(simplified_json, indent=2))

POI ID: long-WOH_2772_178_14055
{
  "gyms": {
    "id": "gyms-5796921707",
    "name": "Mrs. Sporty",
    "distance": 2296.64390594,
    "address": {
      "street": null,
      "housenumber": null
    }
  },
  "banks": {
    "id": "bank-671776291",
    "name": "Bankhaus Dr. Masel AG",
    "distance": 2194.40121109,
    "address": {
      "street": null,
      "housenumber": null
    }
  },
  "malls": {
    "id": "mall-25898004",
    "name": "Center am Juliusturm",
    "distance": 3896.04872281,
    "address": {
      "street": null,
      "housenumber": null
    }
  },
  "parks": {
    "id": "park-50338561",
    "name": "Reiterstadion",
    "distance": 743.10262509,
    "address": {
      "street": null,
      "housenumber": null
    }
  },
  "pools": {
    "id": "pool-1947",
    "name": "Schwimmhalle Forumbad Olympiastadion",
    "distance": 1374.72866302,
    "address": {
      "street": "Hanns-Braun-Stra\u00dfe",
      "housenumber": null
    }
  },
  "sbahn": {
    "id": "sbah-38"

## Count of rows per layer

In [62]:
query = f"""
SELECT layer, count(*)
FROM berlin_unified.unified_pois  
GROUP BY layer

;
"""
# Execute the query
with engine.connect() as conn:
    df= pd.read_sql(text(query), conn)
    conn.commit()  # commit the transaction
df

Unnamed: 0,layer,count
0,banks,324
1,bike_lanes,78833
2,bus_stops,6220
3,dental_offices,781
4,doctors,1615
5,exhibitions,4
6,food_markets,166
7,galleries,324
8,government_offices,85
9,gyms,444
