In [None]:
## Potentially required installs: ##

# Install a pip package in the current Jupyter kernel
# Code taken from: https://jakevdp.github.io/blog/2017/12/05/installing-python-packages-from-jupyter/
import sys
!{sys.executable} -m pip install PyPDF2
import sys
!{sys.executable} -m pip install pymysql
import sys
!{sys.executable} -m pip install mysql-connector



In [None]:
## Import required packages: ##
import requests
import json
import math
import numpy
import shutil # For copying files
import time # For waiting x seconds
from datetime import date as dt # get todays date
import pandas as pd
from numpy import nan
#import PyPDF2             # For reading PDFs
#from pathlib import Path  # For writing/saving PDFs from requests
import pymysql
import mysql.connector
import os.path # Checking whether file exists in current directory

## Step 1 - Identify Companies Of Interest ##

**GetLocalActiveCompanies(api_key, location, numberOfPages=None)**  
Gives basic identifying information on all active companies in a given
location.

Input - api_key, location of interest, numberOfPages of results.  
- If numberOfPages=None then all available results will be provided.
- The current number of results per page is 20, so 2 pages gives 40 results.  


Output - Four lists, indexed equally.  
- listOfCompanyNumbers - Company number.  
- listOfCompanyNames -  Company name.  
- listOfCompanySICCodes - All of the companies SIC codes.  
- listOfCompanyAddresses - Companies full registered office address.

In [None]:
def APIRateLimitHandler(currentCount, maxCountLimit, waitPeriod):
    currentCount = currentCount + 1
    if(currentCount == maxCountLimit): 
        time.sleep(waitPeriod + 30)
        currentCount = 0
    return currentCount

In [None]:
def GetLocalActiveCompanies(api_key, location, currentRequestCount, maxCountLimit, waitPeriod, numberOfPages=None): 

    url = "https://api.company-information.service.gov.uk/advanced-search/companies?location="+ location +"&company_status=active"
    response = requests.get(url,auth=(api_key,''))
    currentRequestCount = APIRateLimitHandler(currentRequestCount, maxCountLimit, waitPeriod)
    jsonSearchResult = response.text
    searchResult = json.JSONDecoder().decode(jsonSearchResult)
    
    hits = searchResult["hits"] 
    itemsPerPage = 20
    if numberOfPages==None: numberOfPages = math.ceil(hits/itemsPerPage)

    listOfCompanyNumbers = []
    listOfCompanyNames = []
    listOfCompanyAddresses = []
    listOfCompanySICCodes = []

    for page in range(0,numberOfPages):
        pageStartIndex = page * itemsPerPage
        url = "https://api.company-information.service.gov.uk/advanced-search/companies?location="+ location +"&company_status=active&start_index="+str(pageStartIndex)

        response = requests.get(url,auth=(api_key,''))
        currentRequestCount = APIRateLimitHandler(currentRequestCount, maxCountLimit, waitPeriod)
        jsonSearchResult = response.text
        searchResult = json.JSONDecoder().decode(jsonSearchResult)
        companies = searchResult["items"]
    
        for company in companies:
            try:
                listOfCompanyNumbers.append(company["company_number"])
            except KeyError:
                listOfCompanyNumbers.append(None)
                
            try:
                listOfCompanyNames.append(company["company_name"])
            except KeyError:
                listOfCompanyNames.append(None)
            
            try:
                listOfCompanySICCodes.append(company["sic_codes"])
            except KeyError:
                listOfCompanySICCodes.append(None)
                
            try:
                addressAsDictionary = (company["registered_office_address"])
                registeredOfficeAddress = ""
                if "address_line_1" in addressAsDictionary: registeredOfficeAddress = registeredOfficeAddress + addressAsDictionary["address_line_1"]
                if "address_line_2" in addressAsDictionary: registeredOfficeAddress = registeredOfficeAddress + ", " + addressAsDictionary["address_line_2"]
                if "locality" in addressAsDictionary: registeredOfficeAddress = registeredOfficeAddress + ", " + addressAsDictionary["locality"]
                if "postal_code" in addressAsDictionary: registeredOfficeAddress = registeredOfficeAddress + ", " + addressAsDictionary["postal_code"]
                if "country" in addressAsDictionary: registeredOfficeAddress = registeredOfficeAddress + ", " + addressAsDictionary["country"]
                listOfCompanyAddresses.append(registeredOfficeAddress)
            except KeyError:
                listOfCompanyAddresses.append(None)
    
    return listOfCompanyNumbers, listOfCompanyNames, listOfCompanySICCodes, listOfCompanyAddresses, currentRequestCount

**Using GetLocalActiveCompanies:**

In [None]:
api_key = "5ef9b7c0-60f2-4fd4-b88b-083e640f6c2a"
location = "Swansea"
currentRequestCount = 0
maxCountLimit = 599 
waitPeriod = 300
numberOfPages=2
listOfCompanyNumbers, listOfCompanyNames, listOfCompanySICCodes, listOfCompanyAddresses, currentRequestCount = GetLocalActiveCompanies(api_key, location, currentRequestCount, maxCountLimit, waitPeriod, numberOfPages)

print("\n Company numbers were: \n")
print(listOfCompanyNumbers)
print("\n Company names were: \n")
print(listOfCompanyNames)
print("\n Company SIC codes were: \n")
print(listOfCompanySICCodes)
print("\n Company addresses were: \n")
print(listOfCompanyAddresses)

print("\n The number of results was: \n")
print(len(listOfCompanyNumbers))



In [None]:
def FormatPostcode(postcode):
    formattedPostcode = ""
    postcode = postcode.split()
    for i in range(len(postcode)):
        formattedPostcode = formattedPostcode + postcode[i] 
        if(i==0): formattedPostcode = formattedPostcode + " "
                
    return formattedPostcode

In [None]:
def GetCompanyPostcodes(listOfCompanyAddresses):
    listOfCompanyPostcodes = []
    for i in range(len(listOfCompanyAddresses)):
        address = listOfCompanyAddresses[i]
        address = address.split(',')
        for j in range(len(address)):
            if((j>1) and (any(digit.isdigit() for digit in address[j]))): postcode = address[j]
                
        postcode = FormatPostcode(postcode)
        listOfCompanyPostcodes.append(postcode)
    
    return listOfCompanyPostcodes 

In [None]:
def ProcessAddressSecondLine(streetAddress,location):
    doIWantIt = False
    listToCheckAgainst = ['road','street','park','Road','Street','Park','way','Way','lane','Lane']
    for i in range(len(listToCheckAgainst)):
        if((listToCheckAgainst[i] in streetAddress) and (streetAddress!=location)): doIWantIt = True
            
    return doIWantIt

In [None]:
def GetCompanyStreetAddress(listOfCompanyAddresses, location):
    listOfCompanyStreetAddress = []
    for i in range(len(listOfCompanyAddresses)):
        address = listOfCompanyAddresses[i]
        address = address.split(',')
        streetAddress = ""
        for j in range(len(address)):
            if(j==0): streetAddress = streetAddress + address[j]
            if((j==1) and (any(digit.isdigit() for digit in address[j])) and (ProcessAddressSecondLine(address[j],location))): 
                streetAddress = streetAddress + " " + address[j]
                

        listOfCompanyStreetAddress.append(streetAddress)
    
    return listOfCompanyStreetAddress 

In [None]:
listOfCompanyPostcodes = GetCompanyPostcodes(listOfCompanyAddresses)
print(listOfCompanyPostcodes)
print(len(listOfCompanyPostcodes))
listOfCompanyStreetAddress = GetCompanyStreetAddress(listOfCompanyAddresses, location)
print(listOfCompanyStreetAddress)
print(len(listOfCompanyStreetAddress))