In [44]:
from uk_postcodes_parsing import ukpostcode, fix, postcode_utils
import pandas as pd
import json
import numpy as np
import re, os, sys
from pprint import pprint

def move_working_dir_to_repo_root(repo_name="orgsync"):
    """
    Move the current working directory to the root of the repository.
    """
    current_dir = os.getcwd()
    while os.path.basename(current_dir).lower() != repo_name:
        current_dir = os.path.dirname(current_dir)
    os.chdir(current_dir)
    print("Current working directory: ", os.getcwd())

move_working_dir_to_repo_root(repo_name="orgsync")

Current working directory:  /home/ubuntu/OrgSync


In [45]:
### Load full gtr and cordis datasets (update the cleaning code...)
data_dir = os.path.join("data", "raw")
gtr_path = os.path.join(data_dir, "gtr_data.json")
cordis_path = os.path.join(data_dir, "uk_data.json")

# with open(gtr_path, "r") as f:
#     gtr_data = json.load(f)

# with open(cordis_path, "r") as f:
#     cordis_data = json.load(f)

def rename_fields(data, field_renaming):
    for record in data:
        for old_field, new_field in field_renaming.items():
            record[new_field] = record.pop(old_field)
    return data

def keep_fields(original_data, fields_to_keep, inplace=False):
    # avoid modifying original json by accident!
    if not inplace:
        data = original_data.copy()
    else:
        data = original_data
    for record in data:
        for field in list(record.keys()):
            if field not in fields_to_keep:
                record.pop(field)
    return data

# def parse_postcode(postcode):
#     parsed = ukpostcode.parse_from_corpus(postcode, attempt_fix=True)
#     # parsed is an empty list if the postcode is invalid
#     if len(parsed) == 0:
#         return None
#     return parsed[0].__dict__

def create_single_valued_field(data, field_name, value):
    for record in data:
        record[field_name] = value
    return data


def convert_field_to_str(data, field):
    """
    convert all values in a field to strings
    """
    for record in data:
        print(record[field])
        print(type(record[field]))
        record[field] = str(record[field])
        print(record[field])
    return data

data_path = os.path.join("data","splink","all_data.json")
with open(data_path, "r") as f:
    data = json.load(f)

postcodes = keep_fields(data, ["dataset", "unique_id", "postcode"])


## Update cleaning code...
# gtr_field_renaming = {
#     "id": "unique_id",
#     "postCode": "postcode",
# }

# cordis_field_renaming = {
#     "organisationID": "unique_id",
#     # "postCode": "postcode",
# }

# gtr_data = rename_fields(gtr_data, gtr_field_renaming)
# cordis_data = rename_fields(cordis_data, cordis_field_renaming)

# create_single_valued_field(gtr_data, "dataset", "gtr")
# create_single_valued_field(cordis_data, "dataset", "cordis")


# fields_to_keep = ["dataset","unique_id", "postcode"]

# # isolate poctcodes, keeping ids to link back to original data
# cordis_postcodes = keep_fields(cordis_data, fields_to_keep)
# gtr_postcodes = keep_fields(gtr_data, fields_to_keep)

# # combine
# all_postcodes = cordis_postcodes + gtr_postcodes

# df = pd.DataFrame(all_postcodes)
# df.tail()




In [None]:
def remove_whitespace(postcode):
    return re.sub(r"\s+", "", postcode)

def parse_postcode(postcode):
    parsed = try_parsing(postcode)
    if parsed:
        return parsed
    parsed = try_parsing(remove_whitespace(postcode))
    return parsed

        
def try_parsing(postcode):
    try:
        parsed = a(postcode)
        if parsed:
            return parsed
    except Exception as e:
        pass
    try:
        parsed = b(postcode)
        if parsed:
            return parsed
    except Exception as e:
        pass
    try:
        parsed = c(postcode)
        if parsed:
            return parsed
    except Exception as e:
        pass
    else:
        return None

def a(postcode):
    try:
        parsed = ukpostcode.parse(postcode)
        return parsed[0].__dict__
    except Exception as e:
        pass
    else:
        return None
    
def b(postcode):
    try:
        parsed = ukpostcode.parse_from_corpus(postcode, attempt_fix=True)
        return parsed[0].__dict__
    except Exception as e:
        pass
    else:
        return None
    
def c(postcode):
    try:
        parsed = ukpostcode.parse_from_corpus(postcode, attempt_fix=True, try_all_fix_options=True)
        return parsed[0].__dict__
    except Exception as e:
        pass
    else:
        return None



def create_parsed_postcodes_fields(data):
    """
    Takes in json of records with fields "unique_id" and "postcode". Parses postcodes with
    ukpostcode library and adds parsed postcode fields to each record.

    If len(parse_postcode) == 0, then the postcode is not valid, and the record should be 
    populated with None values.
    
    e.g.
    parse_postcode("SW1A 1AA") ->
        {'original': 'SW1A 1AA',
        'postcode': 'SW1A 1AA',
        'incode': '1AA',
        'outcode': 'SW1A',
        'area': 'SW',
        'district': 'SW1',
        'sub_district': 'SW1A',
        'sector': 'SW1A 1',
        'unit': 'AA',
        'fix_distance': 0,
        'is_in_ons_postcode_directory': True} 
    """
    empty_fields = {
            "original": None,  
            "postcode": None,
            "incode": None,
            "outcode": None,
            "area": None,
            "district": None,
            "sub_district": None,
            "sector": None,
            "unit": None,
            "fix_distance": None,
            "is_in_ons_postcode_directory": None
    }

    for record in data:
        if not record["postcode"]:
            record.update(empty_fields)
            continue
        parsed = parse_postcode(record["postcode"])
        if not parsed:
            record.update(empty_fields)
            continue
        record.update(parsed)

    # for all fields other than unique_id and dataset, modify to "parsed." + field
    for record in data:
        for field in list(record.keys()):
            if field not in ["unique_id", "dataset"]:
                record["parsed." + field] = record.pop(field)
    return data

all_postcodes = create_parsed_postcodes_fields(postcodes)
all_postcodes
# save to json
save_path = os.path.join("data", "splink", "parsed_postcodes.json")
# check if path exists, else create
if not os.path.exists(os.path.dirname(save_path)):
    os.makedirs(os.path.dirname(save_path))
with open(save_path, "w") as f:
    json.dump(all_postcodes, f, indent=2)


INFO:uk-postcodes-parsing.ukpostcode:Postcode Fixed: 'le13 opb' => 'LE13 0PB'
INFO:uk-postcodes-parsing.ukpostcode:Postcode Fixed: 'le13 opb' => 'LE13 0PB'
INFO:uk-postcodes-parsing.ukpostcode:Postcode Fixed: 'ox3 obp' => 'OX3 0BP'
INFO:uk-postcodes-parsing.ukpostcode:Postcode Fixed: 'ox3 obp' => 'OX3 0BP'
INFO:uk-postcodes-parsing.ukpostcode:Postcode Fixed: 'nw1 otu' => 'NW1 0TU'
INFO:uk-postcodes-parsing.ukpostcode:Postcode Fixed: 'nw1 otu' => 'NW1 0TU'
INFO:uk-postcodes-parsing.ukpostcode:Postcode Fixed: 'rh13 osz' => 'RH13 0SZ'
INFO:uk-postcodes-parsing.ukpostcode:Postcode Fixed: 'rh13 osz' => 'RH13 0SZ'
INFO:uk-postcodes-parsing.ukpostcode:Postcode Fixed: 'rh13 osz' => 'RH13 0SZ'
INFO:uk-postcodes-parsing.ukpostcode:Postcode Fixed: 'rh13 osz' => 'RH13 0SZ'
INFO:uk-postcodes-parsing.ukpostcode:Postcode Fixed: 'cw10 ohx' => 'CW10 0HX'
INFO:uk-postcodes-parsing.ukpostcode:Postcode Fixed: 'cw10 ohx' => 'CW10 0HX'
ERROR:uk-postcodes-parsing.ukpostcode:Unable to fix postcode
ERROR:uk-po

In [4]:
ukpostcode.Postcode.__dict__

mappingproxy({'__module__': 'uk_postcodes_parsing.ukpostcode',
              '__annotations__': {'is_in_ons_postcode_directory': bool,
               'fix_distance': int,
               'original': str,
               'postcode': str,
               'incode': str,
               'outcode': str,
               'area': str,
               'district': str,
               'sub_district': typing.Optional[str],
               'sector': str,
               'unit': str},
              '__doc__': 'Class to hold the parsed postcode.\n    Constructor arguments:\n        original (str): The raw (original) string of the postcode.\n        postcode (str): The postcode as a string.\n        incode (str): The inward code (the first 3 characters) of the postcode.\n        outcode (str): The outward code (the last 4 characters) of the postcode.\n        area (str): The area of the postcode.\n        district (str): The district of the postcode.\n        sub_district (str): The sub-district of the postco

In [31]:
good_code = "SW1A 1AA"
bad_code = "w1 t 5 hd"

def remove_whitespace(postcode):
    return re.sub(r"\s+", "", postcode)

In [10]:
parsed = parse_postcode(good_code)
parsed

{'original': 'SW1A 1AA',
 'postcode': 'SW1A 1AA',
 'incode': '1AA',
 'outcode': 'SW1A',
 'area': 'SW',
 'district': 'SW1',
 'sub_district': 'SW1A',
 'sector': 'SW1A 1',
 'unit': 'AA',
 'fix_distance': 0,
 'is_in_ons_postcode_directory': True}

In [30]:
parsed = parse_postcode(bad_code)
parsed

In [32]:
ukpostcode.parse(bad_code)

ERROR:uk-postcodes-parsing.ukpostcode:Unable to fix postcode
ERROR:uk-postcodes-parsing.ukpostcode:Failed to parse postcode: w1 t 5 hd


In [34]:
ukpostcode.parse(remove_whitespace(bad_code))

Postcode(is_in_ons_postcode_directory=True, fix_distance=0, original='w1t5hd', postcode='W1T 5HD', incode='5HD', outcode='W1T', area='W', district='W1', sub_district='W1T', sector='W1T 5', unit='HD')

In [19]:
parsed =ukpostcode.parse_from_corpus(bad_code, attempt_fix=True, try_all_fix_options=True)
parsed[1].__dict__

IndexError: list index out of range

In [42]:
good_code = "SW1A 1AA"
bad_code = "w1 t 5 hd"

def remove_whitespace(postcode):
    return re.sub(r"\s+", "", postcode)

def postcode_parser(postcode):
    parsed = try_parsing(postcode)
    if parsed:
        return parsed
    parsed = try_parsing(remove_whitespace(postcode))
    return parsed

        
def try_parsing(postcode):
    try:
        parsed = a(postcode)
        if parsed:
            return parsed
    except Exception as e:
        pass
    try:
        parsed = b(postcode)
        if parsed:
            return parsed
    except Exception as e:
        pass
    try:
        parsed = c(postcode)
        if parsed:
            return parsed
    except Exception as e:
        pass
    else:
        return None

def a(postcode):
    try:
        parsed = ukpostcode.parse(postcode)
        return parsed[0].__dict__
    except Exception as e:
        pass
    else:
        return None
    
def b(postcode):
    try:
        parsed = ukpostcode.parse_from_corpus(postcode, attempt_fix=True)
        return parsed[0].__dict__
    except Exception as e:
        pass
    else:
        return None
    
def c(postcode):
    try:
        parsed = ukpostcode.parse_from_corpus(postcode, attempt_fix=True, try_all_fix_options=True)
        return parsed[0].__dict__
    except Exception as e:
        pass
    else:
        return None

parsed = postcode_parser("bloo")


ERROR:uk-postcodes-parsing.ukpostcode:Unable to fix postcode
ERROR:uk-postcodes-parsing.ukpostcode:Failed to parse postcode: bloo
ERROR:uk-postcodes-parsing.ukpostcode:Unable to fix postcode
ERROR:uk-postcodes-parsing.ukpostcode:Failed to parse postcode: bloo


In [43]:
print(parsed)

None


KeyError: 0