In [1]:
from lxml import html
from lxml import etree
import requests
import ssl
#import datefinder
#from pdf_parser import create_folder
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

import os
import re
import time

import json


A slightly more complicated version of the code written by Sashi (https://doi.org/10.1007/s10113-020-01677-8).

Main changes:
- We keep a list of download links (instead of preventing duplicates purely by file name)
- Download for multiple doctypes
- Also download linked HTML pages if PDFs are unavailable, or, if both are unavailable, the text of the page itself
- Save more meta-data

For now, we still download search results sequentially (with different search links in different folders). I don't think we truly care what document resulted from what search, so in the future, we may want to re-write this using multi-threading (probably largely I/O limited anyway), but for the number of results we're getting now, it is fine to just run this in the background.

In [2]:
def get_page(url, returnPage=False):
    """Returns html element of the url of interest
    url: string, URL of a website
    """
    web_page = requests.get(url, timeout = 30)
    web_source = html.fromstring(web_page.content)
    if returnPage:
        return(web_source, web_page)
    else:
        return web_source

In [3]:
def get_topics(web_source, returnAll = False):
    """Returns a list of topic links from the UK policies website
    web_source: html element, output of get_page(url)
    """
    #retrieve every link on the page
    links = map(lambda tup: tup[2],list(web_source.iterlinks()))
    links = set(links) #Makes sure they are unique
    #Filter - many irrelevant links on the page e.g. in header and footer
    #Here, still leaving out links with government/consultations, government/organisations and topics
    topic_links = [l for l in links if any(
        re.findall(r'publications|guidance|government\/news|government\/statistics|government\/case-studies|government\/collections|\.pdf', l, re.IGNORECASE)
    )]
    topic_links = [l for l in topic_links if not (l.startswith(r"/search")|l.endswith("png"))]
    
    if returnAll: #Mainly for debugging/checking
        return topic_links, links
    else:
        return topic_links

In [4]:
def get_publish_date(web_source):
    """Returns publishing date of policy paper as string"""
    top_level = web_source.find_class('app-c-published-dates')
    # published  = top_level[0].text
    # published = list(datefinder(published))[0]
    if len(top_level)>0:
        published = list(map(lambda el: list(el.itertext())[0],top_level))
        published = ' '.join(published[0].split()[-3:])
    else:
        #print("no date could be found on page")
        published = ""
    return published

In [5]:
def get_department(web_source):
    """Returns department name(s) as string"""
    #Old style (?) pages
    top_level = web_source.find_class('app-c-publisher-metadata__definition')
    if len(top_level)>0:
        department = list(map(lambda el: list(el.itertext())[2],top_level))
        return department[0]
    else: #New style pages
        top_level = web_source.find_class('gem-c-metadata__definition')
        if len(top_level)>0:
            try:
                department = list(map(lambda el: list(el.itertext())[0],top_level[0]))
                return department[0]
            except:
                return("")
        else:
            #print(f"no deperatment could be found on page")
            return("")

In [6]:
def get_document_link(web_source,base_url):
    """Returns two lists of document links
        First, a list of pdf links
        Second, a list of HTML links to government pages
    """
    
    global completedLinks
    
    links =get_topics(web_source)
    
    pdf_links = [l for l in links if (l.endswith(".pdf") and l not in completedLinks)]
    completedLinks.extend(pdf_links)
    html_links = [base_url+l for l in links if l.startswith(r"/government/publications/")]
    html_links = [l for l in html_links if l not in completedLinks]
    completedLinks.extend(html_links)
    # pdf_links = list(map(lambda tup:tup[2],filter(lambda tup: tup[2].\
    #                 endswith('.pdf'),list(web_source.iterlinks()))))
    
    return(pdf_links, html_links)

In [7]:
# from bs4 import BeautifulSoup as bs
# topic_url = "https://www.gov.uk/government/publications/intergovernmental-relations-review-annual-report-for-2022"
# web_source, web_page = get_page(topic_url, returnPage=True)
# soup = bs(web_page.content)
# for c in soup.find_all("section", {"class": "attachment embedded"}):
#     if c.find("ti 
#     #print(c.find("span", {"class": "page-length"}).text)
#     print(c)

In [8]:
def get_max_page(web_source):
    try:
        text = web_source.find_class("govuk-pagination__link-label")[0].text
        return(int(text.split("of ")[-1]))
    except:
        print("No pagination found - assuming only one page")
        return(1)

In [9]:
def download_save_document(full_path,document_link):
    """Downloads and writes PDF document to a file"""
    file = requests.get(document_link,verify = False, timeout=30)
    with open(full_path,'wb') as pdf:
        pdf.write(file.content)
    # except Exception as e:
    #     print(f"Saving pdf failed for {document_link} -- {e}")    

In [10]:
def file_name_from_link(link, target_dir, suffix=".pdf"):
    document_name = link.split('/')[-1]
    document_name = document_name.split('?')[-1]
    if not document_name.endswith(suffix):
        document_name = document_name+suffix
    if len(document_name) > 140:
        document_name = document_name[:140]+suffix
    full_path = os.path.join(target_dir,document_name) 
    if os.path.exists(full_path) == False:
        return(full_path)
    else: #if it already exists, add a number at the beginning
        n = 0
        while os.path.exists(full_path):
            full_path = os.path.join(target_dir,f"{n}_{document_name}")
            n+=1
            if n>99:
                print(f"Cannot find new file location for {full_path}\nOverwriting!")
                break
        return(full_path)

In [11]:
 #Copied directly from old code, but why not use pickle/JSON or csv through pandas?

def read_metadata(filename):
    """Read existing metadata dictionary in memory"""
    if os.path.isfile(filename):
        metadata = eval(open(filename,'r').read())
    else:
        print('Creating new metadata')
        metadata = {}
    return metadata

def add_metadata(document_name,publish_date,department, doctype, found_on, downloaded_as):
    """Adds metadata to dictionary if it does not already exist"""
    try:
        metadata[document_name]
        print(f"meta data for {document_name} already included -- seems you still have duplicates")
    except KeyError:
        metadata[document_name] = (publish_date,department, doctype, found_on, downloaded_as)
    return metadata

def write_metadata(complete_metadata):
    """Writes complete collection of scraped metadata for the files"""
    file = open('metadata.txt','w')
    file.write(str(complete_metadata))
    file.close()
    return complete_metadata

In [12]:
def run_parser(target_dir, search_url, base_url,doctype,metadata, maxPage="auto", waitTime=False):
    """Mainloop. Downloads all documents in allowed_doctypes"""
    
    global completedLinks
    
    #start timer
    start_time = time.time()
    print(f"\n-----------\nStarting {doctype}\n-----------")
    #Counter for eligable documents
    doc_count = 0
    ht_count = 0
    pg_count = 0
    failed_count = 0

    #define holder for all the metadata
    complete_metadata = {}
    topic_links = [None]
    page = 0
    if maxPage == "auto":
        if doctype == "all":
            maxPage = get_max_page(get_page(search_url))
        else:
            maxPage = get_max_page(get_page(f"{search_url}&content_purpose_supergroup[]={doctype}"))
    #loop through all pages for a doctype
    while page < maxPage:
        page += 1
        #Regular updates
        if page%10 == 0:
            print(f'Now at page {page} of {maxPage} after {int(time.time()-start_time)} seconds. \nDownloaded: {doc_count + pg_count + ht_count} Failed: {failed_count}')
        #web_url = """https://www.gov.uk/government/publications?departments%5B%5D=all&from_date=&keywords=&official_document_status=all&page={}&publication_filter_option={}&subtaxons%5B%5D=all&taxons%5B%5D=all&to_date=&world_locations%5B%5D=all""".format(page,doctype)
        if doctype == "all":
            web_url = f"{search_url}&page={page}"
        else:
            web_url = f"{search_url}&content_purpose_supergroup[]={doctype}&page={page}"
        try:
            web_source = get_page(web_url)
        except:
            failed_count += 1
            print(f"Could not establish connection for {web_url}")
            continue
        topic_links = get_topics(web_source)
        #loops through all the links in the search results
        for link in topic_links:
            if link.startswith("http"):
                topic_url = link
            else:
                topic_url = base_url+link
                
            try:
                web_source, web_page = get_page(topic_url, returnPage=True)
                #print(topic_url)
            except:
                failed_count += 1
                print(f"Could not establish connection for {web_url}")
                continue
            
            #Check if this has already been done
            if topic_url in completedLinks:
                # if topic_url != "https://www.gov.uk/guidance/getting-the-energy-bills-support-scheme-discount" and doctype !="all":
                #     print(f"NB - A Search URL was included in the completed links list & will be skipped\n{topic_url}\nThis may be because it is included in multiple categories")
                continue
            else:
                completedLinks.append(topic_url) 
                
            #loop through all linked documents in one topic/search result
            #Note that get_document_link checks against completedLinks
            pdf_links, html_links = get_document_link(web_source,base_url)
            publish_date = get_publish_date(web_source)
            department = get_department(web_source)
            
            #if there is a pdf document linked on the webpage, download it
            if len(pdf_links) >0:
                for plink in pdf_links:
                    full_path = file_name_from_link(plink, target_dir, suffix=".pdf")
                    try:
                        download_save_document(full_path,plink)
                        metadata = add_metadata(full_path,publish_date,department, doctype, topic_url, "linked_pdf")
                        doc_count += 1
                    except requests.exceptions.MissingSchema:
                        elink = base_url+plink
                        try:
                            download_save_document(full_path,elink)
                            metadata = add_metadata(full_path,publish_date,department, doctype, topic_url, "linked_pdf")
                            doc_count += 1
                        except:
                            print("Missing schema error & no fixed link found; continuing")
                            failed_count += 1
                            continue                                      
                    except requests.exceptions.ConnectionError:
                        print("encountered a connection error, continuing")
                        failed_count += 1
                        continue
                    except Exception as e:
                        print(f"Saving pdf failed for {plink} -- {e}")
                        failed_count += 1
                        continue
                        
            #If there are no PDFs but there are HTML links, download those (text only)
            elif len(html_links) > 0:
                for hlink in html_links:
                    full_path = file_name_from_link(hlink, target_dir, suffix=".txt")
                    _, h_web_page = get_page(hlink, returnPage=True)
                    try:
                        with open(full_path, 'w', encoding='utf-8') as file:
                            file.write(h_web_page.text)
                        metadata = add_metadata(full_path,publish_date,department, doctype, topic_url, "linked_html")
                        ht_count +=1
                    except Exception as e:
                        print(f"could not save linked HTML: {hlink} - exception: {e}")
                        failed_count += 1
                    
            else: #If no pdf nor HTML link exists on the page, save the webpage itself (text only)
                full_path = file_name_from_link(topic_url, target_dir, suffix=".txt")                
                try:
                    with open(full_path, 'w', encoding='utf-8') as file:
                        file.write(web_page.text)
                    metadata = add_metadata(full_path,publish_date,department, doctype, topic_url, "page_itself")
                    pg_count +=1
                except Exception as e:
                    print(f"could not save page: {link} - exception: {e}")
                    failed_count += 1
                        
            
            if waitTime:
                time.sleep(waitTime)
                    
    #write complete metadata to a text file
    complete_metadata = write_metadata(metadata)
    print('Scraping complete. Took {} seconds to retrieve {} pdf documents, {} linked html pages and {} pages themselves. Failed {}'\
          .format(int(time.time()-start_time),doc_count, ht_count, pg_count, failed_count))
    return complete_metadata

In [13]:
#Load in the links form old meta-data or not
loadOldLinks = True

#define folder to store the documents 
prior_target_dirs = [
    r'C:\Users\siets009\OneDrive - Wageningen University & Research\Policy documents UK NL\Data\230524_GlobalWarming',
    r'C:\Users\siets009\OneDrive - Wageningen University & Research\Policy documents UK NL\Data\230524_ClimateChange'
]

completedLinks = set()
    
if loadOldLinks == True:
    for prior_dir in prior_target_dirs:
        with open(os.path.join(prior_dir, 'metadata.txt'), 'r') as f:
            meta = eval(f.read())
        for i in meta.values():
            completedLinks.add(i[3])            
            
completedLinks = list(completedLinks)

print(len(completedLinks))

8124


In [None]:
#define folder to store the documents
target_dirs = [
    #r'C:\Users\ajsie\OneDrive - Wageningen University & Research\Policy documents UK NL\Data\230524_GlobalWarming',
    #r'C:\Users\ajsie\OneDrive - Wageningen University & Research\Policy documents UK NL\Data\230524_ClimateChange',
    r'C:\Users\siets009\OneDrive - Wageningen University & Research\Policy documents UK NL\Data\230524_Climate'
]
#base url and url and the search results for which we want to download all results
base_url = 'https://www.gov.uk'
search_urls = [
    #'https://www.gov.uk/search/all?keywords=%22global+warming%22&order=updated-newest',
    #'https://www.gov.uk/search/all?keywords=%22climate+change%22&order=updated-newest'
    'https://www.gov.uk/search/all?keywords=climate&order=updated-newest' 
]

#document types we are interested in
#news and communications at the end as it sometimes links to other (non-news) docs as part of the news story
#including 'all' at the after that ensures that we also include uncategorised page
allowed_doctypes = ['guidance_and_regulation', 'research_and_statistics', 'policy_and_engagement','services', 'news_and_communications','all']

for target_dir, search_url in zip(target_dirs, search_urls):
    print("----------------\n"*2)
    print(f"Starting {search_url}\n")
    print("----------------\n"*2)
    
    #Make directory in needed
    if not os.path.exists(target_dir):
        os.makedirs(target_dir)
    
    #load metadata if available
    metadata = read_metadata(os.path.join(target_dir, 'metadata.txt'))
    
    #start scraping process
    for doctype in allowed_doctypes:
        metadata = run_parser(target_dir, search_url, base_url,doctype,metadata, maxPage = "auto")#, waitTime=1.8)
    #write to corresponding data folder    
    with open(os.path.join(target_dir, "metadata.txt"), 'w') as f:
        f.write(str(metadata))   
    print(f"COMPLETED --- nr of documents written: {len(metadata)}\n\n\n")

----------------
----------------

Starting https://www.gov.uk/search/all?keywords=climate&order=updated-newest

----------------
----------------

Creating new metadata

-----------
Starting guidance_and_regulation
-----------


In [None]:
len(metadata)

In [None]:
# #to test
# from bs4 import BeautifulSoup as bs
# with open(os.path.join(target_dir, "assess-the-impact-of-air-emissions-on-global-warming.txt"), 'r', encoding='utf-8') as f:
#     soup = bs(f)
# print(soup.findAll("p")) 