<a href="https://colab.research.google.com/github/dawi118/dk_kurser_repo/blob/main/dk_kurser_total_scrape_aarhus.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Extract text from multiple courses (Aarhus)

Interactive Python script for extracting course data from the University of Aarhus. Note that this codebase is a proof of concept and requires refinement (scalability, quality of outputs).

Transferability of this web scraping tool to other universities is limited, depending on formatting of destination html.

In [1]:
# Optional: Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [9]:
# --- Packages ---
!pip install langdetect
!pip install bs4
!pip install chromium
!pip install playwright # Playwright required to render each web page with javascript in order to extract course information - need to run last

# --- Libraries ---
import re, requests, time, html, unicodedata, json, logging, random, sys, binascii, asyncio, nest_asyncio, os # asyncio allows for playwright to scrape content rendered in js as we go
import pandas as pd
from bs4 import BeautifulSoup, Tag
from urllib.parse import urljoin, urlparse, urldefrag
from langdetect import detect, DetectorFactory
from playwright.async_api import async_playwright, TimeoutError as PWTimeout
import matplotlib.pyplot as plt
from pathlib import Path
import numpy as np
import chromium
import playwright



**1. Scrape Course URLs before retrieving content**

In [16]:
# ---- Config ----
BASE        = "https://kursuskatalog.au.dk/en"
CAL_YEARS   = [str(y) for y in range(2012, 2028)]      # 2012 to 2027 inclusive, as string
YEAR_CHAIN  = "-".join(CAL_YEARS) # How URL formats course titles and years
START_PAGE  = 1 # Can change for sampling approach
MAX_PAGES   = None              # Optional: None = run until no 'cards' left (page renders course cards using js)
PAUSE       = (0.10, 0.20)      # Delay between scrapes (reduce server load)

OUT_TXT     = Path("au_course_urls.txt")
CHK_SIZE    = 1000              # checkpoint size - set to cache url download into manageable chunks
CHK_PREFIX  = "/content/drive/MyDrive/Thesis/web_scraping/aarhus/aarhus_"         # Optional: for aggregating files across checkpoints once course URLs are identified, if in same file (assumed name convention: arrhus_0_1000, arrhus_1000_2000, ...)
CHK_SUFFIX  = ""                # leave empty

SELECTOR_CARDS        = "li.card"
SELECTOR_CARD_ANCHOR  = "h3.card_title--large a[href], a[href*='/en/course/']"

# --- Helpers ---
# absolutise - converts partial or relative URLs into full ones where necessary
def absolutise(base_url: str, href: str) -> str | None:
    if not href or href.startswith(("mailto:", "tel:", "javascript:")):
        return None
    return urldefrag(urljoin(base_url, href))[0]

# discover_all_urls_async - main function to identify urls from Aarhus and save periodically using playwright
async def discover_all_urls_async():
    all_urls, seen = [], set()
    last_chk_end = 0  # end index of the last chunk file written

    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        ctx = await browser.new_context(
            user_agent=("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                        "AppleWebKit/537.36 (KHTML, like Gecko) "
                        "Chrome/124.0.0.0 Safari/537.36"),
            locale="en-GB",
            extra_http_headers={"Accept-Language": "en-GB,en;q=0.9,*;q=0.5"},
            viewport={"width": 1400, "height": 900},
        )
        page = await ctx.new_page()

        pg = START_PAGE
        while True:
            if MAX_PAGES and pg > MAX_PAGES:
                break

            url = f"{BASE}?year={YEAR_CHAIN}&page={pg}"
            try:
                await page.goto(url, wait_until="networkidle", timeout=45000)
            except PWTimeout:
                break

            # Wait for cards to populate
            try:
                await page.wait_for_selector(SELECTOR_CARDS, timeout=6000)
            except PWTimeout:
                pass

            card_l = page.locator(SELECTOR_CARDS)
            card_count = await card_l.count()
            if card_count == 0:
                break

            # Extract one course link per card
            added = 0
            for i in range(card_count):
                card = card_l.nth(i)
                a = card.locator(SELECTOR_CARD_ANCHOR).first
                href = await a.get_attribute("href")
                abs_url = absolutise(url, (href or "").strip())
                if abs_url and "/en/course/" in abs_url and abs_url not in seen:
                    seen.add(abs_url)
                    all_urls.append(abs_url)
                    added += 1

            # Quiet progress line per page
            print(f"Page {pg} complete — added {added} (total {len(all_urls)})")

            # Write any full 1000-link chunks accumulated since last checkpoint
            while last_chk_end + CHK_SIZE <= len(all_urls):
                start = last_chk_end
                end   = last_chk_end + CHK_SIZE
                fname = f"{CHK_PREFIX}{start}_{end}{CHK_SUFFIX}"
                Path(fname).write_text("\n".join(all_urls[start:end]), encoding="utf-8")
                print(f"Saved checkpoint: {fname}")
                last_chk_end = end

            pg += 1
            time.sleep(random.uniform(*PAUSE))

        await browser.close()

    # Produce final consolidated file
    OUT_TXT.write_text("\n".join(all_urls), encoding="utf-8")
    print(f"Saved final list: {OUT_TXT} ({len(all_urls)} URLs)")

    # If the run ended mid-chunk, save the tail to file for re-running
    if last_chk_end < len(all_urls):
        tail_name = f"{CHK_PREFIX}{last_chk_end}_{len(all_urls)}{CHK_SUFFIX}"
        Path(tail_name).write_text("\n".join(all_urls[last_chk_end:]), encoding="utf-8")
        print(f"Saved tail checkpoint: {tail_name}")

    return all_urls

# --- Main ---
nest_asyncio.apply()
urls = await discover_all_urls_async()
asyncio.run(discover_all_urls_async())

Error: BrowserType.launch: Executable doesn't exist at /root/.cache/ms-playwright/chromium_headless_shell-1187/chrome-linux/headless_shell
╔════════════════════════════════════════════════════════════╗
║ Looks like Playwright was just installed or updated.       ║
║ Please run the following command to download new browsers: ║
║                                                            ║
║     playwright install                                     ║
║                                                            ║
║ <3 Playwright Team                                         ║
╚════════════════════════════════════════════════════════════╝

1.1. Rename and aggregate checkpoint files (optional)

In [17]:
# --- Config ---
# Define folder_path - replace with appropriate file name
folder_path = "/content/drive/MyDrive/Thesis/web_scraping/aarhus"

# --- Main ----
# List all files in the folder
files = os.listdir(folder_path)

# Iterate through each file and rename it by adding '.csv' extension
for file_name in files:
    # Construct the full path for the original and new file
    old_path = os.path.join(folder_path, file_name)
    new_path = os.path.join(folder_path, file_name + ".csv")

    # Rename the file using the mv shell command
    !mv "{old_path}" "{new_path}"
    print(f"Renamed '{file_name}' to '{file_name}.csv'")

# List renamed 'csv' files located at folder_path
all_files = [os.path.join(folder_path, f) for f in os.listdir(folder_path) if f.endswith('.csv')]

# Read each CSV file into a DataFrame and store in a list
df_list = []
for f in all_files:
    try:
        # Assuming the CSVs have no header, or you want to treat the first row as data
        df = pd.read_csv(f, header=None)
        # Prune double apostrophes from each cell
        df = df.applymap(lambda x: x.replace('"', '') if isinstance(x, str) else x)
        df_list.append(df)
    except Exception as e:
        print(f"Error reading {f}: {e}")

# Concatenate all DataFrames into a single DataFrame
if df_list:
    combined_df = pd.concat(df_list, ignore_index=True)

    # Define the output file path
    output_file = os.path.join(folder_path, "combined_aarhus_urls.csv")

    # Save the combined DataFrame to a new CSV file
    combined_df.to_csv(output_file, index=False, header=False) # Set header=False if no header in original files
    print(f"Successfully combined {len(all_files)} files into {output_file}")
else:
    print("No CSV files found to combine.")

NameError: name 'os' is not defined

**2. Execute Scraping against URLs (takes very long time)**

In [None]:
# --- Packages (assueme not installed earlier) ---
!pip -q install -U playwright bs4 pandas langdetect nest_asyncio

# Issues using package - try pulling Chromium and any missing OS deps
!playwright install --with-deps chromium

# --- Libraries ---
import re
import json
import html
import time
import random
import logging
import nest_asyncio, asyncio
nest_asyncio.apply()
from pathlib import Path
from urllib.parse import urljoin

import pandas as pd
from bs4 import BeautifulSoup
from langdetect import detect
from playwright.async_api import async_playwright, TimeoutError as PWTimeout

# --- Config ---
URL_CSV      = "/content/drive/MyDrive/Thesis/web_scraping/aarhus/combined_aarhus_urls.csv" # Switch out!
OUT_JSON     = "aarhus_courses_from_csv.json" # Destination file name
FAILED_TXT   = "aarhus_failed_urls.txt" # Destination (failed extractions)
CHECKPOINT_EVERY = 100     # write partial file every x courses (adjust based on speed)
CONCURRENCY = 3       # number of pages viewable simultaneously (with playwright)

HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/124.0.0.0 Safari/537.36"
    ),
    "Accept-Language": "en,en-GB;q=0.9,*;q=0.5",
} # configuring a 'human-like' agent for web scraping

# Logging output settings
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s"
)
log = logging.getLogger("au")

# --- Helpers ---
# norm_text - normalise html text for extraction
def norm_text(s: str | None) -> str | None:
    if not s: return None
    t = html.unescape(s).replace("\xa0", " ")
    t = re.sub(r"\s+", " ", t).strip()
    return t or None

# load_course_urls - extract course_urls downloaded earlier from csv_path
def load_course_urls(csv_path: str) -> list[str]:
    df = pd.read_csv(csv_path, dtype=str, keep_default_na=False)
    urls = []
    for col in df.columns:
        for val in df[col].astype(str):
            s = (val or "").strip()
            if s.startswith("http") and "kursuskatalog.au.dk" in s and "/course/" in s:
                urls.append(s)
    # de-duplicate (preserving order)
    seen, out = set(), []
    for u in urls:
        if u not in seen:
            seen.add(u); out.append(u)
    return out

# section_text_after - extracts relevant blocks of content following specific headings
def section_text_after(bs_heading) -> str:
    parts = []
    for sib in bs_heading.next_siblings:
        name = getattr(sib, "name", None)
        if name in ("h1","h2","h3","h4","h5","h6"):
            break
        if hasattr(sib, "get_text"):
            t = sib.get_text(" ", strip=True)
        else:
            t = str(sib).strip() if isinstance(sib, str) else ""
        if t:
            parts.append(t)
    return norm_text(" ".join(parts)) or ""

# extract_title - extracts course title from page 'h1'
def extract_title(soup: BeautifulSoup) -> str | None:
    root = soup.find("main", id="content") or soup
    h1 = (root.select_one("div.page__content__block h1.list-navigator__header")
          or root.find("h1"))
    if not h1:
        return None
    raw = norm_text(h1.get_text(" ", strip=True))
    if not raw:
        return None
    # remove leading code like "5647: Title"
    m = re.match(r"^\s*[A-Za-z0-9._-]+\s*:\s*(.+)$", raw)
    return m.group(1) if m else raw

# extract_placement - extracts placement variable from page
def extract_placement(soup: BeautifulSoup) -> str | None:
    root = soup.find("main", id="content") or soup
    h2 = root.select_one("div.page__content__block h2.list-navigator__subtitle")
    return norm_text(h2.get_text(" ", strip=True)) if h2 else None

# extract_placement - extracts 'stads uva code' variable from page
def extract_stads_code(soup: BeautifulSoup) -> str | None:
    sp = soup.find("span", class_="copy-to-clipboard__this")
    code = norm_text(sp.get_text(strip=True)) if sp else None
    return code if code and re.search(r"[A-Za-z0-9]", code) else None

# extract_fact_list - extracts credit, level and language variables from normalised fact list on webpage
def extract_fact_list(soup: BeautifulSoup) -> dict:
    out = {}
    for lab in soup.select(".fact-list__item__label"):
        label = norm_text(lab.get_text(" ", strip=True) or "")
        valnode = lab.find_next_sibling(class_="fact-list__item__value") or lab.parent.find(class_="fact-list__item__value")
        value = norm_text(valnode.get_text(" ", strip=True) if valnode else "")
        if not label or not value:
            continue
        lbl = label.lower()
        if "ects" in lbl:
            out["credit"] = value
        elif "level" in lbl:
            out["level"] = value
        elif "language" in lbl:
            out["language"] = value
    return out

# extract_department - extracts department variable from beautiful soup text
def extract_department(soup: BeautifulSoup) -> str | None:
    for dt in soup.find_all("dt"):
        label = norm_text(dt.get_text(" ", strip=True) or "")
        if label and label.lower().strip() == "department":
            dd = dt.find_next_sibling("dd")
            if dd:
                return norm_text(dd.get_text(" ", strip=True))
    return None

# extract_content_and_lo - extract content and learning outcome text from course description
def extract_content_and_lo(soup: BeautifulSoup) -> tuple[str | None, str | None]:
    cols = soup.select("div.course-details__column")
    content, outcomes = None, None

    for col in cols:
        for h in col.find_all(["h2","h3"]):
            t = norm_text(h.get_text(" ", strip=True) or "")
            if not t: continue
            tl = t.lower()
            if (("course content" in tl or tl == "content") and not content):
                content = section_text_after(h)
            elif (("description of qualifications" in tl) or ("learning outcomes" in tl)) and not outcomes:
                outcomes = section_text_after(h)
    if not content and cols:
        content = norm_text(" ".join([norm_text(c.get_text(" ", strip=True) or "") or "" for c in cols]))
    return content, outcomes

# extract_coordinators_and_profile_links - attempt to click through coordinator text to extract course coordinator details (i.e. name and email)
def extract_coordinators_and_profile_links(soup: BeautifulSoup) -> tuple[list[str], list[str]]:
    names = []
    profile_links = []

    dd = None
    for dt in soup.find_all("dt"):
        label = norm_text(dt.get_text(" ", strip=True) or "")
        if label and label.lower().startswith("course coordinator"):
            dd = dt.find_next_sibling("dd")
            break
    root = dd or soup

    for div in root.select(".profile-image__label__title"):
        n = norm_text(div.get_text(" ", strip=True))
        if n: names.append(n)
    for img in root.select("img.profile-image[alt]"):
        n = norm_text(img.get("alt"))
        if n: names.append(n)
    for tr in root.select(".truncator__content"):
        t = norm_text(tr.get_text(" ", strip=True))
        if t: names.append(t)

    for a in root.find_all("a", href=True):
        href = a["href"].strip()
        if href.startswith("mailto:"):
            continue
        if "au.dk" in href:
            profile_links.append(urljoin("https://kursuskatalog.au.dk", href))

    # de-duplicate names
    names = list(dict.fromkeys(names))
    profile_links = list(dict.fromkeys(profile_links))
    return names, profile_links

# ---  Main ---
# render_and_parse (async) - attempt to render webpage using Playwright to obtain details
async def render_and_parse(context, url: str) -> BeautifulSoup | None:
    page = await context.new_page()
    try:
        await page.route("**/*", lambda route: (
            route.continue_() if route.request.resource_type in {"document","script","xhr","fetch","stylesheet"} else route.abort()
        ))
    except Exception:
        pass

    try:
        await page.goto(url, wait_until="domcontentloaded", timeout=30000)
        # Wait for either the title or the content root; keep going if timeout
        try:
            await page.wait_for_selector("h1.list-navigator__header, main#content", timeout=10000)
        except PWTimeout:
            pass
        html_str = await page.content()
        return BeautifulSoup(html_str, "html.parser")
    except Exception as e:
        log.warning("Nav failed: %s (%s)", url, e)
        return None
    finally:
        await page.close()

# fetch_profile_emails (async) - attempt to use profile_url to extract coordinator details on a separate page
async def fetch_profile_emails(context, profile_url: str) -> list[str]:
    page = await context.new_page()
    emails = []
    try:
        await page.goto(profile_url, wait_until="domcontentloaded", timeout=25000)
        # try both immediately and after a short wait (some profiles populate slightly later)
        sel = "span.contact-list__item__content__full-label"
        try:
            await page.wait_for_selector(sel, timeout=6000)
        except PWTimeout:
            pass
        nodes = await page.locator(sel).all_inner_texts()
        for t in nodes:
            t = norm_text(t)
            if t and "@" in t:
                emails.append(t)
    except Exception:
        pass
    finally:
        await page.close()
    return list(dict.fromkeys(emails))


# scrape_one - attempt to scrape required information from a single course page
async def scrape_one(context, url: str) -> dict | None:
    soup = await render_and_parse(context, url)
    if not soup:
        return None

    data = {
        "source_url": url,
        "year": None,
        "course_code": None,
        "course_title": None,
        "credit": None,
        "placement": None,
        "contracting_department": None,
        "coordinators": None,
        "coordinator_emails": None,
        "content": None,
        "learning_outcomes": None,
        "knowledge": None,
        "skills": None,
        "competencies": None,
        "language": None,
        "level": None,
        "web_lang": None
    }

    # Title & Placement
    data["course_title"] = extract_title(soup)
    data["placement"]    = extract_placement(soup)
    if data["placement"]:
        m = re.search(r"(\d{4})", data["placement"])
        if m: data["year"] = m.group(1)

    # STADS code
    data["course_code"] = extract_stads_code(soup)

    # Facts
    facts = extract_fact_list(soup)
    for k, v in facts.items():
        data[k] = v

    # Department
    data["contracting_department"] = extract_department(soup)

    # Content & Learning outcomes
    content, outcomes = extract_content_and_lo(soup)
    data["content"]           = content
    data["learning_outcomes"] = outcomes

    # Coordinators + emails
    names, profiles = extract_coordinators_and_profile_links(soup)
    data["coordinators"] = names or None

    emails = []
    for prof in profiles[:3]:
        emails.extend(await fetch_profile_emails(context, prof))
    data["coordinator_emails"] = list(dict.fromkeys(emails)) or None

    # Language detection fallback
    sample = data["content"] or data["learning_outcomes"] or data["course_title"] or ""
    try:
        data["web_lang"] = detect(sample) if sample else "unknown"
    except Exception:
        data["web_lang"] = "unknown"

    return data

# main - script to run all functions (initiate below)
async def main():
    # Load URLs from step 1
    urls = load_course_urls(URL_CSV)
    print(f"Loaded {len(urls)} course URLs from CSV")

    records, failed = [], []

    # Define concurrency (default = 3)
    sem = asyncio.Semaphore(CONCURRENCY)

    # Launch playwright through chromium browser and commence scraping
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        context = await browser.new_context(user_agent=HEADERS["User-Agent"], locale="en-GB")

        # Cycle through pages and append records
        async def runner(u: str, idx: int):
            async with sem:
                rec = await scrape_one(context, u)
                if rec: records.append(rec)
                else:   failed.append(u)

                # sample print every 50 (For testing!)
                if idx % 10 == 0:
                    print(f"\n=== SAMPLE @ {idx}/{len(urls)} ===")
                    print(json.dumps(records[-1] if records else {"note":"no records yet"}, ensure_ascii=False, indent=2))
                    print("---------------------")

                # checkpoint
                if idx % CHECKPOINT_EVERY == 100: # every 100 scrapes, create checkpoint and save as JSON
                    ck = f"aarhus_courses_ck_{idx-CHECKPOINT_EVERY+1}_{idx}.json"
                    Path(ck).write_text(json.dumps(records, ensure_ascii=False, indent=2), encoding="utf-8")
                    log.info("Checkpoint saved: %s (records so far: %d)", ck, len(records))

                # Delay between tasks - randomise when scraping
                await asyncio.sleep(random.uniform(0.05, 0.12))

        # Run an event look using tasks (with concurrency to allow for multiple scrapes at the same time)
        tasks = []
        for i, u in enumerate(urls, start=1):
            tasks.append(asyncio.create_task(runner(u, i)))

            # limit queue of tasks to avoid huge task list in memory
            if len(tasks) >= 2000:
                await asyncio.gather(*tasks)
                tasks = []
        if tasks:
            await asyncio.gather(*tasks)

        # close browser after success
        await context.close()
        await browser.close()

    # write outputs to OUT_JSON
    Path(OUT_JSON).write_text(json.dumps(records, ensure_ascii=False, indent=2), encoding="utf-8")
    print(f"\nSaved {len(records)} records → {OUT_JSON}")

    if failed:
        Path(FAILED_TXT).write_text("\n".join(failed), encoding="utf-8")
        print(f"Saved {len(failed)} failed URLs → {FAILED_TXT}")

# --- Run ---
if __name__ == "__main__":
    import asyncio, sys
    try:
        # If there's already a running loop, run main() inside this
        asyncio.get_running_loop()
        nest_asyncio.apply()
        asyncio.get_event_loop().run_until_complete(main())

    except RuntimeError:
        # No running loop (i.e. normal Python script on first run) - safe to use asyncio.run()
        asyncio.run(main())

Loaded 61705 course URLs from CSV

=== SAMPLE @ 10/61705 ===
{
  "source_url": "https://kursuskatalog.au.dk/en/course/58688/10-ECTS-Life-histories-and-biographical-research",
  "year": null,
  "course_code": null,
  "course_title": null,
  "credit": null,
  "placement": null,
  "contracting_department": null,
  "coordinators": null,
  "coordinator_emails": null,
  "content": null,
  "learning_outcomes": null,
  "knowledge": null,
  "skills": null,
  "competencies": null,
  "language": null,
  "level": null,
  "web_lang": "unknown"
}
---------------------

=== SAMPLE @ 20/61705 ===
{
  "source_url": "https://kursuskatalog.au.dk/en/course/65434/10-ECTS-Quantitative-Research-Methods",
  "year": "2016",
  "course_code": "171162U003",
  "course_title": "(10 ECTS) Quantitative Research Methods",
  "credit": "10",
  "placement": "Autumn semester 2016",
  "contracting_department": "Danish School of Education",
  "coordinators": [
    "Not specified",
    "ECTS 10 Level Master Language of instr

KeyboardInterrupt: 

# QA & Viz