# Collects company ids and names from the Company House API

In [67]:
import requests
import json
import os
import dotenv
from urllib.parse import quote_plus
from time import sleep
import base64

dotenv.load_dotenv()

# You'll need to get an API key from Companies House
# Register at: https://developer.companieshouse.gov.uk/developer/applications
API_KEY = os.getenv('COMPANIES_HOUSE_API_KEY')

def search_company(company_name, max_results=10):
    """
    Search Companies House API for companies matching the provided name.

    Parameters
    ----------
    company_name : str
        Name of company to search for.
    max_results : int, optional
        Maximum number of results to return (default is 10).

    Returns
    -------
    list of dict
        List of dictionaries containing company information.
    """
    base_url = "https://api.company-information.service.gov.uk"
    search_url = f"{base_url}/search/companies"
    
    # Set up authentication and headers
    auth = (API_KEY, '')
    headers = {'Content-Type': 'application/json'}
    
    # Set up parameters
    params = {
        'q': company_name,
        'items_per_page': max_results
    }
    
    try:
        response = requests.get(
            search_url,
            auth=auth,
            headers=headers,
            params=params
        )
        response.raise_for_status()
        
        results = response.json()
        
        # Extract relevant information
        companies = []
        for item in results.get('items', []):
            company_info = {
                'company_name': item.get('title'),
                'company_number': item.get('company_number'),
                'company_status': item.get('company_status'),
                'date_of_creation': item.get('date_of_creation'),
                'address': item.get('address_snippet')
            }
            companies.append(company_info)
            
        return companies
        
    except requests.exceptions.RequestException as e:
        print(f"Error occurred: {e}")
        if e.response is not None and e.response.status_code == 429:
            return "Rate limit exceeded, please try again later."
        return []


def get_company_profile(company_number):
    """
    Get company profile from Companies House API using company number.

    Parameters
    ----------
    company_number : str
        Company number to look up.

    Returns
    -------
    dict
        Dictionary containing company profile information.
    """
    base_url = "https://api.company-information.service.gov.uk"
    profile_url = f"{base_url}/company/{company_number}"
    
    # Set up authentication and headers
    auth = (API_KEY, '')
    headers = {'Content-Type': 'application/json'}
    
    try:
        response = requests.get(
            profile_url,
            auth=auth,
            headers=headers
        )
        response.raise_for_status()
        
        profile = response.json()
            
        return profile
        
    except requests.exceptions.RequestException as e:
        print(f"Error occurred: {e}")
        if e.response is not None and e.response.status_code == 429:
            return "Rate limit exceeded, please try again later."
        return {}
    

def get_company_officers(company_number):
    """
    Get company officers from Companies House API using company number.

    Parameters
    ----------
    company_number : str
        Company number to look up.

    Returns
    -------
    list
        List of dictionaries containing company officer information.
    """
    base_url = "https://api.company-information.service.gov.uk"
    officers_url = f"{base_url}/company/{company_number}/officers"
    
    # Set up authentication and headers
    auth = (API_KEY, '')
    headers = {'Content-Type': 'application/json'}
    
    try:
        response = requests.get(
            officers_url,
            auth=auth,
            headers=headers
        )
        response.raise_for_status()
        
        officers = response.json()
        return officers
        
    except requests.exceptions.RequestException as e:
        print(f"Error occurred: {e}")
        if e.response is not None and e.response.status_code == 429:
            return "Rate limit exceeded, please try again later."
        return []


# The thing that runs the search

In [68]:
import pandas as pd
from pydantic import BaseModel
import time

class Address(BaseModel):
    address_line_1: str | None = None
    locality: str | None = None
    postal_code: str | None = None
    premises: str | None = None
    region: str | None = None

class DateOfBirth(BaseModel):
    month: int
    year: int

class OfficerData(BaseModel):
    name: str | None = None
    appointed_on: str | None = None
    resigned_on: str | None = None
    officer_role: str | None = None
    country_of_residence: str | None = None
    date_of_birth: DateOfBirth | None = None
    nationality: str | None = None
    occupation: str | None = None
    person_number: str | None = None
    address: Address | None = None

class CompanyData(BaseModel):
    company_name: str | None = None
    company_number: str | None = None
    company_status: str | None = None
    company_created_date: str | None = None
    company_address: str | None = None
    active_officers: int | None = None
    resigned_officers: int | None = None
    inactive_officers: int | None = None
    officers: list[OfficerData] | None = None


df = pd.read_csv("company_data.csv")
company_names = df["Provider Name"].unique()

company_data = []

window_start = time.time()
window_requests = 0

def dump_data(data):
    dumpable_data = [d.model_dump() for d in data]
    with open("company_data.json", "w") as f:
        json.dump(dumpable_data, f, indent=4)

for i, name in enumerate(company_names):
    time.sleep(0.3)
    print(f"Processing {i} of {len(company_names)}: {name}")
    print("window_requests:", window_requests)
    print("elapsed:", time.time() - window_start)
    if window_requests > 596:
        dump_data(company_data)
        print("Saved the company data.")
        now = time.time()
        time_to_exceed = now - window_start
        print(f"Window requests exceeded, in {time_to_exceed} seconds")

        if time_to_exceed > 300:
            print("Resetting window")
            window_start = time.time()
            window_requests = 0
        else:
            print(f"Sleeping for {300 - time_to_exceed} seconds")
            time.sleep(300 - time_to_exceed)
            window_start = time.time()
            window_requests = 0

    print("Searching for company:", name)
    name_lower = name.lower()
    
    results = search_company(name, max_results=10)
    window_requests += 1

    if results == "Rate limit exceeded, please try again later.":
        dump_data(company_data)
        print("Saved the company data.")
        print("sleeping for 300 seconds")
        time.sleep(300)
        window_start = time.time()
        window_requests = 0
        results = search_company(name, max_results=10)

    correct_result = None

    for result in results:
        res_name = result["company_name"].lower()
        if name_lower == res_name:
            correct_result = result
            print("Correct result found for:", name)
            break

    if correct_result is None:
        print("No correct result found for:", name)
        continue

    company_officers = get_company_officers(correct_result["company_number"])
    window_requests += 1

    if company_officers == "Rate limit exceeded, please try again later.":
        dump_data(company_data)
        print("Saved the company data.")
        print("sleeping for 300 seconds")
        time.sleep(300)
        window_start = time.time()
        window_requests = 1
        company_officers = get_company_officers(correct_result["company_number"])


    officer_models = []
    for o in company_officers["items"]:
        try:
            o_data = OfficerData(**o)
            officer_models.append(o_data)
        except Exception as e:
            print(f"Error occurred: {e}")
            print(json.dumps(o, indent=4))
            raise e

    company_data.append(CompanyData(
        company_name=name,
        company_number=correct_result["company_number"],
        company_status=correct_result["company_status"],
        company_created_date=correct_result["date_of_creation"],
        company_address=correct_result["address"],
        active_officers=company_officers["active_count"],
        resigned_officers=company_officers["resigned_count"],
        inactive_officers=company_officers["inactive_count"],
        officers=officer_models
    ))




Processing 0 of 9818: PHC Home Care Limited
window_requests: 0
elapsed: 0.3006155490875244
Searching for company: PHC Home Care Limited
Correct result found for: PHC Home Care Limited
Processing 1 of 9818: Quality & Compassionate Care Ltd
window_requests: 2
elapsed: 0.8869662284851074
Searching for company: Quality & Compassionate Care Ltd
Correct result found for: Quality & Compassionate Care Ltd
Processing 2 of 9818: Astoria Homecare Limited
window_requests: 4
elapsed: 1.5512363910675049
Searching for company: Astoria Homecare Limited
Correct result found for: Astoria Homecare Limited
Processing 3 of 9818: Filcare Ltd
window_requests: 6
elapsed: 2.000763177871704
Searching for company: Filcare Ltd
Correct result found for: Filcare Ltd
Processing 4 of 9818: Kindlycare Ltd
window_requests: 8
elapsed: 2.4487102031707764
Searching for company: Kindlycare Ltd
Correct result found for: Kindlycare Ltd
Processing 5 of 9818: IBEF Care Services Limited
window_requests: 10
elapsed: 3.0839772224

In [48]:
print(json.dumps(company_officers, indent=4))
for o in company_officers["items"]:
    o_data = OfficerData(**o)


{
    "active_count": 2,
    "etag": "5f713e83f4bc638469713850e78213c7b62e9ebe",
    "items": [
        {
            "address": {
                "address_line_1": "Imperial Drive",
                "locality": "Harrow",
                "postal_code": "HA2 7HJ",
                "premises": "Systems House 246",
                "region": "Middlesex"
            },
            "appointed_on": "2020-06-01",
            "is_pre_1992_appointment": false,
            "country_of_residence": "England",
            "date_of_birth": {
                "month": 2,
                "year": 1959
            },
            "links": {
                "self": "/company/09671527/appointments/rOzgMLLG0dcEyCxWMRMrfuZKOrc",
                "officer": {
                    "appointments": "/officers/AYdWGUyZps9nq8bt9gFC-aK8BrA/appointments"
                }
            },
            "name": "A-KUM, Eric Theo",
            "nationality": "British",
            "occupation": "Director",
            "officer_

In [13]:
import json
import pandas as pd
from datetime import datetime


with open("company_data.json", "r") as f:
    data = json.loads(f.read())


# # Create a mapping of officer names to company names
# officer_to_company = {}
# for company in data:
#     for officer in company["officers"]:
#         officer_name = officer["name"]
#         # Store both the original name and a normalized version for matching
#         officer_to_company[officer_name] = company.get("company_name", name)
#         # Also store without spaces and lowercase for fuzzy matching
#         normalized_name = officer_name.lower().replace(" ", "")
#         officer_to_company[normalized_name] = company.get("company_name", name)

# # Try to find matching company name for each record
# for company in data:
#     # First try exact match
#     if company.get("company_name") is None:
#         for officer in company["officers"]:
#             officer_name = officer["name"]
#             if officer_name in officer_to_company:
#                 company["company_name"] = officer_to_company[officer_name]
#                 break
            
#             # Try normalized match if exact match fails
#             normalized_name = officer_name.lower().replace(" ", "")
#             if normalized_name in officer_to_company:
#                 company["company_name"] = officer_to_company[normalized_name]
#                 break

print(len(data))

df = pd.read_csv("company_data.csv")

rows = []

# most_recent_idx = 0

for comp in data:

    lower_replace = lambda x: x.lower().replace(" ", "").replace(",", "")

    all_names = df.loc[most_recent_idx:]["Provider NI Name"].apply(lower_replace)

    comp_name = None
    comp_provider_id = None

    for o in comp["officers"]:
        name = " ".join(o["name"].split(" ")[:-1])
        lowered_name = lower_replace(name)
        name_mask = all_names == lowered_name
        match_idx = name_mask.idxmax() if name_mask.any() else None
        print("Matching company index:", match_idx)
        if match_idx is not None:
            print("Found company name:", df.loc[match_idx]["Provider Name"])
            comp_name = df.loc[match_idx]["Provider Name"]
            comp_provider_id = df.loc[match_idx]["Provider ID"]

            # Update the most recent index to the match index ensures no double counting
            # most_recent_idx = match_idx
            # print("Most recent index:", most_recent_idx)
            break
    
    for o in comp["officers"]:
        if o["resigned_on"] is not None:
            continue
        
        if o["date_of_birth"] is not None:
            dob = datetime(o["date_of_birth"]["year"], o["date_of_birth"]["month"], 1)
            age = datetime.now().year - dob.year
        else:
            age = None

        if o["appointed_on"] is not None:
            appointed = datetime.strptime(o["appointed_on"], "%Y-%m-%d")
            time_in_role = datetime.now().year - appointed.year
        else:
            time_in_role = None
        
        data_dict = {
            "officer_name": o["name"],
            "officer_role": o["officer_role"],
            "officer_appointed_date": o["appointed_on"],
            "officer_time_in_role_(years)": time_in_role,
            "officer_nationality": o["nationality"],
            "officer_country_of_residence": o["country_of_residence"],
            "officer_age_(in_2025)": age,
            "officer_occupation": o["occupation"],
            "officer_person_number": o["person_number"],

            "company_name": comp_name,
            "company_provider_id": comp_provider_id,
            "company_number": comp["company_number"],
            "company_status": comp["company_status"],
            "company_created_date": comp["company_created_date"],
            "company_address": comp["company_address"],
        }

        rows.append(data_dict)

df = pd.DataFrame(rows)
df.to_csv("active_company_officers_with_company_names.csv", index=False)


8961
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching company index: None
Matching 

# Re-adding company names

In [4]:
import json
import pandas as pd
from datetime import datetime


with open("company_data.json", "r") as f:
    data = json.loads(f.read())

df = pd.read_csv("company_data.csv")

print(df.head())

# Get unique company names from the dataframe
unique_companies = df['Provider Name'].unique()



  Provider NI Name    Provider ID                     Provider Name
0      A-Kum, Eric   1-2607514533             PHC Home Care Limited
1       Aabe, Nura  1-11051847936  Quality & Compassionate Care Ltd
2   Aashuri, Sahil   1-3000560424          Astoria Homecare Limited
3     Abad, Girlee  1-12835456013                       Filcare Ltd
4     Abade, Warda  1-14885789859                    Kindlycare Ltd


# Adding data back in to the json

In [85]:
import re

with open("function_output.txt", "r") as f:
    data = f.read()

pattern = re.compile(r"(\d+) of (\d+): (.+)\n")
splits = data.split("Processing ")[1:]

pulled_names = []

for split in splits:
    found = "Correct result found for:" in split
    
    if not found:
        continue
    
    match = pattern.match(split)
    if match:
        print(match.groups())
        pulled_names.append(match.groups()[2])

print(len(pulled_names))

('0', '9818', 'PHC Home Care Limited')
('1', '9818', 'Quality & Compassionate Care Ltd')
('2', '9818', 'Astoria Homecare Limited')
('3', '9818', 'Filcare Ltd')
('4', '9818', 'Kindlycare Ltd')
('5', '9818', 'IBEF Care Services Limited')
('6', '9818', 'Sam2Sam Deaf Care Service Ltd')
('7', '9818', 'Support By Heart Ltd')
('8', '9818', 'Like Care Limited')
('9', '9818', 'Infiniti Care Ltd')
('10', '9818', 'Delicate Hands Care Limited')
('11', '9818', 'Authentic & Care Services Ltd')
('12', '9818', 'Sabir Care UK Ltd')
('13', '9818', 'Trio Care Agency Limited')
('14', '9818', 'Jamacare Ltd')
('15', '9818', 'Dream Home Care Ltd')
('16', '9818', 'London Homecarers Ltd')
('17', '9818', 'Raising Care Ltd')
('18', '9818', 'Marigold Home Care Ltd')
('19', '9818', 'Nation Care Agency Ltd')
('20', '9818', 'First Necessity Care Ltd')
('22', '9818', 'Sahal Care Limited')
('23', '9818', 'Open Heart Care Ltd')
('25', '9818', 'Gentle Hands Care And Services Limited')
('26', '9818', 'Community Care Expe