# 爬虫

In [None]:
import json
import re
import typing
from dataclasses import dataclass
from datetime import datetime, date
from concurrent.futures import ThreadPoolExecutor

from DrissionPage import ChromiumOptions, ChromiumPage
from DrissionPage.common import Settings
from DrissionPage.errors import ElementNotFoundError, WaitTimeoutError
from rich import print


In [None]:
import pandas as pd

# 读取原始 Excel 文件
input_file = "data_china.xlsx"  # 输入文件名
output_file = "group_control_prim.xlsx"  # 输出文件名

# 读取 Excel 数据
df = pd.read_excel(input_file,sheet_name=0,dtype={'股票代码': str})

# 筛选出 "真实性" 列值为 0 的行
filtered_df = df[df["真实性（虚假1真实0）"] == 0]

# 选择保留的列
columns_to_keep = ["股票代码", "公司简称", "新闻发布时间"]
filtered_df = filtered_df[columns_to_keep]

filtered_df["新闻发布时间"] = pd.to_datetime(filtered_df["新闻发布时间"]).dt.date
# 将筛选后的数据保存到新的 Excel 文件
filtered_df.to_excel(output_file, index=False)

print(f"已成功保存筛选后的数据到 {output_file}")


未爬取的代码

In [None]:
import os
import pandas as pd

# 文件夹路径
folder_path = "D:\mycodelife\workshop\\fake_finance\\ready_crawler"

# 获取文件夹中所有文件的文件名
files = os.listdir(folder_path)

# 假设文件名即为股票代码，去掉扩展名并存储为集合
crawled_codes = {os.path.splitext(file)[0] for file in files}

df = pd.read_excel("group_control_prim.xlsx",dtype={'股票代码': str})
# 股票代码列表（可以从一个 DataFrame 中读取，例如）
all_codes = df["股票代码"]  # 示例股票代码

# 找出未爬取的股票代码并输出
uncrawled_df = df[~df["股票代码"].isin(crawled_codes)]

# 保存未爬取股票代码的信息到新文件
output_file = "uncrawled_codes.xlsx"
uncrawled_df.to_excel(output_file, index=False)


In [6]:
import os
import pandas as pd

# 文件夹路径
folder_path = r"D:\\mycodelife\workshop\\fake_finance\\codes_new\\already_done"

# 获取文件夹中所有文件的文件名
files = os.listdir(folder_path)

# 假设文件名格式为 "股票代码_新闻发布时间.csv"
# 提取主码（股票代码_新闻发布时间），并存储为集合
crawled_codes = {os.path.splitext(file)[0] for file in files}

# 读取 Excel 文件
df = pd.read_excel("D:\mycodelife\workshop\\fake_finance\\faker_news\data_china.xlsx", sheet_name= 1, dtype={'股票代码': str})
df["新闻发布时间"] = pd.to_datetime(df["新闻发布时间"]).dt.date  # 确保日期格式一致
df["主码"] = df["股票代码"] + "_" + df["新闻发布时间"].astype(str)  # 生成主码

# 找出未爬取的主码
uncrawled_df = df[~df["主码"].isin(crawled_codes)]

# 保存未爬取股票代码及其发布时间的信息到新文件
output_file = "uncrawled_codes_test.xlsx"
uncrawled_df[["股票代码", "新闻发布时间"]].to_excel(output_file, index=False)

print(f"未爬取的股票代码及新闻发布时间已保存到: {output_file}")


未爬取的股票代码及新闻发布时间已保存到: uncrawled_codes_test.xlsx


In [None]:
df = pd.read_excel("uncrawled_codes.xlsx",dtype={'股票代码':str})
df

In [7]:
import json
import re
import typing
from dataclasses import dataclass
from datetime import datetime, date
from concurrent.futures import ThreadPoolExecutor

from DrissionPage import ChromiumOptions, ChromiumPage
from DrissionPage.common import Settings
from DrissionPage.errors import ElementNotFoundError, WaitTimeoutError

DEBUG = False
DT_FMT = "%Y-%m-%d %H:%M:%S"

Settings.raise_when_wait_failed = True


@dataclass
class Article:
    title: str
    url: str
    published: datetime
    content: str


class PageNotFound(Exception):
    pass


def get_chrome_options() -> ChromiumOptions:
    co = ChromiumOptions()
    co.set_argument("--no-sandbox")
    if not DEBUG:
        co.headless()
        co.set_user_agent(
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
        )
    return co


class Fetcher:
    def __init__(self, code: str, date: date, page: ChromiumPage):
        self.code = code
        self.start_url = f"https://guba.eastmoney.com/list,{code}"
        self.date = date
        self.page = page
        self.total_page = -1

    def get_url(self, pn: int) -> str:
        return f"{self.start_url}_{pn}.html"

    def get_article_url(self, post_id: int) -> str:
        return f"https://guba.eastmoney.com/news,{self.code},{post_id}.html"

    def get_article_list(self, pn: int) -> typing.Generator[Article, None, None]:
        self.page.get(self.get_url(pn))
        #print(f"[green] Fetching page {self.get_url(pn)}...")
        article_list = re.search(
            r"var article_list\s*=\s*({.*?});", self.page.html
        ).group(1)

        json_data = json.loads(article_list)
        articles = json_data["re"]
        for article in articles:
            yield Article(
                title=article["post_title"],
                url=self.get_article_url(article["post_id"]),
                published=datetime.strptime(article["post_publish_time"], DT_FMT),
                content="",
            )

    def get_total_page(self):
        if self.total_page == -1:
            print(self.get_url(1))
            self.page.get(self.get_url(1), retry=3)
            if self.page.url == "https://guba.eastmoney.com/error?type=1":
                raise PageNotFound("Page not found")
            self.page.wait.ele_displayed("t:ul@class:paging")
            pagers = self.page.ele("t:ul@class:paging").eles("t:li")
            last_page = pagers[-2].text
            self.total_page = int(last_page)
            print(f"Total page: {self.total_page}")

    def get_article_detail(self, article: Article) -> Article:
        # print(f"[green] {article.published} {article.title}")
        new_tab = self.page.new_tab()
        try:
            new_tab.get(article.url)
            new_tab.wait.ele_displayed("@class:newstext")
            article.content = new_tab.ele("@class:newstext").text
            return article
        except:
            return article
        finally:
            new_tab.close()

    @staticmethod
    def get_date_range(articles: typing.Iterable[Article]) -> tuple[date, date]:
        articles = list(articles)

        return articles[0].published.date(), articles[-1].published.date()

    def get_articles_with_date(self) -> typing.Generator[Article, None, None]:
        self.get_total_page()

        # Binary search
        start, end = 1, self.total_page

        while start <= end:
            # print(f"[blue] Searching: {start} - {end}")
            mid = (start + end) // 2
            earliest_date_mid, latest_date_mid = self.get_date_range(
                self.get_article_list(mid)
            )

            if self.date > earliest_date_mid:
                end = mid - 1
            elif self.date < latest_date_mid:
                start = mid + 1
            else:
                end = mid - 1

        print(f"目标页面: {start}")

        # 获取目标页前后3天的文章
        start_page = start - 1 if start - 1 > 0 else 1
        end_page = end + 1 if end + 1 <= self.total_page else self.total_page

        # 未做优化，maxworker过大会爆内存
        with ThreadPoolExecutor(max_workers=3) as executor:
            for page in range(start_page, end_page + 1):
                # print(f"[blue] Fetching page {page}...")
                yield from executor.map(
                    self.get_article_detail, self.get_article_list(page)
                )


if __name__ == "__main__":
    import csv
    from openpyxl import load_workbook

    wb = load_workbook("uncrawled_codes_test.xlsx")
    ws = wb.active
    chrome_page = ChromiumPage(get_chrome_options())
    for row in ws.iter_rows(min_row=2, values_only=True):
        code, dt = row
        dt = dt.date()

        output = f"{code}_{dt}.csv"
        print(f"Fetching {code} on {dt}...")
        fetcher = Fetcher(code, dt, chrome_page)

        with open(output, "w", newline="", encoding="utf-8") as f:
            writer = csv.writer(f)
            writer.writerow(["标题", "链接", "发布时间", "内容"])
            try:
                for article in fetcher.get_articles_with_date():
                    writer.writerow(
                        [article.title, article.url, article.published, article.content]
                    )
                   
            except PageNotFound:
                print(f"Page not found: {code}")
            except ElementNotFoundError:
                print(f"Element not found: {code}")
            except WaitTimeoutError:
                print(f"Wait timeout: {code}")

        # fetcher.page.quit()


Fetching 300587 on 2021-04-27...
https://guba.eastmoney.com/list,300587_1.html
Total page: 331
目标页面: 176
Fetching 002331 on 2021-04-28...
https://guba.eastmoney.com/list,002331_1.html
Total page: 780
目标页面: 262
Fetching 688101 on 2021-04-30...
https://guba.eastmoney.com/list,688101_1.html
Total page: 183
目标页面: 96
Fetching 600418 on 2021-07-27...
https://guba.eastmoney.com/list,600418_1.html
Total page: 4876
目标页面: 2289
Fetching 601012 on 2022-03-01...
https://guba.eastmoney.com/list,601012_1.html
Total page: 6586
目标页面: 3498
Fetching 601012 on 2022-04-21...
https://guba.eastmoney.com/list,601012_1.html
Total page: 6586
目标页面: 3498
Fetching 603023 on 2022-06-24...
https://guba.eastmoney.com/list,603023_1.html
Total page: 682
目标页面: 250
Fetching 000980 on 2022-11-04...
https://guba.eastmoney.com/list,000980_1.html
Total page: 11466
目标页面: 3796
