Web -> Bedrock -> Translate -> Comprehend -> S3

这个 Jupyter 风格的 notebook（以 `# %%` 单元格分隔）展示了一个可复现的处理流水线：
1. 用 `requests` 抓取网页并用 BeautifulSoup 抽取正文
2. 可选：把正文发送给 Amazon Bedrock 做清洗/摘要（若配置了 Bedrock 模型）
3. 用 Amazon Translate 翻译为英文
4. 用 Amazon Comprehend 做情感分析
5. 将结果序列化并上传到 S3

假定你已经在机器上运行过 `aws configure` 或者已通过环境变量/角色方式配置好凭证。
请不要把凭证写到 notebook 中。

在运行前，请 `pip install -r requirements.txt`，或者运行下面的安装单元格。


In [None]:

# 安装（可选）
!pip install boto3 requests beautifulsoup4 readability-lxml python-dotenv


In [None]:
# 配置项 (请根据你自己的环境修改)
import os
import json
import time
import hashlib
import logging
from typing import List, Tuple, Dict, Any

import requests
from bs4 import BeautifulSoup
import boto3
from botocore.exceptions import ClientError, BotoCoreError


In [None]:

# logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("web-pipeline")

# 环境变量配置（可用 aws configure 或环境变量）
AWS_REGION = os.environ.get("AWS_REGION", "eu-west-1")
S3_BUCKET = os.environ.get("RESULT_S3_BUCKET", "my-result-bucket")
BEDROCK_MODEL_ID = os.environ.get("BEDROCK_MODEL_ID", "")  # 若不使用 Bedrock 可留空

# boto3 clients
s3 = boto3.client("s3", region_name=AWS_REGION)
translate = boto3.client("translate", region_name=AWS_REGION)
comprehend = boto3.client("comprehend", region_name=AWS_REGION)
# Bedrock 的 client 名称在 boto3 中为 'bedrock-runtime'（需要 SDK 支持）
bedrock = boto3.client("bedrock-runtime", region_name=AWS_REGION)


In [None]:

# 1) 抓取网页并抽取正文

def fetch_page_text(url: str, timeout: int = 10) -> str:
    """使用 requests + BeautifulSoup 做简单的正文抽取。

    策略：先尝试 <article> 标签；若无则合并所有 <p> 段落。
    返回纯文本（去掉多余空白）。
    """
    headers = {"User-Agent": "Mozilla/5.0 (compatible; web-pipeline/1.0)"}
    resp = requests.get(url, timeout=timeout, headers=headers)
    resp.raise_for_status()
    soup = BeautifulSoup(resp.text, "html.parser")

    # 尝试常见正文容器
    candidates = []
    article = soup.find("article")
    if article:
        candidates.append(article.get_text(separator="\n", strip=True))

    # 常见类名（简化）
    for cls in ("main", "content", "post", "article-body", "entry-content"):
        el = soup.find(class_=cls)
        if el:
            candidates.append(el.get_text(separator="\n", strip=True))

    # fallback: 拼接所有 <p>
    if not candidates:
        paragraphs = soup.find_all("p")
        text = "\n\n".join(p.get_text(strip=True) for p in paragraphs if p.get_text(strip=True))
    else:
        # 选最长的候选者
        text = max(candidates, key=len)

    # 清理多余空白
    text = "\n".join(line.strip() for line in text.splitlines() if line.strip())
    return text


In [None]:

# 2) （可选）调用 Bedrock 对文本做清洗/去噪/结构化

def bedrock_clean_text(text: str, model_id: str = BEDROCK_MODEL_ID, timeout_seconds: int = 60) -> str:
    """如果配置了 BEDROCK_MODEL_ID，则把 `text` 发给 Bedrock 的模型，让模型返回清洗后的文本。

    如果未配置 model_id，则直接返回原文。

    注意：Bedrock 不会去抓 URL，它只处理你传入的字符串。
    返回值取决于你使用的模型与 prompt。示例使用最简单的 prompt。
    """
    if not model_id:
        logger.info("No Bedrock model configured, skipping Bedrock step.")
        return text

    prompt = (
        "You are a text-cleaning assistant.\n"
        "Given a noisy HTML-extracted article text, return a cleaned, readable plaintext article.\n\n"
        "ARTICLE:\n" + text + "\n\nCLEANED ARTICLE:\n"
    )

    try:
        # invoke_model 的参数与返回结构可能随 SDK 版本不同，请参考你当前环境的 boto3 文档。
        response = bedrock.invoke_model(
            modelId=model_id,
            contentType="text/plain; charset=utf-8",
            accept="application/json",
            body=prompt.encode("utf-8")
        )
        # response['body'] 是一个 StreamingBody
        body_bytes = response["body"].read()
        cleaned = body_bytes.decode("utf-8")
        # 有些模型会把整个 JSON 或带前缀输出，你可以在这里对 cleaned 做额外处理
        return cleaned
    except (ClientError, BotoCoreError) as e:
        logger.exception("Bedrock invocation failed, returning original text: %s", e)
        return text


In [None]:
# 3) 对长文本做分片（因为 Translate / Comprehend 有长度限制）

def split_text_chunks(text: str, max_chars: int = 4500) -> List[str]:
    """按段落分割并组合成不超过 max_chars 的块。
    4500 字符是一个经验值：Translate/Comprehend 的实际限制请查官方文档。
    """
    paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
    chunks = []
    cur = []
    cur_len = 0
    for p in paragraphs:
        if len(p) > max_chars:
            # 如果单个段落超长，再按句子或固定长度切分
            for i in range(0, len(p), max_chars):
                piece = p[i:i+max_chars]
                if cur:
                    chunks.append('\n\n'.join(cur))
                    cur = []
                    cur_len = 0
                chunks.append(piece)
        else:
            if cur_len + len(p) + 2 > max_chars:
                chunks.append('\n\n'.join(cur))
                cur = [p]
                cur_len = len(p)
            else:
                cur.append(p)
                cur_len += len(p) + 2
    if cur:
        chunks.append('\n\n'.join(cur))
    return chunks


In [None]:
# 4) 翻译（把每个 chunk 发给 Translate，然后拼回去）

def translate_chunks_to_english(chunks: List[str]) -> Tuple[str, str]:
    """将每个文本块翻译成英文并合并，返回 (full_translated_text, detected_source_language)
    detected_source_language 取第一个块的返回值作为代表。
    """
    translated_parts = []
    detected_lang = None
    for chunk in chunks:
        try:
            resp = translate.translate_text(Text=chunk, TargetLanguageCode="en")
            translated_parts.append(resp.get("TranslatedText", ""))
            if detected_lang is None:
                detected_lang = resp.get("SourceLanguageCode")
        except (ClientError, BotoCoreError) as e:
            logger.exception("Translate failed for a chunk, adding original chunk instead: %s", e)
            translated_parts.append(chunk)
    return "\n\n".join(translated_parts), (detected_lang or "unknown")


In [None]:

# 5) 情感分析（对已翻译的英文文本使用 Comprehend）

def analyze_sentiment_for_text(text: str, language_code: str = "en") -> Dict[str, Any]:
    """若文本过长，按块调用 detect_sentiment，然后合并得分。
    返回总体情感标签（majority 或基于分数合成）和每块的细节。
    """
    chunks = split_text_chunks(text, max_chars=4500)
    results = []
    # Comprehend detect_sentiment 每次处理一段文本
    for chunk in chunks:
        try:
            resp = comprehend.detect_sentiment(Text=chunk, LanguageCode=language_code)
            results.append(resp)
        except (ClientError, BotoCoreError) as e:
            logger.exception("Comprehend detect_sentiment failed for a chunk: %s", e)
            results.append({"Sentiment": "ERROR", "SentimentScore": {}})

    # 合并逻辑：对各情感分数求平均
    score_sum = {"Positive": 0.0, "Negative": 0.0, "Neutral": 0.0, "Mixed": 0.0}
    valid = 0
    for r in results:
        sc = r.get("SentimentScore") or {}
        if sc:
            valid += 1
            for k in score_sum.keys():
                score_sum[k] += float(sc.get(k, 0.0))
    if valid > 0:
        avg_score = {k: v / valid for k, v in score_sum.items()}
        # 选平均分最高的情感作为总体情感
        overall_sentiment = max(avg_score.items(), key=lambda x: x[1])[0]
    else:
        avg_score = {}
        overall_sentiment = "UNKNOWN"

    return {
        "overall_sentiment": overall_sentiment,
        "average_scores": avg_score,
        "per_chunk": results,
    }


In [None]:
# 6) 上传 JSON 到 S3

def upload_json_to_s3(obj: Dict[str, Any], key: str) -> str:
    body = json.dumps(obj, ensure_ascii=False, indent=None).encode("utf-8")
    try:
        s3.put_object(Bucket=S3_BUCKET, Key=key, Body=body, ContentType="application/json; charset=utf-8")
    except ClientError:
        logger.exception("Failed to upload result to S3")
        raise
    return f"s3://{S3_BUCKET}/{key}"


In [None]:
# 7) 把所有步骤串成一个 `process_url` 函数

def process_url(url: str, do_bedrock_clean: bool = True) -> Dict[str, Any]:
    start = time.time()
    logger.info("Processing URL: %s", url)
    raw_text = fetch_page_text(url)
    if not raw_text or len(raw_text.strip()) == 0:
        logger.warning("No textual content extracted from %s", url)
        return {"url": url, "status": "no_content"}

    cleaned_text = raw_text
    if do_bedrock_clean and BEDROCK_MODEL_ID:
        cleaned_text = bedrock_clean_text(raw_text, model_id=BEDROCK_MODEL_ID)

    chunks = split_text_chunks(cleaned_text)
    translated_text, src_lang = translate_chunks_to_english(chunks)

    # 使用英文情感分析（因为我们已翻译为英文）
    sentiment = analyze_sentiment_for_text(translated_text, language_code="en")

    result = {
        "source_url": url,
        "detected_source_language": src_lang,
        "raw_text_excerpt": raw_text[:2000],
        "cleaned_text_excerpt": cleaned_text[:2000],
        "translated_text_excerpt": translated_text[:2000],
        "sentiment": sentiment,
        "meta": {
            "char_counts": {
                "raw": len(raw_text),
                "cleaned": len(cleaned_text),
                "translated": len(translated_text),
            },
            "processing_time_sec": time.time() - start,
        }
    }

    # 使用 URL 的 sha1 + 时间戳 作为对象 key
    key = f"results/{hashlib.sha1(url.encode()).hexdigest()}_{int(time.time())}.json"
    s3_uri = upload_json_to_s3(result, key)
    result["s3_uri"] = s3_uri
    logger.info("Finished %s -> %s", url, s3_uri)
    return result


In [None]:
# 8) Demo: 处理单个或多个 URL

if __name__ == "__main__":
    # 交互式示例：把要处理的 URL 列表放到 urls 里
    urls = [
        # 请替换成你想处理的 URL
        "https://example.com/some-article",
    ]
    for u in urls:
        try:
            out = process_url(u, do_bedrock_clean=False)  # demo：不使用 Bedrock
            print(json.dumps(out, ensure_ascii=False, indent=2))
        except Exception as e:
            logger.exception("Error processing %s: %s", u, e)



9) 扩展建议与注意事项
- 不要在 notebook 中存储 AWS 密钥。
- 在处理大量 URL 时，建议将 URL 放入队列（SQS / Kafka / Redis），并用多 worker 并发处理。
- 本 notebook 的分片长度（4500）是经验值，Translate/Comprehend 的具体限制请以官方文档为准。
- Bedrock 调用方式会随 AWS SDK 版本变化，若 `bedrock-runtime` 客户端不可用，请参考 AWS 官方示例或更新 boto3。
- 为生产环境添加重试（指数回退）、指标监控（CloudWatch）和成本控制（预算告警）。
