# 推特最小流水线（Notebook 版）

流程：配置 → 获取 → 解析 → 存储+媒体预览 → AI分析 → 一键运行

约束：
- 仅使用函数与变量，不写类
- 每个可运行代码单元尽量不超过 20 行
- 请求间隔固定 5 秒，不做重试
- 媒体支持 entities.media 与 includes.media
- 使用 Poe(OpenAI兼容) API 调用 gpt-5

说明：
- 所有可修改信息均以“变量”形式集中在“配置”单元
- 尽量保持与脚本版逻辑一致，仅改为 Notebook 组织与展示
- 图片在 Notebook 内渲染，视频以超链接展示


In [None]:
{
 "nbformat": 4,
 "nbformat_minor": 5,
 "metadata": {
  "kernelspec": {
   "name": "python3",
   "display_name": "Python 3 (ipykernel)",
   "language": "python"
  },
  "language_info": {
   "name": "python",
   "version": "3.10",
   "mimetype": "text/x-python",
   "file_extension": ".py",
   "pygments_lexer": "ipython3",
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "nbconvert_exporter": "python"
  }
 },
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 推特最小流水线（Notebook 版，逐步执行）\n",
    "\n",
    "按步骤运行，每个阶段独立输出：\n",
    "1) 配置 → 2) 获取 → 3) 等待5秒 → 4) 解析 → 5) 存储(JSON/CSV) → 6) 媒体预览 → 7) AI 分析\n",
    "\n",
    "约束：函数与变量、单元尽量≤20行、固定5秒、不重试、媒体支持 entities.media 与 includes.media、Poe(OpenAI兼容) gpt-5。\n",
    "\n",
    "若看到 JSON 文本，请使用 VSCode 右上角 “Open in Notebook Editor”。"
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "# 1) 配置（集中变量，可直接修改）\n",
    "API_KEY = \"new1_58fe956453e744e4844728c68ba187d4\"\n",
    "API_URL = \"https://api.twitterapi.io/twitter/user/last_tweets\"\n",
    "TARGET_USER = \"cz_binance\"\n",
    "TWEET_LIMIT = 1\n",
    "REQUEST_INTERVAL_SEC = 5\n",
    "MEDIA_DIR = \"./推特抢跑/twitter_media\"\n",
    "OUTPUT_DIR = \"./推特抢跑\"\n",
    "AI_API_KEY = \"lUOtczZXbp6emUFgvqfZC7odtwGEhBdwmIAdTlpLHzs\"\n",
    "AI_BASE_URL = \"https://api.poe.com/v1\"\n",
    "AI_MODEL = \"gpt-5\"\n",
    "IMAGE_PREVIEW_LIMIT = 3\n",
    "AI_SUMMARY_TOPK = 5\n",
    "print(\"[INIT] 配置完成\")"
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "# 依赖与显示设置\n",
    "import os, json, time, csv\n",
    "from datetime import datetime\n",
    "from typing import List, Dict\n",
    "import requests\n",
    "import matplotlib, matplotlib.pyplot as plt, matplotlib.image as mpimg\n",
    "from io import BytesIO\n",
    "matplotlib.rcParams.update({\"figure.figsize\":(6,4),\"figure.dpi\":120,\"savefig.dpi\":120,\"axes.unicode_minus\":False})\n",
    "for _f in [\"SimHei\",\"Microsoft YaHei\",\"Arial Unicode MS\"]:\n",
    "    try:\n",
    "        matplotlib.rcParams[\"font.sans-serif\"]=[_f]; break\n",
    "    except Exception: pass\n",
    "print(\"[INIT] 依赖导入完成\")"
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "# 公共函数（不写类）\n",
    "def ensure_dir(path: str) -> str:\n",
    "    p = os.path.abspath(path); os.makedirs(p, exist_ok=True); return p\n",
    "\n",
    "def fetch_last_tweets(username: str, count: int) -> List[Dict]:\n",
    "    params={\"userName\":username.lstrip(\"@\"),\"count\":count}; headers={\"X-API-Key\":API_KEY}\n",
    "    try:\n",
    "        resp=requests.get(API_URL,params=params,headers=headers,timeout=30)\n",
    "    except Exception as e:\n",
    "        print(\"[ERR ] 请求异常:\",e); return []\n",
    "    if resp.status_code!=200:\n",
    "        print(\"[ERR ] 请求失败:\",resp.status_code,resp.text[:200]); return []\n",
    "    try:\n",
    "        data=resp.json(); return data.get(\"data\") or data.get(\"tweets\") or data.get(\"results\") or []\n",
    "    except Exception as e:\n",
    "        print(\"[ERR ] JSON解析失败:\",e); return []"
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "def parse_tweets(raw: List[Dict]) -> List[Dict]:\n",
    "    out=[]\n",
    "    for t in raw:\n",
    "        media=[]; ents=t.get(\"entities\",{})\n",
    "        if isinstance(ents.get(\"media\"),list):\n",
    "            for m in ents[\"media\"]:\n",
    "                url=m.get(\"media_url\") or m.get(\"url\") or m.get(\"media_url_https\")\n",
    "                if url: media.append({\"id\":m.get(\"id\"),\"type\":m.get(\"type\"),\"url\":url})\n",
    "        inc=t.get(\"includes\",{})\n",
    "        if isinstance(inc.get(\"media\"),list):\n",
    "            for m in inc[\"media\"]:\n",
    "                url=m.get(\"url\") or m.get(\"preview_image_url\")\n",
    "                if url: media.append({\"id\":m.get(\"media_key\") or m.get(\"id\"),\"type\":m.get(\"type\"),\"url\":url})\n",
    "        out.append({\"tweet_id\":t.get(\"id\"),\"created_at\":t.get(\"created_at\"),\"text\":t.get(\"text\",\"\"),\n",
    "                    \"author\":t.get(\"author_username\") or t.get(\"author_id\"),\n",
    "                    \"permalink\":t.get(\"url\") or t.get(\"permalink\"),\"media\":media})\n",
    "    return out"
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "def save_json(path: str, data: List[Dict]) -> None:\n",
    "    with open(path,\"w\",encoding=\"utf-8\") as f: json.dump(data,f,ensure_ascii=False,indent=2)\n",
    "\n",
    "def save_csv(path: str, rows: List[Dict]) -> None:\n",
    "    with open(path,\"w\",newline=\"\",encoding=\"utf-8\") as f:\n",
    "        w=csv.writer(f); w.writerow([\"tweet_id\",\"created_at\",\"author\",\"text\",\"permalink\",\"media_count\"])\n",
    "        for r in rows: w.writerow([r[\"tweet_id\"],r[\"created_at\"],r[\"author\"],r[\"text\"].replace(\"\\n\",\" \"),r[\"permalink\"],len(r[\"media\"])])"
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "def preview_media(rows: List[Dict], limit: int=3) -> None:\n",
    "    shown=0\n",
    "    for r in rows:\n",
    "        for m in r.get(\"media\",[]):\n",
    "            url=m.get(\"url\"); mtype=(m.get(\"type\") or \"\").lower()\n",
    "            if not url: continue\n",
    "            if mtype in [\"photo\",\"image\"] and shown<limit:\n",
    "                try:\n",
    "                    resp=requests.get(url,timeout=15); resp.raise_for_status()\n",
    "                    img=mpimg.imread(BytesIO(resp.content)); plt.figure(figsize=(4,4))\n",
    "                    plt.imshow(img); plt.axis(\"off\"); plt.tight_layout(); plt.show(); shown+=1\n",
    "                except Exception as e: print(\"[WARN] 图片预览失败:\",e)\n",
    "            else:\n",
    "                print(f\"[video/media] {mtype or 'media'} -> {url}\")"
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "def ai_analyze_text(text: str, hint: str=\"\") -> str:\n",
    "    try:\n",
    "        import openai\n",
    "        client=openai.OpenAI(api_key=AI_API_KEY, base_url=AI_BASE_URL)\n",
    "        prompt=hint or (\"请基于以下推文文本做交易相关性与情绪的简要分析，并给出要点：\\n\"+text)\n",
    "        chat=client.chat.completions.create(model=AI_MODEL, messages=[{\"role\":\"user\",\"content\":prompt}])\n",
    "        return chat.choices[0].message.content\n",
    "    except Exception as e:\n",
    "        print(\"[WARN] AI 分析失败或依赖缺失，返回占位文本。原因:\",e)\n",
    "        return \"AI 分析暂不可用（依赖缺失或请求失败）。\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 2) 获取（只运行此格执行“获取”）"
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "raw = fetch_last_tweets(TARGET_USER, TWEET_LIMIT)\n",
    "print(\"[INFO] 获取条数:\", len(raw))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3) 固定等待 5 秒（节流）"
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "time.sleep(REQUEST_INTERVAL_SEC)\n",
    "print(\"[SLEEP] 已等待\", REQUEST_INTERVAL_SEC, \"秒\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4) 解析（只运行此格执行“解析”）"
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "rows = parse_tweets(raw)\n",
    "print(\"[INFO] 解析完成，记录数:\", len(rows), \"含媒体条数:\", sum(1 for r in rows if r[\"media\"]))\n",
    "print(\"[SAMPLE] 首条文本:\\n\", (rows[0][\"text\"] if rows else \"(empty)\"))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 5) 存储（JSON 与 CSV）"
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "ensure_dir(OUTPUT_DIR)\n",
    "ts = datetime.now().strftime(\"%Y%m%d_%H%M%S\")\n",
    "json_path = os.path.join(OUTPUT_DIR, f\"tweets_{TARGET_USER}_{ts}.json\")\n",
    "csv_path  = os.path.join(OUTPUT_DIR, f\"tweets_{TARGET_USER}_{ts}.csv\")\n",
    "save_json(json_path, rows); save_csv(csv_path, rows)\n",
    "print(\"[SAVE]\", json_path, \"|\", csv_path)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 6) 媒体预览（仅图片渲染，视频打印链接）"
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "ensure_dir(MEDIA_DIR)\n",
    "preview_media(rows, limit=IMAGE_PREVIEW_LIMIT)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 7) AI 分析（Poe 兼容 gpt-5）"
   ]
  },
  {
   "cell_type": "code",
   "metadata": {},
   "execution_count": null,
   "outputs": [],
   "source": [
    "merged_text = \"\\n\\n\".join(r[\"text\"] for r in rows[:AI_SUMMARY_TOPK])\n",
    "summary = ai_analyze_text(merged_text)\n",
    "print(\"[AI ] 摘要前800字:\\n\", (summary or \"\")[:800])"
   ]
  }
 ]
}


[INIT] 依赖导入完成


In [2]:
# 工具函数：仅函数与变量，不写类
def ensure_dir(path: str) -> str:
    p = os.path.abspath(path)
    os.makedirs(p, exist_ok=True)
    return p

def download_file(url: str, local_path: str, timeout: int = 30) -> bool:
    try:
        r = requests.get(url, timeout=timeout, stream=True)
        r.raise_for_status()
        with open(local_path, "wb") as f:
            for c in r.iter_content(8192):
                if c:
                    f.write(c)
        return True
    except Exception as e:
        print("[WARN] 媒体下载失败:", e)
        return False

In [3]:
# 获取：固定 5 秒间隔，不做重试
def fetch_last_tweets(username: str, count: int) -> List[Dict]:
    params = {"userName": username.lstrip("@"), "count": count}
    headers = {"X-API-Key": API_KEY}
    try:
        resp = requests.get(API_URL, params=params, headers=headers, timeout=30)
    except Exception as e:
        print("[ERR ] 请求异常:", e)
        return []
    if resp.status_code != 200:
        print("[ERR ] 请求失败:", resp.status_code, resp.text[:200])
        return []
    try:
        data = resp.json()
        return data.get("data") or data.get("tweets") or data.get("results") or []
    except Exception as e:
        print("[ERR ] JSON解析失败:", e)
        return []

In [4]:
# 解析：支持 entities.media 与 includes.media
def parse_tweets(raw: List[Dict]) -> List[Dict]:
    out: List[Dict] = []
    for t in raw:
        media = []
        ents = t.get("entities", {})
        if isinstance(ents.get("media"), list):
            for m in ents["media"]:
                url = m.get("media_url") or m.get("url") or m.get("media_url_https")
                if url:
                    media.append({"id": m.get("id"), "type": m.get("type"), "url": url})
        inc = t.get("includes", {})
        if isinstance(inc.get("media"), list):
            for m in inc["media"]:
                url = m.get("url") or m.get("preview_image_url")
                if url:
                    media.append({"id": m.get("media_key") or m.get("id"), "type": m.get("type"), "url": url})
        out.append({
            "tweet_id": t.get("id"),
            "created_at": t.get("created_at"),
            "text": t.get("text", ""),
            "author": t.get("author_username") or t.get("author_id"),
            "permalink": t.get("url") or t.get("permalink"),
            "media": media,
        })
    return out

In [5]:
# 存储：JSON 与 CSV（避免超 20 行，拆成两个函数）
def save_json(path: str, data: List[Dict]) -> None:
    with open(path, "w", encoding="utf-8") as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

def save_csv(path: str, rows: List[Dict]) -> None:
    with open(path, "w", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        w.writerow(["tweet_id", "created_at", "author", "text", "permalink", "media_count"])
        for r in rows:
            w.writerow([
                r["tweet_id"], r["created_at"], r["author"],
                r["text"].replace("\n", " "), r["permalink"], len(r["media"]),
            ])

In [6]:
# 媒体预览（Notebook）：图片渲染，视频以链接形式展示
def preview_media(rows: List[Dict], limit: int = 3) -> None:
    shown = 0
    html_parts = []
    for r in rows:
        for m in r["media"]:
            url = m.get("url")
            mtype = (m.get("type") or "").lower()
            if not url:
                continue
            if mtype in ["photo", "image"] and shown < limit:
                try:
                    resp = requests.get(url, timeout=15)
                    resp.raise_for_status()
                    img = Image.open(BytesIO(resp.content))
                    display(img)
                    shown += 1
                except Exception as e:
                    print("[WARN] 图片预览失败:", e)
            else:
                html_parts.append(f"<p>[{mtype or 'media'}] <a href='{url}' target='_blank'>链接</a></p>")
    if html_parts:
        display(HTML("\n".join(html_parts)))

In [7]:
# AI 分析：Poe(OpenAI兼容) gpt-5；若依赖缺失或失败，返回占位文本
def ai_analyze_text(text: str, hint: str = "") -> str:
    try:
        import openai
        client = openai.OpenAI(api_key=AI_API_KEY, base_url=AI_BASE_URL)
        prompt = hint or ("请基于以下推文文本做交易相关性与情绪的简要分析，并给出要点：\n" + text)
        chat = client.chat.completions.create(
            model=AI_MODEL,
            messages=[{"role": "user", "content": prompt}],
        )
        return chat.choices[0].message.content
    except Exception as e:
        print("[WARN] AI 分析失败或依赖缺失，返回占位文本。原因:", e)
        return "AI 分析暂不可用（依赖缺失或请求失败）。"

In [8]:
# 一次完整运行：配置 → 获取 → 等待5s → 解析 → 存储 → 预览 → AI
def run_once(username: str, count: int) -> Dict:
    print("[RUN ] 开始获取:", username)
    raw = fetch_last_tweets(username, count)
    print("[INFO] 获取条数:", len(raw))
    time.sleep(REQUEST_INTERVAL_SEC)
    rows = parse_tweets(raw)
    print("[INFO] 解析完成，含媒体条数:", sum(1 for r in rows if r["media"]))
    ts = datetime.now().strftime("%Y%m%d_%H%M%S")
    ensure_dir(OUTPUT_DIR)
    json_path = os.path.join(OUTPUT_DIR, f"tweets_{username}_{ts}.json")
    csv_path = os.path.join(OUTPUT_DIR, f"tweets_{username}_{ts}.csv")
    save_json(json_path, rows)
    save_csv(csv_path, rows)
    print("[SAVE] 已保存:", json_path, csv_path)
    ensure_dir(MEDIA_DIR)
    preview_media(rows, limit=IMAGE_PREVIEW_LIMIT)
    merged_text = "\n\n".join(r["text"] for r in rows[:AI_SUMMARY_TOPK])
    summary = ai_analyze_text(merged_text)
    return {"rows": rows, "json": json_path, "csv": csv_path, "ai": summary}

In [9]:
# 一键运行（可直接执行本单元）
result = run_once(TARGET_USER, TWEET_LIMIT)
print("[DONE] 完成，推文数:", len(result["rows"]))

NameError: name 'TARGET_USER' is not defined

In [None]:
# 展示 AI 摘要（前 800 字）
ai_preview = (result.get("ai") or "")[:800]
print("[AI ] 摘要预览:\n", ai_preview)