In [1]:
import csv
import sys
import time
import json
import random
import psutil
import subprocess
import dataclasses
from pathlib import Path
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import (
    TimeoutException,
    NoSuchElementException,
    ElementClickInterceptedException,
)

In [None]:
# 寻找edge进程
for proc in psutil.process_iter():
    name, pid = proc.name(), proc.pid
    if sys.platform.startswith('darwin'):
        assert "Microsoft Edge" not in name, f"edge进程已经存在, pid={pid}"
    else:
        assert "msedge.exe" not in name, f"edge进程已经存在, pid={pid}"


EDGE_PATH = "/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge"
p = subprocess.Popen([EDGE_PATH, "--remote-debugging-port=20000"])

In [3]:
options = webdriver.EdgeOptions()
options.add_experimental_option("debuggerAddress", "127.0.0.1:20000")
driver = webdriver.Edge(options=options)
driver.maximize_window()

In [None]:
COOKIE_FILE = Path("./cookie.json")

if COOKIE_FILE.exists():
    driver.get("https://www.ctrip.com/")
    cookies = json.loads(COOKIE_FILE.read_text())
    for cookie in cookies:
        driver.add_cookie(cookie)
    driver.refresh()
else:
    driver.get("https://passport.ctrip.com/user/login")
    print("请在120s内手动完成登录...")
    try:
        WebDriverWait(driver, 120).until(
            EC.url_to_be("https://my.ctrip.com/myinfo/home")
        )
    except TimeoutException:
        print("登录超时，请重试")
    else:
        print("登录成功，保存cookies...")
        cookies = driver.get_cookies()
        COOKIE_FILE.write_text(json.dumps(cookies, indent=2))
        driver.get("https://www.ctrip.com/")

In [None]:
try:
    search = WebDriverWait(driver, 10).until(
        EC.presence_of_element_located((By.ID, "_allSearchKeyword"))
    )
except TimeoutException:
    print("页面加载超时，无法定位到搜索框")
else:
    search.send_keys("香港酒店")
    time.sleep(1)
    search.send_keys(Keys.RETURN)
    time.sleep(1)
    driver.switch_to.window(driver.window_handles[-1])

In [None]:
try:
    btn = WebDriverWait(driver, 3).until(
        EC.presence_of_element_located(
            (By.XPATH, "//div[@class='list-btn-more']/h3[@class='btn-box']")
        )
    )
except TimeoutException:
    print("无法定位到[查看更多按钮]")
else:
    btn.click()
    time.sleep(1)
    driver.switch_to.window(driver.window_handles[-1])

In [None]:
@dataclasses.dataclass
class Hotel:
    name: str
    condition: str = ""
    price: str = ""
    address: str = ""
    score: str = ""
    comment: str = ""
    tags: list = dataclasses.field(default_factory=list)

    def __str__(self):
        return " - ".join(
            [
                self.name,
                self.condition,
                self.price,
                self.address,
                self.score,
                self.comment,
                str(self.tags),
            ]
        )

In [None]:
SCROLL_Y = 1000
MAX_FAIL = 20

counter = 0
data_list = []
s = set()
continous_fail = 0

while True:
    driver.execute_script(f"window.scrollBy(0, {SCROLL_Y})")
    time.sleep(random.uniform(1, 2))

    try:
        btn = driver.find_element(
            By.XPATH,
            "//div[@class='list-btn-more']/div[@class='btn-box']/span[text()='搜索更多酒店']",
        )
        continous_fail = 0
    except NoSuchElementException:
        continous_fail += 1
        if continous_fail > MAX_FAIL:
            print("多次未找到[更多]按钮，程序停止")
            break
        
        time.sleep(2)
        continue
        

    lis = driver.find_elements(By.CLASS_NAME, "list-item-target")
    for li in lis:
        if li.id in s:
            continue
        s.add(li.id)

        d = {}
        for k, xpath in zip(
            ("name", "condition", "price", "address", "score", "comment"),
            (
                ".//div[@class='info']//div[@class='list-card-title']/span",
                ".//div[@class='sold-out-btn']/span",
                ".//div[@class='list-card-price']//span[contains(@class, 'real-price')]",
                ".//div[@class='list-card-transport']//span[@class='ads']",
                ".//div[@class='list-card-comment']//div[@class='score']/span",
                ".//div[@class='list-card-comment']//p[@class='count']/a",
            ),
        ):
            try:
                d[k] = li.find_element(By.XPATH, xpath).text
            except NoSuchElementException:
                ...

        tags = li.find_elements(By.XPATH, ".//div[@class='list-card-tag']/span")
        d["tags"] = [tag.text for tag in tags]

        hotel = Hotel(**d)
        data_list.append(hotel)
        counter += 1
        print(f"第{counter}条数据：{hotel}")

    while True:
        # 调整btn按钮到viewport位置
        while True:
            btn_y = driver.execute_script(
                """return arguments[0].getBoundingClientRect();""", btn
            )["y"]
            inner_height = driver.execute_script("return window.innerHeight;")
            if btn_y < 150:
                diff = 150 - btn_y
                driver.execute_script(f"window.scrollBy(0, -{diff})")
            elif btn_y > inner_height - 150:
                diff = btn_y - inner_height + 150
                driver.execute_script(f"window.scrollBy(0, {diff})")
            else:
                break

        try:
            WebDriverWait(driver, 10).until(EC.element_to_be_clickable(btn))
        except TimeoutException:
            print("无法定位到[查看更多按钮]")
            continue
        try:
            btn.click()
            time.sleep(3)
            break
        except ElementClickInterceptedException:
            print("点击[查看更多按钮]失败")
            continue

In [9]:
CSV_FILE = Path("./hotel.csv")
with CSV_FILE.open("w", encoding="utf-8") as f:
    writer = csv.writer(f)
    writer.writerow(Hotel.__dataclass_fields__.keys())
    for hotel in data_list:
        writer.writerow(dataclasses.astuple(hotel))