# 高德地图 POI 抓取（矩形切分版）

> 目的：给定一个或多个矩形范围（左上角 + 右下角），从高德地图 Web 服务接口批量抓取 POI，并写入 CSV。

## 你需要准备什么
- 一个高德 Web 服务 Key（不要把 Key 写进代码/截图/公开仓库；若有学习交流需求，高级key可以通过闲鱼等平台自行购买）
- POI `types` 编码（例如：`060000` 餐饮、`110000` 景点等；具体以高德分类为准，参见：https://lbs.amap.com/api/webservice/download）
- 查询范围：矩形 `[min_lon, max_lat, max_lon, min_lat]`（经度在前，纬度在后）

## 输出
- 默认输出 CSV：`OUTPUT_CSV`（见下方“配置区”）
- 字段：`name, address, adname, lng, lat`（其中 `lng/lat` 为 WGS84；高德返回的是 GCJ-02，这里做了转换）

## 使用方式（推荐流程）
1. 先运行“配置区”单元格（填写 Key、types、范围、输出文件名）
2. 再运行“依赖与函数”单元格（只需运行一次）
3. 最后运行“执行抓取”单元格
4. 可选：运行“读取结果”单元格做检查

## 可自定义的地方
- `POI_TYPES`：你要抓取的 POI 类型编码
- `INPUT_RECTS`：一个或多个矩形范围
- `OUTPUT_CSV`：输出文件名/路径
- `COUNT_LIMIT`：超过该数量就继续切分矩形（高德接口单次 polygon 查询有数量上限）
- `REQUEST_TIMEOUT` / `REQUEST_SLEEP`：超时与降频参数

## 注意事项
- 频率限制：如果请求过快可能触发限流/失败；可以调大 `REQUEST_SLEEP`
- 覆盖写入：当前实现是“追加写入”。如需每次重新抓取，请先删除旧 CSV 或改成写入前清空文件
- 合规：请遵守高德服务条款与数据合规要求，仅用于学习/研究/授权场景

In [None]:
# =========================
# 配置区（只改这里就能跑）
# =========================
from __future__ import annotations

# 1) 高德 Key：请自行申请，并建议用环境变量注入（更安全）
# Windows PowerShell 示例： $env:AMAP_KEY = "你的key"
import os

AMAP_KEY = os.getenv("AMAP_KEY", "").strip()  # 也可以直接写成 "xxxx"（不推荐）
if not AMAP_KEY:
    print("提示：未检测到环境变量 AMAP_KEY。请先设置后再运行‘执行抓取’单元格。")

# 2) POI 类型（高德分类编码）
POI_TYPES = "060000"  # 例如：060000 餐饮

# 3) 输入矩形范围列表：每个矩形为 [min_lon, max_lat, max_lon, min_lat]
INPUT_RECTS: list[list[float]] = [
    [117.210884, 34.671898, 117.269592, 34.639562],  # 示例：微山岛
 ]

# 4) 输出 CSV 文件名（相对路径：当前工作目录）
OUTPUT_CSV = f"{POI_TYPES}.csv"

# 5) 抓取控制参数
COUNT_LIMIT = 800          # 超过该数量就继续切分矩形
REQUEST_TIMEOUT = 20       # 单次请求超时（秒）
REQUEST_SLEEP = 0.0        # 每次请求后 sleep 秒数（可用于降频）
WRITE_HEADER = True        # CSV 是否写入表头（首次创建文件时）

In [None]:
# =========================
# 依赖与函数（只需运行一次）
# =========================
from __future__ import annotations

import csv
import json
import time
from pathlib import Path
from typing import Any, Iterable

import requests

from Coordin_transformlat import gcj02towgs84

AMAP_POLYGON_URL = "https://restapi.amap.com/v3/place/polygon"
USER_AGENT = (
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
    "AppleWebKit/537.36 (KHTML, like Gecko) "
    "Chrome/120.0 Safari/537.36"
 )

def _rect_to_polygon_param(rect: list[float]) -> str:
    """矩形 -> 高德 polygon 参数字符串。

    rect: [min_lon, max_lat, max_lon, min_lat]
    返回: "min_lon,max_lat|max_lon,min_lat"
    """
    if len(rect) != 4:
        raise ValueError(f"rect 需要 4 个数值，实际为 {rect!r}")
    min_lon, max_lat, max_lon, min_lat = rect
    return f"{min_lon},{max_lat}|{max_lon},{min_lat}"

def _amap_polygon_search(*, key: str, poi_types: str, rect: list[float], page: int, timeout: int) -> dict[str, Any]:
    """调用高德 polygon POI 接口，返回 JSON dict（已解析）。"""
    polygon_param = _rect_to_polygon_param(rect)
    headers = {"User-Agent": USER_AGENT}
    params = {
        "polygon": polygon_param,
        "key": key,
        "types": poi_types,
        "page": page,
        "offset": 25,
        "extensions": "base",
    }
    resp = requests.get(AMAP_POLYGON_URL, params=params, headers=headers, timeout=timeout)
    resp.encoding = "utf-8"
    try:
        data = resp.json()
    except json.JSONDecodeError as e:
        raise RuntimeError(f"高德接口返回非 JSON（HTTP {resp.status_code}）") from e

    # 高德：status=1 表示成功；status=0 表示失败，通常会给 info/infocode
    if str(data.get("status")) != "1":
        info = data.get("info")
        infocode = data.get("infocode")
        raise RuntimeError(f"高德接口请求失败：info={info!r}, infocode={infocode!r}")
    return data

def _poi_to_row(poi: dict[str, Any]) -> list[Any]:
    name = poi.get("name", "")
    location = poi.get("location", "")  # "lng,lat" (GCJ-02)
    address = poi.get("address") or ""
    adname = poi.get("adname") or ""

    # 坐标：GCJ-02 -> WGS84
    if location:
        lng, lat = gcj02towgs84(location)
    else:
        lng, lat = "", ""
    return [name, address, adname, lng, lat]

def _ensure_csv_header(csv_path: Path) -> None:
    csv_path.parent.mkdir(parents=True, exist_ok=True)
    if csv_path.exists() and csv_path.stat().st_size > 0:
        return
    with csv_path.open("a", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        writer.writerow(["name", "address", "adname", "lng", "lat"])

def _append_rows(csv_path: Path, rows: Iterable[list[Any]]) -> None:
    csv_path.parent.mkdir(parents=True, exist_ok=True)
    with csv_path.open("a", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        for row in rows:
            writer.writerow(row)

def _split_rect_into_quadrants(rect: list[float]) -> list[list[float]]:
    """把一个矩形四等分，返回 4 个子矩形（同样格式）。"""
    min_lon, max_lat, max_lon, min_lat = rect
    mid_lon = (min_lon + max_lon) / 2
    mid_lat = (min_lat + max_lat) / 2

    # 左上、右上、左下、右下
    return [
        [min_lon, max_lat, mid_lon, mid_lat],
        [mid_lon, max_lat, max_lon, mid_lat],
        [min_lon, mid_lat, mid_lon, min_lat],
        [mid_lon, mid_lat, max_lon, min_lat],
    ]

def run_poi_job(
    *,
    key: str,
    poi_types: str,
    rects: list[list[float]],
    output_csv: str,
    count_limit: int = 800,
    timeout: int = 20,
    sleep: float = 0.0,
    write_header: bool = True,
    max_split_depth: int = 12,
 ) -> Path:
    """主入口：抓取多个矩形的 POI 并写入 CSV。"""
    if not key:
        raise ValueError("AMAP_KEY 为空：请先在配置区设置环境变量 AMAP_KEY")
    if not rects:
        raise ValueError("INPUT_RECTS 为空：请在配置区提供至少一个矩形范围")

    csv_path = Path(output_csv)
    if write_header:
        _ensure_csv_header(csv_path)

    def process_rect(rect: list[float], depth: int) -> None:
        if depth > max_split_depth:
            raise RuntimeError(f"切分层级超过 {max_split_depth}，请检查输入范围是否异常：{rect!r}")

        # 先请求第 1 页：拿 count + pois，用于判断是否需要切分
        data = _amap_polygon_search(key=key, poi_types=poi_types, rect=rect, page=1, timeout=timeout)
        count = int(data.get("count") or 0)
        pois = data.get("pois") or []

        if count > count_limit:
            for sub in _split_rect_into_quadrants(rect):
                process_rect(sub, depth + 1)
            return

        # count 未超限：分页写入
        if pois:
            _append_rows(csv_path, (_poi_to_row(p) for p in pois))
        page = 2
        while True:
            if sleep > 0:
                time.sleep(sleep)
            data = _amap_polygon_search(key=key, poi_types=poi_types, rect=rect, page=page, timeout=timeout)
            pois = data.get("pois") or []
            if not pois:
                break
            _append_rows(csv_path, (_poi_to_row(p) for p in pois))
            page += 1

    for idx, rect in enumerate(rects, start=1):
        print(f"[{idx}/{len(rects)}] 处理矩形：{rect}")
        process_rect(rect, depth=0)

    print(f"完成：已写入 {csv_path.resolve()}")
    return csv_path

In [None]:
# =========================
# 执行抓取（确认配置无误后运行）
# =========================
csv_path = run_poi_job(
    key=AMAP_KEY,
    poi_types=POI_TYPES,
    rects=INPUT_RECTS,
    output_csv=OUTPUT_CSV,
    count_limit=COUNT_LIMIT,
    timeout=REQUEST_TIMEOUT,
    sleep=REQUEST_SLEEP,
    write_header=WRITE_HEADER,
 )

https://restapi.amap.com/v3/place/polygon?polygon=117.210884,34.671898|117.269592,34.639562&key=7496afcae7a433317b58161d4d8178af&types=060000&page=1
85
https://restapi.amap.com/v3/place/polygon?polygon=117.210884,34.671898|117.269592,34.639562&key=7496afcae7a433317b58161d4d8178af&types=060000&page=2
85
https://restapi.amap.com/v3/place/polygon?polygon=117.210884,34.671898|117.269592,34.639562&key=7496afcae7a433317b58161d4d8178af&types=060000&page=3
85
https://restapi.amap.com/v3/place/polygon?polygon=117.210884,34.671898|117.269592,34.639562&key=7496afcae7a433317b58161d4d8178af&types=060000&page=4
85
https://restapi.amap.com/v3/place/polygon?polygon=117.210884,34.671898|117.269592,34.639562&key=7496afcae7a433317b58161d4d8178af&types=060000&page=5
0
该区域poi数量小于800，正在写入数据


In [None]:
import pandas as pd
from pathlib import Path

csv_path = Path(OUTPUT_CSV)
if not csv_path.exists():
    raise FileNotFoundError(f"未找到输出文件：{csv_path.resolve()}（请先运行‘执行抓取’单元格）")

poi_df = pd.read_csv(csv_path, encoding="utf-8")
print(f"Total POIs: {len(poi_df)}")
poi_df.head(20)

Total POIs: 63


Unnamed: 0,name,address,adname,lng,lat
0,微山岛抗战文化园,旅游大道微山湖旅游区-微山岛(西南角),微山县,117.236782,34.656546
1,微山湖旅游区-微子文化苑,微山岛微山湖风景名胜区内,微山县,117.239392,34.659898
2,民俗展区,微山岛微山湖风景名胜区内,微山县,117.239106,34.658017
3,汉魏石搨,微山岛微山湖风景名胜区内,微山县,117.238908,34.658721
4,汉画像石展厅,微山岛微山湖风景名胜区内,微山县,117.239141,34.658722
5,古瓷展厅,微山岛微山湖风景名胜区内,微山县,117.239141,34.65821
6,铁道游击队纪念园,微山岛镇,微山县,117.227314,34.658362
7,文昌殿,微山岛微山湖风景名胜区微子文化苑内,微山县,117.239947,34.659965
8,微山湖风景名胜区-台北荷苑,微山岛微山湖风景名胜区微子文化苑内,微山县,117.240765,34.658322
9,三仁殿,微山岛微山湖风景名胜区微子文化苑内,微山县,117.239616,34.660422
