## ES setup and Server launch

In [None]:
# Install txtai and elasticsearch python client
#!pip install git+https://github.com/neuml/txtai elasticsearch

# Download and extract elasticsearch
!wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.0.0-linux-x86_64.tar.gz -q
!tar -xzf elasticsearch-7.0.0-linux-x86_64.tar.gz
!chown -R daemon:daemon elasticsearch-7.0.0


In [None]:
# start server
import os
from subprocess import Popen, PIPE, STDOUT
es_server = Popen(['elasticsearch-7.0.0/bin/elasticsearch'], 
                  stdout=PIPE, stderr=STDOUT,
                  preexec_fn=lambda: os.setuid(1)  # as daemon
                 )

In [None]:
# wait a bit then test
!curl -X GET "localhost:9200/"

{
  "name" : "f852c08bc2c7",
  "cluster_name" : "elasticsearch",
  "cluster_uuid" : "rxEfohTLRzK5jzFx8h0KLQ",
  "version" : {
    "number" : "7.0.0",
    "build_flavor" : "default",
    "build_type" : "tar",
    "build_hash" : "b7e28a7",
    "build_date" : "2019-04-05T22:55:32.697037Z",
    "build_snapshot" : false,
    "lucene_version" : "8.0.0",
    "minimum_wire_compatibility_version" : "6.7.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}


# Install Required Libraries

In [None]:
# install requirements
!pip install elasticsearch
!pip install metaphone
!pip install pyjarowinkler
!pip install python-Levenshtein
!pip install python-dateutil

Collecting elasticsearch
  Downloading elasticsearch-7.14.0-py2.py3-none-any.whl (364 kB)
[?25l[K     |█                               | 10 kB 35.7 MB/s eta 0:00:01[K     |█▉                              | 20 kB 20.9 MB/s eta 0:00:01[K     |██▊                             | 30 kB 16.0 MB/s eta 0:00:01[K     |███▋                            | 40 kB 14.2 MB/s eta 0:00:01[K     |████▌                           | 51 kB 7.0 MB/s eta 0:00:01[K     |█████▍                          | 61 kB 6.9 MB/s eta 0:00:01[K     |██████▎                         | 71 kB 7.3 MB/s eta 0:00:01[K     |███████▏                        | 81 kB 8.2 MB/s eta 0:00:01[K     |████████                        | 92 kB 8.6 MB/s eta 0:00:01[K     |█████████                       | 102 kB 6.7 MB/s eta 0:00:01[K     |██████████                      | 112 kB 6.7 MB/s eta 0:00:01[K     |██████████▉                     | 122 kB 6.7 MB/s eta 0:00:01[K     |███████████▊                    | 133 kB 6.7 MB

In [None]:
# import lib for data ingestion
from elasticsearch import Elasticsearch
from argparse import ArgumentParser
import csv,time,logging, json
import pandas as pd
import numpy as np
from elasticsearch import helpers
from metaphone import doublemetaphone
from elasticsearch import exceptions

# Connecting Google drive 

In [None]:
# This step needs to be done for the first time when you're reading something from the Shared Project Folder 
# Please navigate to "Shared with me" on your Gdrive home sceen
# Right click on the "HNY_Project" (the primary folder for this project) and select "Add shortcut to Drive"
# This way the below code can find a link to the Project drive folder through your own drive.


# Mounting your personal Gdrive to the Colab notebook
from google.colab import drive
drive.mount('/gdrive',force_remount=True)

KeyboardInterrupt: ignored

In [None]:
bulk_insert_log = "gdrive/Mydrive/hnyc-spatial-linkage/ES_matching/doc/bulk_insert.log"
config_json_file = "gdrive/Mydrive/hnyc-spatial-linkage/ES_matching/doc/ES_config_1850_BK.json"

### Run Elastic Search on census data

In [None]:
# Wall Clock: 1m 24s
import re
es = Elasticsearch(hosts=["http://localhost:9200"], timeout=60, retry_on_timeout=True)

logging.basicConfig(filename= bulk_insert_log,
                            filemode='a',
                            format='%(asctime)s %(name)s %(message)s',
                            datefmt='%H:%M:%S',
                            level=logging.WARNING)
logger = logging.getLogger('InsertTime')

def ingest(config):
    df = pd.read_csv(config['census_filename'])
    bulk_data = []
    count = 0 
    for itr, row in df.iterrows():
        count+=1
        row = row.replace(np.nan,'',regex=True) #covnert nan to empty string
        data = row.to_dict() #converts the dataframe row to dictonary with their correct data type
        if 'LOCATION' in data:
          data['LOCATION'] = {"lat":data["LAT"],"lon":data["LONG"]} # find location
        
        if 'ADDNUMFROM' in data and type(data['ADDNUMFROM']) is str:
            data['ADDNUMFROM'] = data['ADDNUMFROM'].replace('`','')

        data[config['census_first_name']] = name_clean(data[config['census_first_name']])
        data[config['census_last_name']] = name_clean(data[config['census_last_name']])
        
        if config['metaphone'] is 1: # default metaphone is 1
          data['METAPHONE_NAMEFIRST'] = [i for i in doublemetaphone(data[config['census_first_name']]) if i]
          data['METAPHONE_NAMELAST'] = [i for i in doublemetaphone(data[config['census_last_name']]) if i]
        
        if id is not False:
          meta = {
              "_index": config['es-index'],
              "_id": data[config['es-id']],
              "_source": data
          }
        else:
          meta = {
              "_index": config['es-index'],
              "_source": data
          }

        bulk_data.append(meta)
        if itr%config['ingest_size'] == 0:
            helpers.bulk(es, bulk_data)
            bulk_data = []
            print("INSERTING NOW", itr)
            
    helpers.bulk(es, bulk_data)
    return count

def name_clean(name):
  name = max(name.split(' '), key=len) # remove middle name if any
  name = re.sub('[^A-Za-z0-9]+', '', name)
   
  return name

if __name__=='__main__':
    if __name__ == '__main__':

      with open(config_json_file) as json_data_file:
         config = json.load(json_data_file)

      st = time.time()
      ingest(config)
      end = time.time()
      logger.warning(config["es-index"] +" "+ str(end-st))
 
#Mapping used
'''
PUT census
{
    "mappings" : {
      "properties" : {
        "ADDNUM" : {
          "type" : "long"
        },
        "ADDNUMFROM" : {
          "type" : "long"
        },
        "ADDNUMTO" : {
          "type" : "long"
        },
        "ADDR_TYPE" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "CENSUS_ADDRESSB" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "CENSUS_AGEB" : {
          "type" : "long"
        },
        "CENSUS_BUILDING_I" : {
          "type" : "long"
        },
        "CENSUS_CITY" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "CENSUS_ED" : {
          "type" : "long"
        },
        "CENSUS_ENUMDISTB" : {
          "type" : "long"
        },
        "CENSUS_EXTGROUP_I" : {
          "type" : "long"
        },
        "CENSUS_FID" : {
          "type" : "long"
        },
        "CENSUS_MATCH_ADDR" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "CENSUS_MERGEID" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "CENSUS_NAMEFRSTB" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "CENSUS_NAMELASTB" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "CENSUS_NEIGHBOR_1" : {
          "type" : "long"
        },
        "CENSUS_NEIGHBOR_2" : {
          "type" : "long"
        },
        "CENSUS_NPERHHB" : {
          "type" : "long"
        },
        "CENSUS_OCCLABELB" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "CENSUS_PAGENUMB" : {
          "type" : "long"
        },
        "CENSUS_RACEB" : {
          "type" : "long"
        },
        "CENSUS_RACENAMEB" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "CENSUS_REELB" : {
          "type" : "long"
        },
        "CENSUS_RELATEB" : {
          "type" : "long"
        },
        "CENSUS_RELATE_STR" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "CENSUS_SEGGROUP_I" : {
          "type" : "long"
        },
        "CENSUS_SEGMENT_ID" : {
          "type" : "long"
        },
        "CENSUS_SERIAL" : {
          "type" : "long"
        },
        "CENSUS_SERIALB" : {
          "type" : "long"
        },
        "CENSUS_SEXB" : {
          "type" : "long"
        },
        "CENSUS_STATE" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "CENSUS_STREET" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "CENSUS_STREETB" : {
          "type" : "long"
        },
        "CENSUS_TYPEB" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "CENSUS_UNITTYPE" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "CENSUS_VOLUMEB" : {
          "type" : "long"
        },
        "CITY" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "COUNTY" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "LOCATION":{
             "type": "geo_point"
        },
        "LAT" : {
          "type" : "float"
        },
        "LONG" : {
          "type" : "float"
        },
        "MATCH_ADDR" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "METAPHONE_NAMEFIRST": {
          "type": "text",
          "fields": {
            "keyword": {
                "type": "keyword",
                "ignore_above": 256
            }
          }
        },
        "METAPHONE_NAMELAST": {
          "type": "text",
          "fields": {
            "keyword": {
                "type": "keyword",
                "ignore_above": 256
            }
          }
        },
        "OBJECTID" : {
          "type" : "long"
        },
        "SIDE" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "STADDR" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "STATE" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "STDIR" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "STNAME" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "STPREDIR" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "STPRETYPE" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "STTYPE" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "WARD_NUM" : {
          "type" : "long"
        }
      }
    }
  }
'''



FileNotFoundError: [Errno 2] No such file or directory: '/content/bulk_insert.log'

### Match cd and census data

In [None]:
# Wall clock: 54s 

with open('/content/config_1850_B.json') as json_data_file:
    config = json.load(json_data_file)

es = Elasticsearch(host=config['host'], port=config['port'])

def match_addr():
    df = pd.read_csv(config['cd_filename'])
    count, match,unmatch = 0,0,0
    with open(config['match_output_filename'],'w') as fw, open(config['unmatch_output_filename'],'w') as fw2:
        writer = csv.writer(fw, delimiter="\t",quotechar='"')
        columns = config["output_census_cols"] + config["output_city_directory_cols"]
        rows=""
        for cols in columns:
            rows = rows + cols + "\t"
        
        writer.writerow(rows.rstrip("\t").split("\t"))
        for index, row in df.iterrows():
            row = row.replace(np.nan,'',regex=True) #covnert nan to empty string
            data = row.to_dict()
            
            first_name_metaphone = [i for i in doublemetaphone(data["CD_FIRST_NAME"]) if i]
            last_name_metaphone = [i for i in doublemetaphone(data["CD_LAST_NAME"]) if i]

            if config['edit_distance'] !=0:
                if config['metaphone'] is 1:
                    query = { "bool" : { "must" : [{ "fuzzy": { "CENSUS_FIRST_NAME": { "value": data["CD_FIRST_NAME"], "fuzziness": config["edit_distance"], "max_expansions": 50, "prefix_length": 0, "transpositions": True, "rewrite": "constant_score" } } },
                            {"fuzzy": { "CENSUS_LAST_NAME": { "value": data["CD_LAST_NAME"], "fuzziness": config["edit_distance"], "max_expansions": 50, "prefix_length": 0, "transpositions": True, "rewrite": "constant_score" } }},
                            { "match" : { "CENSUS_WARD_NUM": data["CD_WARD_NUM"]} },
                            {"terms": {"METAPHONE_NAMELAST.keyword": last_name_metaphone}},
                            {"terms": {"METAPHONE_NAMEFIRST.keyword": first_name_metaphone}}]}}

                else:
                    query = { "bool" : { "must" : [{ "fuzzy": { "CENSUS_FIRST_NAME": { "value": data["CD_FIRST_NAME"], "fuzziness": config["edit_distance"], "max_expansions": 50, "prefix_length": 0, "transpositions": True, "rewrite": "constant_score" } } },
                            {"fuzzy": { "CENSUS_LAST_NAME": { "value": data["CD_LAST_NAME"], "fuzziness": config["edit_distance"], "max_expansions": 50, "prefix_length": 0, "transpositions": True, "rewrite": "constant_score" } }},
                            { "match" : { "CENSUS_WARD_NUM": data["CD_WARD_NUM"]} }
                            ]}}
            

            if config['edit_distance'] is 0:
                if config['metaphone'] is 1:
                   query =  { "bool" : { "must" : [{ "match": { "CENSUS_FIRST_NAME": data["CD_FIRST_NAME"] } },
                            { "match": { "CENSUS_LAST_NAME": data["CD_LAST_NAME"] } },
                            { "match" : { "CENSUS_WARD_NUM": data["CD_WARD_NUM"]} },
                            {"terms": {"METAPHONE_NAMELAST.keyword": last_name_metaphone}},
                            {"terms": {"METAPHONE_NAMEFIRST.keyword": first_name_metaphone}}]}}
            
                else:
                    query = { "bool" : { "must" : [{ "match": { "CENSUS_FIRST_NAME": data["CD_FIRST_NAME"] } },
                            { "match": { "CENSUS_LAST_NAME": data["CD_LAST_NAME"] } }, 
                            { "match" : { "CENSUS_WARD_NUM": data["CD_WARD_NUM"]}}
                            ]}}

            
            try:
                res = es.search(index=config["es-index"], body={ "from": 0, "size": 10000, "query":query})
            except exceptions.RequestError:
                print("Exception at row id: ", index)
                continue
            
            if res['hits']['total']['value']!= 0:
                for i in res['hits']['hits']:
                    i = i['_source']
                    content = ""
                    for j in config["output_census_cols"]:
                        content = content + str(i[j]) + "\t"

                    for j in config["output_city_directory_cols"]:
                        content = content + str(data[j]) + "\t"

                    writer.writerow(content.rstrip("\t").split("\t"))
                    match+=1
            else:
                fw2.write(str(data['CD_RECORD_ID'])+"\n")
                unmatch+=1
        
    print(count,match,unmatch)

if __name__=='__main__':
    st = time.time()
    match_addr()
    end = time.time()
    print(config["es-index"] +" "+ str(end-st))

Exception at row id:  923
Exception at row id:  1455
Exception at row id:  1707
Exception at row id:  1969
Exception at row id:  2084
Exception at row id:  2425
Exception at row id:  2556
Exception at row id:  2627
Exception at row id:  3162
Exception at row id:  3165
Exception at row id:  3216
Exception at row id:  3496
Exception at row id:  3501
Exception at row id:  3746
Exception at row id:  3774
Exception at row id:  3775
Exception at row id:  3981
Exception at row id:  4034
Exception at row id:  4063
Exception at row id:  4545
Exception at row id:  4551
Exception at row id:  4886
Exception at row id:  4955
Exception at row id:  5039
Exception at row id:  5069
Exception at row id:  5699
Exception at row id:  5827
Exception at row id:  5866
Exception at row id:  5926
Exception at row id:  5944
Exception at row id:  6153
Exception at row id:  6287
Exception at row id:  6333
Exception at row id:  6446
Exception at row id:  7103
Exception at row id:  7104
Exception at row id:  7105
Ex