In [1]:
pip install selenium webdriver-manager beautifulsoup4 lxml requests

Collecting selenium
  Downloading selenium-4.36.0-py3-none-any.whl.metadata (7.5 kB)
Collecting webdriver-manager
  Using cached webdriver_manager-4.0.2-py2.py3-none-any.whl.metadata (12 kB)
Collecting trio<1.0,>=0.30.0 (from selenium)
  Downloading trio-0.31.0-py3-none-any.whl.metadata (8.5 kB)
Collecting trio-websocket<1.0,>=0.12.2 (from selenium)
  Downloading trio_websocket-0.12.2-py3-none-any.whl.metadata (5.1 kB)
Collecting websocket-client<2.0,>=1.8.0 (from selenium)
  Downloading websocket_client-1.9.0-py3-none-any.whl.metadata (8.3 kB)
Collecting python-dotenv (from webdriver-manager)
  Using cached python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Collecting attrs>=23.2.0 (from trio<1.0,>=0.30.0->selenium)
  Downloading attrs-25.4.0-py3-none-any.whl.metadata (10 kB)
Collecting sortedcontainers (from trio<1.0,>=0.30.0->selenium)
  Downloading sortedcontainers-2.4.0-py2.py3-none-any.whl.metadata (10 kB)
Collecting outcome (from trio<1.0,>=0.30.0->selenium)
  Downloading out


[notice] A new release of pip is available: 24.2 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [1]:
import csv
import os
import re
import time
from dataclasses import dataclass
from typing import Iterable, List, Optional, Tuple

import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from selenium import webdriver
from selenium.webdriver.edge.options import Options as EdgeOptions
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.edge.service import Service as EdgeService
try:
    from webdriver_manager.microsoft import EdgeChromiumDriverManager
    WEBDRIVER_MANAGER_AVAILABLE = True
except Exception:
    WEBDRIVER_MANAGER_AVAILABLE = False


# 仅保留二手房相关配置
USER_AGENT = (
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
    "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
ESF_BASE = "https://bj.esf.fang.com/"  # 二手房基础域名
ESF_COMMUNITY_PATH = "house-a015277-b03115/"  # 海淀-世纪城二手房社区页路径
HEADERS = {
    "User-Agent": USER_AGENT,
    "Referer": ESF_BASE,
    "Accept-Language": "zh-CN,zh;q=0.9",
}


# 仅保留二手房数据类
@dataclass
class SaleItem:
    area_sqm: float  # 面积（平方米）
    total_price_wan: float  # 总价（万元）
    unit_price: float  # 单价（元/平方米）


def ensure_domain(url: str, allowed_domains: Iterable[str]) -> bool:
    """验证URL是否属于允许的二手房域名"""
    parsed = urlparse(url)
    hostname = parsed.hostname or ""
    return any(hostname == d or hostname.endswith("." + d) for d in allowed_domains)


def get_soup(url: str, session: Optional[requests.Session] = None) -> BeautifulSoup:
    """获取页面HTML并解析为BeautifulSoup对象（requests版）"""
    if not ensure_domain(url, ["esf.fang.com"]):
        raise ValueError("URL 不在允许的二手房域名中，拒绝请求：" + url)
    s = session or requests.Session()
    resp = s.get(url, headers=HEADERS, timeout=15)
    resp.raise_for_status()
    return BeautifulSoup(resp.text, "lxml")


def create_driver() -> webdriver.Edge:
    """创建Edge无头浏览器驱动（用于动态页面加载）"""
    options = EdgeOptions()
    options.use_chromium = True
    options.add_argument("--headless=new")  # 无头模式
    options.add_argument("--disable-gpu")
    options.add_argument("--no-sandbox")
    options.add_argument(f"--user-agent={USER_AGENT}")
    options.add_argument("--lang=zh-CN")
    
    # 固定Edge驱动路径（需根据实际情况调整）
    driver_dir = r"C:\Users\lenovo\Desktop\2025\KE\Quent\edgedriver_win64"
    driver_exe = os.path.join(driver_dir, "msedgedriver.exe")
    if not os.path.exists(driver_exe):
        raise FileNotFoundError(
            f"未找到 Edge 驱动：{driver_exe}\n"
            "请确认已解压并提供正确路径（需要 msedgedriver.exe）"
        )
    
    service = EdgeService(executable_path=driver_exe)
    driver = webdriver.Edge(service=service, options=options)
    driver.set_page_load_timeout(30)
    return driver


def get_soup_selenium(url: str, driver: webdriver.Edge) -> BeautifulSoup:
    """通过Selenium获取动态页面并解析（处理懒加载）"""
    if not ensure_domain(url, ["esf.fang.com"]):
        raise ValueError("URL 不在允许的二手房域名中，拒绝请求：" + url)
    
    driver.get(url)
    # 等待页面加载完成
    try:
        WebDriverWait(driver, 8).until(
            lambda d: d.execute_script("return document.readyState") == "complete"
        )
    except TimeoutException:
        pass
    
    # 等待房源列表元素出现
    selectors = [
        (By.CSS_SELECTOR, "div.shop_list_4 dl"),
        (By.CSS_SELECTOR, "div.shop_list"),
        (By.CSS_SELECTOR, "div#houseList dl"),
        (By.CSS_SELECTOR, "div.houseList dl"),
        (By.CSS_SELECTOR, "ul.listCon li"),
        (By.CSS_SELECTOR, "div#listBox dl"),
        (By.CSS_SELECTOR, "ul#houseList li"),
    ]
    for by, sel in selectors:
        try:
            WebDriverWait(driver, 5).until(EC.presence_of_element_located((by, sel)))
            break
        except TimeoutException:
            continue
    
    # 滚动页面触发懒加载
    try:
        driver.execute_script("window.scrollTo(0, Math.min(800, document.body.scrollHeight));")
        time.sleep(0.6)
        driver.execute_script("window.scrollTo(0, Math.min(2000, document.body.scrollHeight));")
        time.sleep(0.6)
        driver.execute_script("window.scrollTo(0, document.body.scrollHeight);")
        time.sleep(0.6)
    except Exception:
        pass
    
    # 首次运行保存首页HTML用于调试
    if not hasattr(driver, "_saved_first_page"):
        with open("debug_esf_first_page.html", "w", encoding="utf-8") as f:
            f.write(driver.page_source)
        setattr(driver, "_saved_first_page", True)
    
    return BeautifulSoup(driver.page_source, "lxml")


def extract_numbers(text: str, pattern: str) -> Optional[float]:
    """从文本中提取数字（用于解析面积、价格）"""
    m = re.search(pattern, text.replace("\xa0", " "))
    if not m:
        return None
    try:
        return float(m.group(1))
    except Exception:
        return None


def parse_sale_list(soup: BeautifulSoup) -> Tuple[List[SaleItem], Optional[str]]:
    """解析二手房列表页面，提取房源数据和下一页链接"""
    items: List[SaleItem] = []
    
    # 匹配二手房列表容器
    containers = soup.select(
        "div.shop_list, ul.shop_list, div#houseList, div.houseList, ul.list, div#list, div.listBox, ul#houseList"
    ) or [soup]
    
    for container in containers:
        # 匹配单个房源卡片
        cards = container.select("dl, dd, div.list, div.item, li, section") or []
        for card in cards:
            text = card.get_text(" ", strip=True)
            if not text:
                continue
            
            # 提取核心数据
            area = extract_numbers(text, r"(\d+(?:\.\d+)?)\s*㎡")  # 面积
            total_price = extract_numbers(text, r"(\d+(?:\.\d+)?)\s*万")  # 总价（万）
            unit_price = extract_numbers(text, r"(\d+(?:\.\d+)?)\s*元/㎡")  # 单价
            
            # 数据补全（缺一个时通过另一个计算）
            if area and (total_price or unit_price):
                if total_price is None:
                    total_price = round((unit_price * area) / 10000.0, 2)
                if unit_price is None:
                    unit_price = round((total_price * 10000.0) / area, 2)
                items.append(SaleItem(area, total_price, unit_price))
    
    # 提取下一页链接
    next_href = None
    next_link = soup.select_one("a[rel=next], a.pageNext, a.next, a.next-page, a#PageControl1_hlk_next, a:-soup-contains('下一页')")
    if next_link and next_link.get("href"):
        next_href = next_link["href"]
    
    return items, next_href


def crawl_sales_pages(keyword: str, district: str, max_pages: int = 20, delay_sec: float = 1.0) -> Iterable[Tuple[int, List[SaleItem]]]:
    """抓取二手房多页数据，返回（页码，该页房源列表）"""
    driver = create_driver()
    try:
        for page_no in range(1, max_pages + 1):
            # 构造分页URL（第1页无分页参数，>=2页加i3{页码}/）
            page_path = ESF_COMMUNITY_PATH + (f"i3{page_no}/" if page_no > 1 else "")
            url = urljoin(ESF_BASE, page_path)
            
            # 解析页面并提取数据
            soup = get_soup_selenium(url, driver)
            page_items, _ = parse_sale_list(soup)
            
            yield page_no, page_items
            time.sleep(delay_sec)  # 爬取间隔，避免反爬
    finally:
        driver.quit()  # 确保驱动关闭


def write_sales_to_csv(items: List[SaleItem], filepath: str) -> None:
    """将二手房数据写入CSV文件"""
    with open(filepath, "w", newline="", encoding="utf-8-sig") as f:
        writer = csv.writer(f)
        # CSV表头
        writer.writerow(["area_sqm（平方米）", "total_price_wan（万元）", "unit_price（元/平方米）"])
        # 写入数据（总价转换为元，保留整数）
        for it in items:
            writer.writerow([it.area_sqm, int(round(it.total_price_wan * 10000)), it.unit_price])


def main() -> None:
    """主函数：执行二手房抓取并保存到CSV"""
    keyword = "世纪城"
    district = "海淀"
    max_pages = 20  # 最大抓取页数
    output_csv = "shijicheng_esf.csv"  # 输出CSV文件名

    print(f"开始抓取 {district}-{keyword} 二手房数据（来源：{ESF_BASE}）...")
    sales_all: List[SaleItem] = []
    
    # 批量抓取多页数据
    for page_no, items in crawl_sales_pages(keyword, district, max_pages=max_pages):
        print(f"第{page_no}页：抓取到 {len(items)} 条房源")
        sales_all.extend(items)
    
    # 保存到CSV
    write_sales_to_csv(sales_all, output_csv)
    print(f"\n抓取完成！共获取 {len(sales_all)} 条二手房数据，已保存至 {output_csv}")


if __name__ == "__main__":
    main()

开始抓取 海淀-世纪城 二手房数据（来源：https://bj.esf.fang.com/）...
第1页：抓取到 61 条房源
第2页：抓取到 61 条房源
第3页：抓取到 60 条房源
第4页：抓取到 60 条房源
第5页：抓取到 60 条房源
第6页：抓取到 60 条房源
第7页：抓取到 60 条房源
第8页：抓取到 60 条房源
第9页：抓取到 60 条房源
第10页：抓取到 60 条房源
第11页：抓取到 60 条房源
第12页：抓取到 60 条房源
第13页：抓取到 60 条房源
第14页：抓取到 60 条房源
第15页：抓取到 60 条房源
第16页：抓取到 60 条房源
第17页：抓取到 60 条房源
第18页：抓取到 60 条房源
第19页：抓取到 60 条房源
第20页：抓取到 60 条房源

抓取完成！共获取 1202 条二手房数据，已保存至 shijicheng_esf.csv


In [9]:
import re, time, csv, random, os
from pathlib import Path

from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.edge.service import Service as EdgeService
from selenium.common.exceptions import TimeoutException

# ----------------- 参数区 -----------------
START_URL   = "https://zu.fang.com/house-a015277-b03115/"
MAX_PAGE    = 20
CSV_FILE    = "hlg_fang_rent.csv"
# 使用与第一段代码相同的Edge驱动路径
DRIVER_DIR = r"C:\Users\lenovo\Desktop\2025\KE\Quent\edgedriver_win64"
DRIVER_PATH = os.path.join(DRIVER_DIR, "msedgedriver.exe")
# -------------------------------------------

# -------------- 浏览器初始化 --------------
def init_driver():
    # 使用Edge浏览器配置
    options = webdriver.EdgeOptions()
    options.use_chromium = True
    # 反反爬设置
    options.add_argument("--disable-blink-features=AutomationControlled")
    options.add_experimental_option("excludeSwitches", ["enable-automation"])
    options.add_experimental_option('useAutomationExtension', False)
    # 可根据需要注释/取消注释无头模式
    # options.add_argument("--headless=new")
    options.add_argument("--disable-gpu")
    options.add_argument("--no-sandbox")
    
    # 验证驱动是否存在
    if not os.path.exists(DRIVER_PATH):
        raise FileNotFoundError(
            f"未找到 Edge 驱动：{DRIVER_PATH}\n"
            "请确认已解压并提供正确路径（需要 msedgedriver.exe）"
        )
    
    # 使用Edge驱动服务
    service = EdgeService(executable_path=DRIVER_PATH)
    driver = webdriver.Edge(service=service, options=options)
    
    # 保持反检测设置
    driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
        "source": "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
    })
    
    driver.set_page_load_timeout(30)
    return driver

# -------------- 单页解析（XPath 版）--------------
def parse_one_page(driver):
    wait = WebDriverWait(driver, 15)
    # 1. 等待列表容器出现
    wait.until(EC.presence_of_element_located(
        (By.XPATH, '//*[@id="listBox"]//div[contains(@class,"houseList")]')))

    # 2. 取所有房源行
    rows = driver.find_elements(By.XPATH, '//*[@id="listBox"]//dd')
    data = []
    for dl in rows:
        try:
            # 月租金
            price_txt = dl.find_element(By.XPATH, './/span[contains(@class,"price")]').text
            price = int(re.search(r'(\d+)', price_txt.replace(',', '')).group(1))

            # 面积
            info_txt = dl.find_element(By.XPATH, './/p[contains(@class,"font15") or contains(@class,"room")]').text
            m = re.search(r'([\d.]+)\s*㎡', info_txt)
            area = float(m.group(1)) if m else None  # 容错处理

            # 只保留面积和月租两个字段
            data.append({"面积(㎡)": area, "月租金(元)": price})
        except Exception as e:
            print("[WARN] 丢弃一行：", e)
    return data

# -------------- 主流程 --------------
def main():
    driver = init_driver()
    driver.get(START_URL)
    all_records = []

    for page in range(1, MAX_PAGE + 1):
        print(f"正在采集第 {page:02d} 页 …")
        try:
            all_records += parse_one_page(driver)
            if page == MAX_PAGE:
                break
            # 点击“下一页”
            next_btn = driver.find_element(By.XPATH, '//a[contains(text(),"下一页")]')
            next_btn.click()
            time.sleep(random.uniform(1.2, 2.4))  # 随机延迟避免反爬
        except TimeoutException:
            print("[ERROR] 加载下一页超时，提前结束")
            break
        except Exception as e:
            print("[ERROR] 未知异常：", e)
            break

    driver.quit()
    # 保存 csv
    import pandas as pd
    pd.DataFrame(all_records).to_csv(CSV_FILE, index=False, encoding="utf-8-sig")
    print(f"全部完成，共采集 {len(all_records)} 条，已写入 {Path(CSV_FILE).absolute()}")
    print(f"生成的文件包含以下字段：面积(㎡)、月租金(元)")

if __name__ == "__main__":
    main()


正在采集第 01 页 …
正在采集第 02 页 …
正在采集第 03 页 …
正在采集第 04 页 …
正在采集第 05 页 …
正在采集第 06 页 …
正在采集第 07 页 …
正在采集第 08 页 …
正在采集第 09 页 …
正在采集第 10 页 …
正在采集第 11 页 …
正在采集第 12 页 …
正在采集第 13 页 …
正在采集第 14 页 …
正在采集第 15 页 …
正在采集第 16 页 …
正在采集第 17 页 …
正在采集第 18 页 …
正在采集第 19 页 …
正在采集第 20 页 …
全部完成，共采集 1200 条，已写入 c:\Users\lenovo\Desktop\2025\KE\Quent\Quant_RUC-main\Homework\hlg_fang_rent.csv
生成的文件包含以下字段：面积(㎡)、月租金(元)
