In [None]:
# download libraries
!pip install --upgrade pip
!pip install bs4
!pip install pandas
!pip install requests
!pip install numpy
!pip install urllib3
!pip install phonenumbers
!pip install py3-validate-email
!pip install fuzzywuzzy

from bs4 import BeautifulSoup
import pandas as pd
import requests
from urllib.parse import urlparse
import numpy as np
import phonenumbers
from validate_email import validate_email
from fuzzywuzzy.fuzz import partial_ratio

In [None]:
# scraping multiple timestamps of govt list of pcr providers (obtaining comany name, website, number & email)
urls = [ 
    'https://web.archive.org/web/20210519121145/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210520140910/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210526072031/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210529103556/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210531150239/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210604100539/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210618121419/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210626230824/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210629104343/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210701142411/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210708123655/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210713165043/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210729180818/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210808132243/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210811073050/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210812113842/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210817114915/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210818174329/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210823032256/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://web.archive.org/web/20210824065330/https://www.find-travel-test-provider.service.gov.uk/test-type/amber',
    'https://www.find-travel-test-provider.service.gov.uk/test-type/amber'
]

provider_details = pd.DataFrame(columns=['company_name','company_link', 'company_number', 'company_email', 'price'])

for url in urls:
    print(url)
    # scrape gvmt test provider site table
    page = requests.get(url)
    soup = BeautifulSoup(page.text, 'html.parser')
    providers_table = soup.find('table', {'class', 'govuk-table'}).find('tbody')

    url_provider_details = pd.DataFrame(columns=['company_name', 'company_link', 'company_number', 'company_email'])
    for row in providers_table.find_all('tr'):
        # test provider saved in cell with id 'provider'
        provider = row.find(id='provider').find('a')
        name = provider.get_text().rstrip().lower()
        link = provider['href']
    
        # test provider number and email saved in only cell(s) with no id
        number_email = row.find_all('td', id=None)
        
        # remove web archive prefix from wayback machine
        link = link[43:] if 'web.archive.org' in link else link

        # old format stores number and email in separate cells
        # new format stores number and email in same cell
        if len(number_email) == 1:
            number_email = number_email[0].find_all('a')
        number = str(number_email[0].get_text())
        email = number_email[1].get_text()
        
        # apply standard format to numbers, emails and links
        if number and len(number) >= 10:
            number = ' '.join(number.rstrip().split())
            number = phonenumbers.format_number(phonenumbers.parse(number, 'GB'), phonenumbers.PhoneNumberFormat.INTERNATIONAL)
        else:
            number = np.nan 

        if email:
            email = str(email).rstrip().replace('\n', '').lower()
        if link:
            link = urlparse(link.lower()).netloc

        price = float(row.find(id='priceAmber').get_text().replace('£', ''))

        url_provider_details = url_provider_details.append({
            'company_name': name,
            'company_link': link,
            'company_number': number,
            'company_email': email,
            'price': price
        }, ignore_index=True)

    provider_details = pd.merge(
        provider_details, 
        url_provider_details, 
        how="outer", 
        on=['company_name','company_link','company_number','company_email', 'price']
    )

provider_details.to_csv('./datasets/aggregated-details/provider_details.csv', index=False)

In [99]:
# group all numbers, emails, and links for each company together
grouped_details = pd.DataFrame(columns=['company_name'])
provider_details = pd.read_csv('./datasets/aggregated-details/provider_details.csv')

for category in ['company_number', 'company_email', 'company_link', 'price']:
    category_details = provider_details[['company_name', category]].drop_duplicates()
    df = (category_details.set_index(['company_name', category_details.groupby('company_name').cumcount()])[category]
            .unstack(fill_value='')
            .add_prefix(category+'_')
            .reset_index())

    grouped_details = pd.merge(grouped_details, df, how='outer',on=['company_name'])

grouped_details.to_csv('./datasets/aggregated-details/grouped_details.csv', index=False)

In [None]:
# validate scraped phone numbers
grouped_details = pd.read_csv('./datasets/aggregated-details/grouped_details.csv')
phone_cols = [x for x in grouped_details.columns if 'company_number' in x]

def check_number(x):
    results = []
    for cell in x:
        if pd.isna(cell) or len(str(cell)) < 9:
            results.append('')
        else:
            number = phonenumbers.parse(cell,'GB')
            valid = phonenumbers.is_valid_number(number)
            results.append(bool(valid))
    return results

phone_details = grouped_details[phone_cols]
phone_validated = phone_details.apply(check_number)
invalid_number = pd.DataFrame({
    'company_name': grouped_details['company_name'], 
    'phone_invalid': [False]*len(grouped_details['company_name'])
})

invalid_cols = phone_validated[phone_validated.eq(False).any(axis=1)].index
empty_cols = phone_validated[phone_validated.eq('').all(axis=1)].index

invalid_number['phone_invalid'].iloc[invalid_cols] = True
invalid_number['phone_invalid'].iloc[empty_cols] = True

invalid_number.to_csv('./datasets/indicators/phone_validation.csv', index=False)

In [190]:
# validate scraped email addresses
grouped_details = pd.read_csv('./datasets/aggregated-details/grouped_details.csv')
email_cols = [x for x in grouped_details.columns if 'company_email' in x]

def check_email(x):
    results = []
    for cell in x:
        print(str(cell).rstrip())
        if pd.isna(cell) or not cell:
            results.append('')
        else:
            valid = validate_email(email_address=str(cell).rstrip())
            results.append(bool(valid))
    return results
    

email_details = grouped_details[email_cols]
email_validated = email_details.apply(check_email)
invalid_email = pd.DataFrame({
    'company_name': grouped_details['company_name'], 
    'email_invalid': [False]*len(grouped_details['company_name'])
})


invalid_cols = email_validated[email_validated.eq(False).any(axis=1)].index
empty_cols = email_validated[email_validated.eq('').all(axis=1)].index

invalid_email['email_invalid'].iloc[invalid_cols] = True
invalid_email['email_invalid'].iloc[empty_cols] = True

invalid_email.to_csv('./datasets/indicators/email_validation.csv', index=False)


day2and8testing@nomadtravel.co.uk
covidtestbookings@everythinggeneticltd.co.uk
enquiries@expertcovidtestinguk.com
info@247traveltests.com
hello@0044covidtest.com
info@covidtravelpcr.com
covid@testingservices.online
enquiries@expertcovidtestinguk.com
enquiries@myexpresstesting.co.uk
info@4mdicaltesting.co.uk
support@expertdoctors.uk
support@medicalcovidtest.com
help@0covidclear.com
info@blindspotglobal.com
arrivals@coronatest.co.uk
hello@travel19.co.uk
info@harleymedic.co.uk
support@bookatraveltest.com
info@covidtravelpcr.com
covid@testingservices.online
covidtestbookings@everythinggeneticltd.co.uk
enquiries@001alphaexpresstesting.co.uk
contact@001covidtest.co.uk
info@pharmadiagnostics.co.uk
enquiries@expertcovidtestinguk.com
enquiries@myexpresstesting.co.uk
enquiries@expertcovidtestinguk.com
info@4medicaltesting.co.uk
hello@0044covidtest.com
help@0044healthtesting.co.uk
info@4medicaltesting.co.uk
help@00analysis1.co.uk
enquiries@expertcovidtestinguk.com
info@ct24.co.uk
help@01harleystr

In [None]:
# obtaining trustpilot links
# categorise
grouped_details = pd.read_csv('./datasets/aggregated-details/grouped_details.csv')
link_cols = [x for x in grouped_details.columns if 'company_link' in x]

def check_trustpilot(x):
    results = []
    for cell in x:
        if pd.isna(cell) or not cell:
            results.append(np.nan)
        else:
            page = requests.get("https://uk.trustpilot.com/review/"+cell)
            print(page)
            soup = BeautifulSoup(page.content, "html.parser")
            score = soup.find('p', {'class', 'header_trustscore'})
                
            results.append(float(score.get_text()) if score else np.nan)

    return results


link_details = grouped_details[link_cols]
trustpilot_scores = link_details.apply(check_trustpilot)
trustpilot_scores[trustpilot_scores.eq(0)] = np.nan 

trustpilot_scores['trustpilot_score'] = trustpilot_scores.mean(axis=1)

trustpilot_scores['score_category'] = ['']*len(trustpilot_scores)
trustpilot_scores['score_category'].iloc[trustpilot_scores[trustpilot_scores['trustpilot_score'].isna()].index] = "No Score"
trustpilot_scores['score_category'].iloc[trustpilot_scores[trustpilot_scores['trustpilot_score'].between(0, 1, inclusive="right")].index] = "0-1"
trustpilot_scores['score_category'].iloc[trustpilot_scores[trustpilot_scores['trustpilot_score'].between(1, 2, inclusive="right")].index] = "1-2"
trustpilot_scores['score_category'].iloc[trustpilot_scores[trustpilot_scores['trustpilot_score'].between(2, 3, inclusive="right")].index] = "2-3"
trustpilot_scores['score_category'].iloc[trustpilot_scores[trustpilot_scores['trustpilot_score'].between(3, 4, inclusive="right")].index] = "3-4"
trustpilot_scores['score_category'].iloc[trustpilot_scores[trustpilot_scores['trustpilot_score'].between(4, 5, inclusive="right")].index] = "4-5"

trustpilot_scores['is_reviewed'] = [True]*len(trustpilot_scores)
trustpilot_scores['is_reviewed'].iloc[trustpilot_scores[trustpilot_scores['trustpilot_score'].isna()].index] = False

trustpilot_scores['company_name'] = grouped_details['company_name']
trustpilot_scores[['trustpilot_score', 'score_category', 'is_reviewed', 'company_name']].to_csv('./datasets/indicators/trustpilot_scores.csv', index=False)
trustpilot_scores

In [None]:
# match registered CH companies
api_key = '3511f785-bc60-40b3-b697-36bedb63c848'

grouped_details = pd.read_csv('./datasets/aggregated-details/grouped_details.csv')
matched_companies = pd.DataFrame(columns=['company_name', 'registered_name'])
matched_companies['company_name'] = grouped_details['company_name']

matched = []
for company in grouped_details['company_name'][599:]:
    match = ''

    company = company.lower()
    response = requests.get(
        'https://api.company-information.service.gov.uk/search?items_per_page=1&q='+'+'.join(company.split()), 
        auth=(api_key, '')
    )
    print(response)

    if response:
        response_json = response.json()
        if response_json['total_results'] > 1:
            company_found = response.json()['items'][0]['title'].lower()
            
            if partial_ratio(company_found, company) >= 80:
                match = company_found
        
    matched.append(match)

matched_companies['registered_name'] = matched
matched_companies['is_registered'] = [True]*len(matched_companies)
matched_companies['is_registered'].iloc[matched_companies['registered_name'].eq('')] = False

matched_companies.to_csv('./datasets/indicators/registered.csv', index=False)