In [None]:
import pandas as pd
import urllib

import numpy as np

import json

from tqdm.autonotebook import tqdm

#%matplotlib inline

tqdm.pandas()

import dask.dataframe as dd

# from dask.multiprocessing import get
from dask.diagnostics import ProgressBar

from datetime import datetime
import matplotlib.pyplot as plt

from IPython.display import display
import requests

try: 
    import seaborn as sbn
except ModuleNotFoundError: 
    print("Seaborn not installed, limiting plotting quality")

# Functions

In [14]:
ws_hostname = "172.27.0.64:4001" # bePelias hostname:port

street_field  =  "streetName"
housenbr_field = "houseNumber"
postcode_field = "postCode"
city_field  =    "postName"

filename = "data.csv" # A csv file with as header "streetName,houseNumber,postCode,postName"

In [15]:
def call_ws(addr_data, mode="advanced"):
    t = datetime.now()
    
    if isinstance(addr_data, pd.Series):
        addr_data = addr_data.to_dict()
        
    
    addr_data["mode"]=mode       
    addr_data["withPeliasResult"]=True
    try: 
        r = requests.get(
        f'http://{ws_hostname}/REST/bepelias/v1/geocode',
            params=addr_data)
        

    except Exception as e:
        print("Exception !")
        print(addr_data)
        print(e)
        raise e
        
    if r.status_code == 204:
        print("No result!")
        print(addr_data)
        print(r.text)
        return
    elif r.status_code == 400:
        print("Argument error")
        print(r.text)
    elif r.status_code == 200:
        try:
            res = json.loads(r.text)
            res["time"] = (datetime.now() - t).total_seconds()
        except ValueError as ve:

            print("Cannot decode result:")
            print(ve)
            print(r.text)
            return r.text
        return res
    else: 
        print(f"Unknown return code: {r.status_code} ")
        print(r.text)



In [16]:
def call_ws_by_id(best_id): 
    t = datetime.now()
    
    if best_id is None:
        return None
       
    try: 
        url = f'http://{ws_hostname}/REST/bepelias/v1/id/{urllib.parse.quote_plus(urllib.parse.quote_plus(best_id))}'
        r = requests.get(url)
        

    except Exception as e:
        print("Exception !")
        print(best_id)
        print(e)
        raise e
        
    if r.status_code == 204:
        # print("No result!")
        # print(addr_data)
        # print(r.text)
        return
    elif r.status_code == 400:
        print("Argument error")
        print(r.text)
    elif r.status_code == 200:
        try:
            res = json.loads(r.text)
        except ValueError as ve:

            print("Cannot decode result:")
            print(ve)
            print(r.text)
            return r.text
        except AttributeError as ae:
            print(ae)
            print(type(r.text))
            print(r.text)
        return res
    else: 
        print(f"Unknown return code: {r.status_code} ")
        print(r.text)



In [17]:
def call_ws_search_city(postcode=None, postname=None):
    t = datetime.now()
    
    data = {"postCode": postcode,
           "postName": postname,
           "raw":True
           }
    
    try: 
        r = requests.get(
        f'http://{ws_hostname}/REST/bepelias/v1/searchCity',
            params=data)

    except Exception as e:
        print("Exception !")
        print(e)
        raise e
        
    if r.status_code == 204:
        # print("No result!")
        # print(addr_data)
        # print(r.text)
        return
    elif r.status_code == 400:
        print("Argument error")
        print(r.text)
    elif r.status_code == 200:
        try:
            res = json.loads(r.text)
            # res["time"] = (datetime.now() - t).total_seconds()
        except ValueError as ve:

            print("Cannot decode result:")
            print(ve)
            print(r.text)
            return r.text
        except AttributeError as ae:
            print(ae)
            print(type(r.text))
            print(r.text)
        return res
    else: 
        print(f"Unknown return code: {r.status_code} ")
        print(r.text)



In [18]:
def call_ws_reverse(lat=None, lon=None): #lg = "en,fr,nl"
    t = datetime.now()
    
    data = {"lat": lat,
           "lon": lon,
           "radius":0.01,
           "size":1
           }
    
    try: 
        r = requests.get(
        f'http://{ws_hostname}/REST/bepelias/v1/reverse',
            params=data)

    except Exception as e:
        print("Exception !")
        print(e)
        raise e
        
    if r.status_code == 204:
        # print("No result!")
        # print(addr_data)
        # print(r.text)
        return
    elif r.status_code == 400:
        print("Argument error")
        print(r.text)
    elif r.status_code == 200:
        try:
            res = json.loads(r.text)
            # res["time"] = (datetime.now() - t).total_seconds()
        except ValueError as ve:

            print("Cannot decode result:")
            print(ve)
            print(r.text)
            return r.text
        except AttributeError as ae:
            print(ae)
            print(type(r.text))
            print(r.text)
        return res
    else: 
        print(f"Unknown return code: {r.status_code} ")
        print(r.text)



In [19]:
def get(dct, keys):
    for k in keys:
        try: 
            if  dct is None:
                return None
                # print(init_dct)

            dct = dct[k]
        except KeyError :
            return None
        except IndexError:
            return None
    return dct

# Calls

## Single address calls

In [None]:
res=call_ws({
        street_field:   "Avenue Fonsny", \
         housenbr_field: "20",\
         city_field:     "Saint-Gilles",\
         postcode_field: "1060"})
res

## Batch calls (row by row)

In [None]:
addresses = pd.read_csv(filename)
addresses

In [22]:
# addresses = addresses.sample(10000)

In [23]:
iter_per_sec_stats={}


### Simple way

In [None]:
addresses_seq = addresses.copy()

t = datetime.now()
addresses_seq["json"] = addresses_seq[[street_field, housenbr_field, postcode_field, city_field]].fillna("").progress_apply(call_ws,
                                                                mode="advanced", axis=1)
tot_time = (datetime.now() - t).total_seconds()

ips=addresses_seq.shape[0]/tot_time
iter_per_sec_stats["sequential"] = ips
print(f"{tot_time:.2f} seconds, {ips:.2f} it/s")


addresses_seq

In [None]:
addresses_seq["best_id"] = addresses_seq.json.apply(lambda r: get(r, ["items", 0, "bestId"]) or\
                                                              get(r, ["items", 0, "street", "id"]) or\
                                                              get(r, ["items", 0, "municipality", "id"]))

addresses_seq

In [None]:
# Results with no address id
addresses_seq[addresses_seq.json.apply(lambda r: get(r, ["items", 0, "bestId"])).isnull()]

In [None]:
# timing = addresses_seq[["dataset"]].copy()
addresses_seq["time"] =addresses_seq.json.apply(lambda j: j["time"] if j else None) 
addresses_seq["method"] =addresses_seq.json.apply(lambda j: (j["transformers"]+"-"+j["callType"]) if j else None) 

addresses_seq

In [None]:
# addresses_seq.iloc[0].json["bepelias"]#["call_type"]
addresses_seq.time.describe()

In [None]:
(addresses_seq.groupby("method").time.mean()).plot.bar(title="Avg call duration")

In [None]:

addresses_seq[["time"]].boxplot()

### Using Dask

In [20]:
addresses_dask = addresses.copy()

In [None]:
t = datetime.now()
dd_addresses = dd.from_pandas(addresses_dask.fillna(""), npartitions=32)

dask_task = dd_addresses[[street_field, housenbr_field, postcode_field, city_field]].apply(call_ws, meta=('x', 'str'), axis=1)

with ProgressBar(): 
    addresses_dask["json"] = dask_task.compute()
    

tot_time = (datetime.now() - t).total_seconds()

ips=addresses_dask.shape[0]/tot_time
iter_per_sec_stats["dask"] = ips
print(f"{tot_time:.2f} seconds, {ips:.2f} it/s, {ips*3600:.0f} it/h")


In [None]:
# addresses_dask["best_id"] = addresses_dask.json.apply(lambda r: r["features"][0]["properties"]["id"] if r and len(r["features"])>0 else None )
addresses_dask["best_id"] = addresses_dask.json.apply(lambda r: get(r, ["items", 0, "bestId"]) or\
                                                              get(r, ["items", 0, "street", "id"]) or\
                                                              get(r, ["items", 0, "municipality", "id"]))

addresses_dask

In [None]:
mg = addresses_dask.drop(columns="json").merge(addresses_seq.drop(columns="json"), how="outer", indicator=True)
if mg.shape[0] == addresses.shape[0]:
    print("Same result in seq and dask run!")
else: 
    print("!!! Not the same result in seq and dask run!")
    

# Test modes

In [24]:
addresses_mode = addresses.copy()

In [None]:
for mode in ["basic", "simple", "advanced"]:
    print(mode)
    t = datetime.now()
    dd_addresses = dd.from_pandas(addresses_mode.fillna(""), npartitions=32)

    dask_task = dd_addresses[[street_field, housenbr_field, postcode_field, city_field]].apply(call_ws, meta=('x', 'str'), mode=mode, axis=1)

    with ProgressBar(): 
        addresses_mode[f"json_{mode}"] = dask_task.compute()


    tot_time = (datetime.now() - t).total_seconds()

    ips=addresses_dask.shape[0]/tot_time
    iter_per_sec_stats["dask"] = ips

    addresses_mode[f"precision_{mode}"] = addresses_mode[f"json_{mode}"].apply(lambda r: get(r, ["items", 0, "precision"])).fillna("[no result]")

    addresses_mode[f"time_{mode}"] =addresses_mode[f"json_{mode}"].apply(lambda j: j["time"] if j else None) 


    print(f"{tot_time:.2f} seconds, {ips:.2f} it/s, {ips*3600:.0f} it/h")


## Match rate

In [26]:
# addresses_mode[["precision_basic", "precision_simple", "precision_advanced"]].unstack().reset_index().groupby(["level_0", 0]).level_1.count()#[0].value_counts().plot.bar(stacked=True)

In [None]:
precision = addresses_mode[["precision_basic", "precision_simple", "precision_advanced"]].unstack().rename("precision").reset_index()#.groupby(["level_0"])[0].value_counts().unstack().plot.barh(stacked=True)
precision["precision_short"] = precision["precision"].map({
            "address":              "building",
            "address_00":           "country",
            "address_streetcenter": "street",
            "address_interpol":     "building",
            "street_interpol":      "building",
            "street_00":            "country",
            "street":               "street",
            "city":                 "city",
            "country":              "country",
        }).fillna("[no res]")
precision = precision.rename(columns={"level_0":"mode"})
precision["mode"] = precision["mode"].str.split("_").str[1]
precision

In [None]:
vc = precision.groupby(["mode"]).precision_short.value_counts().unstack()
vc = vc.reindex(["building", "street", "city", "[no res]"], axis=1)
vc.plot.barh(stacked=True, color=["tab:green", "tab:orange", "tab:red", "tab:grey"])

In [None]:
vc/addresses.shape[0]*100

## Timing

In [None]:
addresses_mode[["time_basic", "time_simple", "time_advanced"]].boxplot()

In [None]:
timing = addresses_mode[["time_basic", "time_simple", "time_advanced"]].unstack().rename("time").reset_index()
timing = timing.rename(columns={"level_0":"mode"})
timing["mode"] = timing["mode"].str.split("_").str[1]
timing

In [None]:
timing.groupby("mode").time.describe()

In [None]:
try:
    sbn.histplot(timing, x="time", hue="mode", element="poly")
except NameError:
    print("Seaborn is not installed, cannot plot this histogram")

In [None]:
addresses_mode[addresses_mode.precision_simple=="[no result]"]


In [None]:
addresses_mode.json_basic.apply(lambda x: x["peliasRaw"]["features"][0]["properties"]["source"] if len(x["peliasRaw"]["features"])>0 else "NONE").value_counts()

# res

In [None]:
addresses_mode.json_simple.apply(lambda x: x["peliasRaw"]["features"][0]["properties"]["source"] if len(x["peliasRaw"]["features"])>0 else "NONE").value_counts()

In [None]:
addresses_mode.json_advanced.apply(lambda x: x["peliasRaw"]["features"][0]["properties"]["source"] if len(x["peliasRaw"]["features"])>0 else "NONE").value_counts()

# Get By ID

In [None]:
addresses_seq["json_from_id"] = addresses_seq["best_id"].progress_apply(call_ws_by_id)

In [None]:
addresses_seq["json_from_id"]

In [149]:
x = addresses_seq.json_from_id.apply(lambda r: get(r, ["items", 0, "bestId"]) or \
                                               get(r, ["items", 0, "street", "id"]) or \
                                               get(r, ["items", 0, "municipality", "id"]))


In [None]:
addresses_seq[addresses_seq.best_id != x]

# Search city

In [None]:
addresses_seq.postCode.progress_apply(lambda r: call_ws_search_city(postcode=r))

In [None]:
addresses_seq.postName.progress_apply(lambda r: call_ws_search_city(postname=r))

In [None]:
addresses_seq.progress_apply(lambda r: call_ws_search_city(postname=r.postName, postcode=r.postCode), axis=1)

# Reverse

In [None]:
addresses_seq["json_from_reverse"] = addresses_seq.json.progress_apply(lambda r: call_ws_reverse(**get(r, ["items", 0, "coordinates"])))

In [None]:
addresses_seq

In [None]:
addresses_seq["best_id_from_reverse"] = addresses_seq.json_from_reverse.apply(lambda r: get(r, ["items", 0, "bestId"]) or\
                                                              get(r, ["items", 0, "street", "id"]) or\
                                                              get(r, ["items", 0, "municipality", "id"]))

addresses_seq

In [None]:
# Mismatch OK if:
# - primary result (in json) if not "address" level (streetname, city...)
# - coordinates are interpolated (precision='address_streetcenter', 'address_interpol', ...)
# - best_id is null (and by chance we find an Best address in the coordinates given by WhosOnFirst)
# - several addresses with the same coordinates

addresses_seq[addresses_seq.best_id != addresses_seq.best_id_from_reverse]#.iloc[0:60]