# Inserting Individual PSCs into the graph

The largest class of PSC are the individual "human" owners. Here we load in the full dataset and select out the individuals before converting the raw data into a form that will be stored within the neo4j graph.

In [18]:
import pandas as pd
import json
from pandas.io.json import json_normalize
import numpy as np
from neo4j.v1 import GraphDatabase

In [2]:
original_psc_data = pd.read_json('./data/psc_snapshot-2017-09-08.json')
all_records_psc = pd.concat([original_psc_data['company_number'],json_normalize(original_psc_data['data'])],axis=1)
del original_psc_data

In [3]:
all_records_psc.head()

Unnamed: 0,company_number,address.address_line_1,address.address_line_2,address.care_of,address.country,address.locality,address.po_box,address.postal_code,address.premises,address.region,...,name_elements.middle_name,name_elements.surname,name_elements.title,nationality,natures_of_control,notified_on,persons_of_significant_control_count,restrictions_notice_withdrawal_reason,statement,statements_count
0,9145694,St. Andrews Road,,,England,Henley-On-Thames,,RG9 1HP,2,,...,Thanh,Wildman,Mrs,Vietnamese,[ownership-of-shares-50-to-75-percent],2016-04-06,,,,
1,8581893,High Street,Wendover,,England,Aylesbury,,HP22 6EA,14a,Buckinghamshire,...,Robert Charles,Davies,Mr,British,"[ownership-of-shares-25-to-50-percent, ownersh...",2016-06-30,,,,
2,8581893,Holywells Road,,,United Kingdom,Ipswich,,IP3 0DL,37-41,Suffolk,...,Fiona,Tarrant,,British,"[ownership-of-shares-25-to-50-percent, voting-...",2016-04-06,,,,
3,1605766,20-22 Wenlock Road,,,England,London,,N1 7GU,Suite Lp33221,,...,,,,,[ownership-of-shares-75-to-100-percent],2016-04-06,,,,
4,10259080,38 Church Road,Worcester Park,,,Surrey,,KT4 7RD,,,...,Peter,Ollett,Mr,British,[ownership-of-shares-50-to-75-percent],2016-06-30,,,,


In [5]:
all_records_psc.kind.value_counts()

individual-person-with-significant-control          4225140
persons-with-significant-control-statement           404603
corporate-entity-person-with-significant-control     344866
legal-person-person-with-significant-control           5490
super-secure-person-with-significant-control            186
exemptions                                               37
totals#persons-of-significant-control-snapshot            1
Name: kind, dtype: int64

## Filter the data to only handle individual people with control

In [4]:
human_psc = all_records_psc[all_records_psc.kind == "individual-person-with-significant-control"]
del all_records_psc

In [5]:
human_psc[human_psc['company_number'] == '10262086']

Unnamed: 0,company_number,address.address_line_1,address.address_line_2,address.care_of,address.country,address.locality,address.po_box,address.postal_code,address.premises,address.region,...,name_elements.middle_name,name_elements.surname,name_elements.title,nationality,natures_of_control,notified_on,persons_of_significant_control_count,restrictions_notice_withdrawal_reason,statement,statements_count
10903,10262086,Wimborne Road,Colehill,,England,Wimborne,,BH21 2QS,1 Richmond Villas,Dorset,...,Louise,Wilkey,Mrs,English,"[ownership-of-shares-25-to-50-percent, voting-...",2016-04-06,,,,


In [10]:
def convert_control_list(control_list):
    """Function to convert a list of controls into a dictionary with the controls as a key and a value of 1"""
    if isinstance(control_list, list):
        return {key: 1 for key in control_list}
    else:
        return {}

In [11]:
human_psc['DICTIONARY_OF_CONTROLS'] = human_psc['natures_of_control'].map(convert_control_list)

In [12]:
human_psc.head()

Unnamed: 0,company_number,address.address_line_1,address.address_line_2,address.care_of,address.country,address.locality,address.po_box,address.postal_code,address.premises,address.region,...,name_elements.surname,name_elements.title,nationality,natures_of_control,notified_on,persons_of_significant_control_count,restrictions_notice_withdrawal_reason,statement,statements_count,DICTIONARY_OF_CONTROLS
0,9145694,St. Andrews Road,,,England,Henley-On-Thames,,RG9 1HP,2,,...,Wildman,Mrs,Vietnamese,[ownership-of-shares-50-to-75-percent],2016-04-06,,,,,{'ownership-of-shares-50-to-75-percent': 1}
1,8581893,High Street,Wendover,,England,Aylesbury,,HP22 6EA,14a,Buckinghamshire,...,Davies,Mr,British,"[ownership-of-shares-25-to-50-percent, ownersh...",2016-06-30,,,,,"{'ownership-of-shares-25-to-50-percent': 1, 'o..."
2,8581893,Holywells Road,,,United Kingdom,Ipswich,,IP3 0DL,37-41,Suffolk,...,Tarrant,,British,"[ownership-of-shares-25-to-50-percent, voting-...",2016-04-06,,,,,"{'ownership-of-shares-25-to-50-percent': 1, 'v..."
4,10259080,38 Church Road,Worcester Park,,,Surrey,,KT4 7RD,,,...,Ollett,Mr,British,[ownership-of-shares-50-to-75-percent],2016-06-30,,,,,{'ownership-of-shares-50-to-75-percent': 1}
6,10259081,Albion Dockside Estate,Hanover Place,,United Kingdom,Bristol,,BS1 6UT,Albion Dockside Building,,...,Dance,Mr,British,"[ownership-of-shares-50-to-75-percent, voting-...",2016-07-18,,,,,"{'ownership-of-shares-50-to-75-percent': 1, 'v..."


In [13]:
number_missing_companyID = human_psc[human_psc['company_number'].isnull()].shape[0]
print("WARNING: Missing company number for {} companies in list".format(number_missing_companyID))
print("Removing these rows from data; if significant you should investigate")
human_psc = human_psc[human_psc['company_number'].notnull()]

Removing these rows from data; if significant you should investigate


In [7]:
country_code_map = pd.read_pickle('./data/clean_country_code_map.pkl')
combined_map = pd.read_pickle('./data/combined_country_map.pkl')
nationality_map = pd.read_pickle('./data/nation_map.pkl')

### Defining specific functions to handle human PSCs

These functions will convert the raw data for human PSCs into a format that can then be inderted into Neo4j in an easy fashion.

In [28]:
def psc_dob(record):
    "Function to create dob string identifier for person"
    try:
        dob_segment = "{}/{}".format(str(int(record['date_of_birth.month'])).zfill(2), int(record['date_of_birth.year']))
        return dob_segment
    except TypeError as e:
        return '00/0000'
    except ValueError as e:
        return '00/0000'

def psc_name_id(record):
    "Function to create name string identifier for person"
    try:
        name_segment = "{}_{}_{}".format(str(record['name_elements.surname']).upper(), 
                                         str(record['name_elements.middle_name']).lower(),
                                         str(record['name_elements.forename']).lower())
        return name_segment.replace(' ', '-').replace('nan', '')
    except TypeError as e:
        return 'NONAME__BLANK'
    except AttributeError:
        return 'NONAME__BLANK'
    
def psc_uid(record):
    "Function to create a unique ID for someone from their name and dob"
    components = [psc_name_id(record), psc_dob(record)]
    return {'uid': ':'.join(components)}

def psc_name(record):
    "Function to create name record of person"
    try:
        name_record = {
            'surname': str(record['name_elements.surname']),
            'middle_name': str(record['name_elements.middle_name']),
            'forename': str(record['name_elements.forename']),
            'title': str(record['name_elements.title']),
            'name': record.get('name', '').upper()
        }
        return {k: v.replace('nan', '') for k,v in name_record.items()}
    except TypeError as e:
        return {}
    except AttributeError:
        return {}

def psc_address(record):
    "Function to create a address details for person"
    data = record
    try:
        new_address = {
            'premises': str(data.get('address.premises', '')).replace('nan', ''),
            'address_Line1': str(data.get('address.address_line_1', '')).replace('nan', ''),
            'address_Line2': str(data.get('address.address_line_2', '')).replace('nan', ''),
            'address_PostTown': str(data.get('address.locality', '').replace('nan', '')).upper(),
            'address_POBox': str(data.get('address.po_box', '')).replace('nan', ''),
            'address_County': str(data.get('address.region', '')).replace('nan', '').upper(),
            'address_PostCode': str(data.get('address.postal_code', '')).replace('nan', '').upper(),
            'address_Country': str(data.get('address.country', '')).replace('nan', '').upper(),
            'address_CareOf': str(data.get('address.care_of', '')).replace('nan', '')
        }
        return new_address
    except TypeError as e:
        return {'address_Country': "UNKNOWN"}
    except AttributeError:
        return {'address_Country': "UNKNOWN"}
    
def psc_details(record):
    "Function to create a relationship details for person"
    details = {}
    details['company_id'] = record['links.self'].split('/')[2]
    details['control_kind'] = record['kind']
    details['nationality'] = record['nationality']
    details['ceased_on'] = record['ceased_on']
    dob = psc_dob(record)
    if dob != '00/0000':
        details['DOB'] = dob
    return details
    

def new_record(record):
    "Function to create a new record that is flat"
    uid = psc_uid(record)
    address = psc_address(record)
    return {**uid, **address, 
            **psc_details(record), 
            **psc_name(record), 
            'natures_of_control': record['DICTIONARY_OF_CONTROLS']}


In [14]:
human_psc.iloc[0:5].apply(lambda s: pd.Series(new_record(s)), axis=1)

Unnamed: 0,DOB,address_CareOf,address_Country,address_County,address_Line1,address_Line2,address_POBox,address_PostCode,address_PostTown,ceased_on,...,control_kind,forename,middle_name,name,nationality,natures_of_control,premises,surname,title,uid
0,02/1977,,ENGLAND,,St. Andrews Road,,,RG9 1HP,HENLEY-ON-THAMES,,...,individual-person-with-significant-control,Nga,Thanh,MRS NGA THANH WILDMAN,Vietnamese,{'ownership-of-shares-50-to-75-percent': 1},2,Wildman,Mrs,WILDMAN_thanh_nga:02/1977
1,09/1947,,ENGLAND,BUCKINGHAMSHIRE,High Street,Wendover,,HP22 6EA,AYLESBURY,2016-07-01,...,individual-person-with-significant-control,Stephen,Robert Charles,MR STEPHEN ROBERT CHARLES DAVIES,British,"{'ownership-of-shares-25-to-50-percent': 1, 'o...",14a,Davies,Mr,DAVIES_robert-charles_stephen:09/1947
2,01/1966,,UNITED KINGDOM,SUFFOLK,Holywells Road,,,IP3 0DL,IPSWICH,,...,individual-person-with-significant-control,Gayle,Fiona,GAYLE FIONA TARRANT,British,"{'ownership-of-shares-25-to-50-percent': 1, 'v...",37-41,Tarrant,,TARRANT_fiona_gayle:01/1966
4,06/1948,,,,38 Church Road,Worcester Park,,KT4 7RD,SURREY,,...,individual-person-with-significant-control,Jocelyn,Peter,MR JOCELYN PETER OLLETT,British,{'ownership-of-shares-50-to-75-percent': 1},,Ollett,Mr,OLLETT_peter_jocelyn:06/1948
6,12/1959,,UNITED KINGDOM,,Albion Dockside Estate,Hanover Place,,BS1 6UT,BRISTOL,,...,individual-person-with-significant-control,Nigel,Keith,MR NIGEL KEITH DANCE,British,"{'ownership-of-shares-50-to-75-percent': 1, 'v...",Albion Dockside Building,Dance,Mr,DANCE_keith_nigel:12/1959


In [15]:
neo_records_df = human_psc.head(10).apply(lambda s: pd.Series(new_record(s)), axis=1)

neo_records_df['Citizen_of'] = neo_records_df.nationality.apply(lambda x: nationality_map.get(x.lower(), ''))
neo_records_df['Registered_in'] = neo_records_df.address_Country.apply(lambda x: combined_map.get(x.upper(), ''))

neo_records_df.tail(5)

Unnamed: 0,DOB,address_CareOf,address_Country,address_County,address_Line1,address_Line2,address_POBox,address_PostCode,address_PostTown,ceased_on,...,middle_name,name,nationality,natures_of_control,premises,surname,title,uid,Citizen_of,Registered_in
7,04/1960,,UNITED KINGDOM,,41 Walsingham Road,,,EN2 6EY,ENFIELD,2016-10-11,...,Donald,MR CLIFFORD DONALD WING,British,"{'ownership-of-shares-75-to-100-percent': 1, '...",Parkside House,Wing,Mr,WING_donald_clifford:04/1960,GB,GB
8,07/1962,,SCOTLAND,,85 East London Street,,,EH7 4BQ,EDINBURGH,,...,,MR MARK HARRISON,British,{'ownership-of-shares-75-to-100-percent': 1},Flat 7,Harrison,Mr,HARRISON__mark:07/1962,GB,GB
9,07/1942,,,,250 York Road,,,SW11 3SJ,LONDON,,...,Van,MR JOHN VAN SOMEREN,British,{'significant-influence-or-control': 1},Office 209,Someren,Mr,SOMEREN_van_john:07/1942,GB,
10,08/1971,,UNITED KINGDOM,,Battlefield Enterprise Park,,,SY1 3AF,SHREWSBURY,,...,Kikis,MS MARIA KIKIS ZACHARIADES,British,"{'ownership-of-shares-25-to-50-percent': 1, 'v...",10 Park Plaza,Zachariades,Ms,ZACHARIADES_kikis_maria:08/1971,GB,GB
12,10/1979,,ENGLAND,HAMPSHIRE,6 St. Cross Road,,,SO23 9HX,WINCHESTER,,...,,MR RAJAN VENUGOPAL SOUNDRA,British,"{'ownership-of-shares-75-to-100-percent': 1, '...",Sg House,Venugopal Soundra,Mr,VENUGOPAL-SOUNDRA__rajan:10/1979,GB,GB


In [82]:
human_psc.ceased_on.value_counts().sum()

59144

## Let us first insert old PSCs that have now ceased controlling a company

If we we have "ceased_on" date then we know we have a former PSC.

In [29]:
neo_records_df = human_psc[human_psc.ceased_on.notnull()].apply(lambda s: pd.Series(new_record(s)), axis=1)

neo_records_df['Citizen_of'] = neo_records_df.nationality.apply(lambda x: nationality_map.get(x.lower(), ''))
neo_records_df['Registered_in'] = neo_records_df.address_Country.apply(lambda x: combined_map.get(x.upper(), ''))

neo_records_df.tail(5)
print(neo_records_df.shape)

(59144, 24)


#### Need a little more enrichment for when the control relationship ceased

In [30]:
def extend_control_relations(row):
    original = row['natures_of_control']
    original.update({'ceased_on': row['ceased_on']})
    return original

neo_records_df['natures_of_control'] = neo_records_df.apply(lambda x: extend_control_relations(x), axis=1)

We need to convert the dataframe into a list of record details before we start to insert into the Neo4j database

In [31]:
input_data = [v for k,v in neo_records_df.T.to_dict().items()]

In [19]:
driver = GraphDatabase.driver("bolt://10.0.0.1:7687", auth=("myusername", "mypassword"))

**Now** we are ready to run over the data and insert the people and relationships

In [103]:
with driver.session() as session:
    session.run(("UNWIND {list} AS d "
                 "MERGE (c:Person {uid: d.uid}) "
                 "ON CREATE SET c:PSC, c.forename=d.forename, "
                 "c.middle_name=d.middle_name, "
                 "c.surname=d.surname, "
                 "c.title=d.title, "
                 "c.name=d.name, "
                 "c.dob=d.DOB, "
                 "c.nationality=d.nationality, "
                 "c.address_premises=d.premises, "
                 "c.address_Line1=d.address_Line1, "
                 "c.address_Line2=d.address_Line2, "
                 "c.address_PostTown=d.address_PostTown, "
                 "c.address_POBox=d.address_POBox, "
                 "c.address_County=d.address_County, "
                 "c.address_Postcode=d.address_PostCode, "                 
                 "c.address_Country=d.address_Country, "                 
                 "c.uri=d.URI;"), {"list": input_data})

In [104]:
with driver.session() as session:
    session.run(("UNWIND {list} AS d "
                 "MATCH (c:Person {uid: d.uid}) "
                 "MERGE (country:Country {code: d.Registered_in}) "
                 "MERGE (c)-[:REGISTERED_IN]->(country);"), {"list": input_data})

In [105]:
with driver.session() as session:
    session.run(("UNWIND {list} AS d "
                 "MATCH (c:Person {uid: d.uid}) "
                 "MERGE (country:Country {code: d.Citizen_of}) "
                 "MERGE (c)-[:CITIZEN_OF]->(country);"), {"list": input_data})

In [106]:
with driver.session() as session:
    session.run(("UNWIND {list} AS d "
                 "MATCH (p:Person {uid: d.uid}) "
                 "MERGE (ce:ControllingEntity {type: d.control_kind}) "
                 "MERGE (p)-[:HAS_CONTROL_KIND]->(ce);"), {"list": input_data})

In [120]:
with driver.session() as session:
    session.run(("UNWIND {list} AS d "
                 "MATCH (p:Person {uid: d.uid}) "
                 "MERGE (c:Company {uid: d.company_id}) "
                 "MERGE (p)-[r:CONTROLLED]->(c) ON CREATE SET r=d.natures_of_control;"), {"list": input_data})

In [34]:
with driver.session() as session:
    session.run(("UNWIND {list} AS d "
                 "MATCH (p:Person {uid: d.uid}) "
                 "MERGE (pc:Postcode {uid: d.address_PostCode}) "
                 "MERGE (p)-[:REGISTERED_IN]->(pc);"), {"list": input_data})

Let's check how many people, controlled relationships and companies we've connected together

In [122]:
with driver.session() as session:
    result = session.run("MATCH (p:Person)-[r:CONTROLLED]-(c:Company) RETURN COUNT(distinct p), COUNT(distinct r), COUNT(distinct c);")
    print(result.data())

[{'COUNT(distinct p)': 47705, 'COUNT(distinct r)': 59131, 'COUNT(distinct c)': 58485}]


## Now for active PSCs

Let's turn our CYPHER writes into a function we can use in a loop to move over the data. 

Now we can loop over the larger set of data and insert all of the active PSCs

In [6]:
def insert_individual_PSC_nodes_and_relationships(input_data, driver):
    """From a list of dictionaries construct the appropriate nodes and relationships to be inserted into Neo4j
    :param input_data - a list of dictionaries that have all the required information for nodes and relationships
    :param driver - an active driver object to connect to a neo4j instance
    :return """
    
    # Make sure nodes exist for Person
    with driver.session() as session:
        session.run(("UNWIND {list} AS d "
                     "MERGE (c:Person {uid: d.uid}) "
                     "ON CREATE SET c.forename=d.forename, "
                     "c.middle_name=d.middle_name, "
                     "c.surname=d.surname, "
                     "c.title=d.title, "
                     "c.name=d.name, "
                     "c.dob=d.DOB, "
                     "c.nationality=d.nationality, "
                     "c.address_premises=d.premises, "
                     "c.address_Line1=d.address_Line1, "
                     "c.address_Line2=d.address_Line2, "
                     "c.address_PostTown=d.address_PostTown, "
                     "c.address_POBox=d.address_POBox, "
                     "c.address_County=d.address_County, "
                     "c.address_PostCode=d.address_Postcode, "                 
                     "c.address_Country=d.address_Country, "                 
                     "c.uri=d.URI;"), {"list": input_data})
        
    # Connect Person with Country of registration
    with driver.session() as session:
        session.run(("UNWIND {list} AS d "
                     "MATCH (c:Person {uid: d.uid}) "
                     "MERGE (country:Country {code: d.Registered_in}) "
                     "MERGE (c)-[:REGISTERED_IN]->(country);"), {"list": input_data})
    
    # Connect Person with Country of citizenship
    with driver.session() as session:
        session.run(("UNWIND {list} AS d "
                     "MATCH (c:Person {uid: d.uid}) "
                     "MERGE (country:Country {code: d.Citizen_of}) "
                     "MERGE (c)-[:CITIZEN_OF]->(country);"), {"list": input_data})
        
    # Connect person to control_entity
    with driver.session() as session:
        session.run(("UNWIND {list} AS d "
                     "MATCH (p:Person {uid: d.uid}) "
                     "MERGE (ce:ControllingEntity {type: d.control_kind}) "
                     "MERGE (p)-[:HAS_CONTROL_KIND]->(ce);"), {"list": input_data})
        
    # Connect Person to Company it controls
    with driver.session() as session:
        session.run(("UNWIND {list} AS d "
                     "MATCH (p:Person {uid: d.uid}) "
                     "MERGE (c:Company {uid: d.company_id}) "
                     "MERGE (p)-[r:CONTROLS]->(c) ON CREATE SET r=d.natures_of_control;"), {"list": input_data})
        
    # Connect Person to Postcode it controls    
    with driver.session() as session:
        session.run(("UNWIND {list} AS d "
                     "MATCH (p:Person {uid: d.uid}) "
                     "MERGE (pc:Postcode {uid: d.address_PostCode}) "
                     "MERGE (p)-[:REGISTERED_IN]->(pc);"), {"list": input_data})

Using the odo library within blaze we can loop over our input data in chunks

In [None]:
import blaze as bz

proc_records = 0
total_records = human_psc.shape[0] - 59144
for chunk in bz.odo(human_psc[human_psc.ceased_on.isnull()], target=bz.chunks(pd.DataFrame), chunksize=100000):
    neo_records_df = chunk.apply(lambda s: pd.Series(new_record(s)), axis=1)
    neo_records_df['Citizen_of'] = neo_records_df.nationality.apply(lambda x: nationality_map.get(x.lower(), 'UNKNOWN'))
    neo_records_df['Registered_in'] = neo_records_df.address_Country.apply(lambda x: combined_map.get(x.upper(), 'UNKNOWN'))

    input_data = [v for k,v in neo_records_df.T.to_dict().items()]
    del neo_records_df
    
    insert_individual_PSC_nodes_and_relationships(input_data, driver)
    proc_records += len(input_data)
    print("Processed {} of {} .... {:5.2f}% complete".format(proc_records, total_records, 100*proc_records/total_records))

Processed 100000 of 4165996 ....  2.40% complete
Processed 200000 of 4165996 ....  4.80% complete
Processed 300000 of 4165996 ....  7.20% complete
Processed 400000 of 4165996 ....  9.60% complete
Processed 500000 of 4165996 .... 12.00% complete
Processed 600000 of 4165996 .... 14.40% complete
Processed 700000 of 4165996 .... 16.80% complete
Processed 800000 of 4165996 .... 19.20% complete
Processed 900000 of 4165996 .... 21.60% complete
Processed 2600000 of 4165996 .... 62.41% complete
Processed 2700000 of 4165996 .... 64.81% complete
Processed 2800000 of 4165996 .... 67.21% complete
Processed 2900000 of 4165996 .... 69.61% complete
Processed 3000000 of 4165996 .... 72.01% complete
Processed 3100000 of 4165996 .... 74.41% complete
Processed 3200000 of 4165996 .... 76.81% complete
Processed 3300000 of 4165996 .... 79.21% complete
Processed 3400000 of 4165996 .... 81.61% complete
Processed 3500000 of 4165996 .... 84.01% complete
Processed 3600000 of 4165996 .... 86.41% complete
Processed

ClientError: Cannot merge node using null property value for uid