# 初始化设置与工具函数

In [12]:
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.edge.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException, NoSuchElementException
import time
import random
import pandas as pd
import os
import socket
import subprocess  # 新增：用于检查进程

# 配置Edge浏览器（强化反检测版本）
options = webdriver.EdgeOptions()

# 核心反检测配置
options.add_argument('--disable-blink-features=AutomationControlled')  # 禁用自动化控制标识
options.add_argument('--disable-dev-shm-usage')  # 解决内存不足问题
options.add_argument('--no-sandbox')  # 非沙箱模式，增强兼容性
options.add_argument('--disable-extensions')  # 禁用扩展，减少特征
options.add_argument('--disable-gpu')  # 禁用GPU加速，避免某些环境问题
options.add_argument('--disable-infobars')  # 禁用信息栏
options.add_argument('--start-maximized')  # 最大化窗口，模拟真实使用

# 隐藏自动化开关
options.add_experimental_option('excludeSwitches', ['enable-automation', 'enable-logging'])
options.add_experimental_option('useAutomationExtension', False)

# 优化User-Agent（使用Edge真实UA，避免与浏览器类型冲突）
options.add_argument('user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36 Edg/114.0.1823.67')

# 配置用户数据目录（确保复用已登录状态）
user_data_path = r"C:\Users\ZhaoQY\AppData\Local\Microsoft\Edge\User Data"
options.add_argument(f"user-data-dir={user_data_path}")

# 清理残留进程（避免用户数据目录冲突）
def clean_edge_processes():
    try:
        # 结束所有Edge相关进程
        subprocess.run(["taskkill", "/F", "/IM", "msedge.exe"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print("已清理残留的Edge进程")
    except Exception as e:
        print(f"清理进程时出错（非致命）：{e}")

# 检查用户数据目录是否存在
if not os.path.exists(user_data_path):
    raise FileNotFoundError(f"用户数据目录不存在：{user_data_path}，请确认路径正确性")

# 本地EdgeDriver路径配置
edgedriver_path = r'D:\edgedriver_win64\msedgedriver.exe'
if not os.path.exists(edgedriver_path):
    raise FileNotFoundError(f"未找到EdgeDriver: {edgedriver_path}")

# 目标URL
base_url = "https://xueqiu.com/hq/detail?market=CN&first_name=0&second_name=0&type=sh_sz"

# 增强版网络连接检查
def check_internet_connection():
    test_ips = [("8.8.8.8", 53), ("114.114.114.114", 53), ("223.5.5.5", 53)]  # 多节点检测
    for ip, port in test_ips:
        try:
            socket.create_connection((ip, port), timeout=3)
            print(f"网络连接正常（通过{ip}验证）")
            return True
        except OSError:
            continue
    print("网络连接失败，请检查网络设置")
    return False

# 增强版URL验证
def verify_url_accessible(url):
    try:
        # 解析域名
        if url.startswith(('http://', 'https://')):
            domain = url.split('/')[2]
        else:
            domain = url.split('/')[0]
        
        # 尝试解析IP
        ip_address = socket.gethostbyname(domain)
        print(f"域名 {domain} 解析正常，IP: {ip_address}")
        
        # 尝试建立TCP连接（模拟浏览器预热）
        socket.create_connection((ip_address, 80), timeout=5)
        print(f"成功与 {domain} 建立连接")
        return True
    except Exception as e:
        print(f"域名验证失败: {str(e)}")
        return False

# 增强版随机等待（模拟人类不规则等待）
def random_sleep(min_seconds=2, max_seconds=5, human_like=True):
    if human_like:
        # 增加人类行为特征：偶尔等待更长时间
        if random.random() < 0.1:  # 10%概率延长等待
            sleep_time = random.uniform(max_seconds, max_seconds * 2)
        else:
            sleep_time = random.uniform(min_seconds, max_seconds)
    else:
        sleep_time = random.uniform(min_seconds, max_seconds)
    
    time.sleep(sleep_time)
    return sleep_time

# 初始化浏览器（新增：集中管理浏览器启动）
def init_browser():
    # 启动前清理残留进程
    clean_edge_processes()
    
    # 初始化驱动
    service = Service(edgedriver_path)
    driver = webdriver.Edge(service=service, options=options)
    
    # 隐藏webdriver属性（关键反检测步骤）
    driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
        "source": """
            // 覆盖navigator.webdriver
            Object.defineProperty(navigator, 'webdriver', {
                get: () => undefined,
                configurable: true
            });
            
            // 覆盖其他自动化特征
            Object.defineProperty(navigator, 'plugins', {
                get: () => [1, 2, 3], // 模拟插件数量
                configurable: true
            });
            
            // 模拟真实窗口尺寸变化
            window.addEventListener('resize', () => {
                window.innerWidth = window.outerWidth;
                window.innerHeight = window.outerHeight;
            });
        """
    })
    
    return driver

# 股票信息提取函数

In [13]:
# 第二段代码：仅保留函数声明和框架，移除重复内容
# 提取股票基本信息（仅声明，具体实现见第三段）
def extract_stock_basic_info(driver):
    pass  # 实际实现由第三段代码提供

# 提取股票评论（仅声明，具体实现见第三段）
def extract_all_comments(driver, stock_name, stock_code):
    pass  # 实际实现由第三段代码提供

# 保存数据到Excel（仅声明，具体实现见第三段）
def save_to_excel(data, filename="股票数据.xlsx"):
    pass  # 实际实现由第三段代码提供

# 主函数（仅做前置检查，不执行浏览器初始化，避免与第三段重复）
def main():
    all_stock_data = []
    driver = None
    
    try:
        # 仅保留基础检查，复用第一段的工具函数
        if not check_internet_connection():
            raise ConnectionError("网络连接失败")
        if not verify_url_accessible(base_url):
            raise ConnectionError("目标URL无法访问")
        
        # 不进行浏览器初始化，避免与第三段重复
        print("第二段代码检查通过，等待后续执行...")
        
    except Exception as e:
        print(f"第二段代码检查出错：{e}")
    finally:
        # 此时未初始化浏览器，无需关闭
        print("第二段代码执行完毕")

if __name__ == "__main__":
    main()

网络连接正常（通过8.8.8.8验证）
域名 xueqiu.com 解析正常，IP: 60.205.172.136
成功与 xueqiu.com 建立连接
第二段代码检查通过，等待后续执行...
第二段代码执行完毕


# 主爬虫程序

In [16]:
def main():
    all_stock_data = []
    driver = None
    # 配置Edge浏览器
    options = webdriver.EdgeOptions()
    options.add_argument("--start-maximized")
    options.add_experimental_option("excludeSwitches", ["enable-automation"])
    options.add_experimental_option("useAutomationExtension", False)
    options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.7204.185 Safari/537.36")  
    
    try:
        # 检查网络和URL可用性
        if not check_internet_connection():
            raise ConnectionError("网络连接失败")  
        if not verify_url_accessible(base_url):
            raise ConnectionError("目标URL无法访问")
        
        # 初始化浏览器
        service = Service(edgedriver_path)
        driver = webdriver.Edge(service=service, options=options)
        driver.maximize_window()
        driver.get(base_url)
        time.sleep(random.uniform(5, 8))
        
        # 处理验证
        print("请检查浏览器窗口，如有滑动验证请手动完成，完成后按Enter继续...")
        input("确认验证完成后按Enter继续...")
        time.sleep(random.uniform(3, 5))
        
        # 爬取参数
        total_stock_pages = 167
        stocks_per_page = 30
        start_stock_idx = 4
        max_stock_retries = 3
        max_comment_pages = 100
        
        # 爬取逻辑
        for stock_page in range(1, total_stock_pages + 1):
            print(f"\n===== 正在爬取股票列表第{stock_page}页 =====")
            
            try:
                if stock_page > 1:
                    try:
                        page_button = WebDriverWait(driver, 15).until(
                            EC.element_to_be_clickable((By.XPATH, f"//div[@class='hq-pager']//a[text()='{stock_page}']"))
                        )
                        page_button.click()
                    except:
                        print(f"⚠️ 直接定位第{stock_page}页失败，尝试通过下一页翻页")
                        next_page_btn = WebDriverWait(driver, 10).until(
                            EC.element_to_be_clickable((By.XPATH, "//div[@class='hq-pager']//a[contains(text(), '下一页')]"))
                        )
                        next_page_btn.click()
                    time.sleep(random.uniform(5, 8))
                    try:
                        WebDriverWait(driver, 10).until(
                            EC.text_to_be_present_in_element((By.XPATH, "//div[@class='hq-pager']//a[contains(@class, 'current')]"), 
                                                             str(stock_page))
                        )
                        print(f"✅ 已成功跳转至第{stock_page}页")
                    except:
                        print(f"⚠️ 第{stock_page}页跳转失败，跳过当前页")
                        continue
                
                current_start_idx = start_stock_idx if stock_page == 1 else 1
                for stock_idx in range(current_start_idx, stocks_per_page + 1):
                    stock_retry = 0
                    stock_success = False
                    
                    while stock_retry <= max_stock_retries and not stock_success:
                        try:
                            print(f"\n----- 正在爬取第{stock_page}页第{stock_idx}支股票（第{stock_retry + 1}次尝试） -----")
                            
                            if not driver.session_id or not driver.window_handles:
                                print("⚠️ 会话失效，重新初始化浏览器...")
                                driver.quit()
                                driver = webdriver.Edge(service=service, options=options)
                                driver.get(base_url)
                                time.sleep(5)
                                print("请重新完成登录/验证（30秒）...")
                                time.sleep(30)
                                driver.get(f"{base_url}&page={stock_page}")
                                time.sleep(random.uniform(5, 8))
                            
                            stock_link = WebDriverWait(driver, 20).until(
                                EC.element_to_be_clickable((By.XPATH,
                                    f"//*[@id='app']/div/div[2]/div/div/div[2]/div[2]/div[1]/div/table/tbody/tr[{stock_idx}]/td[2]/span/a"))
                            )
                            stock_name = stock_link.text
                            print(f"正在处理股票: {stock_name}")
                            
                            stock_link.click()
                            time.sleep(random.uniform(5, 8))
                            
                            if len(driver.window_handles) > 1:
                                driver.switch_to.window(driver.window_handles[-1])
                            else:
                                print("⚠️ 未打开新窗口，使用当前窗口")
                            
                            stock_basic = extract_stock_basic_info(driver)
                            if not stock_basic["股票名称"]:
                                stock_basic["股票名称"] = stock_name  
                            print(f"股票基本信息: {stock_basic}")
                            
                            stock_comments = extract_all_comments(
                                driver, 
                                stock_basic["股票名称"], 
                                stock_basic["股票代码"],
                                max_comment_pages
                            )
                            print(f"共提取到{len(stock_comments)}条评论（当前股票）")
                            
                            stock_data = {
                                "基本信息": stock_basic,
                                "评论": stock_comments
                            }
                            all_stock_data.append(stock_data)
                            stock_success = True
                            
                            if len(driver.window_handles) > 1:
                                driver.close()
                                driver.switch_to.window(driver.window_handles[0])
                            time.sleep(random.uniform(3, 5))
                            
                        except Exception as e:
                            stock_retry += 1
                            print(f"处理第{stock_page}页第{stock_idx}支股票时出错（第{stock_retry}次）: {e}")
                            
                            try:
                                if driver and len(driver.window_handles) > 1:
                                    driver.close()
                                    driver.switch_to.window(driver.window_handles[0])
                                time.sleep(3)
                            except:
                                pass
                            
                            if stock_retry > max_stock_retries:
                                print(f"⚠️ 已达最大重试次数，跳过当前股票")
        
            except Exception as e:
                print(f"爬取第{stock_page}页出错: {e}")
                driver.refresh()
                time.sleep(5)
                continue
        
        save_to_excel(all_stock_data, batch_size=1000)
        print("所有爬取任务已完成")
        
    except Exception as e:
        print(f"爬取过程出错：{e}")
        if all_stock_data:
            save_to_excel(all_stock_data, filename="股票数据_部分保存.xlsx")
            print("已保存部分爬取数据")
    finally:
        if driver:
            driver.quit()
            print("浏览器已关闭")


# 提取股票基本信息
def extract_stock_basic_info(driver):
    try:
        name_code_xpaths = [
            "//div[@class='stock-name']",
            "//h1[@class='name']",
            "//div[@class='stock-title']"
        ]
        
        name_code_text = ""
        for xpath in name_code_xpaths:
            try:
                element = WebDriverWait(driver, 8).until(
                    EC.presence_of_element_located((By.XPATH, xpath))
                )
                name_code_text = element.text.strip()
                break
            except:
                continue
        
        code_match = re.search(r'\((SH|SZ)\d{6}\)', name_code_text)
        stock_code = code_match.group().replace("(", "").replace(")", "") if code_match else ""
        pure_name = name_code_text.replace(f"({stock_code})", "").strip() if code_match else name_code_text
        
        return {
            "股票名称": pure_name,
            "股票代码": stock_code
        }
    
    except Exception as e:
        print(f"提取基本信息出错: {e}")
        return {"股票名称": "", "股票代码": ""}


# 提取股票评论
def extract_all_comments(driver, stock_name, stock_code, max_comment_pages):
    all_comments = []
    max_retry = 3
    retry_count = 0
    last_page = 1
    last_comment_idx = 0
    
    content_xpath_templates = [
        "{article}/div[2]/div[2]/div/div/div",
        "{article}/div[2]/div[2]/div[1]/div/div",
        "{article}/div[2]/div[2]/div[1]/div[1]/div",
        "{article}/div[2]/div[2]/div/div",
        "{article}/div[2]/div[2]/div"
    ]
    
    expand_xpath_templates = [
        "{article}/div[2]/div[2]/div/div/a",
        "{article}/div[2]/div[2]/div[1]/div/a",
        "{article}/div[2]/div[2]/div[1]/div[1]/a",
        "{article}/div[2]/div[2]/div/a"
    ]
    
    while retry_count < max_retry:
        try:
            if not driver.window_handles:
                raise NoSuchWindowException("页面已关闭")
            
            comment_container_xpath = "//*[@id='app']/div[2]/div[2]/div[8]/div[3]"
            WebDriverWait(driver, 20).until(
                EC.presence_of_element_located((By.XPATH, comment_container_xpath))
            )
            print("✅ 评论区容器已加载")
            
            current_page = last_page
            
            if last_page > 1 or last_comment_idx > 0:
                print(f"⏳ 恢复进度：第{last_page}页，从第{last_comment_idx + 1}条开始")
                while current_page < last_page:
                    if current_page >= max_comment_pages:
                        print(f"⚠️ 已达评论最大页数（{max_comment_pages}页），停止爬取")
                        return all_comments
                    
                    next_button = WebDriverWait(driver, 10).until(
                        EC.element_to_be_clickable((By.XPATH, "//a[contains(text(), '下一页')]"))
                    )
                    driver.execute_script("arguments[0].click();", next_button)
                    WebDriverWait(driver, 15).until(
                        EC.presence_of_element_located((By.XPATH, comment_container_xpath))
                    )
                    current_page += 1
                    print(f"已跳转至第{current_page}页")
            
            while current_page <= max_comment_pages:
                print(f"\n===== 开始处理{stock_name}评论第{current_page}页 =====")
                page_comments = []
                
                start_idx = last_comment_idx + 1 if current_page == last_page else 1
                for comment_idx in range(start_idx, 11):
                    try:
                        article_xpath = f"//*[@id='app']/div[2]/div[2]/div[8]/div[3]/article[{comment_idx}]"
                        print(f"\n----- 尝试提取第{current_page}页第{comment_idx}条 -----")
                        
                        try:
                            WebDriverWait(driver, 15).until(
                                EC.presence_of_element_located((By.XPATH, article_xpath))
                            )
                        except TimeoutException:
                            print(f"ℹ️ 第{comment_idx}条评论未加载，跳过")
                            continue
                        
                        expand_button = None
                        for exp_template in expand_xpath_templates:
                            exp_xpath = exp_template.format(article=article_xpath)
                            buttons = driver.find_elements(By.XPATH, exp_xpath)
                            if buttons and buttons[0].is_displayed():
                                expand_button = buttons[0]
                                print(f"✅ 找到展开按钮")
                                break
                        
                        if expand_button:
                            try:
                                driver.execute_script("arguments[0].click();", expand_button)
                                time.sleep(random.uniform(1.5, 2.5))
                            except:
                                print("⚠️ 展开按钮点击失败，继续提取")
                        
                        comment_content = None
                        for cont_template in content_xpath_templates:
                            cont_xpath = cont_template.format(article=article_xpath)
                            elements = driver.find_elements(By.XPATH, cont_xpath)
                            if elements and elements[0].text.strip():
                                comment_content = elements[0].text.strip()
                                print(f"✅ 提取到内容")
                                break
                        
                        if comment_content:
                            page_comments.append({
                                "页码": current_page,
                                "评论序号": comment_idx,
                                "内容": comment_content
                            })
                            last_comment_idx = comment_idx
                            last_page = current_page
                        else:
                            print(f"ℹ️ 第{comment_idx}条未匹配到有效内容")
                    
                    except Exception as e:
                        print(f"❌ 第{comment_idx}条处理失败: {e}")
                        continue
                
                if page_comments:
                    all_comments.extend(page_comments)
                    print(f"✅ 第{current_page}页共提取 {len(page_comments)} 条评论")
                
                next_button_xpath = "//*[@id='app']/div[2]/div[2]/div[8]/div[4]/a[contains(text(), '下一页')]"
                try:
                    next_button = WebDriverWait(driver, 15).until(
                        EC.element_to_be_clickable((By.XPATH, next_button_xpath))
                    )
                except TimeoutException:
                    print("❌ 未找到“下一页”按钮，已到最后一页")
                    break
                
                is_disabled = False
                try:
                    btn_class = next_button.get_attribute("class")
                    if "disabled" in btn_class or "gray" in btn_class:
                        is_disabled = True
                        print("⚠️ 下一页按钮不可用")
                except:
                    pass
                if is_disabled:
                    break
                
                driver.execute_script("arguments[0].click();", next_button)
                print(f"✅ 已点击“下一页”按钮，前往第{current_page + 1}页")
                time.sleep(random.uniform(3, 5))
                
                try:
                    WebDriverWait(driver, 20).until(
                        EC.presence_of_element_located((By.XPATH, comment_container_xpath))
                    )
                    print(f"✅ 第{current_page + 1}页加载完成")
                except TimeoutException:
                    print(f"⚠️ 第{current_page + 1}页加载超时，重试点击下一页")
                    next_button.click()
                    time.sleep(5)
                
                current_page += 1
                last_comment_idx = 0
            
            if current_page > max_comment_pages:
                print(f"✅ 已爬取至评论最大页数（{max_comment_pages}页），停止提取")
                break
            
            if all_comments:
                break
            else:
                retry_count += 1
                print(f"⚠️ 未提取到评论，第{retry_count}次重试当前股票...")
                time.sleep(5)
        
        except Exception as e:
            retry_count += 1
            print(f"❌ 评论提取出错，第{retry_count}次重试: {e}")
            
            if "session" in str(e).lower() or "window" in str(e).lower():
                print(f"⚠️ 会话失效/页面关闭，尝试重新初始化浏览器...")
                driver.quit()
                options = webdriver.EdgeOptions()
                options.add_argument("--start-maximized")
                options.add_experimental_option("excludeSwitches", ["enable-automation"])
                options.add_experimental_option("useAutomationExtension", False)
                options.add_argument("user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.7204.185 Safari/537.36")
                driver = webdriver.Edge(options=options)
                driver.get(base_url)
                time.sleep(5)
                print("请重新完成登录/验证（30秒内）...")
                time.sleep(30)
                try:
                    stock_link = WebDriverWait(driver, 15).until(
                        EC.element_to_be_clickable((By.XPATH, f"//*[contains(text(), '{stock_name}')]/ancestor::a"))
                    )
                    stock_link.click()
                    time.sleep(3)
                    driver.switch_to.window(driver.window_handles[-1])
                except:
                    print(f"❌ 重新打开{stock_name}失败，跳过")
                    return all_comments
            else:
                driver.refresh()
                time.sleep(5)
    
    print(f"\n===== {stock_name}评论提取完成，共提取 {len(all_comments)} 条评论 =====")
    return all_comments


# 保存数据到Excel
def save_to_excel(all_stock_data, batch_size=1000, filename="股票数据.xlsx"):
    try:
        excel_data = []
        for i, stock in enumerate(all_stock_data):
            basic_info = stock["基本信息"]
            comments = stock["评论"]
            
            for comment in comments:
                row = {
                    "股票代码": basic_info.get("股票代码", ""),
                    "股票名称": basic_info.get("股票名称", ""),
                    "评论页码": comment.get("页码", ""),
                    "评论序号": comment.get("评论序号", ""),
                    "评论内容": comment.get("内容", "")
                }
                excel_data.append(row)
            
            if (i + 1) % batch_size == 0:
                df = pd.DataFrame(excel_data)
                df.to_excel(f"{filename}_part{i//batch_size + 1}.xlsx", index=False)  
                print(f"已保存第{i//batch_size + 1}批数据（共{len(excel_data)}条）")
                excel_data = []
        
        if excel_data:
            df = pd.DataFrame(excel_data)
            if len(all_stock_data) > batch_size:
                df.to_excel(f"{filename}_part{len(all_stock_data)//batch_size + 1}.xlsx", index=False)
            else:
                df.to_excel(filename, index=False)
            print(f"\n成功保存 {len(excel_data)} 条评论数据到 {filename}")
        
    except Exception as e:
        print(f"保存Excel失败: {e}")


if __name__ == "__main__":
    main()
    

网络连接正常（通过8.8.8.8验证）
域名 xueqiu.com 解析正常，IP: 60.205.172.136
成功与 xueqiu.com 建立连接
请检查浏览器窗口，如有滑动验证请手动完成，完成后按Enter继续...


确认验证完成后按Enter继续... 



===== 正在爬取股票列表第1页 =====

----- 正在爬取第1页第4支股票（第1次尝试） -----
正在处理股票: 海泰科
提取基本信息出错: name 're' is not defined
股票基本信息: {'股票名称': '海泰科', '股票代码': ''}
✅ 评论区容器已加载

===== 开始处理海泰科评论第1页 =====

----- 尝试提取第1页第1条 -----
ℹ️ 第1条评论未加载，跳过

----- 尝试提取第1页第2条 -----
浏览器已关闭


KeyboardInterrupt: 