# Delineate cities using A-DBSCAN

This notebook contains the code used to create the city delineations. Because the procedure is computationally intensive, these steps were run in a series of hardware and in separate steps. Taken together, they implement the A-DBSCAN algorithm. The code presented here is thus published as an illustration of the steps followed. For a more flexible implementation of A-DBSCAN, please check the one published in PySAL on the [`esda`](https://github.com/pysal/esda) package.

## Dependencies & input data

In [None]:
import tools, time, os, sys, traceback, sqlite3
from tools import logger
from time import gmtime, strftime
import pandas as pd
import numpy as np
import geopandas as gpd
from sqlalchemy import create_engine

from tools import ADBSCAN

# Path to sqlite created in the `extract_merge_buildings` notebook
db_path = '/home/jovyan/work/cadastro.db'
engine = create_engine('sqlite:////'+db_path)

Read relevant columns for all building footprints:

In [None]:
varlist=['X', 'Y', 'geometry', 'localId', 'numberOfBuildingUnits']
%time db = pd.read_sql(("SELECT X, Y, localId, numberOfBuildingUnits "\
                        "FROM cadastro"), engine)

Now let's set up parameters, the logging function and check if we've done already any run, so we do not repeat it:

In [None]:
epss = [2000]
min_ptss = [2000]
pct = 0.1
reps = 1000

out_dir = './revision/'

def logger(txt, f='log_revision.txt', mode='a'):
    fo = open(f, 'a')
    fo.write(txt)
    fo.close()
    return txt

done_fmp = [f.strip('bcn_').strip('sp_').strip('.feather').split('_') \
        for f in os.listdir(out_dir) if ('sp_' in f) and ('.feather' in f)]
done_fmp = [(int(i[0].strip('mp')), int(i[1].strip('eps'))) for i in done_fmp]

print("Completed solutions:")
[i for i in done_fmp if i[0]==2000]

## Computation of DBSCAN draws

In the published version, city delineations are based on an ensemble of 1,000 draws, each of which computes DBSCAN on a sample and extends it to the rest of the dataset through a nearest-neighbor algorithm.

Given our hardware, we could not hold in RAM 1,000 candidate solutions _and_ perform the subsequent steps required in the A-DBSCAN algorithm. For that reason, we broke the original algorithm into steps and performed each of them separate, writing to disk intermediate results.

The following code computes A-DBSCAN 1,000, computing DBSCAN for 10% of the sample and extending it to the rest of the dataset through a nearest-neighbor regression. The result of each draw is written into a SQLite database (`results.db`) and further indexed on the observation and replication ID, as well as on its parameters. 

In [None]:
%%time

errors = {}

! rm log_revision.txt
! mv results.db results_bu.db
resDB = '/home/jovyan/work/results.db'
results = create_engine('sqlite:////' + resDB)

logger('', mode='w')

for eps in epss:
    for min_pts in min_ptss:
        if (min_pts, eps) not in done_fmp:
            # Random blocks
            for rep_id in range(reps):
                now = strftime("%Y-%m-%d %H:%M:%S", gmtime())
                print(logger('[eps=%i, min_pts=%i] %s\n'%(eps, min_pts, now)))
                try:
                    # Computation
                    t0 = time.time()
                    np.random.seed(1234 + rep_id)
                    solu = ADBSCAN(eps=eps, min_samples=min_pts, algorithm='ball_tree',
                                   pct_exact=pct, reps=1, n_jobs=-1, keep_solus=True)\
                           .fit(db[['X', 'Y']])
                    t1 = time.time()
                    txt = '\tComputation completed in %.2f sec.\n'%(t1-t0)
                    print(logger(txt))
                    # Write out to results DB
                    out = pd.DataFrame({'id': solu.solus.index,
                                        'lbls': solu.solus['rep-0'].values,
                                        'rep': rep_id,
                                        'min_pts': min_pts,
                                        'eps': eps})
                    out.to_sql('results', 
                               con=results, 
                               if_exists='append',
                               index=False)
                    txt = '\tSaved to disk in %.2f sec.\n'%(time.time()-t1)
                    print(logger(txt))
                except:
                    errors[(eps, min_pts)] = traceback.format_exc()
                    print(logger(str(traceback.format_exc())))

t2 = time.time()
conn = sqlite3.connect(resDB)
c = conn.cursor()

c.execute("CREATE INDEX id ON results (id);")
c.execute("CREATE INDEX rep ON results (rep);")
c.execute("CREATE INDEX min_pts ON results (min_pts);")
c.execute("CREATE INDEX eps ON results (eps);")
txt = '\tIndices created in %.2f sec.\n'%(time.time()-t2)
print(logger(txt))

## Export SQLite file to plain csv

The output turned out to be relatively large in size (+200GB) and thus not efficient to work with in SQLite. After the computations above, we decided to switch and work with [`dask`](https://dask.org/), which does not play well with SQLite. Below the output database is converted into a `results.csv` file.

Set up the operation:

In [None]:
import traceback
import pandas as pd
from sqlalchemy import create_engine
from time import gmtime, strftime

cadastro = '/Users/dani/Dropbox/Cadastre/01 Catastro maps/sqlite_db/cadastro.db'
con_cat = create_engine('sqlite:////'+cadastro)

resDB = '/Users/dani/Desktop/results.db'
results = create_engine('sqlite:////' + resDB)

And computation:

In [None]:
%%time
read = 0
fo = open('results.csv', 'w')
fo.write('id,lbls,rep\n')
fo.close()
def logger(txt, f='/Users/dani/Dropbox/Cadastre/02_area_delineation/log_tocsv_hut8.txt', mode='a'):
    fo = open(f, 'a')
    fo.write(txt)
    fo.close()
    return txt
logger('', mode='w')
errors = {}

reader = pd.read_sql("SELECT * FROM results;", results, chunksize=12069635)
for chunk in reader:
    try:
        now = strftime('%Y-%m-%d %H:%M:%S', gmtime())
        print(logger(f"{now} | Currently {read} read\n"))
        if (chunk.loc[0, 'min_pts']==2000) and (chunk.loc[0, 'eps']==2000) and (chunk['rep'].unique().shape[0] == 1):
            chunk = chunk.loc[:, ['id', 'rep', 'lbls']]
            chunk.to_csv('results.csv', mode='a',
                         header=False, index=False)
            read+=chunk.shape[0]
        else:
            print(logger("min_pts or eps are not 2,000 so exiting..."))
            break
    except:
        errors[(eps, min_pts)] = traceback.format_exc()
        print(logger(str(traceback.format_exc())))

## Re-labelling of solutions

With independent candidate solutions from each draw written on a `.csv` file, the next step involved re-labelling them so they are comparable across solutions. This remapping uses the same logic as the one embedded in the A-DBSCAN implementation in `pysal/esda`, but it is performed in a modular way, separate from the previous and subsequent steps.

The final output of this process is a folder (`remapped_lbls/`) with a separate `.parquet` file for each solution and using a standardised labelling across files.

In [None]:
import os
import tools
import pandas as pd
import dask.dataframe as dd
from dask.diagnostics import ProgressBar
from sqlalchemy import create_engine
from scipy.spatial import cKDTree
from importlib import reload
from tools import logger
from time import gmtime, strftime
import multiprocessing as mp

In [None]:
url = 'results.csv'
db = dd.read_csv(url).rename(columns={'lbls': 'rep',
                                      'rep': 'lbls'})

### Pick solution with most labels

In [None]:
try:
    # read
    ref_soluIDO = pd.read_csv('results_most_labels.csv', index_col='rep')
except:
    ref_soluID = db.groupby(['rep', 'lbls'])\
                   .size()\
                   .reset_index()\
                   .drop(0, axis=1)\
                   .groupby('rep')\
                   .size()\
                   .nlargest(1)

    ! rm log_dask.txt
    print(logger(f"Executing pick of solutions with most labels | "\
                 f"{strftime('%Y-%m-%d %H:%M:%S', gmtime())}\n", 'log_dask.txt'))
    with ProgressBar():
        ref_soluIDO = ref_soluID.compute()
    print(logger(f"Computed pick of solutions with most labels | "\
                 f"{strftime('%Y-%m-%d %H:%M:%S', gmtime())}\n", 'log_dask.txt'))

    ref_soluIDO.to_csv(url.replace('.csv', '_most_labels.csv'),
                       header=['Count'])
    print(logger(f"Written pick of solutions with most labels | "\
                 f"{strftime('%Y-%m-%d %H:%M:%S', gmtime())}\n", 'log_dask.txt'))

### Remap labels to standardise across solutions

* Load reference solution and XY coordinates (from `parquet` file, which was extracted from `cadastro.db`)

In [None]:
%time xys = pd.read_parquet('xys.parquet.gzip')

try:
    ref_soluO = pd.read_parquet('ref_soluO.parquet.gzip')\
                  .set_index('id')\
                  ['lbls']
except:
    print("Extracting reference solution...")
    ref_solu = db.loc[db['rep']==ref_soluIDO.index[0], ['id', 'lbls']]
    with ProgressBar():
        ref_soluO = ref_solu.compute()
    ref_soluO = ref_soluO.set_index('id')['lbls']

* Compute centroids from reference solution

In [None]:
%%time
ref_centroids = xys.groupby(ref_soluO)\
                   .apply(lambda xys: xys.mean())\
                   .drop(-1, errors='ignore')
ref_kdt = cKDTree(ref_centroids)
pars = (ref_centroids, ref_kdt, xys, ['X', 'Y'])

* Remap labels across all solutions

In [None]:
reload(tools)
def align_and_remap(new_lbls, pars, write=False):
    ref_centroids, ref_kdt, xys, xy_name = pars
    topass = new_lbls.join(xys, on='id')
    remapped_ids = tools.remap_lbls_single(topass['lbls'], 
                                           (ref_centroids, ref_kdt, 
                                            topass[xy_name], xy_name))
    remapped_lbls = new_lbls['lbls'].map(remapped_ids)\
                                    .fillna(-1)\
                                    .astype(int)
    rep_id = new_lbls['rep'].iloc[0]
    outF = f"remapped_lbls/rep_{rep_id}.parquet"
    remapped_lbls = remapped_lbls.reset_index()\
                                 .assign(rep=rep_id)\
                                 .rename(columns={'index': 'id'})
    if write:
        remapped_lbls.to_parquet(outF)
    return remapped_lbls

In [None]:
n_chunks = 5
n = ref_soluO.shape[0]
reps = 1000
rows_by_chunk = n * reps / n_chunks

In [None]:
fo = open('log_dask_remapping.txt', 'w')
fo.close()
! rm -r remapped_lbls
! mkdir remapped_lbls

print(logger(f"Starting sequential remapping of labels | "\
             f"{strftime('%Y-%m-%d %H:%M:%S', gmtime())}\n", 'log_dask_remapping.txt'))
reader = pd.read_csv('results.csv', chunksize=n)
for i, solu in enumerate(reader):
    solu = solu.rename(columns={'lbls': 'rep',
                                'rep': 'lbls'})
    outF = align_and_remap(solu, pars, write=True)
    logger(f"Solution {i+1}/1000 remapped | "\
           f"{strftime('%Y-%m-%d %H:%M:%S', gmtime())}\n", 'log_dask_remapping.txt')


## Collect winner solution and votes

In this step, each re-labelled solution is used to count how often each label is attributed to each observation. This is used later to determine whether a building is part of a city in a robust manner or not (if not, they are not assigned into any city). The input is the folder of relabelled `.parquet` files; the output is a single file (`solution_rep1000_eps2000_mp2000_thr90.parquet`) containing a table with a row for each building and two columns, ones for the winning label, and another for its frequency.

In [None]:
def update_store(store, lbls):
    def _update_record(entry_lbl):
        entry, lbl = entry_lbl
        if lbl in entry:
            entry[lbl] += 1
        else:
            entry[lbl] = 1
    _ = list(map(_update_record, zip(store, lbls)))
    return store

def pick_winner_vote(store_entry):
    lbl_count = pd.Series(store_entry)
    return lbl_count.nlargest(1)

In [None]:
fo = open('log_dask_win_vote.txt', 'w')
fo.close()

print(logger(f"Starting to count votes | "\
             f"{strftime('%Y-%m-%d %H:%M:%S', gmtime())}\n", 'log_dask_win_vote.txt'))
# Set up a `store`
store = pd.read_parquet('remapped_lbls/rep_0.parquet')
store = [{} for _ in range(store.shape[0])]
# Loop over each remapped solution
for solu in os.listdir('remapped_lbls/'):
    print(logger(f"Processing {solu} | "\
                 f"{strftime('%Y-%m-%d %H:%M:%S', gmtime())}\n", 'log_dask_win_vote.txt'))
    ## Load file
    tmp = pd.read_parquet(f"remapped_lbls/{solu}")
    ## Update `store` by checking if lbl in store[id]
    ##and creating it otherwise
    store = update_store(store, 
                         tmp['lbls'].values)
print(logger(f"Done updating votes | "\
             f"{strftime('%Y-%m-%d %H:%M:%S', gmtime())}\n", 'log_dask_win_vote.txt'))
# Pick winner and vote from `store`
pool = mp.Pool(mp.cpu_count())
win_vote = pool.map(pick_winner_vote, store)
pool.close()
print(logger(f"Done counting votes in parallel | "\
             f"{strftime('%Y-%m-%d %H:%M:%S', gmtime())}\n", 'log_dask_win_vote.txt'))
# Write winner and vote to disk
win_vote = pd.concat(win_vote).reset_index()\
                              .rename(columns={'index':'lbls',
                                               0: 'pct'})\
                              .reset_index()\
                              .rename(columns={'index': 'id'})
win_vote['pct'] = win_vote['pct'] * 100 / 1000
win_vote.to_parquet('win_vote.parquet')
print(logger(f"Done with the process | "\
             f"{strftime('%Y-%m-%d %H:%M:%S', gmtime())}\n", 'log_dask_win_vote.txt'))

* Filter extra-noise (<90% = `-1`)

In [None]:
try:
    _ = win_vote.head()
except:
    win_vote = pd.read_parquet('../output/revision/win_vote.parquet')

lbl_type = type(win_vote.loc[0, 'lbls'])
win_vote.loc[win_vote['pct'] < 90, 'lbls'] = lbl_type(-1)

In [None]:
win_vote.to_parquet('../output/revision/solution_rep1000_eps2000_mp2000_thr90.parquet')

### Turn labels into polygons

At this point, we know in which city each building is, and we are ready to create the delineation of city boundaries. The intput of this step is the `solution_rep1000_eps2000_mp2000_thr90.parquet` table, and the output is a GeoPackage (`solution_rep1000_eps2000_mp2000_thr90.gpkg`) with the polygons of all identified cities.

In [None]:
import pandas
import tools

win_vote = pandas.read_parquet('solution_rep1000_eps2000_mp2000_thr90.parquet')
lbl_type = type(win_vote.loc[0, 'lbls'])

In [None]:
# replace `outF` with final set of labels
xys = pd.read_parquet('xys.parquet.gzip')
%time polys = tools.lbls2polys_parallel(win_vote['lbls'], xys, \
                               noise=lbl_type(-1), gdf=True)
polys.to_file('solution_rep1000_eps2000_mp2000_thr90.gpkg', driver='GPKG')