In [1]:
%load_ext autoreload
%autoreload 2

# 3-months postprocessing

## Get files ready for Torben

In [None]:
from src.constants import PREDS_PATH
from src.utils.geometry import load_ukraine_admin_polygons

adm1 = load_ukraine_admin_polygons(adm_level=1).set_index('admin_id')
adm2 = load_ukraine_admin_polygons(adm_level=2).set_index('admin_id')
adm3 = load_ukraine_admin_polygons(adm_level=3).set_index('admin_id')
adm4 = load_ukraine_admin_polygons(adm_level=4).set_index('admin_id')

In [None]:
adm1.shape, adm2.shape, adm3.shape, adm4.shape

In [None]:
from src.utils.geometry import get_best_utm_crs
import geopandas as gpd

def prepare_gdf_preds(gdf, adm3_id):

    gdf_pivot = gdf.reset_index().pivot_table(
        index='building_id',
        columns='post_date',
        values='weighted_mean'
    )
    gdf = gpd.GeoDataFrame(gdf_pivot.join(gdf.groupby('building_id').agg({'geometry': 'first', 'dataset': 'first'})), crs=gdf.crs)
    gdf['area'] = gdf.to_crs(get_best_utm_crs(gdf)).area

    # Add admin names and ids
    d_admins = {k:v for k,v in adm3.loc[adm3_id].to_dict().items() if k.startswith('ADM')}
    for k, v in d_admins.items():
        gdf[k] = v

    adm1_id = adm1[adm1['ADM1_EN'] == d_admins['ADM1_EN']].index[0]
    adm2_id = adm2[(adm2['ADM1_EN'] == d_admins['ADM1_EN']) & (adm2['ADM2_EN'] == d_admins['ADM2_EN'])].index[0]
    gdf['adm1_id'] = adm1_id
    gdf['adm2_id'] = adm2_id
    gdf['adm3_id'] = adm3_id

    # For adm4, we need to cross reference with the building polygons
    adm4_ = adm4[
        (adm4.ADM1_EN == d_admins['ADM1_EN']) &
        (adm4.ADM2_EN == d_admins['ADM2_EN']) &
        (adm4.ADM3_EN == d_admins['ADM3_EN'])
    ]
    gdf['ADM4_EN'] = None
    gdf['adm4_id'] = None
    for adm4_id, adm4_row in adm4_.iterrows():
        gdf_ = gdf[gdf.within(adm4_row.geometry)]
        gdf.loc[gdf_.index, 'ADM4_EN'] = adm4_row.ADM4_EN
        gdf.loc[gdf_.index, 'adm4_id'] = adm4_id

    # geomtry as wkt
    gdf['geometry_wkt'] = gdf['geometry'].apply(lambda x: x.wkt)

    # reset index
    gdf = gdf.reset_index()
    return gdf[sorted(gdf.columns)]

In [None]:
import geopandas as gpd
import duckdb
from tqdm import tqdm

run_name = '240307'

db_name = PREDS_PATH / run_name / 'buildings_preds.db'

db = duckdb.connect(f'{db_name}')
db.execute("INSTALL spatial; INSTALL httpfs; LOAD spatial; LOAD httpfs; SET s3_region='us-west-2';")

# Path to the output Parquet file
output_parquet_file = './test.parquet'

# All geojson files to ingest

admin_preds_folder = PREDS_PATH / run_name / 'admin_preds'
fps = sorted(admin_preds_folder.glob('*.geojson'))

# check if table buildings_preds exists
try:
    db.execute("SELECT COUNT(*) FROM buildings_preds").fetchall()
    table_exists = True

    # filter out existing adm3_ids
    existing_adm3_ids = db.execute("SELECT DISTINCT adm3_id FROM buildings_preds").fetchdf().adm3_id.values
    fps = [fp for fp in fps if fp.stem not in existing_adm3_ids]
    print(f'Found {len(existing_adm3_ids)} existing adm3_ids. {len(fps)} new files to process.')
except:
    table_exists = False
    print(f'{len(fps)} files to process.')

for fp in tqdm(fps[:3], total=3):

    print(fp.stem)

    gdf = prepare_gdf_preds(gpd.read_file(fp).set_index(['building_id', 'post_date']), fp.stem)
    df = gdf.drop(columns=['geometry'])
    df.fillna(999, inplace=True)
    # print(f'Dataframe for admin {fp.stem} ready ({df.shape}). Writing to DuckDB...')

    # Register the pandas DataFrame with DuckDB
    #db.register('df', df)

    if not table_exists:
        db.execute("CREATE TABLE buildings_preds AS SELECT * FROM df")
        table_exists = True
    else:
        db.execute('INSERT INTO buildings_preds SELECT * FROM df')

    # Unregister the DataFrame to avoid conflicts on the next iteration
    #db.unregister('df')

In [None]:
# find unique adm3_id within building_preds
unique_adm3_ids = db.execute("SELECT DISTINCT adm3_id FROM buildings_preds").fetchdf().adm3_id.values
unique_adm3_ids

# Multithreading

In [None]:
import pandas
import duckdb
from threading import Thread, current_thread
from src.utils.time import timeit

run_name = '240307'

db_name = PREDS_PATH / run_name / 'buildings_preds.db'

db = duckdb.connect(f'{db_name}')
existing_adm3_ids = db.execute("SELECT DISTINCT adm3_id FROM buildings_preds").fetchdf().adm3_id.values

admin_preds_folder = PREDS_PATH / run_name / 'admin_preds'
fps = sorted(admin_preds_folder.glob('*.geojson'))
fps = [fp for fp in fps if fp.stem not in existing_adm3_ids]

@timeit
def write_from_thread(adm3_id, db):

    # Create a DuckDB connection specifically for this thread
    local_db = db.cursor()

    # insert into db
    fp = admin_preds_folder / f'{adm3_id}.geojson'
    gdf = prepare_gdf_preds(gpd.read_file(fp).set_index(['building_id', 'post_date']), fp.stem)
    df = gdf.drop(columns=['geometry'])
    df.fillna(999, inplace=True)
    local_db.execute('INSERT INTO buildings_preds SELECT * FROM df')

write_thread_count = 20
threads = []



for i in range(write_thread_count):
    print(f'Starting thread {i}. {fps[i].stem}')
    threads.append(Thread(target=write_from_thread, args=(fps[i].stem, db)))

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

In [None]:
existing_adm3_ids = db.execute("SELECT DISTINCT adm3_id FROM buildings_preds").fetchdf().adm3_id.values
existing_adm3_ids

In [None]:
fps_to_compute = fps[:20]
fps

# Area

In [None]:
bins = [(i, i+50) for i in range(0, 1000, 50)]

n_buildings_area = []
for bin in bins:
    condition = f"area > {bin[0]} AND area <= {bin[1]}" if bin[1] != 2500 else f"area > {bin[0]}"
    n_buildings_area.append(db.execute(
        f"""
            SELECT
                COUNT(*)
            FROM
                buildings_preds
            WHERE
                {condition}
        """
    ).fetchone()[0])

In [None]:
# plot the histogram of building areas
import matplotlib.pyplot as plt
import numpy as np

fig, ax = plt.subplots()
ax.bar(range(len(bins)), n_buildings_area)
ax.set_xticks(range(len(bins)))
ax.set_xticklabels([f'{bin[0]}-{bin[1]}' for bin in bins], rotation=90)
ax.set_ylabel('Number of buildings')
ax.set_xlabel('Area (m2)')
ax.set_title(f'Histogram of building areas (N={sum(n_buildings_area):.2e})')
plt.show()

# Add UNOSAT damage

In [2]:
# Slow but ok, do it once TODO: DO LIKE WITH CLASS (SEE BELOW)
import duckdb
from src.data import get_all_aois
from src.data.buildings.overture_unosat import load_overture_buildings_aoi
from src.utils.time import timeit

@timeit
def add_unosat_damage_to_db():

    run_name = '240307'
    db_name = PREDS_PATH / run_name / 'buildings_preds.db'

    db = duckdb.connect(f'{db_name}')

    # add a column unosat_damage to the table, filled with NULL
    try:
        db.execute("ALTER TABLE buildings_preds ADD COLUMN unosat_damage INT")
    except Exception:
        print('Column already exists')


    # Fill for all AOIs
    for aoi in get_all_aois():

        print(f'Processing {aoi}...')

        gdf_unosat = load_overture_buildings_aoi(aoi) # contains damage_5m per overture building
        gdf_unosat = gdf_unosat[gdf_unosat.damage_5m != 6]

        print(f'{len(gdf_unosat)} buildings to update.')

        # iterate through UNOSAT damage, could be definitely optimized
        for damage_5m, row in gdf_unosat.set_index('damage_5m').iterrows():

            # building_id do not match, so we use geometry_wkt instead
            db.execute(
                f"""
                UPDATE buildings_preds
                SET unosat_damage = {damage_5m}
                WHERE
                    geometry_wkt == '{row.geometry.wkt}'
                """
            )

        print(f'Updated {len(gdf_unosat)} buildings.')

# add_unosat_damage_to_db()

# Add class

In [None]:
from src.constants import PREDS_PATH
import duckdb

from src.constants import OVERTURE_BUILDINGS_RAW_PATH

FP_RAW_PARQUET = OVERTURE_BUILDINGS_RAW_PATH / "ukraine_buildings.parquet"

def add_class_to_db():

    run_name = '240307'
    db_name = PREDS_PATH / run_name / 'buildings_preds.db'

    db = duckdb.connect(f'{db_name}')
    db.execute("INSTALL spatial; INSTALL httpfs; LOAD spatial; LOAD httpfs; SET s3_region='us-west-2';");

    try:
        db.execute('ALTER TABLE buildings_preds ADD COLUMN class STRING')
    except Exception:
        print('already added column class')

    # Load the parquet file into the database as a view
    db.execute(
        f"""
        CREATE OR REPLACE VIEW all_buildings AS
        SELECT ST_AsText(ST_GeomFromWKB(geometry)) as geometry_wkt, class
        FROM read_parquet('{FP_RAW_PARQUET}', hive_partitioning=1)
        WHERE class not NULL
        """
    )

    # perform join operation
    db.execute(
        f"""
        UPDATE buildings_preds
        SET class = all_buildings.class
        FROM all_buildings
        WHERE buildings_preds.geometry_wkt = all_buildings.geometry_wkt
        """
    )
    print('added class to db')

add_class_to_db()

# Add date unosat damage

In [None]:
from src.data import load_unosat_labels

load_overture_buildings_aoi(aoi)

In [None]:
from src.constants import PREDS_PATH
import duckdb

from src.constants import OVERTURE_BUILDINGS_RAW_PATH


FP_RAW_PARQUET = OVERTURE_BUILDINGS_RAW_PATH / "ukraine_buildings.parquet"


def add_date_unosat_to_db():

    run_name = "240307"
    db_name = PREDS_PATH / run_name / "buildings_preds.db"

    db = duckdb.connect(f"{db_name}")
    db.execute("INSTALL spatial; INSTALL httpfs; LOAD spatial; LOAD httpfs; SET s3_region='us-west-2';")

    try:
        db.execute("ALTER TABLE unosat_date_analysis ADD COLUMN class STRING")
    except Exception:
        print("already added column unosat_date_analysis")

    # Load the parquet file into the database as a view
    db.execute(
        f"""
        CREATE OR REPLACE VIEW all_labels AS
        SELECT date, unosat_id
        FROM read_parquet('{FP_RAW_PARQUET}', hive_partitioning=1)
        WHERE class not NULL
        """
    )

    # perform join operation
    db.execute(
        f"""
        UPDATE buildings_preds
        SET class = all_buildings.class
        FROM all_buildings
        WHERE buildings_preds.geometry_wkt = all_buildings.geometry_wkt
        """
    )
    print("added class to db")


# add_class_to_db()

# To Parquet

In [None]:
import duckdb
from src.constants import PREDS_PATH
import geopandas as gpd

run_name = '240307'
db_name = PREDS_PATH / run_name / 'buildings_preds.db'

db = duckdb.connect(f'{db_name}')
db.execute("INSTALL spatial; INSTALL httpfs; LOAD spatial; LOAD httpfs; SET s3_region='us-west-2';")
filepath = PREDS_PATH / run_name / 'buildings_preds.parquet'
db.execute(
    f"""
    COPY (
        SELECT * FROM buildings_preds
    ) TO '{filepath}' WITH (FORMAT 'Parquet');
    """
)

# Damage but area filtered

In [None]:
from src.utils.geometry import load_ukraine_admin_polygons
adm3 = load_ukraine_admin_polygons(adm_level=3).set_index('admin_id')
adm3.shape

In [None]:
import duckdb
from src.constants import PREDS_PATH
import geopandas as gpd

run_name = '240307'
db_name = PREDS_PATH / run_name / 'buildings_preds.db'

db = duckdb.connect(f'{db_name}')

In [None]:
post_dates_neg = ['2021-02-24', '2021-05-24', '2021-08-24', '2021-11-24']
post_dates = ['2022-02-24', '2022-05-24', '2022-08-24', '2022-11-24', '2023-02-24', '2023-05-24', '2023-08-24', '2023-11-24']
condition = ' OR '.join([f'"{post_date}" >= 255*0.65' for post_date in post_dates])
conditon_neg = ' AND '.join([f'"{post_date}" < 255*0.65' for post_date in post_dates_neg])
condition = f'({condition}) AND ({conditon_neg})'

df = db.execute(
    f"""
        SELECT adm3_id, ADM3_EN, COUNT(*) as count
        FROM buildings_preds
        WHERE {condition}
        GROUP BY adm3_id, ADM3_EN
        ORDER BY count DESC
    """
).fetchdf().set_index('adm3_id')
gdf = gpd.GeoDataFrame(df.join(adm3[['geometry']], how='left'), crs=adm3.crs)

In [None]:
import numpy as np

area_lims = np.concatenate([np.arange(0,100,10), np.arange(100, 1500, 50), [1500, 2000,2500]])
n_tot_buildings = []
for area_lim in area_lims:
    n_tot_buildings.append(db.execute(
        f"""
            SELECT COUNT(*)
            FROM buildings_preds
            WHERE {condition} AND area > {area_lim}
        """
    ).fetchall()[0][0])

In [None]:
import matplotlib.pyplot as plt

fig, ax = plt.subplots(figsize=(15,8))
ax.bar(range(len(area_lims)), n_tot_buildings)
ax.semilogy()
ax.set_xticks(range(len(area_lims)))
ax.set_xticklabels([f'{area_lim}' for area_lim in area_lims], rotation=90)
ax.set_ylabel('Number of buildings')
ax.set_xlabel('Minimum Area (m2)')
ax.set_title(f'Building destroyed vs minimum area (N={sum(n_tot_buildings):.2e})')
for i, v in enumerate(n_tot_buildings):
    ax.text(i, v, f'{v:.2e}', ha='center', va='bottom', fontsize=6)
plt.show()

In [None]:
db.execute(
    f"""
        SELECT COUNT(*), dataset
        FROM buildings_preds
        WHERE area > 50
        GROUP BY dataset
    """
).fetchdf()

In [None]:
t = (31+13/60)/6.09
print(f'{np.floor(t)} min and {np.ceil((t-np.floor(t))*60)} sec')

In [None]:
t = (30+16/60)/6.38
print(f'{np.floor(t)} min and {np.ceil((t-np.floor(t))*60)} sec')