In [1]:
from elasticsearch import Elasticsearch, helpers
import json
import pandas as pd

# connect to Elasticsearch
es = Elasticsearch(['http://localhost:9200'])

# load sample data
sample = pd.read_csv('../data/ukpostcodes.csv',
                     usecols=['postcode', 'latitude', 'longitude'])

# filter to Manchester postcodes
man_postcodes = sample[sample.postcode.str.contains('^M\d+')].reset_index(drop=True)

In [2]:
man_postcodes.head()

Unnamed: 0,postcode,latitude,longitude
0,M9 8PA,53.533045,-2.23408
1,M9 8PB,53.532879,-2.236327
2,M9 8PD,53.532303,-2.236429
3,M9 8PE,53.532425,-2.238195
4,M9 8PF,53.533084,-2.237158


In [3]:
# construct Multi-Search API call
# list of queries
queries = []
for row in man_postcodes.itertuples():
    lon = row.longitude
    lat = row.latitude
    query = {
        'size': 1,
        '_source': {'exclude': ['geometry', 'level1*', 'level2*', 'level3_id', 'level_mapping']},
        'query': {
            'bool': {
                'filter': {
                    'geo_shape': {
                        'geometry': {
                            'shape': {
                                'type': 'point',
                                'coordinates': [lon, lat]
                            }
                        }
                    }
                }
            }
        }
    }
    queries.append(query)

# convert each dict to a string
queries_str = [json.dumps(query) for query in queries]

# split list into batches of 1,000 queries
def make_batches(lst, batch_size):
    for i in range(0, len(lst), batch_size):
        yield lst[i:i + batch_size]
batches = [batch for batch in make_batches(queries_str, 1_000)]

# format as new line delimited JSON
ndjson_batches = ['{}\n' + '\n{}\n'.join(batch) + '\n' for batch in batches]

In [8]:
%%timeit
# TEST on 1,000 docs - query Elasticsearch with Multi-Search API
results = es.msearch(body=ndjson_batches[0], index='census_boundaries')

520 ms ± 124 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [11]:
%%timeit
# TEST - query Elasticsearch with Search API in a for-loop
results = [es.search(body=queries[i], index='census_boundaries') for i, _ in enumerate(queries[0:999])]

4.64 s ± 64.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [12]:
# query Elasticsearch with Multi-Search API
results = [es.msearch(body=batch, index='census_boundaries') for batch in ndjson_batches]

# parse results
responses = [result.get('responses') for result in results]
source = [item.get('hits').get('hits')[0].get('_source') for response in responses 
                                                         for item in response]

In [13]:
# convert to dataframe
census_blocks = pd.DataFrame(source)

# column bind with original dataframe
mapped_postcodes = pd.concat([man_postcodes, census_blocks], axis='columns')
mapped_postcodes = mapped_postcodes.loc[:, ['postcode', 'level4_name', 'level3_name', 
                                            'level0_id', 'latitude', 'longitude']]

In [14]:
mapped_postcodes.head()

Unnamed: 0,postcode,level4_name,level3_name,level0_id,latitude,longitude
0,M9 8PA,England,Manchester,E00025808,53.533045,-2.23408
1,M9 8PB,England,Manchester,E00025812,53.532879,-2.236327
2,M9 8PD,England,Manchester,E00025811,53.532303,-2.236429
3,M9 8PE,England,Manchester,E00025811,53.532425,-2.238195
4,M9 8PF,England,Manchester,E00025812,53.533084,-2.237158
