## Scraping the United Nation's Digital Library

**Scraping script done on behalf of Oladoyin Okunoren @ Boston College**

By David J. Thomas

---

This notebook contains a series of scripts to scrape every item from the United Nation's Digital Librarys database about Ebola from 2014-2016. It is a part of the dissertation of research of Oladoyin Okunoren, at [Boston College](https://bc.edu)

---

## Installation

``` bash
pip install -r requirements.txt
jupyter lab
```
---

## Creating a Base Scraper Object

In [None]:
import os
import time
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from bs4 import BeautifulSoup


class BaseSeleniumScraper:
    """Base class for all Selenium Scraper objects."""
    url = ''
    scrape_delay = 5
    options = Options()
    webdriver_path = '/usr/local/bin/chromedriver'
    service = None
    driver = None
    metadata = {}
    wait = None


    def __init__(self, url,  *args, **kwargs):
        # store the target url
        self.url = url
        # store scrape delay, if provided and a valid number
        if 'scrape_delay' in kwargs:
            if not isinstance(kwargs['scrape_delay'], int) and not isinstance(kwargs['scrape_delay'], float) and not isinstance(kwargs['scrape_delay'], complex):
                raise Exception('Argument \'scrape_delay must be an integer\', float, or complex number')
            self.scrape_delay = kwargs['scrape_delay']
        # store webdriver_path, if a string and pointing to a file
        if 'webdriver_path' in kwargs:
            if not isinstance(kwargs['scrape_delay'], str):
                raise Exception('Argument \'webdriver_path\' must be a string')
            if not os.path.isfile(os.path.abspath(kwargs['webdriver_path'])):
                raise Exception('Argument \'webdriver_path\' must point to a valid webdriver')
            self.webdriver_path = kwargs['webdriver_path']
        # comment out this line to run Chrome normally
        self.options.add_argument("--headless")
        self.options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36")
        self.service = ChromeService(executable_path=self.webdriver_path)
        self.driver = webdriver.Chrome(service=self.service, options=self.options)
        self.wait = WebDriverWait(self.driver, self.scrape_delay)
        try:
            self.load()
        except Exception as e:
            self.shutdown()
            raise Exception(e)
        # make sure to shutdown the driver even if error occurs
        try:
            self.post_load()
        except Exception as e:
            self.shutdown()
            raise Exception(e)
        self.shutdown()

    def load(self, *args, **kwargs):
        """Enforces a per-page scraping delay, then performs the inital page load."""
        print('Sleeping for', self.scrape_delay)
        time.sleep(self.scrape_delay)
        # fetch the page data
        print('Getting page at ', self.url)
        self.driver.get(self.url)

    def post_load(self, *args, **kwargs):
        """Runs after the initial load of page data. Placeholder, SHOULD BE OVERWRITTEN by child classes to extract data"""
        pass

    def shutdown(self, *args, **kwargs):
        self.driver.quit()

print('Class defined, PROCEED.')

## Creating an Object to Scrape Browse Pages

In [None]:
class UNDLBrowseScraper(BaseSeleniumScraper):
    data = []

    def post_load(self):
        self.data = self.gather_links()

    @property
    def next_link(self):
        link = None
        try:
            navigation_container = self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, 'span.rec-navigation'))).get_attribute('innerHTML')
        # if element isn't found, results were only a single page, so return None as there is no next link
        except:
            return None
        souped_navigation_container = BeautifulSoup(navigation_container, 'html.parser')
        link_containers = souped_navigation_container.find_all('a', class_='img')
        for link_container in link_containers:
            if link_container.img['alt'] == 'next':
                link = link_container['href']
        if link is None:
            return None
        return 'https://digitallibrary.un.org' + link

    @property
    def links(self):
        """Gets the links to the article on the page as it currently exists"""
        links = []
        link_container = self.driver.find_element(By.CSS_SELECTOR, 'form.all-results').get_attribute('innerHTML')
        container_soup = BeautifulSoup(link_container, 'html.parser')
        for result_row in container_soup.find('table').find_all('tr'):
            article_link = 'https://digitallibrary.un.org' + result_row.find('div', class_='result-title').a['href'] + '&v=pdf'
            links.append(article_link)
        return links
    
    def gather_links(self):
        """Recursive function to gather links to all articles on this page, and subsequent pages. If not on the last page,
        return the links on the page plus those returned by a recursively call another UNDLBrowseScraper object on the next page.
        If on the last page, just return the links and break the recursive loop"""
        # if no next_link, just return the links on the page
        if not self.next_link:
            return self.links
        else:
            return self.links + UNDLBrowseScraper(self.next_link, scrape_delay=self.scrape_delay).data

article_links = []
search_urls = [
    # 2014 articles
    'https://digitallibrary.un.org/search?ln=en&p=ebola&f=&rm=&sf=&so=d&rg=50&c=Resource%20Type&c=UN%20Bodies&c=&of=hb&fti=1&fct__3=2016&fti=1',
    # 2016 articles
    'https://digitallibrary.un.org/search?ln=en&p=ebola&f=&rm=&sf=&so=d&rg=50&c=Resource%20Type&c=UN%20Bodies&c=&of=hb&fti=1&fct__3=2014&fti=1',
    # 2015 articles were too numerous, had to break into several search queries by UN Body to reduce to < 500
    'https://digitallibrary.un.org/search?ln=en&p=ebola&f=&rm=&sf=&so=d&rg=50&c=Resource%20Type&c=UN%20Bodies&c=&of=hb&fti=1&fct__3=2015&fct__2=General%20Assembly&fti=1',
    'https://digitallibrary.un.org/search?ln=en&p=ebola&f=&rm=&sf=&so=d&rg=50&c=Resource%20Type&c=UN%20Bodies&c=&of=hb&fti=1&fct__3=2015&fct__2=Security%20Council&fti=1',
    'https://digitallibrary.un.org/search?ln=en&p=ebola&f=&rm=&sf=&so=d&rg=50&c=Resource%20Type&c=UN%20Bodies&c=&of=hb&fti=1&fct__3=2015&fct__2=Economic%20and%20Social%20Council&fti=1',
    'https://digitallibrary.un.org/search?ln=en&p=ebola&f=&rm=&sf=&so=d&rg=50&c=Resource%20Type&c=UN%20Bodies&c=&of=hb&fti=1&fct__3=2015&fct__2=Human%20Rights%20Bodies&fti=1',
    'https://digitallibrary.un.org/search?ln=en&p=ebola&f=&rm=&sf=&so=d&rg=50&c=Resource%20Type&c=UN%20Bodies&c=&of=hb&fti=1&fct__3=2015&fct__2=Programmes%20and%20Funds&fti=1',
    'https://digitallibrary.un.org/search?ln=en&p=ebola&f=&rm=&sf=&so=d&rg=50&c=Resource%20Type&c=UN%20Bodies&c=&of=hb&fti=1&fct__3=2015&fct__2=Other%20UN%20Bodies%20and%20Entities&fti=1',
    'https://digitallibrary.un.org/search?ln=en&p=ebola&f=&rm=&sf=&so=d&rg=50&c=Resource%20Type&c=UN%20Bodies&c=&of=hb&fti=1&fct__3=2015&fct__2=Secretariat&fti=1',
    'https://digitallibrary.un.org/search?ln=en&p=ebola&f=&rm=&sf=&so=d&rg=50&c=Resource%20Type&c=UN%20Bodies&c=&of=hb&fti=1&fct__3=2015&fct__2=Economic%20Commissions&fti=1'
]
for search_url in search_urls:
    scraper = UNDLBrowseScraper(search_url, scrape_delay=30)
    article_links += scraper.data

# remove any duplicate links
article_links = list(set(article_links))

print('Success! Gathered', len(article_links), 'articles')

## Define a PDF Scraper Object

In [None]:
import requests
import fitz

class PDFScraper:
    """Separate scraper to handle fetching/converting the PDF into text. Will be used by the UNDLArticleScraper below"""
    url = ''
    scrape_delay = 5
    max_tries = 5
    headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:66.0) Gecko/20100101 Firefox/66.0",
            "Accept-Encoding": "*",
            "Connection": "keep-alive"
        }
    data = None

    def __init__(self, url, *args, **kwargs):
        if type(url) != str:
            raise Exception('URL must be a valid string')
        self.url = url
        # store scrape delay, if provided and a valid number
        if 'scrape_delay' in kwargs:
            if not isinstance(kwargs['scrape_delay'], int) and not isinstance(kwargs['scrape_delay'], float) and not isinstance(kwargs['scrape_delay'], complex):
                raise Exception('Argument \'scrape_delay must be an integer\', float, or complex number')
            self.scrape_delay = kwargs['scrape_delay']
        # store scrape delay, if provided and a valid number
        if 'max_tries' in kwargs:
            if isinstance(kwargs['max_tries'], int) or isinstance(kwargs['max_tries'], float) or isinstance(kwargs['max_tries'], complex):
                raise Exception('Argument \'scrape_delay must be an integer\', float, or complex number')
            self.max_tries = kwargs['max_tries']
        self.data = self.pdf

    def get_pdf_data(self, tries_left=5):
        """Fetch data and returns raw content... if fail to fetch, returns self recursively, with tries_left decremented"""
        # enforce scrape delay
        pdf_content = None
        time.sleep(self.scrape_delay)
        if not tries_left or type(tries_left) != int or tries_left < 0:
            tries_left = 5
        print('Getting PDF at ', self.url)
        # attempt to get page data, decrement tries_left if successful
        try:
            pdf_content = requests.get(self.url, headers=self.headers).content
            tries_left -= 1
         # if an error occured, retry by returning recursively
        except:
            print('Error getting', self.url)
            if tries_left > 0:
                print('Retrying...')
                return self.get_pdf_data(self.url, tries_left=tries_left-1)
            if tries_left <= 0:
                print('Retry limit reached, ABORTING parse of', self.url)
                return None
        return pdf_content
    
    @property
    def pdf(self):
        """Returns the content of the PDF as text"""
        text = ''
        pdf_document = fitz.open('pdf', self.get_pdf_data(self.max_tries))
        for page_num in range(pdf_document.page_count):
            page = pdf_document[page_num]
            text += page.get_text()
        # return text with redundant whitespace and all newlines/tabs replaced
        text = text.replace('\n', ' ').replace('\t', ' ').replace('\r', ' ')
        text = ' '.join(text.split())
        return text

## Define a Scraper Object for Individual Articles

In [None]:


class UNDLArticleScraper(BaseSeleniumScraper):
    """Used to scrape the metadata & pdf link from a single article page. Uses the PDF scraper to get contents of the PDF"""

    def post_load(self, *args, **kwargs):
        metadata_container = self.wait.until(EC.visibility_of_element_located((By.CSS_SELECTOR, 'div.detailed-record-content'))).get_attribute('innerHTML')
        time.sleep(self.scrape_delay)
        metadata_soup = BeautifulSoup(metadata_container, 'html.parser')
        self.metadata = self.get_metadata(metadata_soup)
        try:
            files_container = self.wait.until(EC.visibility_of_element_located((By.ID, 'record-files-list'))).get_attribute('innerHTML')
            files_soup = BeautifulSoup(files_container, 'html.parser')
            self.metadata['enPdfLink'] = self.get_en_pdf_link(files_soup)
        except:
            self.metadata['enPdfLink'] = ''

    def get_metadata(self, souped_data, *args, **kwargs):
        """Extract metadata from page after load and return as a dictionary"""
        extracted_data = {
            'title': '',
            'authors': '',
            'description': '',
            'agenda': '',
            'resolution': '',
            'meetingRecord': '',
            'draftResolution': '',
            'note': '',
            'date': '',
            'enPdfLink': '',
            'url': self.url,
            'collections': ''
        }
        container = souped_data.find('div', id='details-collapse').find_all('div', class_='metadata-row')
        for data_row in container:
            # check label of metadata for the kind of metadata in that row
            match data_row.span.get_text().strip():
                case 'Title':
                    extracted_data['title'] = data_row.find_all('span')[1].get_text()
                case 'Authors':
                    extracted_data['authors'] = data_row.find_all('span')[1].get_text()
                case 'Agenda information':
                    extracted_data['agenda'] = data_row.find_all('span')[1].get_text()
                case 'Description':
                    extracted_data['description'] = data_row.find_all('span')[1].get_text()
                case 'Resolution / Decision':
                    extracted_data['resolution'] = data_row.find_all('span')[1].get_text()
                case 'Meeting record':
                    extracted_data['meetingRecord'] = data_row.find_all('span')[1].get_text()
                case 'Draft resolution':
                    extracted_data['draftResolution'] = data_row.find_all('span')[1].get_text()
                case 'Note':
                    extracted_data['note'] = data_row.find_all('span')[1].get_text()
                case 'Vote date':
                    extracted_data['date'] = data_row.find_all('span')[1].get_text()
                case 'Date':
                    extracted_data['date'] = data_row.find_all('span')[1].get_text()
                case 'Collections':
                    extracted_data['collections'] = data_row.find_all('span')[1].get_text()
        return extracted_data
    
    def get_en_pdf_link(self, souped_data, *args, **kwargs):
        """gets all the PDF links on the page"""
        link = ''
        for pdf_row in souped_data.find_all('tr')[1:]:
            if pdf_row.find_all('td')[4].get_text() == 'English':
                link =  pdf_row.find_all('td')[0].find('tindui-app-file-download-link')['url']
        return link
    
    @property
    def pdf(self):
        if self.metadata['enPdfLink'] != '':
            return PDFScraper(self.metadata['enPdfLink'], scrape_delay=self.scrape_delay).data
        else:
            return ''
    
story_data = []

counter = 0
for article_link in article_links:
    counter += 1
    print('Getting article', counter, 'of', len(article_links))
    story_datum = {}
    scraper = UNDLArticleScraper(article_link)
    story_datum = scraper.metadata
    story_datum['text'] = scraper.pdf
    # ignore blank stories (where there is no text, because the file is not online)
    if story_datum['text'] != '':
        story_data.append(story_datum)

print('Scraped', len(article_links), 'succesfully!')
print(story_data[0:5])


## Saving to File (CSV)

Now we need to output the data for text analysis. In this step we will output each record as a line in a .CSV (spreadsheet) file. That file will be stored in `output/un_digital_library.csv`.

In [None]:
import os
import csv

OUTPUT_CSV_FILENAME = 'un_digital_library_3.csv'
OUTPUT_CSV_FIELDNAMES = ['title', 'authors', 'description', 'agenda', 'resolution', 'meetingRecord', 'draftResolution', 'note', 'date', 'enPdfLink', 'url', 'collections', 'text']

output_filepath = os.path.join(os.path.abspath(os.getcwd()), 'output', OUTPUT_CSV_FILENAME)

# ensure directory exists, if not, create it
if not os.path.exists(os.path.join(os.path.abspath(os.getcwd()), 'output')):
    os.makedirs(os.path.join(os.path.abspath(os.getcwd()), 'output'))

print('Writing CSV File ', output_filepath)
with open(output_filepath, 'w+', encoding='utf8') as csvfile:
    writer = csv.DictWriter(csvfile, fieldnames=OUTPUT_CSV_FIELDNAMES)
    writer.writeheader()
    for story_datum in story_data:
        writer.writerow(story_datum)

print('Success writing CSV File!')

## Saving to File (TXT)

Finally, some text analysis packages use folders of .txt files, instead of .csv files. So, we will also output every record as a .txt file that will be located inside of `output/un_digital_library/FILENAME.txt`, where the FILENAME will be determined by the english PDF link.

In [None]:
import os

OUTPUT_FOLDERNAME = 'un_digital_library'

output_folderpath = os.path.join(os.path.abspath(os.getcwd()), 'output', OUTPUT_FOLDERNAME)

# ensure directory exists, if not, create it
if not os.path.exists(output_folderpath):
    os.makedirs(output_folderpath)

print('Writing TXT Files ', output_folderpath)
for story_datum in story_data:
    # get the filename from the last segment of the PDF link, remove the 'en' suffix and the file type and replace with .txt
    output_filename = story_datum['enPdfLink'].split('/')[-1].replace('-EN.pdf', '.txt')
    output_filepath = os.path.join(output_folderpath, output_filename)
    txtfile = open(output_filepath, 'w+', encoding='utf8')
    txtfile.write(story_datum['text'])
    txtfile.close()

print('Success writing TXT Files!')