In [None]:
# 浏览器驱动配置模块
from selenium import webdriver
from selenium.webdriver.edge.options import Options as EdgeOptions
import sys

def initialize_browser(headless_mode=True):
    """创建并返回配置好的Edge浏览器实例
    
    参数:
        headless_mode (bool): 是否启用无界面模式，默认开启
        
    返回:
        WebDriver实例
        
    异常:
        RuntimeError: 当驱动初始化失败时抛出
    """
    # 配置浏览器选项
    browser_config = EdgeOptions()
    browser_config.use_chromium = True  # 确保使用Chromium内核
    if headless_mode:
        browser_config.add_argument("--headless=new")  # 新版本无头模式参数
        browser_config.add_argument("--disable-gpu")
    
    # 设置常见配置参数
    common_params = [
        "--no-sandbox",
        "--disable-dev-shm-usage",
        "--remote-allow-origins=*",  # 允许所有远程连接
        "--log-level=3"  # 禁用浏览器日志
    ]
    for param in common_params:
        browser_config.add_argument(param)

    try:
        # 实例化浏览器驱动
        driver_instance = webdriver.Edge(options=browser_config)
        # 设置隐式等待时间为10秒
        driver_instance.implicitly_wait(10)
        return driver_instance
    except Exception as error:
        error_msg = f"浏览器初始化失败 || 错误类型: {type(error).__name__} || 详细信息: {str(error)}"
        raise RuntimeError(error_msg) from error

In [None]:
import time
import random
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
import pandas as pd

# 浏览器初始化（使用之前改进的版本）
def initialize_browser():
    """配置并返回Edge浏览器实例"""
    options = webdriver.EdgeOptions()
    options.add_argument("--headless=new")
    options.add_argument("--disable-gpu")
    options.add_argument("--no-sandbox")
    options.add_argument("--disable-dev-shm-usage")
    options.add_argument("--remote-allow-origins=*")
    return webdriver.Edge(options=options)

# 区域配置
TARGET_AREAS = [
    {"main_area": "北京周边", "sub_areas": ["通州", "大厂", "燕郊", "马驹桥", "亦庄"]}
]

# 核心爬取函数
def crawl_properties(driver, main_area, sub_area):
    print(f"\n已点击主区域: {main_area}")
    print(f"已点击子区域: {sub_area}")
    
    # 打开目标网站
    driver.get("https://esf.fang.com/")
    
    # 选择主区域
    try:
        main_selector = WebDriverWait(driver, 15).until(
            EC.element_to_be_clickable((By.XPATH, f"//a[contains(text(), '{main_area}')]"))
        )
        main_selector.click()
        time.sleep(random.uniform(1, 2))
    except TimeoutException:
        print("主区域选择超时")
        return []

    # 搜索子区域
    try:
        search_box = WebDriverWait(driver, 10).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, "#keyword"))
        )
        search_box.clear()
        search_box.send_keys(sub_area)
        driver.find_element(By.CSS_SELECTOR, "#searchBtn").click()
        time.sleep(random.uniform(2, 3))
    except TimeoutException:
        print("搜索框加载失败")
        return []

    # 数据收集
    all_data = []
    page_count = 0
    max_retry = 3

    while True:
        try:
            WebDriverWait(driver, 20).until(
                EC.presence_of_all_elements_located((By.CSS_SELECTOR, ".shop_list li"))
            )
            print("房源列表加载完成")
            page_count += 1
            
            # 解析数据
            items = driver.find_elements(By.CSS_SELECTOR, ".shop_list li")
            for item in items:
                try:
                    data = {
                        "标题": item.find_element(By.CSS_SELECTOR, ".text a").text.strip(),
                        "价格": item.find_element(By.CSS_SELECTOR, ".priceInfo .red").text,
                        "详情": item.find_element(By.CSS_SELECTOR, ".text p").text,
                        "区域": f"{main_area}-{sub_area}"
                    }
                    all_data.append(data)
                except NoSuchElementException:
                    continue

            # 翻页处理
            try:
                next_btn = driver.find_element(By.CSS_SELECTOR, ".page_al a:last-child")
                if "下一页" in next_btn.text:
                    next_btn.click()
                    time.sleep(random.uniform(3, 5))
                    max_retry = 3  # 重置重试次数
                else:
                    break
            except Exception:
                if max_retry > 0:
                    max_retry -= 1
                    time.sleep(5)
                    continue
                print("翻页超时")
                break

        except TimeoutException:
            print("房源列表加载超时")
            break

    # 保存数据
    filename = f"{main_area}_{sub_area}_前{page_count}页房源.xlsx"
    pd.DataFrame(all_data).to_excel(filename, index=False)
    print(f"数据已保存至 {filename}")
    print(f"共爬取 {len(all_data)} 条数据")
    return all_data

# 主程序
if __name__ == "__main__":
    browser = initialize_browser()
    try:
        for area_config in TARGET_AREAS:
            for sub in area_config["sub_areas"]:
                crawl_properties(browser, area_config["main_area"], sub)
    finally:
        browser.quit()

In [None]:
import pandas as pd
import numpy as np
import re
import matplotlib.pyplot as plt
import seaborn as sns
import statsmodels.formula.api as smf
from pathlib import Path

# --------------------------
# 数据预处理模块
# --------------------------

def preprocess_data():
    """合并并预处理所有数据"""
    # 读取所有Excel文件
    data_dir = Path('./')
    dfs = []
    for f in data_dir.glob('*房源.xlsx'):
        df = pd.read_excel(f)
        dfs.append(df)
    combined = pd.concat(dfs, ignore_index=True)

    # 提取面积信息
    def extract_area(text):
        match = re.search(r'(\d+)㎡|建筑面积[:：](\d+)', text)
        if match:
            return int(match.group(1) if match.group(1) else int(match.group(2))
        return np.nan

    combined['面积'] = combined['详情'].apply(extract_area)

    # 计算每平米指标
    combined = combined.dropna(subset=['面积'])
    combined['price_per_m2'] = combined['价格'].str.extract(r'(\d+)').astype(float) / combined['面积']
    combined['rent_per_m2'] = combined['价格'].str.extract(r'(\d+)').astype(float) / combined['面积']  # 假设租金字段处理相同

    # 处理异常值
    q1 = combined[['price_per_m2', 'rent_per_m2']].quantile(0.25)
    q3 = combined[['price_per_m2', 'rent_per_m2']].quantile(0.75)
    iqr = q3 - q1
    combined = combined[~((combined[['price_per_m2', 'rent_per_m2']] < (q1 - 1.5*iqr)) | 
                        (combined[['price_per_m2', 'rent_per_m2']] > (q3 + 1.5*iqr))).any(axis=1)]
    
    return combined

# --------------------------
# 可视化模块
# --------------------------

def plot_median_ratio(df):
    """绘制中位数价租比"""
    plt.figure(figsize=(12, 6))
    ratio_df = df.groupby('区域')[['price_per_m2', 'rent_per_m2']].median()
    ratio_df['price_rent_ratio'] = ratio_df['price_per_m2'] / ratio_df['rent_per_m2']
    
    ax = sns.barplot(x=ratio_df.index, y='price_rent_ratio', data=ratio_df)
    plt.axhline(200, color='red', linestyle='--', label='Global Fair Value')
    plt.title('Median Price to Rent Ratio by Block')
    plt.xticks(rotation=45)
    plt.ylabel('Ratio')
    plt.legend()
    plt.tight_layout()
    plt.savefig('figureA.png', dpi=300)
    plt.close()

# --------------------------
# 建模预测模块
# --------------------------

def run_models(df):
    """运行回归模型并进行预测"""
    # 模型1：价格模型
    model_price = smf.ols('price_per_m2 ~ m2 + C(location) + m2:C(location)', 
                         data=df.rename(columns={'面积':'m2', '区域':'location'})).fit()
    
    # 模型2：租金模型
    model_rent = smf.ols('rent_per_m2 ~ m2 + C(location) + m2:C(location)', 
                        data=df.rename(columns={'面积':'m2', '区域':'location'})).fit()
    
    # 生成预测数据
    predict_data = pd.DataFrame({
        'm2': [50, 100]*len(df['区域'].unique()),
        'location': np.repeat(df['区域'].unique(), 2)
    })
    
    # 进行预测
    predict_data['pred_price'] = model_price.predict(predict_data)
    predict_data['pred_rent'] = model_rent.predict(predict_data)
    predict_data['pred_ratio'] = predict_data['pred_price'] / predict_data['pred_rent']
    
    return predict_data

def plot_predicted_ratios(predict_data):
    """绘制预测结果"""
    for size in [50, 100]:
        plt.figure(figsize=(12, 6))
        subset = predict_data[predict_data['m2'] == size]
        ax = sns.barplot(x='location', y='pred_ratio', data=subset)
        plt.title(f'Predicted Price-Rent Ratio at {size}m2')
        plt.axhline(200, color='red', linestyle='--')
        plt.xticks(rotation=45)
        plt.ylabel('Ratio')
        plt.tight_layout()
        plt.savefig(f'figure_{"B" if size==50 else "C"}.png', dpi=300)
        plt.close()

# --------------------------
# 主程序
# --------------------------

if __name__ == "__main__":
    # 数据预处理
    df = preprocess_data()
    print(f"有效数据量: {len(df)}")
    
    # 绘制基础图表
    plot_median_ratio(df)
    
    # 建模预测
    predicted = run_models(df)
    
    # 绘制预测图表
    plot_predicted_ratios(predicted)
    
    # 保存预测结果示例
    predicted.to_excel('model_predictions.xlsx', index=False)