In [None]:
from bs4 import BeautifulSoup
from selenium import webdriver 
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options 
import os 
import re
from tqdm import tqdm
import time
import csv
import urllib.request
from datetime import datetime
import html

In [None]:

# 1. 基础配置
chrome_options = Options()
chrome_options.add_argument('--headless=new')
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')

# 2. 核心：指定容器内 Chromium 的二进制路径
chrome_options.binary_location = "/usr/bin/chromium"

# 3. 核心：指定容器内 ChromeDriver 的服务路径
# 注意：这一步是解决 "linux/aarch64" 报错的关键
service = Service(executable_path="/usr/bin/chromedriver")

# 4. 启动
driver = webdriver.Chrome(service=service, options=chrome_options)

In [98]:
MAIN_URL = "https://www.straitstimes.com/breaking-news"
BASE_URL = "https://www.straitstimes.com/"


In [99]:
response = urllib.request.urlopen(MAIN_URL)
responseData = response.read()
soup = BeautifulSoup(responseData, "html.parser")
article_url_list = []
article_info = soup.find_all("a", {"class": re.compile("flex select-none gap-x-04.*"), 'aria-label': 'link', 'href':re.compile("^/(?!multimedia)")})
for article in article_info:
    article_url_list.append(BASE_URL + article['href'][1:])
for url in article_url_list:
    print(url)
print(len(article_url_list))

https://www.straitstimes.com/singapore/van-tipped-on-its-side-after-accident-with-bus-outside-boon-lay-interchange-van-driver-hurt
https://www.straitstimes.com/singapore/courts-crime/probation-for-teen-who-entered-mrt-tracks-during-operational-hours-and-filmed-train
https://www.straitstimes.com/singapore/courts-crime/jail-for-man-who-misappropriated-over-2-5m-in-company-assets-while-he-was-coo-of-sgx-listed-firm
https://www.straitstimes.com/singapore/motorcyclist-dies-after-accident-with-car-on-sle
https://www.straitstimes.com/singapore/police-appeal-for-information-on-missing-man-42-last-seen-in-ubi
https://www.straitstimes.com/singapore/courts-crime/probation-for-ex-tanjong-pagar-united-youth-footballer-who-punched-2-players-from-opposing-team
https://www.straitstimes.com/singapore/jobs/how-are-singapore-youth-hashing-out-their-career-aspirations
https://www.straitstimes.com/singapore/courts-crime/ex-lovers-fight-over-money-after-30-year-affair-ends-high-court-allows-womans-578000-cl

In [100]:
def clean_text(text):
    import html
    if not text or not isinstance(text, str):
        return ""
    
    # 1. 还原 HTML 实体 (防止 &quot; 或 &apos; 残留)
    text = html.unescape(text)
    
    # 2. 定义要删除的“引号黑名单”
    # 包含：半角单双引号(" ')，全角单双引号(“ ” ‘ ’)，以及中文引号（﹃ ﹄ 等）
    # 甚至包括可能已经存在的反斜杠转义引号
    quotes_pattern = r'["\'“”‘’\'\\]' 
    
    # 3. 使用正则直接替换为空
    text = re.sub(quotes_pattern, '', text)
    
    # 4. 映射其他特殊符号 (如长破折号或特殊空格)
    punctuation_map = {
        '—': '-', 
        '\xa0': ' ',
        '\u200b': '' # 零宽空格
    }
    text = text.translate(str.maketrans(punctuation_map))
    
    # 5. 最后清理空白符：去掉换行并合并多余空格
    text = " ".join(text.split())
    
    return text.strip()

In [101]:
def extract_date(element):
    """
    通用日期提取函数：
    从元素中提取符合 'Jan 06, 2026, 06:50 AM' 格式的字符串
    """
    if not element:
        return ""
    # 使用 " " 分隔，防止文本粘连，并清理 HTML 实体
    text = element.get_text(" ", strip=True).replace("\xa0", " ")
    
    # 正则逻辑：匹配月份(3字母) + 日(1-2位) + 年(4位) + 后续所有时间信息
    date_pattern = r'[A-Z][a-z]{2}\s\d{1,2},\s\d{4}.*'
    match = re.search(date_pattern, text)
    
    return match.group(0).strip() if match else text.strip()

In [None]:
all_records = []
seen_urls = set()
for i in tqdm(range(len(article_url_list)), desc="Scraping Pages"):
#for i in tqdm(range(1), desc="Scraping Pages"):
    #url = "https://www.straitstimes.com/singapore/how-kwang-hwee-takes-over-as-police-commissioner"
    url = article_url_list[i]
    if url in seen_urls:
            continue
    seen_urls.add(url)
    
    title = ""
    publish_date = ""
    update_date = ""
    img_url = ""
    caption_text = ""
    tags_list = None
    full_article = ""

    driver.get(url)
    time.sleep(2)
                
    html = driver.page_source
    soup = BeautifulSoup(html, "html.parser")

    # 1. get title
    selector = '.container h1.font-display-xxl-semibold[data-testid="heading-test-id"]'

    title_element = soup.select_one(selector)

    if title_element:
        title = title_element.get_text(strip=True)
    else:
        title_tag = soup.select_one('h1[itemprop="headline"]')
        if title_tag:
            title = title_tag.get_text(strip=True)
    
    # 2. get publish time
    wrappers = soup.select('.social-timestamp-wrapper')

    if wrappers:
        # 逻辑 1: 处理 social-timestamp-wrapper 结构
        p_elements = wrappers[0].select('[data-testid="paragraph-test-id"]')
        if len(p_elements) > 0:
            publish_date = extract_date(p_elements[0])
        if len(p_elements) > 1:
            update_date = extract_date(p_elements[1])

    else:
        # 逻辑 2: 处理 group-story-timestamp 结构
        parent_container = soup.select_one('.group-story-timestamp')
        if parent_container:
            postdate_containers = parent_container.select('.group-story-postdate')
            
            if len(postdate_containers) > 0:
                # 提取第一个 postdate 下的日期
                target = postdate_containers[0].select_one('.story-postdate')
                publish_date = extract_date(target)
                
            if len(postdate_containers) > 1:
                print('0k3')
                # 提取第二个 postdate 下的日期
                target = postdate_containers[1].select_one('.story-postdate')
                update_date = extract_date(target)

    # 3. get picture
    hero_wrapper = soup.select_one('.hero-media-wrapper')

    if hero_wrapper:
        img_tag = hero_wrapper.select_one('img[data-testid="hero-media-content-test-id"]')
        img_url = img_tag['src'] if img_tag else ""
        caption_div = hero_wrapper.select_one('.hero-media-caption')
        caption_text = caption_div.get_text(strip=True) if caption_div else ""
    
    # 4. get tags
    tags_container = soup.select_one('div.flex.w-full.flex-wrap.gap-16')

    if not tags_container:
        # 备选方案：如果上面的没找到，直接找包含按钮的那个 header 后面的兄弟节点
        tags_container = soup.select_one('[data-testid="content-block-header"] + div')

    if tags_container:
        # 这里的选择器直接定位到文字所在的 span，更准确
        tag_spans = tags_container.select('button[data-testid="button-test-id"] span')
        
        # 提取文字
        raw_tags = [s.get_text(strip=True) for s in tag_spans]
        
        # 解决单引号报错：在大括号外部处理 replace
        # 这里用 .format() 避开 Python f-string 内部不能写反斜杠的限制
        formatted_items = ["'{}'".format(t.replace("'", "\\'")) for t in raw_tags]
        
        # 拼接成结果字符串
        tags_list = "[" + ", ".join(formatted_items) + "]"
    else:
        tags_list = "[]"
    
    # 5. get article
    article_container = soup.select_one('.storyline-wrapper')

    if article_container:
        paragraphs = article_container.select('p[data-testid="article-paragraph-annotation-test-id"]')
        if paragraphs:
            article_text_list = [p.get_text(strip=True) for p in paragraphs if p.get_text(strip=True)]
            raw_article = " ".join(article_text_list)
            full_article = raw_article.replace('\n', '').replace('\r', '')
            full_article = " ".join(full_article.split())
    else:
        article_body_container = soup.select_one('.text-formatted.field--name-field-paragraph-text')
        if article_body_container:
            paragraphs = article_body_container.select('p')
            if paragraphs:
                article_text_list = [p.get_text(strip=True) for p in paragraphs if p.get_text(strip=True)]
                raw_article = " ".join(article_text_list)
                full_article = raw_article.replace('\n', '').replace('\r', '')
                full_article = " ".join(full_article.split())
        
    all_records.append(
        {
            'title' : clean_text(title),
            'publish_date' : publish_date,
            'update_date' : update_date,
            "img_url" : img_url,
            'caption_text' : clean_text(caption_text),
            'tags_list' : tags_list,
            'full_article' : clean_text(full_article),           
            'url' : url
        }
    )

for record in all_records:
    print(record)

Scraping Pages: 100%|██████████| 70/70 [03:58<00:00,  3.41s/it]

{'title': 'Van tipped on its side after accident with bus outside Boon Lay interchange; van driver hurt', 'publish_date': 'Jan 06, 2026, 02:50 PM', 'update_date': 'Jan 06, 2026, 02:51 PM', 'img_url': 'https://cassette.sphdigital.com.sg/image/straitstimes/bca060aa81ce48c121a960bc67b3d596d5f4a7c8f7164b4060602e3c5370b029', 'caption_text': 'A 49-year-old male driver was taken to the hospital conscious after the accident on Jan 6 outside Boon Lay bus interchange.PHOTO: SCREENGRAB FROM CARGO/XIAOHONGSHU', 'tags_list': "['Accidents - traffic', 'Accidents', 'Police']", 'full_article': 'SINGAPORE – A49-year-oldmanwas taken to the hospitalon themorning of Jan 6afterthevanhe was drivingwas involved in an accident with an SBS Transit bus in Jurong West. The police said they were alerted to the accident inJurong West Central 3 towards Jurong West Street 64,which is outside Boon Lay bus interchange, atabout 7.40am. The van driver wasconscious when he wastaken totheNational University Hospitalby the 




In [None]:
import os
from datetime import datetime
import csv
from pathlib import Path

# 1. 动态获取路径：确保在 Airflow 容器中指向 include/data
# 如果是在 include/ 文件夹下的脚本运行，推荐这样写：
BASE_DATA_DIR = "/usr/local/airflow/include/data"

# 2. 自动创建目录（防止文件夹不存在报错）
Path(BASE_DATA_DIR).mkdir(parents=True, exist_ok=True)

# 3. 生成文件名
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
file_path = os.path.join(BASE_DATA_DIR, f'news_{current_time}.csv')

In [None]:
current_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

with open(file_path, 'w', newline='', encoding='utf-8') as f:
    writer = csv.writer(f)
        
        # Write CSV header row
    writer.writerow([
            'title', 'publish_date', 'update_date', 'img_url', 'caption_text', 'tags_list', 'full_article', 'url'
        ])
        
        # Write data rows (one row per book)
    for record in all_records:
        writer.writerow([
            record['title'], record['publish_date'], record['update_date'], record['img_url'], record['caption_text'], record['tags_list'], record['full_article'], record['url']
                    ])