# scrape from sitemap which includes older articles

In [20]:
# pip install requests beautifulsoup4 lxml python-dateutil pytz
import csv, re, requests, pytz
from bs4 import BeautifulSoup
from dateutil import parser as dtparser

MONTHS = ["01","02","03","04","05","06","07", "08", "09"]
SITEMAP_TMPL = "https://www.straitstimes.com/sitemap/2025/{m}/feeds.xml"
TZ_SGT = pytz.timezone("Asia/Singapore")

HEADERS = {"User-Agent":(
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
    "AppleWebKit/537.36 (KHTML, like Gecko) "
    "Chrome/120.0 Safari/537.36"
)}

def get(url):
    r = requests.get(url, headers=HEADERS, timeout=30)
    if not r.encoding or r.encoding.lower()=="iso-8859-1":
        r.encoding = "utf-8"
    r.raise_for_status()
    return r.text

def parse_article(url):
    """Return dict(title,url,published_sgt) or None if parse fails."""
    html = get(url)
    soup = BeautifulSoup(html, "lxml")

    title = (soup.find("meta", {"property":"og:title"}) or {}).get("content")
    if not title:
        h1 = soup.find("h1")
        title = h1.get_text(" ", strip=True) if h1 else None

    pub = None
    for tag, attrs in [
        ("meta", {"property":"article:published_time"}),
        ("meta", {"name":"article:published_time"}),
        ("meta", {"itemprop":"datePublished"}),
        ("meta", {"name":"pubdate"}),
    ]:
        m = soup.find(tag, attrs=attrs)
        if m and m.get("content"):
            pub = m["content"].strip(); break
    if not pub:
        t = soup.find("time")
        if t and (t.get("datetime") or t.get("content")):
            pub = (t.get("datetime") or t.get("content")).strip()
    if not pub:
        m = re.search(r"Published\s+[A-Za-z]{3,9}\s+\d{1,2},\s+\d{4}(?:,\s+\d{1,2}:\d{2}\s*(?:AM|PM))?", soup.get_text("\n", strip=True))
        pub = m.group(0).replace("Published","").strip() if m else None

    if not title or not pub:
        return None

    dt = dtparser.parse(pub)
    dt = TZ_SGT.localize(dt) if dt.tzinfo is None else dt.astimezone(TZ_SGT)
    return {"title": title, "url": url, "published_sgt": dt.isoformat()}

def main(out_csv="Output/st_housing_2025_Jan_to_Sep_from_sitemaps.csv"):
    urls = []
    for m in MONTHS:
        xml = get(SITEMAP_TMPL.format(m=m))
        sx = BeautifulSoup(xml, "xml")
        for loc in sx.find_all("loc"):
            u = loc.get_text(strip=True)
            if "/singapore/housing/" in u and u.rstrip("/") != "https://www.straitstimes.com/singapore/housing":
                urls.append(u)

    # ---- FIXED SECTION ----
    seen = set()
    unique_urls = []
    for u in urls:
        if u not in seen:
            seen.add(u)
            unique_urls.append(u)
    urls = unique_urls
    # -----------------------

    print(f"Found {len(urls)} Housing URLs from Jan–Sep 2025 sitemaps.")

    rows = []
    for i, u in enumerate(urls, 1):
        try:
            art = parse_article(u)
            if art:
                rows.append(art)
                status = "OK"
            else:
                status = "SKIP"
        except Exception as e:
            status = f"ERR ({e})"
        print(f"[{i}/{len(urls)}] {status} - {u}")

    with open(out_csv, "w", newline="", encoding="utf-8-sig") as f:
        w = csv.DictWriter(f, fieldnames=["title","url","published_sgt"])
        w.writeheader()
        w.writerows(rows)

    print(f"Wrote {len(rows)} rows to {out_csv}")

if __name__ == "__main__":
    main()


Found 93 Housing URLs from Jan–Sep 2025 sitemaps.
[1/93] OK - https://www.straitstimes.com/singapore/housing/condo-sizes-shrank-in-past-15-years-as-developers-sought-to-keep-prices-manageable
[2/93] OK - https://www.straitstimes.com/singapore/housing/hdb-handing-over-long-delayed-bus-interchange-in-bidadari-to-lta
[3/93] OK - https://www.straitstimes.com/singapore/housing/rooftop-badminton-court-sheltered-exercise-spaces-among-upcoming-facilities-in-housing-estates
[4/93] OK - https://www.straitstimes.com/singapore/housing/site-of-former-driving-range-in-toa-payoh-east-being-prepared-for-new-homes
[5/93] OK - https://www.straitstimes.com/singapore/housing/private-home-prices-jumped-2-3-per-cent-in-q4-2024-on-novembers-new-sales
[6/93] OK - https://www.straitstimes.com/singapore/housing/hdb-resale-prices-up-9-7-in-2024-more-units-sold
[7/93] OK - https://www.straitstimes.com/singapore/housing/construction-demand-of-up-to-53-billion-expected-in-2025-bca
[8/93] OK - https://www.straitstim

# scrape article contents from output file

In [22]:
import re
import time
import pandas as pd
import requests
from bs4 import BeautifulSoup
from dateutil import parser as dtparser
import pytz

In [23]:
# ========= USER CONFIG =========
INPUT_CSV   = "Output/st_housing_2025_Jan_to_Sep_from_sitemaps.csv"  # your CSV from previous step
OUTPUT_XLSX = "Output/articles_st_housing_sitemap.xlsx"
REQUEST_TIMEOUT = 30
SLEEP_BETWEEN = 0.3  # politeness delay (seconds)
STOP_PHRASES = [
    # Add any end-of-article phrases you want to cut off at, e.g.:
    # "Join ST's Telegram channel",
    # "Get unlimited access",
]
# =================================

In [24]:
HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0 Safari/537.36"
    )
}

TZ_SGT = pytz.timezone("Asia/Singapore")

# Regex patterns for the header lines
PUBLISHED_RE = re.compile(
    r"^Published\s+[A-Za-z]{3,9}\s+\d{1,2},\s+\d{4}(?:,\s+\d{1,2}:\d{2}\s*(?:AM|PM))?\s*$",
    re.IGNORECASE
)
UPDATED_RE = re.compile(
    r"^Updated\s+[A-Za-z]{3,9}\s+\d{1,2},\s+\d{4}(?:,\s+\d{1,2}:\d{2}\s*(?:AM|PM))?\s*$",
    re.IGNORECASE
)

def get_html(url):
    r = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT)
    if not r.encoding or r.encoding.lower() == "iso-8859-1":
        r.encoding = "utf-8"
    r.raise_for_status()
    return r.text

def clean_text(txt: str) -> str:
    txt = txt.replace("\u00a0", " ")
    txt = re.sub(r"[ \t]+", " ", txt)
    txt = re.sub(r"\n{3,}", "\n\n", txt)
    return txt.strip()

def cut_at_stop_phrases(text: str, stops) -> str:
    if not stops:
        return text
    lower = text.lower()
    cut_idx = None
    for phrase in stops:
        i = lower.find(phrase.lower())
        if i != -1:
            cut_idx = i if cut_idx is None else min(cut_idx, i)
    return text[:cut_idx].rstrip() if cut_idx is not None else text

def is_promo_line(t: str) -> bool:
    t_low = t.lower()
    return ("sign up now" in t_low and "newsletter" in t_low) or ("newsletters delivered to your inbox" in t_low)

def is_photo_credit(t: str) -> bool:
    t_strip = t.strip()
    return t_strip.upper().startswith("PHOTO:") or t_strip.upper().startswith("ST PHOTO:")

def is_byline(t: str) -> bool:
    # Common byline patterns: "By John Doe", "By JOHN DOE", or just a short proper-name line
    if t.lower().startswith("by "):
        return True
    # Heuristic: short line with 1–4 tokens, each capitalised or uppercase (e.g., "Isabelle Liew", "JOHN TAN")
    tokens = t.strip().split()
    if 1 <= len(tokens) <= 4:
        cap_like = 0
        for w in tokens:
            if re.match(r"^[A-Z][a-z]+(-[A-Z][a-z]+)?$", w) or re.match(r"^[A-Z]{2,}$", w):
                cap_like += 1
        if cap_like == len(tokens):
            return True
    return False

def find_content_paragraphs(soup: BeautifulSoup):
    """
    Heuristically locate the paragraphs that comprise the article body.
    Returns a list of <p> elements in document order.
    """
    # Remove obvious non-content elements
    for sel in ["script", "style", "noscript", "header", "footer", "form", "aside"]:
        for tag in soup.select(sel):
            tag.decompose()

    # Likely containers first
    selectors = [
        '[itemprop="articleBody"]',
        '[property="articleBody"]',
        'article',
        ".article-body",
        ".article__content",
        ".content-body",
        ".rich-text",
        ".field--name-body",
        ".c-article-content",
        ".article-content",
        ".story-content",
        ".field-item",
    ]
    candidates = []
    for sel in selectors:
        nodes = soup.select(sel)
        if nodes:
            candidates.extend(nodes)

    if not candidates:
        main = soup.find("main")
        candidates = [main] if main else [soup]

    # Choose the first candidate that has enough paragraphs; else fallback
    best_ps = []
    for node in candidates:
        ps = [p for p in node.find_all("p") if p.get_text(strip=True)]
        if len(ps) >= 3:
            best_ps = ps
            break
        elif ps and not best_ps:
            best_ps = ps

    # If nothing, last resort: all paragraphs on the page
    if not best_ps:
        best_ps = [p for p in soup.find_all("p") if p.get_text(strip=True)]

    return best_ps

def extract_article_content_with_lead(html: str) -> str:
    """
    Keep one 'lead line' between the promo ('sign up now ... newsletters') and the 'Published ...' header,
    skipping photo credits and bylines. Then include everything after Published (and optional Updated).
    """
    soup = BeautifulSoup(html, "lxml")
    ps = find_content_paragraphs(soup)
    if not ps:
        return ""

    # Normalized paragraph texts
    p_texts = [clean_text(p.get_text(" ", strip=True)) for p in ps]

    # Locate indices of interest
    promo_idx = None
    published_idx = None
    for i, t in enumerate(p_texts):
        if promo_idx is None and is_promo_line(t):
            promo_idx = i
        if published_idx is None and (PUBLISHED_RE.match(t) or t.startswith("Published ")):
            published_idx = i
            break

    # If no 'Published' header, fallback to entire body
    if published_idx is None:
        content = clean_text("\n\n".join(p_texts))
        return cut_at_stop_phrases(content, STOP_PHRASES)

    # Choose lead line: scan from (promo_idx+1 if found else max(published_idx-6,0))
    lead_line = ""
    scan_start = (promo_idx + 1) if promo_idx is not None else max(published_idx - 6, 0)
    for j in range(scan_start, published_idx):
        cand = p_texts[j]
        if not cand:
            continue
        if is_photo_credit(cand) or is_byline(cand) or is_promo_line(cand):
            continue
        # Prefer a substantive sentence (ends with .!? or length threshold)
        if len(cand) >= 40 or re.search(r"[.!?]$", cand):
            lead_line = cand
            break

    # Skip 'Published ...' and optional 'Updated ...'
    start_idx = published_idx + 1
    if start_idx < len(p_texts) and (UPDATED_RE.match(p_texts[start_idx]) or p_texts[start_idx].startswith("Updated ")):
        start_idx += 1

    # Join parts
    parts = []
    if lead_line:
        parts.append(lead_line)
    parts.extend(p_texts[start_idx:])

    content = clean_text("\n\n".join(parts))
    content = cut_at_stop_phrases(content, STOP_PHRASES)

    # Fallback if empty
    if not content:
        content = clean_text("\n\n".join(p_texts))
        content = cut_at_stop_phrases(content, STOP_PHRASES)

    return content

def main():
    df = pd.read_csv(INPUT_CSV, encoding="utf-8")
    if "url" not in df.columns:
        raise ValueError("Input CSV must contain a 'url' column.")
    urls = df["url"].dropna().astype(str).tolist()
    print(f"Loaded {len(urls)} URLs from CSV.")
    urls = list(dict.fromkeys(urls))  # preserve order, deduplicate
    print(f"Unique URLs to scrape: {len(urls)}")

    contents, lengths = [], []
    for i, url in enumerate(urls, 1):
        try:
            html = get_html(url)
            content = extract_article_content_with_lead(html)
        except Exception as e:
            print(f"[{i}/{len(urls)}] ERROR: {url} -> {e}")
            content = ""

        char_count = len(content)
        contents.append(content)
        lengths.append(char_count)
        print(f"[{i}/{len(urls)}] Scraped. chars={char_count} | {url}")
        time.sleep(SLEEP_BETWEEN)

    df_out = df.copy()
    df_out["content"] = contents
    df_out["char_count"] = lengths

    with pd.ExcelWriter(OUTPUT_XLSX, engine="openpyxl") as writer:
        df_out.to_excel(writer, sheet_name="articles", index=False)

    print(f"Done. Wrote {len(df_out)} rows to {OUTPUT_XLSX}")

if __name__ == "__main__":
    main()


Loaded 93 URLs from CSV.
Unique URLs to scrape: 93
[1/93] Scraped. chars=7308 | https://www.straitstimes.com/singapore/housing/condo-sizes-shrank-in-past-15-years-as-developers-sought-to-keep-prices-manageable
[2/93] Scraped. chars=2807 | https://www.straitstimes.com/singapore/housing/hdb-handing-over-long-delayed-bus-interchange-in-bidadari-to-lta
[3/93] Scraped. chars=5506 | https://www.straitstimes.com/singapore/housing/rooftop-badminton-court-sheltered-exercise-spaces-among-upcoming-facilities-in-housing-estates
[4/93] Scraped. chars=5367 | https://www.straitstimes.com/singapore/housing/site-of-former-driving-range-in-toa-payoh-east-being-prepared-for-new-homes
[5/93] Scraped. chars=5360 | https://www.straitstimes.com/singapore/housing/private-home-prices-jumped-2-3-per-cent-in-q4-2024-on-novembers-new-sales
[6/93] Scraped. chars=4056 | https://www.straitstimes.com/singapore/housing/hdb-resale-prices-up-9-7-in-2024-more-units-sold
[7/93] Scraped. chars=4990 | https://www.straitstim

# dropped: scrape list of ST articles on housing page

In [None]:
# requirements:
#   pip install selenium webdriver-manager beautifulsoup4 lxml python-dateutil pytz requests

import csv
import re
import time
from urllib.parse import urlparse

import pytz
import requests
from bs4 import BeautifulSoup
from dateutil import parser as dtparser

from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager

In [None]:
# configuration
BASE = "https://www.straitstimes.com/singapore/housing"

# Date window (inclusive) in Asia/Singapore
TZ_SGT = pytz.timezone("Asia/Singapore")
START_DATE_SGT = TZ_SGT.localize(dtparser.parse("2025-01-01 00:00:00"))
END_DATE_SGT   = TZ_SGT.localize(dtparser.parse("2025-09-30 23:59:59"))

In [10]:
HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0 Safari/537.36"
    )
}

def setup_driver():
    opts = webdriver.ChromeOptions()
    opts.add_argument("--headless=new")  # remove to watch the browser
    opts.add_argument("--no-sandbox")
    opts.add_argument("--disable-gpu")
    opts.add_argument("--window-size=1400,900")
    opts.add_argument(f"--user-agent={HEADERS['User-Agent']}")
    # Mild stealth
    opts.add_experimental_option("excludeSwitches", ["enable-automation"])
    opts.add_experimental_option("useAutomationExtension", False)
    driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=opts)
    driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
        "source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
    })
    return driver

def safe_click(driver, el):
    driver.execute_script("arguments[0].scrollIntoView({block:'center'});", el)
    time.sleep(0.25)
    try:
        el.click()
    except Exception:
        driver.execute_script("arguments[0].click();", el)

def dismiss_cookie_banner(driver):
    for xp in [
        "//button[contains(translate(., 'ACEPTLDOSR', 'aceptldosr'),'accept')]",
        "//button[contains(., 'Accept & Close')]",
        "//button[contains(., 'Got it')]",
        "//button[contains(., 'I Accept')]",
        "//button[contains(translate(., 'ALLOWOK', 'allowok'),'ok')]",
    ]:
        try:
            btn = WebDriverWait(driver, 3).until(EC.element_to_be_clickable((By.XPATH, xp)))
            safe_click(driver, btn)
            time.sleep(0.4)
            print("Cookie/consent banner dismissed.")
            break
        except Exception:
            pass

def extract_listing_urls(driver):
    """Return the set of article-like URLs currently present in the DOM."""
    urls = set()
    for a in driver.find_elements(By.CSS_SELECTOR, "a[href]"):
        href = (a.get_attribute("href") or "").split("?")[0]
        if not href:
            continue
        path = urlparse(href).path
        if path.startswith("/singapore/housing/") and path.rstrip("/") != "/singapore/housing":
            urls.add(href)
    return urls

def get_html(url):
    r = requests.get(url, headers=HEADERS, timeout=30)
    if not r.encoding or r.encoding.lower() == "iso-8859-1":
        r.encoding = "utf-8"
    r.raise_for_status()
    return r.text

def parse_article(url):
    """
    Return (title, dt_sgt, url) or None if cannot parse.
    """
    try:
        html = get_html(url)
    except Exception as e:
        print(f"  ! Fetch failed: {url} ({e})")
        return None

    soup = BeautifulSoup(html, "lxml")

    # Title
    title = (soup.find("meta", {"property": "og:title"}) or {}).get("content")
    if not title:
        h1 = soup.find("h1")
        if h1:
            title = h1.get_text(" ", strip=True)
    if not title:
        print(f"  ! No title: {url}")
        return None

    # Published time
    pub = None
    for tag, attrs in [
        ("meta", {"property": "article:published_time"}),
        ("meta", {"name": "article:published_time"}),
        ("meta", {"itemprop": "datePublished"}),
        ("meta", {"name": "pubdate"}),
    ]:
        m = soup.find(tag, attrs=attrs)
        if m and m.get("content"):
            pub = m["content"].strip()
            break
    if not pub:
        t = soup.find("time")
        if t and (t.get("datetime") or t.get("content")):
            pub = (t.get("datetime") or t.get("content")).strip()
    if not pub:
        text = soup.get_text("\n", strip=True)
        m = re.search(r"Published\s+([A-Za-z]{3,9}\s+\d{1,2},\s+\d{4}(?:,\s+\d{1,2}:\d{2}\s*(?:AM|PM))?)", text)
        if m:
            pub = m.group(1)
    if not pub:
        print(f"  ! No published date: {url}")
        return None

    try:
        dt = dtparser.parse(pub, dayfirst=False)
        if dt.tzinfo is None:
            dt = TZ_SGT.localize(dt)
        else:
            dt = dt.astimezone(TZ_SGT)
    except Exception as e:
        print(f"  ! Date parse failed: {url} ({e})")
        return None

    return title, dt, url

def collect_until_startdate(driver, max_clicks=300, wait_after_click=1.2):
    """
    Click 'Load more' repeatedly until ANY article date is older than START_DATE_SGT.
    Returns dict url -> (title, dt_sgt, url) for all parsed items seen.
    """
    driver.get(BASE)
    wait = WebDriverWait(driver, 10)
    dismiss_cookie_banner(driver)

    parsed = {}      # url -> (title, dt_sgt, url)
    seen_urls = set()

    def parse_new(urls):
        oldest = None
        added = 0
        for u in sorted(urls):
            if u in parsed:
                dt = parsed[u][1]
            else:
                p = parse_article(u)
                if not p:
                    continue
                parsed[u] = p
                dt = p[1]
                added += 1
            if oldest is None or dt < oldest:
                oldest = dt
        return oldest, added

    # First page
    current = extract_listing_urls(driver)
    seen_urls |= current
    print(f"Found {len(current)} candidate URLs on first page.")
    oldest_seen, added = parse_new(current)
    print(f"Parsed {added} new articles from first page. Oldest seen so far: {oldest_seen}")

    clicks = 0
    while clicks < max_clicks:
        # Stop once any article is older than the start date (we’ve reached before 2025-01-01)
        if oldest_seen and oldest_seen < START_DATE_SGT:
            print(f"Stop condition reached: oldest_seen {oldest_seen} < START_DATE {START_DATE_SGT}")
            break

        # Find and click Load more
        loadmore = None
        for xp in [
            "//button[normalize-space()='Load more']",
            "//button[contains(., 'Load more')]",
            "//a[contains(., 'Load more')]",
            "//button[contains(@class,'load') and contains(.,'more')]"
        ]:
            try:
                loadmore = wait.until(EC.element_to_be_clickable((By.XPATH, xp)))
                break
            except Exception:
                continue

        if not loadmore:
            print("No 'Load more' button found. Stopping.")
            break

        clicks += 1
        safe_click(driver, loadmore)
        time.sleep(wait_after_click)

        after = extract_listing_urls(driver)
        new_urls = after - seen_urls
        if not new_urls:
            # Nudge bottom for lazy load
            driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
            time.sleep(0.8)
            after2 = extract_listing_urls(driver)
            new_urls = after2 - seen_urls

        print(f"[Click {clicks}] New URLs discovered: {len(new_urls)} (total candidates so far: {len(after)})")

        if not new_urls:
            print("No new URLs after click; stopping.")
            break

        seen_urls |= new_urls
        oldest_new, added = parse_new(new_urls)
        if oldest_seen is None or (oldest_new and oldest_new < oldest_seen):
            oldest_seen = oldest_new
        print(f"[Click {clicks}] Parsed {added} new articles. Oldest seen now: {oldest_seen}")

    print(f"Total parsed articles: {len(parsed)}")
    return parsed

def main(output_csv="Output/st_housing_2025-01-01_to_2025-09-30.csv"): # rename output file to desired
    print(f"Target window: {START_DATE_SGT} to {END_DATE_SGT} (inclusive)")
    driver = setup_driver()
    try:
        parsed = collect_until_startdate(driver)
    finally:
        driver.quit()

    # Keep only items within the inclusive window
    rows = []
    for (title, dt_sgt, url) in parsed.values():
        if START_DATE_SGT <= dt_sgt <= END_DATE_SGT:
            rows.append({
                "title": title,
                "url": url,
                "published_sgt": dt_sgt.isoformat()
            })

    # Dedup by URL
    dedup = {}
    for r in rows:
        dedup.setdefault(r["url"], r)

    print(f"Articles in window (after filtering & dedup): {len(dedup)}")
    with open(output_csv, "w", newline="", encoding="utf-8-sig") as f:
        writer = csv.DictWriter(f, fieldnames=["title", "url", "published_sgt"])
        writer.writeheader()
        writer.writerows(dedup.values())

    print(f"Wrote {len(dedup)} rows to {output_csv}")

if __name__ == "__main__":
    main()


Target window: 2025-01-01 00:00:00+08:00 to 2025-09-30 23:59:59+08:00 (inclusive)
Found 10 candidate URLs on first page.
Parsed 10 new articles from first page. Oldest seen so far: 2025-09-15 22:58:00+08:00
[Click 1] New URLs discovered: 10 (total candidates so far: 20)
[Click 1] Parsed 10 new articles. Oldest seen now: 2025-09-01 05:00:00+08:00
[Click 2] New URLs discovered: 10 (total candidates so far: 30)
[Click 2] Parsed 10 new articles. Oldest seen now: 2025-07-30 18:35:00+08:00
[Click 3] New URLs discovered: 6 (total candidates so far: 36)
[Click 3] Parsed 6 new articles. Oldest seen now: 2025-07-23 10:43:08+08:00
No 'Load more' button found. Stopping.
Total parsed articles: 36
Articles in window (after filtering & dedup): 30
Wrote 30 rows to Output/st_housing_2025-01-01_to_2025-09-30.csv


# dropped: scrape contents of articles from output file

In [15]:
import re
import time
import pandas as pd
import requests
from bs4 import BeautifulSoup
from dateutil import parser as dtparser
import pytz

In [16]:
# ========= USER CONFIG =========
INPUT_CSV   = "Output/st_housing_2025-01-01_to_2025-09-30.csv"  # your CSV from previous step
OUTPUT_XLSX = "Output/articles_st_housing.xlsx"
REQUEST_TIMEOUT = 30
SLEEP_BETWEEN = 0.3  # politeness delay (seconds)
STOP_PHRASES = [
    # Add any end-of-article phrases you want to cut off at, e.g.:
    # "Join ST's Telegram channel",
    # "Get unlimited access",
]
# =================================

In [17]:
HEADERS = {
    "User-Agent": (
        "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
        "AppleWebKit/537.36 (KHTML, like Gecko) "
        "Chrome/120.0 Safari/537.36"
    )
}

TZ_SGT = pytz.timezone("Asia/Singapore")

# Regex patterns for the header lines
PUBLISHED_RE = re.compile(
    r"^Published\s+[A-Za-z]{3,9}\s+\d{1,2},\s+\d{4}(?:,\s+\d{1,2}:\d{2}\s*(?:AM|PM))?\s*$",
    re.IGNORECASE
)
UPDATED_RE = re.compile(
    r"^Updated\s+[A-Za-z]{3,9}\s+\d{1,2},\s+\d{4}(?:,\s+\d{1,2}:\d{2}\s*(?:AM|PM))?\s*$",
    re.IGNORECASE
)

def get_html(url):
    r = requests.get(url, headers=HEADERS, timeout=REQUEST_TIMEOUT)
    if not r.encoding or r.encoding.lower() == "iso-8859-1":
        r.encoding = "utf-8"
    r.raise_for_status()
    return r.text

def clean_text(txt: str) -> str:
    txt = txt.replace("\u00a0", " ")
    txt = re.sub(r"[ \t]+", " ", txt)
    txt = re.sub(r"\n{3,}", "\n\n", txt)
    return txt.strip()

def cut_at_stop_phrases(text: str, stops) -> str:
    if not stops:
        return text
    lower = text.lower()
    cut_idx = None
    for phrase in stops:
        i = lower.find(phrase.lower())
        if i != -1:
            cut_idx = i if cut_idx is None else min(cut_idx, i)
    return text[:cut_idx].rstrip() if cut_idx is not None else text

def is_promo_line(t: str) -> bool:
    t_low = t.lower()
    return ("sign up now" in t_low and "newsletter" in t_low) or ("newsletters delivered to your inbox" in t_low)

def is_photo_credit(t: str) -> bool:
    t_strip = t.strip()
    return t_strip.upper().startswith("PHOTO:") or t_strip.upper().startswith("ST PHOTO:")

def is_byline(t: str) -> bool:
    # Common byline patterns: "By John Doe", "By JOHN DOE", or just a short proper-name line
    if t.lower().startswith("by "):
        return True
    # Heuristic: short line with 1–4 tokens, each capitalised or uppercase (e.g., "Isabelle Liew", "JOHN TAN")
    tokens = t.strip().split()
    if 1 <= len(tokens) <= 4:
        cap_like = 0
        for w in tokens:
            if re.match(r"^[A-Z][a-z]+(-[A-Z][a-z]+)?$", w) or re.match(r"^[A-Z]{2,}$", w):
                cap_like += 1
        if cap_like == len(tokens):
            return True
    return False

def find_content_paragraphs(soup: BeautifulSoup):
    """
    Heuristically locate the paragraphs that comprise the article body.
    Returns a list of <p> elements in document order.
    """
    # Remove obvious non-content elements
    for sel in ["script", "style", "noscript", "header", "footer", "form", "aside"]:
        for tag in soup.select(sel):
            tag.decompose()

    # Likely containers first
    selectors = [
        '[itemprop="articleBody"]',
        '[property="articleBody"]',
        'article',
        ".article-body",
        ".article__content",
        ".content-body",
        ".rich-text",
        ".field--name-body",
        ".c-article-content",
        ".article-content",
        ".story-content",
        ".field-item",
    ]
    candidates = []
    for sel in selectors:
        nodes = soup.select(sel)
        if nodes:
            candidates.extend(nodes)

    if not candidates:
        main = soup.find("main")
        candidates = [main] if main else [soup]

    # Choose the first candidate that has enough paragraphs; else fallback
    best_ps = []
    for node in candidates:
        ps = [p for p in node.find_all("p") if p.get_text(strip=True)]
        if len(ps) >= 3:
            best_ps = ps
            break
        elif ps and not best_ps:
            best_ps = ps

    # If nothing, last resort: all paragraphs on the page
    if not best_ps:
        best_ps = [p for p in soup.find_all("p") if p.get_text(strip=True)]

    return best_ps

def extract_article_content_with_lead(html: str) -> str:
    """
    Keep one 'lead line' between the promo ('sign up now ... newsletters') and the 'Published ...' header,
    skipping photo credits and bylines. Then include everything after Published (and optional Updated).
    """
    soup = BeautifulSoup(html, "lxml")
    ps = find_content_paragraphs(soup)
    if not ps:
        return ""

    # Normalized paragraph texts
    p_texts = [clean_text(p.get_text(" ", strip=True)) for p in ps]

    # Locate indices of interest
    promo_idx = None
    published_idx = None
    for i, t in enumerate(p_texts):
        if promo_idx is None and is_promo_line(t):
            promo_idx = i
        if published_idx is None and (PUBLISHED_RE.match(t) or t.startswith("Published ")):
            published_idx = i
            break

    # If no 'Published' header, fallback to entire body
    if published_idx is None:
        content = clean_text("\n\n".join(p_texts))
        return cut_at_stop_phrases(content, STOP_PHRASES)

    # Choose lead line: scan from (promo_idx+1 if found else max(published_idx-6,0))
    lead_line = ""
    scan_start = (promo_idx + 1) if promo_idx is not None else max(published_idx - 6, 0)
    for j in range(scan_start, published_idx):
        cand = p_texts[j]
        if not cand:
            continue
        if is_photo_credit(cand) or is_byline(cand) or is_promo_line(cand):
            continue
        # Prefer a substantive sentence (ends with .!? or length threshold)
        if len(cand) >= 40 or re.search(r"[.!?]$", cand):
            lead_line = cand
            break

    # Skip 'Published ...' and optional 'Updated ...'
    start_idx = published_idx + 1
    if start_idx < len(p_texts) and (UPDATED_RE.match(p_texts[start_idx]) or p_texts[start_idx].startswith("Updated ")):
        start_idx += 1

    # Join parts
    parts = []
    if lead_line:
        parts.append(lead_line)
    parts.extend(p_texts[start_idx:])

    content = clean_text("\n\n".join(parts))
    content = cut_at_stop_phrases(content, STOP_PHRASES)

    # Fallback if empty
    if not content:
        content = clean_text("\n\n".join(p_texts))
        content = cut_at_stop_phrases(content, STOP_PHRASES)

    return content

def main():
    df = pd.read_csv(INPUT_CSV, encoding="utf-8")
    if "url" not in df.columns:
        raise ValueError("Input CSV must contain a 'url' column.")
    urls = df["url"].dropna().astype(str).tolist()
    print(f"Loaded {len(urls)} URLs from CSV.")
    urls = list(dict.fromkeys(urls))  # preserve order, deduplicate
    print(f"Unique URLs to scrape: {len(urls)}")

    contents, lengths = [], []
    for i, url in enumerate(urls, 1):
        try:
            html = get_html(url)
            content = extract_article_content_with_lead(html)
        except Exception as e:
            print(f"[{i}/{len(urls)}] ERROR: {url} -> {e}")
            content = ""

        char_count = len(content)
        contents.append(content)
        lengths.append(char_count)
        print(f"[{i}/{len(urls)}] Scraped. chars={char_count} | {url}")
        time.sleep(SLEEP_BETWEEN)

    df_out = df.copy()
    df_out["content"] = contents
    df_out["char_count"] = lengths

    with pd.ExcelWriter(OUTPUT_XLSX, engine="openpyxl") as writer:
        df_out.to_excel(writer, sheet_name="articles", index=False)

    print(f"Done. Wrote {len(df_out)} rows to {OUTPUT_XLSX}")

if __name__ == "__main__":
    main()


Loaded 30 URLs from CSV.
Unique URLs to scrape: 30
[1/30] Scraped. chars=2958 | https://www.straitstimes.com/singapore/housing/60-winners-picked-from-among-12600-photo-competition-entries-showcasing-hdb-heartlands
[2/30] Scraped. chars=7214 | https://www.straitstimes.com/singapore/housing/more-bto-flats-to-be-built-in-new-berlayar-estate-on-former-keppel-club-site
[3/30] Scraped. chars=4705 | https://www.straitstimes.com/singapore/housing/new-private-home-sales-lifted-by-flurry-of-launches-before-hungry-ghost-month-low-interest-rates
[4/30] Scraped. chars=6953 | https://www.straitstimes.com/singapore/housing/pasir-ris-to-get-more-homes-next-to-mrt-station-potential-new-neighbourhood
[5/30] Scraped. chars=4598 | https://www.straitstimes.com/singapore/housing/10-ex-staff-of-interior-design-firm-make-reports-over-unpaid-commissions-director-disputes-amounts
[6/30] Scraped. chars=4763 | https://www.straitstimes.com/singapore/housing/5-ways-mount-pleasants-old-police-academy-will-get-a-new-