# HLS Web Scraping

In [4]:
# %load /home/gaetan/Desktop/geovpylib/templates/heading-admin.py
%load_ext autoreload
%autoreload 2

# Common imports
import os
import pandas as pd, numpy as np
import datetime
# import math
#import time
#import json
#import requests
#import duckdb
#import plotly.express as px
# from multiprocessing import Pool

# Geovpylib library
import geovpylib.analysis as a
import geovpylib.database as db
import geovpylib.decorators as d
import geovpylib.importer as i
import geovpylib.magics
import geovpylib.pks as pks
import geovpylib.queries as q
import geovpylib.record_linkage as rl
import geovpylib.sparql as sparql
import geovpylib.utils as u
eta = u.Eta()

# Specific imports
from selenium.webdriver import Chrome, ChromeOptions
from selenium.webdriver.common.by import By

# Global variables
# ...

# Connect to Geovistory('prod')

# Connect to Geovistory database for insert
# env = 'prod' # Database to query: "prod", "stag", "dev", "local"
# pk_project = pks.projects. # The project to query/insert: integer
# execute = False # Boolean to prevent to execute directly into databases
# metadata_str = '' # kebab-lower-case or snake-lower-case. 
# import_manner = 'one-shot' # 'one-shot' or 'batch'
# db.connect_geovistory(env, pk_project, execute)
# db.set_metadata({'import-id': datetime.datetime.today().strftime('%Y%m%d') + '-' + metadata_str})
# db.set_insert_manner(import_manner)

# Connect to other database
db_url_env_var_name = 'YELLOW_SWITZERLAND_AND_BEYOND' # Name of an environment variable holding the Postgres database URL
execute = True # Boolean to prevent to execute directly into databases
db.connect_external(os.getenv(db_url_env_var_name), execute=execute)

# Connect to a SPARQL endpoint
# sparql.connect_external('url')


The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload
[DB] Connecting to PGSQL Database ... Connected!


# Create tables

In [None]:
db.execute("""
CREATE TABLE hls.theme (
	id serial primary key,
	url varchar,
	name varchar,
	notice varchar,
	uri_geov varchar
);
CREATE TABLE hls.person (
	id serial primary key,
	url varchar,
	name varchar,
	notice varchar,
	uri_geov varchar
);
CREATE TABLE hls.family (
	id serial primary key,
	url varchar,
	name varchar,
	notice varchar,
	uri_geov varchar
);
CREATE TABLE hls.place (
	id serial primary key,
	url varchar,
	name varchar,
	notice varchar,
	uri_geov varchar
);
""")

# Scrap all themes links

In [None]:
def extract_theme_links(browser, url):
    """Load a page, extract name and URL, and write informations in database."""
    
    browser.get(url)
    
    elements = browser.find_elements(By.CSS_SELECTOR, '.search-result')
    themes = db.query('select * from hls.theme')
    themes_set = set(themes['url'])
    
    sql = ""
    for elt in elements:
        link = elt.find_element(By.CSS_SELECTOR, 'a').get_attribute('href')
        name = elt.find_element(By.CSS_SELECTOR, 'a > h2').text
        try:
            dates = elt.find_element(By.CSS_SELECTOR, 'a > h2 > span').text
        except Exception:
            dates = ''

        link = link[0:link[:-1].rfind('/')] # remove the trailing date
        name = name.replace(dates, '').strip() # Remove the dates in the string

        if link not in themes_set:
            sql += f"""
                insert into hls.theme 
                    (url, name)
                    values ('{link}', '{name.replace("'", "''")}');
            """
    
    db.execute(sql)

    if browser.find_elements(By.CSS_SELECTOR, '.noNextPagination'):
        return False

    return browser.find_element(By.CSS_SELECTOR, '.nextPagination').get_attribute('href')



browser = Chrome()
url = "https://hls-dhs-dss.ch/fr/search/category?f_hls.lexicofacet_string=0%2F016900.&text=*&sort=score&sortOrder=desc&collapsed=true&r=1"
cpt = 0

eta.begin(158, 'Fetching themes urls')
while True:
    cpt += 1
    url = extract_theme_links(browser, url)
    if not url: break
    eta.iter()
eta.end()

browser.quit()


# Scrap all persons links

In [None]:
def extract_person_links(browser, url):
    """Load a page, extract name and URL, and write informations in database."""
    
    browser.get(url)
    
    elements = browser.find_elements(By.CSS_SELECTOR, '.search-result')
    persons = db.query('select * from hls.person')
    persons_set = set(persons['url'])
    
    sql = ""
    for elt in elements:
        link = elt.find_element(By.CSS_SELECTOR, 'a').get_attribute('href')
        name = elt.find_element(By.CSS_SELECTOR, 'a > h2').text
        try:
            dates = elt.find_element(By.CSS_SELECTOR, 'a > h2 > span').text
        except Exception:
            dates = ''

        link = link[0:link[:-1].rfind('/')] # remove the trailing date
        name = name.replace(dates, '').strip() # Remove the dates in the string

        if link not in persons_set:
            sql += f"""
                insert into hls.person 
                    (url, name)
                    values ('{link}', '{name.replace("'", "''")}');
            """
    
    db.execute(sql)

    if browser.find_elements(By.CSS_SELECTOR, '.noNextPagination'):
        return False

    return browser.find_element(By.CSS_SELECTOR, '.nextPagination').get_attribute('href')



browser = Chrome()
url = "https://hls-dhs-dss.ch/fr/search/category?f_hls.lexicofacet_string=0%2F000100.&text=*&sort=score&sortOrder=desc&collapsed=true&r=1"
cpt = 0

eta.begin(1268, 'Fetching persons urls')
while True:
    cpt += 1
    url = extract_person_links(browser, url)
    if not url: break
    eta.iter()
eta.end()

browser.quit()


# Scrap all families links

In [None]:
def extract_family_links(browser, url):
    """Load a page, extract name and URL, and write informations in database."""
    
    browser.get(url)
    
    elements = browser.find_elements(By.CSS_SELECTOR, '.search-result')
    families = db.query('select * from hls.family')
    families_set = set(families['url'])
    
    sql = ""
    for elt in elements:
        link = elt.find_element(By.CSS_SELECTOR, 'a').get_attribute('href')
        name = elt.find_element(By.CSS_SELECTOR, 'a > h2').text
        try:
            dates = elt.find_element(By.CSS_SELECTOR, 'a > h2 > span').text
        except Exception:
            dates = ''

        link = link[0:link[:-1].rfind('/')] # remove the trailing date
        name = name.replace(dates, '').strip() # Remove the dates in the string

        if link not in families_set:
            sql += f"""
                insert into hls.family 
                    (url, name)
                    values ('{link}', '{name.replace("'", "''")}');
            """
    
    db.execute(sql)

    if browser.find_elements(By.CSS_SELECTOR, '.noNextPagination'):
        return False

    return browser.find_element(By.CSS_SELECTOR, '.nextPagination').get_attribute('href')



browser = Chrome()
url = "https://hls-dhs-dss.ch/fr/search/category?f_hls.lexicofacet_string=0%2F000200.&text=*&sort=score&sortOrder=desc&collapsed=true&r=1"
cpt = 0

eta.begin(128, 'Fetching families urls')
while True:
    cpt += 1
    url = extract_family_links(browser, url)
    if not url: break
    eta.iter()
eta.end()

browser.quit()


# Scrap all places links

In [None]:
def extract_place_links(browser, url):
    """Load a page, extract name and URL, and write informations in database."""
    
    browser.get(url)
    
    elements = browser.find_elements(By.CSS_SELECTOR, '.search-result')
    places = db.query('select * from hls.place')
    places_set = set(places['url'])
    
    sql = ""
    for elt in elements:
        link = elt.find_element(By.CSS_SELECTOR, 'a').get_attribute('href')
        name = elt.find_element(By.CSS_SELECTOR, 'a > h2').text
        try:
            dates = elt.find_element(By.CSS_SELECTOR, 'a > h2 > span').text
        except Exception:
            dates = ''

        link = link[0:link[:-1].rfind('/')] # remove the trailing date
        name = name.replace(dates, '').strip() # Remove the dates in the string

        if link not in places_set:
            sql += f"""
                insert into hls.place 
                    (url, name)
                    values ('{link}', '{name.replace("'", "''")}');
            """
    
    db.execute(sql)

    if browser.find_elements(By.CSS_SELECTOR, '.noNextPagination'):
        return False

    return browser.find_element(By.CSS_SELECTOR, '.nextPagination').get_attribute('href')



browser = Chrome()
url = "https://hls-dhs-dss.ch/fr/search/category?f_hls.lexicofacet_string=0%2F006800.&text=*&sort=score&sortOrder=desc&collapsed=true&r=1"
cpt = 0

eta.begin(274, 'Fetching places urls')
while True:
    cpt += 1
    url = extract_place_links(browser, url)
    if not url: break
    eta.iter()
eta.end()

browser.quit()

# Fetch notices for all themes

In [4]:
browser = Chrome()

themes = db.query('select * from hls.theme where notice is null')

eta.begin(len(themes), 'Scraping themes notices')
for _, theme in themes.iterrows():
    if theme['notice']: 
        eta.iter()
        continue
    
    browser.get(theme['url'])
    notice = browser.find_element(By.CSS_SELECTOR, '.hls-article-text-unit > p').text

    db.execute(f"""
        update hls.theme
            set notice = '{notice.replace("'", "''")}'
        where id = {theme['id']};
    """)
    eta.iter()
eta.end()

browser.quit()

Scraping themes notices is done - Elapsed: [00h00m00s]                                                                     


# Fetch notices for all persons

In [6]:
browser = Chrome()

persons = db.query('select * from hls.person where notice is null')

eta.begin(len(persons), 'Scraping persons notices')
for _, person in persons.iterrows():
    if person['notice']: 
        eta.iter()
        continue
    
    browser.get(person['url'])
    notice = browser.find_element(By.CSS_SELECTOR, '.hls-article-text-unit > p').text

    try:
        birthdate = browser.find_element(By.CSS_SELECTOR, '.hls-article-text-unit > p > .hls-dnais').text
        if birthdate != '': notice = notice.replace(birthdate, 'Naît le ' + birthdate)
    except: pass

    try:
        deathdate = browser.find_element(By.CSS_SELECTOR, '.hls-article-text-unit > p > .hls-ddec').text
        notice = notice.replace(deathdate, 'meurt le ' + birthdate)
    except: pass

    db.execute(f"""
        update hls.person
            set notice = '{notice.replace("'", "''")}'
        where id = {person['id']};
    """)
    eta.iter()
eta.end()

browser.quit()

Scraping persons notices is done - Elapsed: [00h31m12s]                                                                       


# Fetch notices for all families

In [5]:
browser = Chrome()

families = db.query('select * from hls.family where notice is null')

eta.begin(len(families), 'Scraping families notices')
for _, family in families.iterrows():
    if family['notice']: 
        eta.iter()
        continue
    
    browser.get(family['url'])
    notice = browser.find_element(By.CSS_SELECTOR, '.hls-article-text-unit > p').text

    db.execute(f"""
        update hls.family
            set notice = '{notice.replace("'", "''")}'
        where id = {family['id']};
    """)
    
    eta.iter()
eta.end()

browser.quit()

Scraping families notices is done - Elapsed: [00h00m00s]                                                                   


# Fetch notices for all places

In [2]:
browser = Chrome()

places = db.query('select * from hls.place where notice is null')

eta.begin(len(places), 'Scraping places notices')
for _, place in places.iterrows():
    if place['notice']: 
        eta.iter()
        continue
    
    browser.get(place['url'])
    notice = browser.find_element(By.CSS_SELECTOR, '.hls-article-text-unit > p').text

    db.execute(f"""
        update hls.place
            set notice = '{notice.replace("'", "''")}'
        where id = {place['id']};
    """)

    eta.iter()
eta.end()

browser.quit()

Scraping places notices is done - Elapsed: [00h26m47s]                                                                     
