In [None]:
import pandas as pd
import urllib

import numpy as np

import json

from tqdm.autonotebook import tqdm

#%matplotlib inline

tqdm.pandas()

import dask.dataframe as dd

from dask.multiprocessing import get
from dask.diagnostics import ProgressBar

from datetime import datetime
import matplotlib.pyplot as plt

from IPython.display import display


In [None]:
import urllib3

In [None]:
http = urllib3.PoolManager()

In [None]:
from config_batch import * 

# Functions

In [None]:
ws_hostname = "127.0.1.1"
ws_hostname = "172.27.0.64"

# ws_hostname = "192.168.1.3"

In [None]:
def call_ws(addr_data, check_result=True, structured_osm=False): #lg = "en,fr,nl"
    t = datetime.now()
    
    try: 
        r = http.request(
        'POST',
        f'http://{ws_hostname}:5000/search',
        fields= { 
             "street": addr_data[street_field],
             "housenumber": addr_data[housenbr_field],
             "city": addr_data[city_field],
             "postcode": addr_data[postcode_field],
             "country": addr_data[country_field],
             "check_result" : "yes" if check_result else "no",
             "struct_osm" : "yes" if structured_osm else "no"

        })
    except Exception as e:
        print("Exception !")
        print(addr_data)
        print(e)
        raise e
        
    if r.status == 204:
        print("No result!")
        return
    elif r.status == 200:
        try:
            res = json.loads(r.data.decode('utf-8'))
            res["time"] = (datetime.now() - t).total_seconds()
        except ValueError as ve:

            print("Cannot decode result:")
            print(ve)
            print(r.data.decode('utf-8'))
            return r.data
        return res
    else: 
        print(f"Unknown return code: {r.status} ")



In [None]:
def call_ws_batch(addr_data, mode="geo", with_rejected=False, check_result=True, structured_osm=False): #lg = "en,fr,nl"
    file_data = addr_data.rename(columns = {
        street_field : "street",
        housenbr_field: "housenumber",
        postcode_field: "postcode",
        city_field: "city",
        country_field: "country",
        addr_key_field : "addr_key"
    }).to_csv(index=False)
    
    r = http.request(
    'POST',
    f'http://{ws_hostname}:5000/batch',
    fields= { 
        'media': ('addresses.csv', file_data),
        'mode': mode,
        "with_rejected": "yes" if with_rejected else "no",
        "check_result" : "yes" if check_result else "no",
        "struct_osm"   : "yes" if structured_osm else "no",
        #"extra_house_nbr": "no"
    })
    
    try:
        res = pd.DataFrame(json.loads(r.data.decode('utf-8')))
    except ValueError as ve:
        
        print("Cannot decode result:")
        print(ve)
        print(r.data.decode('utf-8'))
     
        return r.data
#     display(res)
    return res

In [None]:
def expand_json(addresses):
    addresses["status"]= addresses.json.apply(lambda d: "error" if "error" in d else "match" if "match" in d else "no_result")
    addresses["time"]  = addresses.json.apply(lambda d: d["time"])

    addresses["timing"]  = addresses.json.apply(lambda d: d["timing"] if "timing" in d else {})
    
    for f in ["method", "place_rank"]:
        addresses[f]= addresses.json.apply(lambda d: d["match"][0][f] if len(d)>0 and "match" in d else "none")
    
    for field in ["street", "number", "postcode", "city"]:
        addresses["addr_out_"+field]= addresses.json.apply(lambda d: d["match"][0]["addr_out_"+field] if len(d)>0 and "match" in d else "")
    return 

# Calls

## Single address calls

In [None]:
res=call_ws({street_field:   "Avenue Fonsny", \
         housenbr_field: "20",\
         city_field:     "Saint-Gilles",\
         postcode_field: "1060",\
         country_field:  "Belgique"}, check_result=False, structured_osm=False)
res

## Batch calls (row by row)

In [None]:
addresses = pd.concat([
    #get_addresses("../GISAnalytics/data/geocoding/resto_1000_sample.csv.gz"),
    #get_addresses("../GISAnalytics/data/geocoding/best_1000_sample.csv.gz"),
    get_addresses("address.csv.gz")])
addresses = addresses.reset_index(drop=True)
addresses["addr_key"] = addresses.index.astype(str)
addresses

In [None]:
# addresses = addresses[addresses.addr_key.isin([ "2078829"])]#"1622",
# addresses


### Simple way

In [None]:
addresses_seq = addresses.copy()

t = datetime.now()
addresses_seq["json"] = addresses_seq.fillna("").progress_apply(call_ws, check_result=False, structured_osm=False, axis=1)
tot_time = (datetime.now() - t).total_seconds()
print(f"{tot_time:.2f} seconds, {addresses_seq.shape[0]/tot_time:.2f} it/s")
# KBO dataset:
# Normal mode: 128.78 seconds, 7.77 it/s
# Fastmode:     68.80 seconds, 14.54 it/s

#Resto dataset: 
# Normal mode: 145.73 seconds, 6.86 it/s
# Fast mode:    82.99 seconds, 12.05 it/s

# Best dataset:
# Normal mode: 108.53 seconds, 9.21 it/s
# Fast mode: 37.44 seconds, 26.71 it/s

In [None]:
expand_json(addresses_seq)
addresses_seq

In [None]:
# addresses_seq.iloc[0].json

In [None]:
addresses_seq.method.value_counts()

### Using Dask

In [None]:
addresses_dask = addresses.copy()

In [None]:
t = datetime.now()
dd_addresses = dd.from_pandas(addresses_dask.fillna(""), npartitions=8)

dask_task = dd_addresses.apply(call_ws, check_result=False, meta=('x', 'str'), axis=1)

with ProgressBar(): 
    addresses_dask["json"] = dask_task.compute()
    
tot_time = (datetime.now() - t).total_seconds()
print(f"{tot_time:.2f} seconds, {addresses_dask.shape[0]/tot_time:.2f} it/s")
# KBO dataset:
# Normal mode: 24.52 seconds, 40.79 it/s
# Fastmode:    15.81 seconds, 63.27 it/s


# Resto dataset:
# Normal mode: 27.86 seconds, 35.89 it/s
# Fast mode:   18.44 seconds, 54.23 it/s

# Best dataset: 
# Normal mode: 16.11 seconds, 62.07 it/s
# Fast mode:    9.76 seconds, 102.42 it/s

In [None]:
# 1000, 1 worker: 4m18
# 4 workers, npart=4 : 1m20
# 8 workers, npart=4 : 1m20
# 8 workers, npart=8 : 44s

# with checker=False:
# 8 workers, npart=8 : 24s


In [None]:
expand_json(addresses_dask)
addresses_dask

In [None]:
addresses_dask.method.value_counts()#.json.loc[550]

In [None]:
mg = addresses_seq[["addr_key", "city", "postcode","street", "housenumber", "addr_out_street", "addr_out_number", "addr_out_postcode", "addr_out_city"]].merge(
    addresses_dask[["addr_key", "city", "postcode","street", "housenumber", "addr_out_street", "addr_out_number", "addr_out_postcode", "addr_out_city"]], how="outer", indicator=True)
if mg.shape[0] == addresses.shape[0]:
    print("Same result in seq and dask run!")
else: 
    print("!!! Not the same result in seq and dask run!")
    

In [None]:
mg[mg._merge != "both"].sort_values("addr_key")


## Batch calls (batch WS)

### Single block

In [None]:
t = datetime.now()

addresses_batch = call_ws_batch(addresses[[addr_key_field, 
                                           street_field, housenbr_field, postcode_field, city_field, country_field]], 
                                mode="long", 
                                check_result=False, 
                                structured_osm=False,
                               with_rejected=True)

tot_time = (datetime.now() - t).total_seconds()
print(f"{tot_time:.2f} seconds, {addresses.shape[0]/tot_time:.2f} it/s")
# KBO dataset: 33.94 seconds, 29.46 it/s
# Best:        24.99 seconds, 40.01 it/s
# Resto:       38.33 seconds, 26.09 it/s

addresses_batch

In [None]:
rejected_addresses = addresses_batch[addresses_batch.reject.apply(lambda lst: len(lst)>0)].reject.apply(pd.Series).unstack().dropna().apply(pd.Series)
rejected_addresses

In [None]:
rejected_addresses.reject_reason.value_counts()

In [None]:
rejected_addresses[rejected_addresses.reject_reason=="mismatch"]

In [None]:
addresses_batch

In [None]:
mg = addresses_seq[[ "city", "postcode","street", "housenumber", "method", "addr_out_street", "addr_out_number", "addr_out_postcode", "addr_out_city", "addr_key"]].fillna("").replace("fast", "orig").merge(
    addresses_batch[["city", "postcode","street", "housenumber", "method", "addr_out_street", "addr_out_number", "addr_out_postcode", "addr_out_city", "addr_key"]].fillna(""), how="outer", indicator=True)
if mg[mg._merge == "both"].shape[0] == addresses.shape[0]:
    print("Same result in seq and dask run!")
else: 
    print("!!! Not the same result in seq and dask run!")
    

In [None]:
addresses_batch

In [None]:
mg[mg._merge != "both"]

In [None]:
# Geocode + address
call_ws_batch(addresses[[addr_key_field, street_field, housenbr_field, postcode_field, city_field, country_field]], mode="geo", check_result=False) 

In [None]:
# Geocode + address
call_ws_batch(addresses[[addr_key_field, street_field, housenbr_field, postcode_field, city_field, country_field]], mode="short", check_result=False) 

In [None]:
# Geocode + address, with rejected addresses
call_ws_batch(addresses, mode="long", with_reject=True)

### Batch blocs

In [None]:
# addresses = addresses.sample(10000, replace=True)
# addresses = addresses.reset_index(drop=True)
# addresses["addr_key"]= addresses.index.astype(str)

In [None]:
t = datetime.now()

nb_threads=8

chunks = np.array_split(addresses, nb_threads) # addresses.shape[0]//100)

print(f"{len(chunks)} chunks on {nb_threads} threads")

import multiprocess as mp

p = mp.Pool(nb_threads)

def f(chunk):
    return call_ws_batch(chunk, mode="long", 
                        check_result=False, 
                        structured_osm=False)

with p:
     res= list(tqdm(p.imap(f, chunks), total=len(chunks)))
    
addresses_batch2 = pd.concat(res).reset_index(drop=True)

tot_time = (datetime.now() - t).total_seconds()
print(f"{tot_time:.2f} seconds, {addresses.shape[0]/tot_time:.2f} it/s")
# KBO:    9.28 seconds, 107.72 it/s
# Best:   6.88 seconds, 145.43 it/s
# Resto: 11.79 seconds,  84.85 it/s

In [None]:
# addresses_batch2

In [None]:
mg = addresses_seq[[ "city", "postcode","street", "housenumber", "method", "addr_out_street", "addr_out_number", "addr_out_postcode", "addr_out_city", "addr_key"]].fillna("").replace("fast", "orig").merge(
    addresses_batch2[["city", "postcode","street", "housenumber", "method", "addr_out_street", "addr_out_number", "addr_out_postcode", "addr_out_city", "addr_key"]].fillna(""), how="outer", indicator=True)
if mg[mg._merge == "both"].shape[0] == addresses.shape[0]:
    print("Same result in seq and dask run!")
else: 
    print("!!! Not the same result in seq and dask run!")
    

In [None]:
mg[mg._merge != "both"].sort_values("addr_key")

## Comparing options

In [None]:
addresses = get_addresses("address.csv.gz")
addresses = addresses[addresses[country_field] == "Belgique"]
addresses = addresses.sample(10000).copy()

In [None]:
results = {}
it_per_seconds=pd.DataFrame()

for check_label in ["check", "nocheck"]:
    for struct_label in ["struct", "unstruct" ]:
        print(check_label, struct_label)
        start=datetime.now()
        
        results[(check_label, struct_label)] = call_ws_batch_chunks(addresses, 
                                                                    mode="short", 
                                                                    check_result   =  check_label == "check", 
                                                                    structured_osm =  struct_label == "struct")
        
        it_per_seconds.loc[check_label, struct_label] = addresses.shape[0] / (datetime.now()-start).total_seconds()
print("Iterations per seconds:")
it_per_seconds

In [None]:
print("Match rate")
pd.DataFrame({k1: {k2: results[(k1,k2)].shape[0]/addresses.shape[0] for k2 in ["struct", "unstruct"]} 
                  for k1 in  ["check","nocheck"]})

In [None]:
print("Match rate (without nostreet)")
pd.DataFrame({k1: {k2: results[(k1,k2)].query("method!='nostreet'").shape[0]/addresses.shape[0] for k2 in ["struct", "unstruct"]} 
                  for k1 in  ["check","nocheck"]})

In [None]:
print("Unmatched addresses")
for k1 in results:
    print(k1)
    nomatch=addresses[~addresses[addr_key_field].isin(results[k1]["addr_key"])]
    display(nomatch)
    print(nomatch[country_field].value_counts())

In [None]:
vc_values = pd.DataFrame(columns=results.keys(), index=results.keys())

for k1 in results:
    vc_values.loc[k1, k1] = results[k1].shape[0]
    for k2 in results:
        if k1>k2:
            r1=results[k1]
            r2=results[k2]
            mg = r1[["addr_key", "place_id"]].merge(r2[["addr_key", "place_id"]], on="addr_key", how="outer", indicator=True)
 
            vc = mg._merge.value_counts()

            mismatches = mg[mg.place_id_x != mg.place_id_y][["addr_key"]]
            mismatches = mismatches.merge(addresses.rename({addr_key_field:"addr_key"}, axis=1))
            mismatches = mismatches.merge(r1[["addr_key", "addr_out_street", "addr_out_number", "extra_house_nbr", "addr_out_postcode", "addr_out_city"]], on="addr_key")
            mismatches = mismatches.merge(r2[["addr_key", "addr_out_street", "addr_out_number", "extra_house_nbr", "addr_out_postcode", "addr_out_city"]], on="addr_key")
            mismatches.columns = pd.MultiIndex.from_arrays([["Input"]*6 + [f"x:{k1}"]*5 + [f"y:{k2}"]*5, mismatches.columns])

            mismatch_values = mismatches[(mismatches[f"x:{k1}"].rename(lambda x: x.replace("_x", ""), axis=1).fillna("") != 
                                          mismatches[f"y:{k2}"].rename(lambda x: x.replace("_y", ""), axis=1).fillna("")).any(axis=1)]
            
            mismatch_values_no_nmbr = mismatches[(mismatches[f"x:{k1}"].rename(lambda x: x.replace("_x", ""), axis=1).drop("addr_out_number", axis=1).fillna("") != 
                                                  mismatches[f"y:{k2}"].rename(lambda x: x.replace("_y", ""), axis=1).drop("addr_out_number", axis=1).fillna("")).any(axis=1)]
            
            
            vc_label = f"{vc['both']} ({mismatches.shape[0]} - {mismatch_values.shape[0]} - {mismatch_values_no_nmbr.shape[0]}) / {vc['left_only']} / {vc['right_only']}"
            vc_values.loc[k1, k2]=vc_label

                
            print(f"{k1} vs {k2}")
            print(vc_label)
            print("-----------------------------")
            
            print(f"Only in {k1}")
            display(r1[r1.addr_key.isin(mg[mg._merge=="left_only"].addr_key)].merge(addresses.rename({addr_key_field:"addr_key"}, axis=1)))
            
            print(f"Only in {k2}")
            display(r2[r2.addr_key.isin(mg[mg._merge=="right_only"].addr_key)].merge(addresses.rename({addr_key_field:"addr_key"}, axis=1)))
            
            print("Mismatch on place_id")
            display(mismatches)
            
            print("Mismatch on values")
            
            display(mismatch_values)
            
            print("Mismatch on values (no nbr)")
            display(mismatch_values_no_nmbr)
            
            print("#######################")
            
# display(vc_values.fillna(""))

In [None]:
print("Common in both (disagree on place_id - disagree on values - disagree on values, ignoring number) / results only for row / results only for columns")
vc_values.fillna("")

# tests

In [None]:
osm_host ="172.27.0.64:8080"
def get_osm(addr, accept_language = ""): #lg = "en,fr,nl"
    params = urllib.parse.urlencode({"q": addr,
                                    "format":"jsonv2",
                                    "accept-language":accept_language,
                                    "addressdetails":"1",
                                    "namedetails" : "1",
                                    "limit": "50"
                                    })
    
    url = "http://%s/search.php?%s"%(osm_host, params)
#     print(f"Call to OSM: {url}")
    try: 
        with urllib.request.urlopen(url) as response:
            res = response.read()
            res = json.loads(res)
#             return res
            return [ {field: item[field] for field in ["place_id", "lat", "lon", "display_name", "address", "namedetails", "place_rank", "category", "type"]} for item in res] 
    except Exception as e:
        raise Exception (f"Cannot get OSM results ({osm_host}): {e}") 

In [None]:
%timeit get_osm("Av. Fonsny 20, 1060 Bruxelles")

In [None]:
%timeit call_ws_test({street_field:   "Av. Fonsny", \
         housenbr_field: "20",\
         city_field:     "Saint-Gilles",\
         postcode_field: "1060",\
         country_field:  "Belgium"}, check_result=False, structured_osm=False)
# res