In [None]:
# 使用 BeautifulSoup、requests 來擷取 Wayback Machine 上的網頁內容並存成

## 簡介

這支程式的目的是協助使用者擷取特定年月、類別的文章列表，
透過 Wayback CDX API 與`requests`、`BeautifulSoup`取得文章 url、Wayback 快取的網址、文章 id，能夠初步查看總共有哪些文章。

為了加快速度，使用了多執行緒來加快程式運作。

```{tip}
以下皆是在Python3中執行。
在開始之前，建議先開一個虛擬環境，避免衝突。
```

     python3 -m venv .venv  # 建立虛擬環境 (資料夾名稱可自訂，一般用 .venv 或 venv)
    source .venv/bin/activate # 啟動虛擬環境 macOS / Linux
    .venv\Scripts\activate # Windows

```

```

````{note}
這支程式會使用 `requests`、`Beautiful Soup`這個套件與 Wayback Machine 做互動。
在開始之前，請先安裝：
    ```
        pip install requests BeautifulSoup
    ```
````

```python
import os
import time
import json
import zipfile
import requests
import pandas as pd
from bs4 import BeautifulSoup
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse
```

```python
# settings
INPUT_CSV = "example.csv"  # Input file with article URLs
URL_FIELD = "uri"
ID_FIELD = "id"
THREADS = 5
N_LIMIT = 10                    # None = all rows
OUTPUT_PREFIX = "output_"
SKIP_FILE_NAME = "skipped_urls.txt"
```

```python
# Utility functions
def _only_date(s: str) -> str:
    """
    Normalize different datetime formats into YYYY-MM-DD.
    Supported inputs include:
    - '20240207123456' (CDX timestamp)
    - '2024-02-07T12:34:56+08:00'
    - '2024-02-07 12:34:56'
    - '2024-02-07'
    Returns empty string if parsing fails.
    """
    if not s:
        return ""
    s = s.strip()
    if s.isdigit() and len(s) == 14:  # CDX format
        try:
            return datetime.strptime(s, "%Y%m%d%H%M%S").strftime("%Y-%m-%d")
        except Exception:
            pass
    for fmt in ("%Y-%m-%dT%H:%M:%S%z",
                "%Y-%m-%dT%H:%M:%S",
                "%Y-%m-%d %H:%M:%S",
                "%Y-%m-%d"):
        try:
            dt = datetime.strptime(s.replace("Z", "+0000"), fmt)
            return dt.strftime("%Y-%m-%d")
        except Exception:
            continue
    if len(s) >= 10 and s[4] == "-" and s[7] == "-":
        return s[:10]
    return ""


def _first_path_segment(raw_url: str) -> str:
    """
    Return the first path segment of a URL.
    Example: /local/20200729/... -> 'local'
    """
    try:
        p = urlparse(raw_url)
        parts = [seg for seg in p.path.split("/") if seg]
        return parts[0] if parts else ""
    except Exception:
        return ""


def _ensure_dir(path: str):
    os.makedirs(path, exist_ok=True)
```

```python
# Wayback Scraper
class WaybackScraper:
    def __init__(self, img_root_dir):
        self.session = requests.Session()
        self.session.headers.update({'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'})
        self.img_root_dir = img_root_dir
        _ensure_dir(self.img_root_dir)

    def get_latest_snapshot(self, url):
        """
        Query the CDX API for the latest snapshot of a given URL.
        Returns timestamp, original URL, and constructed Wayback URL.
        """
        api = "https://web.archive.org/cdx/search/cdx"
        params = {
            'url': url,
            'output': 'json',
            'filter': 'statuscode:200',
            'sort': 'reverse',
            'limit': 1
        }
        try:
            r = self.session.get(api, params=params, timeout=20)
            r.raise_for_status()
            data = r.json()
            if len(data) > 1:
                row = data[1]
                return {
                    'timestamp': row[1],
                    'original_url': row[2],
                    'wayback_url': f"https://web.archive.org/web/{row[1]}/{row[2]}"
                }
        except Exception as e:
            print(f"[CDX error] {url} → {e}, params={params}")
        return None

    def _extract_published_date(self, soup, wayback_ts: str) -> str:
        """
        Try to extract a published date from <meta property='article:published_time'>
        or <time datetime>. Fall back to Wayback timestamp if none are found.
        """
        meta = soup.find("meta", attrs={"property": "article:published_time"})
        if meta and meta.get("content"):
            d = _only_date(meta.get("content"))
            if d:
                return d
        t = soup.find("time")
        if t:
            d = _only_date(t.get("datetime") or t.get_text(strip=True))
            if d:
                return d
        return _only_date(wayback_ts)

    def scrape_article(self, wayback_url: str, raw_url: str, wayback_ts: str):
        """
        Parse a Wayback snapshot page to extract:
        - title
        - body text (concatenated <p> tags)
        - publication date
        - category (from URL path)
        - image URLs (first = cover, rest = others)
        """
        try:
            resp = self.session.get(wayback_url, timeout=20)
            resp.raise_for_status()
            soup = BeautifulSoup(resp.content, 'html.parser')

            # Title
            title_tag = soup.find("h1") or soup.find("title")
            title = title_tag.get_text(strip=True) if title_tag else ""

            # Body text
            bodies = "\n".join(
                p.get_text().strip()
                for p in soup.find_all("p")
                if p.get_text().strip()
            )

            # Dates
            firstcreated = self._extract_published_date(soup, wayback_ts)
            contentcreated = firstcreated
            versioncreated = _only_date(wayback_ts)

            # Collect images
            all_img_urls = []
            for img in soup.find_all("img"):
                src = img.get("src")
                if not src:
                    continue
                if src.startswith("//"):
                    src = "https:" + src
                elif not src.startswith("http"):
                    src = f"https://web.archive.org{src}"
                if src not in all_img_urls:
                    all_img_urls.append(src)

            subject = _first_path_segment(raw_url)

            return {
                "title": title,
                "bodies": bodies,
                "firstcreated": firstcreated,
                "versioncreated": versioncreated,
                "contentcreated": contentcreated,
                "subject": subject,
                "all_img_urls": all_img_urls
            }
        except Exception as e:
            print(f"[Parse error] {wayback_url} (raw={raw_url}) → {e}")
            return None

    def download_images(self, urls, item_id):
        """
        Download all images for an article.
        Saved to images/{item_id}/img/
        Filenames: {item_id}_cover_1.jpg (first), {item_id}_other_2.jpg (rest).
        Returns a list of filenames aligned with input URLs.
        """
        saved_files = []
        item_img_dir = os.path.join(self.img_root_dir, item_id, "img")
        _ensure_dir(item_img_dir)

        for i, url in enumerate(urls, 1):
            try:
                ext = os.path.splitext(url)[1].split("?")[0].lower()
                if ext not in ['.jpg', '.jpeg', '.png', '.gif', '.webp']:
                    ext = '.jpg'
                label = "cover" if i == 1 else "other"
                fname = f"{item_id}_{label}_{i}{ext}"
                fpath = os.path.join(item_img_dir, fname)

                r = self.session.get(url, timeout=20)
                if r.status_code == 200:
                    with open(fpath, "wb") as f:
                        f.write(r.content)
                    saved_files.append(fname)
                else:
                    print(f"[Download failed {r.status_code}] {url}")
                    saved_files.append("")
            except Exception as e:
                print(f"[Download error] {url} → {e}")
                saved_files.append("")
        return saved_files
```


開始時間： 2025-08-20 11:40:50
共讀取 50 個 URL，開始處理…
[1] v https://www.appledaily.com.tw/local/20220101/23YFPLCSYZCZPANVI4E2LTW5L4/
[2] v https://www.appledaily.com.tw/local/20220101/4C4Q3YRHTZGGBN2YAHLJH4CSQI/
[3] v https://www.appledaily.com.tw/local/20220101/C4AZKGYVYZDVTPWZ7RPWARUHDY/
[4] v https://www.appledaily.com.tw/local/20220101/C5GNIZT4DNA4DAFUVA5NHKJYDI/
[5] v https://www.appledaily.com.tw/local/20220101/DUS6LUBV2FEHBHZ5R47BG6YCCY/
[6] v https://www.appledaily.com.tw/local/20220101/E4KZSUGK2JFJ7ODIJ6MZKAGZBE/
[7] v https://www.appledaily.com.tw/local/20220101/HZEOOEE2DRE3XP3YZ4QFRL7UXI/
[8] v https://www.appledaily.com.tw/local/20220101/J34SI2C7PFA7VEIBA3BLINE4IU/
[9] v https://www.appledaily.com.tw/local/20220101/K54XGASEKBBEVCFVQ35NP6CWOE/
[10] v https://www.appledaily.com.tw/local/20220101/KB7OQRBQ75CIDCHR7KVMLAFGBM/
[11] v https://www.appledaily.com.tw/local/20220101/LDYGE3C3QZA6HF6ZIGH64EWNTA/
[12] v https://www.appledaily.com.tw/local/20220101/M535PM4OKVDQHMICHJQZIMD5WA/
[13]

In [20]:
#多執行緒
import requests
import json
import pandas as pd
from bs4 import BeautifulSoup
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

class WaybackScraper:
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
    
    def get_latest_snapshot(self, url):
        params = {
            'url': url,
            'output': 'json',
            'filter': 'statuscode:200',
            'sort': 'reverse',
            'limit': 1
        }
        try:
            r = self.session.get("https://web.archive.org/cdx/search/cdx", params=params, timeout=20)
            r.raise_for_status()
            data = r.json()
            if len(data) > 1:
                row = data[1]
                return {
                    'timestamp': row[1],
                    'original_url': row[2],
                    'wayback_url': f"https://web.archive.org/web/{row[1]}/{row[2]}"
                }
        except Exception as e:
            print(f"[CDX錯誤] {url} → {e}")
        return None

    def scrape_webpage_content(self, wayback_url):
        try:
            resp = self.session.get(wayback_url, timeout=10)
            resp.raise_for_status()
            soup = BeautifulSoup(resp.content, 'html.parser')
            content = "\n".join([p.get_text().strip() for p in soup.find_all('p') if p.get_text().strip()])
            return content
        except Exception as e:
            print(f"[爬取錯誤] {wayback_url} → {e}")
            return None

    def format_timestamp(self, ts):
        try:
            return datetime.strptime(ts, "%Y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
        except:
            return ts

def process_one_url(row):
    """一個網址的處理流程（for thread pool）"""
    raw_url = row[url_field]
    snap = scraper.get_latest_snapshot(raw_url)
    if not snap:
        return None

    content = scraper.scrape_webpage_content(snap['wayback_url'])
    if content:
        return {
            id_field: row[id_field],
            url_field: raw_url,
            "bodies": content,
            "timestamp": snap['timestamp'],
            "formatted_date": scraper.format_timestamp(snap['timestamp']),
            "wayback_url": snap['wayback_url']
        }
    return None


# === 主程式 ===
if __name__ == "__main__":
    import time
    from datetime import datetime
    
    import logging
    logging.basicConfig(
        filename='wayback.log',
        filemode='a',
        level=logging.INFO,
        format='%(asctime)s [%(levelname)s] %(message)s',
    )
    
    start_time = time.time()
    start_dt = datetime.now()
    print("開始時間：", start_dt.strftime("%Y-%m-%d %H:%M:%S"))

    csv_path = "only_in_b.csv"
    url_field = "uri"
    id_field = "id"
    N_LIMIT = None
    THREADS = 5   # 要開幾條執行緒

    df = pd.read_csv(csv_path)
    if N_LIMIT:
        df = df.head(N_LIMIT)

    scraper = WaybackScraper()
    results = []

    with ThreadPoolExecutor(max_workers=THREADS) as executor:
        future_to_row = {executor.submit(process_one_url, row): row for _, row in df.iterrows()}

        for i, future in enumerate(as_completed(future_to_row), 1):
            data = future.result()
            if data:
                results.append(data)
                print(f"[{i}] v {data[url_field]}")
            else:
                print(f"[{i}] x 失敗")

    # === 輸出 JSON & CSV ===
    if results:
        with open("wayback_threads.json", "w", encoding="utf-8") as f:
            json.dump(results, f, ensure_ascii=False, indent=2)
        print("v JSON 已輸出：wayback_threads.json")

        pd.DataFrame(results).to_csv("wayback_threads.csv", index=False, encoding="utf-8-sig")
        print("v CSV 已輸出：wayback_threads.csv")
    else:
        print("全部失敗 ")

    end_time = time.time()
    end_dt = datetime.now()
    print("結束時間：", end_dt.strftime("%Y-%m-%d %H:%M:%S"))
    print(f"總耗時：{end_time - start_time:.2f} 秒")
    logging.info(f"結束時間：{end_dt.strftime('%Y-%m-%d %H:%M:%S')}")
    logging.info(f"總耗時：{end_time - start_time:.2f} 秒")
    logging.info("== 結束 ==\n")


開始時間： 2025-08-20 12:04:29
[1] v https://www.appledaily.com.tw/local/20220101/C5GNIZT4DNA4DAFUVA5NHKJYDI/
[2] v https://www.appledaily.com.tw/local/20220101/4C4Q3YRHTZGGBN2YAHLJH4CSQI/
[3] v https://www.appledaily.com.tw/local/20220101/C4AZKGYVYZDVTPWZ7RPWARUHDY/
[4] v https://www.appledaily.com.tw/local/20220101/E4KZSUGK2JFJ7ODIJ6MZKAGZBE/
[5] v https://www.appledaily.com.tw/local/20220101/HZEOOEE2DRE3XP3YZ4QFRL7UXI/
[6] v https://www.appledaily.com.tw/local/20220101/J34SI2C7PFA7VEIBA3BLINE4IU/
[7] v https://www.appledaily.com.tw/local/20220101/K54XGASEKBBEVCFVQ35NP6CWOE/
[8] v https://www.appledaily.com.tw/local/20220101/KB7OQRBQ75CIDCHR7KVMLAFGBM/
[9] v https://www.appledaily.com.tw/local/20220101/MBTKRW5L2NEB3D4LNLK4HWK3RU/
[10] v https://www.appledaily.com.tw/local/20220101/N5QZTAKJBBBKZA7EPSKMCWAY2A/
[11] v https://www.appledaily.com.tw/local/20220101/23YFPLCSYZCZPANVI4E2LTW5L4/
[12] v https://www.appledaily.com.tw/local/20220101/O24ZJITVMNBBDIZLV5KGHGX5FM/
[13] v https://www.appl

In [13]:
import requests
import json
import pandas as pd
from bs4 import BeautifulSoup
import time
from datetime import datetime

class WaybackScraper:
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
    
    def get_latest_snapshot(self, url):
        params = {
            'url': url,
            'output': 'json',
            'filter': 'statuscode:200',
            'sort': 'reverse',
            'limit': 1
        }
        try:
            r = self.session.get("https://web.archive.org/cdx/search/cdx", params=params)
            r.raise_for_status()
            data = r.json()
            if len(data) > 1:
                row = data[1]
                return {
                    'timestamp': row[1],
                    'original_url': row[2],
                    'wayback_url': f"https://web.archive.org/web/{row[1]}/{row[2]}"
                }
        except Exception as e:
            print(f"[CDX錯誤] {url} → {e}")
        return None

    def scrape_webpage_content(self, wayback_url):
        try:
            resp = self.session.get(wayback_url, timeout=10)
            resp.raise_for_status()
            soup = BeautifulSoup(resp.content, 'html.parser')
            content = "\n".join([p.get_text().strip() for p in soup.find_all('p') if p.get_text().strip()])
            return content
        except Exception as e:
            print(f"[爬取錯誤] {wayback_url} → {e}")
            return None

    def format_timestamp(self, ts):
        try:
            return datetime.strptime(ts, "%Y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
        except:
            return ts

def main():
    # === 設定 ===
    csv_path = "only_in_b.csv"
    url_field = "uri"      #for wayback
    id_field  = "id"
    N_LIMIT = 1           # 如果只想抓前10筆，設10；全部抓改成 None

    # === 讀原始 CSV ===
    df = pd.read_csv(csv_path)
    if N_LIMIT:
        df = df.head(N_LIMIT)

    scraper = WaybackScraper()
    results = []

    for idx, row in df.iterrows():
        raw_url = row[url_field]
        print(f"[{idx+1}/{len(df)}] {raw_url}")
        snap = scraper.get_latest_snapshot(raw_url)
        if not snap:
            print("  x 沒找到快照\n")
            continue

        print("  → 快照 URL:", snap['wayback_url'])
        content = scraper.scrape_webpage_content(snap['wayback_url'])
        if content:
            result_row = {
                id_field: row[id_field],
                url_field: raw_url,
                "bodies": content,
                "timestamp": snap['timestamp'],
                "formatted_date": scraper.format_timestamp(snap['timestamp']),
                "wayback_url": snap['wayback_url']
            }
            results.append(result_row)
            print("  v 抓取成功\n")
        else:
            print("  x 內容讀取失敗\n")

        time.sleep(1)

    # === 匯出 CSV ===
        # --- 存檔 ---
    if results:
        # JSON 全量
        with open("wayback_latest_with_bodies.json", "w", encoding="utf-8") as f:
            json.dump(all_data, f, ensure_ascii=False, indent=2)
        print("[v] 已輸出 wayback_latest_with_bodies.json")
    if results:
        out_df = pd.DataFrame(results)
        out_df.to_csv("wayback_latest_with_bodies.csv", index=False, encoding="utf-8-sig")
        print(" v 已輸出 → wayback_latest_with_bodies.csv")
    else:
        print("全部失敗 ")

if __name__ == "__main__":
    main()


[1/1] https://www.appledaily.com.tw/local/20220101/23YFPLCSYZCZPANVI4E2LTW5L4/
  → 快照 URL: https://web.archive.org/web/20220904084040/https://www.appledaily.com.tw/local/20220101/23YFPLCSYZCZPANVI4E2LTW5L4/
  v 抓取成功



NameError: name 'all_data' is not defined

In [14]:
import requests
import json
import pandas as pd
from bs4 import BeautifulSoup
import time
from datetime import datetime

class WaybackScraper:
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
    
    def get_latest_snapshot(self, url):
        params = {
            'url': url,
            'output': 'json',
            'filter': 'statuscode:200',
            'sort': 'reverse',
            'limit': 1
        }
        try:
            r = self.session.get("https://web.archive.org/cdx/search/cdx", params=params)
            r.raise_for_status()
            data = r.json()
            if len(data) > 1:
                row = data[1]
                return {
                    'timestamp': row[1],
                    'original_url': row[2],
                    'wayback_url': f"https://web.archive.org/web/{row[1]}/{row[2]}"
                }
        except Exception as e:
            print(f"[CDX錯誤] {url} → {e}")
        return None

    def scrape_webpage_content(self, wayback_url):
        try:
            resp = self.session.get(wayback_url, timeout=10)
            resp.raise_for_status()
            soup = BeautifulSoup(resp.content, 'html.parser')
            content = "\n".join([p.get_text().strip() for p in soup.find_all('p') if p.get_text().strip()])
            return content
        except Exception as e:
            print(f"[爬取錯誤] {wayback_url} → {e}")
            return None

    def format_timestamp(self, ts):
        try:
            return datetime.strptime(ts, "%Y%m%d%H%M%S").strftime("%Y-%m-%d %H:%M:%S")
        except:
            return ts

def main():
    csv_path = "only_in_b.csv"
    url_field = "uri"
    id_field  = "id"
    N_LIMIT = 50
    
    df = pd.read_csv(csv_path)
    if N_LIMIT:
        df = df.head(N_LIMIT)

    scraper = WaybackScraper()
    results = []

    for idx, row in df.iterrows():
        raw_url = row[url_field]
        print(f"[{idx+1}/{len(df)}] {raw_url}")
        snap = scraper.get_latest_snapshot(raw_url)
        if not snap:
            print("  x 沒找到快取\n")
            continue

        print("  → 快取 URL:", snap['wayback_url'])
        content = scraper.scrape_webpage_content(snap['wayback_url'])
        if content:
            result_row = {
                id_field: row[id_field],
                url_field: raw_url,
                "bodies": content,
                "timestamp": snap['timestamp'],
                "formatted_date": scraper.format_timestamp(snap['timestamp']),
                "wayback_url": snap['wayback_url']
            }
            results.append(result_row)
            print("  v 成功\n")
        else:
            print("  x 內容讀取失敗\n")

        time.sleep(1)

    if results:
        # --- JSON 全量輸出 ---
        with open("wayback_latest_with_bodies.json", "w", encoding="utf-8") as f:
            json.dump(results, f, ensure_ascii=False, indent=2)
        print("v 已輸出 wayback_latest_with_bodies.json")

        # --- CSV 摘要輸出 ---
        out_df = pd.DataFrame(results)
        out_df.to_csv("wayback_latest_with_bodies.csv", index=False, encoding="utf-8-sig")
        print("v 已輸出 wayback_latest_with_bodies.csv")
    else:
        print("全部失敗 :(")

if __name__ == "__main__":
    main()


[1/5] https://www.appledaily.com.tw/local/20220101/23YFPLCSYZCZPANVI4E2LTW5L4/
  → 快照 URL: https://web.archive.org/web/20220904084040/https://www.appledaily.com.tw/local/20220101/23YFPLCSYZCZPANVI4E2LTW5L4/
  v 成功

[2/5] https://www.appledaily.com.tw/local/20220101/4C4Q3YRHTZGGBN2YAHLJH4CSQI/
  → 快照 URL: https://web.archive.org/web/20220906014810/https://www.appledaily.com.tw/local/20220101/4C4Q3YRHTZGGBN2YAHLJH4CSQI/
  v 成功

[3/5] https://www.appledaily.com.tw/local/20220101/C4AZKGYVYZDVTPWZ7RPWARUHDY/
  → 快照 URL: https://web.archive.org/web/20220903180022/https://www.appledaily.com.tw/local/20220101/C4AZKGYVYZDVTPWZ7RPWARUHDY/
  v 成功

[4/5] https://www.appledaily.com.tw/local/20220101/C5GNIZT4DNA4DAFUVA5NHKJYDI/
  → 快照 URL: https://web.archive.org/web/20220908050404/https://www.appledaily.com.tw/local/20220101/C5GNIZT4DNA4DAFUVA5NHKJYDI/
  v 成功

[5/5] https://www.appledaily.com.tw/local/20220101/DUS6LUBV2FEHBHZ5R47BG6YCCY/
  → 快照 URL: https://web.archive.org/web/20220903033433/https: