# 实验六：Python网络爬虫实战

1.	认真阅读三篇文献资料，了解互联网开源数据和证券市场的相关研究，重点关注其中的研究思路和分析方法。

2.	利用所给Data_600618中的数据文件，结合正则表达式提取出股票600618的日度数据（日期，开盘，最高，最低，收盘，成交量，成交金额），并输出至csv文件，在实验报告中给出数据截图。

3.	在东方股吧下载“长久物流吧”（`https://guba.eastmoney.com/list,603569.html`）所有发帖数据（建议以班级为单位组队分工爬取，每人爬十几页），提取出帖子作者，发帖时间，阅读量，评论数，帖子标题，帖子链接，并将结果输出至文本文件“data_guba_cjwl.txt”。

4.	利用新浪数据接口读取长久物流的历史日度数据，接口数据链接： 
`http://money.finance.sina.com.cn/quotes_service/api/json_v2.php/CN_MarketData.getKLineData?symbol=sh603569&scale=240&ma=no&datalen=10000`
从返回的数据中提取，交易日，开盘，最高，最低，收盘，成交量数据，并将结果输出至文本文件“data_sina_cjwl.txt”。

5.	构建股吧信息量指标（根据发帖时间、阅读量、评论数、帖子标题自行设计指标），将该指标作为预测因子，检验其对长久物流的超额收益率是否具有可预测性。

## 正则表达式提取 600618 的日度数据

In [1]:
import os
import re
import csv
import json

In [2]:
def extract_data_2009_2013(html):
    data = []
    table = re.search(r'<table class="table_bg001 border_box limit_sale">[\s\S]*?</table>', html)
    if not table:
        return []
    rows = re.findall(r'<tr class=[\'"]?(?:dbrow|)[\'"]?><td>[\s\S]*?</tr>', table.group(0))
    for row in rows:
        try:
            cells = re.findall(r'<td[^>]*>([\s\S]*?)</td>', row)
            if len(cells) < 11:
                continue
            data.append({
                'date': cells[0],
                'open': float(cells[1]),
                'high': float(cells[2]),
                'low': float(cells[3]),
                'close': float(cells[4]),
                'change': float(cells[5]),
                'change_percent': float(cells[6]),
                'volume': int(cells[7].replace(',', '')),
                'amount': float(cells[8].replace(',', '')) * 10000,
                'amplitude': float(cells[9]),
                'turnover_rate': float(cells[10])
            })
        except:
            continue
    return data

def extract_data_before_2014(html):
    if 'table_bg001 border_box limit_sale' in html:
        return extract_data_2009_2013(html)

    def parse_common(pattern):
        return [{
            'date': m[0],
            'open': float(m[1]),
            'high': float(m[2]),
            'close': float(m[3]),
            'low': float(m[4]),
            'volume': int(m[5]),
            'amount': int(m[6])
        } for m in re.findall(pattern, html, re.DOTALL)]

    if "[考试顺利]" in html:
        if "href=" in html or "target='_blank'" in html:
            pattern_2007 = r'<tr[^>]*>\s*<td><div[^>]*>\[考试顺利\]\s*(?:<a[^>]*>)?\s*([\d-]+)\s*(?:</a>)?[^<]*</div></td>\s*' \
                           r'<td><div[^>]*>\[考试顺利\]([\d.]+)</div></td>\s*' \
                           r'<td><div[^>]*>\[考试顺利\]([\d.]+)</div></td>\s*' \
                           r'<td><div[^>]*>\[考试顺利\]([\d.]+)</div></td>\s*' \
                           r'<td[^>]*><div[^>]*>\[考试顺利\]([\d.]+)</div></td>\s*' \
                           r'<td[^>]*><div[^>]*>\[考试顺利\](\d+)</div></td>\s*' \
                           r'<td[^>]*><div[^>]*>\[考试顺利\](\d+)</div></td>'
            return parse_common(pattern_2007)
        else:
            pattern_2004 = r'<tr[^>]*>\s*<td><div[^>]*>\[考试顺利\]\s*([\d-]+)\s*</div></td>\s*' \
                           r'<td><div[^>]*>\[考试顺利\]([\d.]+)</div></td>\s*' \
                           r'<td><div[^>]*>\[考试顺利\]([\d.]+)</div></td>\s*' \
                           r'<td><div[^>]*>\[考试顺利\]([\d.]+)</div></td>\s*' \
                           r'<td[^>]*><div[^>]*>\[考试顺利\]([\d.]+)</div></td>\s*' \
                           r'<td[^>]*><div[^>]*>\[考试顺利\](\d+)</div></td>\s*' \
                           r'<td[^>]*><div[^>]*>\[考试顺利\](\d+)</div></td>'
            return parse_common(pattern_2004)
    else:
        pattern_1999 = r'<tr\s+(?:class="tr_2"|)>\s*<td><div align="center">\s*([\d-]+)\s*</div></td>\s*' \
                       r'<td><div align="center">([\d.]+)</div></td>\s*' \
                       r'<td><div align="center">([\d.]+)</div></td>\s*' \
                       r'<td><div align="center">([\d.]+)</div></td>\s*' \
                       r'<td[^>]*><div align="center">([\d.]+)</div></td>\s*' \
                       r'<td[^>]*><div align="center">(\d+)</div></td>\s*' \
                       r'<td[^>]*><div align="center">(\d+)</div></td>'
        return parse_common(pattern_1999)

def extract_data_after_2014(js):
    match = re.search(r'historySearchHandler\((.*)\)', js)
    if not match:
        return []
    try:
        json_data = json.loads(match.group(1))
        hq = json_data[0].get('hq', []) if json_data and isinstance(json_data, list) else []
        return [{
            'date': i[0],
            'open': float(i[1]),
            'high': float(i[6]),
            'low': float(i[5]),
            'close': float(i[2]),
            'volume': int(i[7]) * 100,
            'amount': float(i[8].replace(',', '')) * 10000 if isinstance(i[8], str) else float(i[8]) * 10000
        } for i in hq]
    except:
        return []

In [3]:
def process_file(file_path):
    """
    处理单个文件，提取股票数据
    """
    with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
        content = f.read()
    
    # 根据文件名判断数据格式
    if any(year in file_path for year in ['2014', '2015', '2016', '2017', '2018', '2019']):
        return extract_data_after_2014(content)
    else:
        return extract_data_before_2014(content)

def process_pipeline():
    # 数据文件所在的目录
    data_dir = './assets/data/Data_600618'  

    # 查找所有相关的数据文件
    all_data = []
    for filename in os.listdir(data_dir):
        if filename.startswith('DataHTML_600618_Year_') and filename.endswith('.txt'):
            file_path = os.path.join(data_dir, filename)
            file_data = process_file(file_path)
            all_data.extend(file_data)
    
    print(f"总共提取了 {len(all_data)} 条记录")
    
    # 按日期排序
    all_data.sort(key=lambda x: x['date'])
    
    # 输出到CSV文件
    output_file = './assets/data/股票600618日度数据.csv'
    with open(output_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        # 写入表头
        writer.writerow(['日期', '开盘价', '最高价', '最低价', '收盘价', '成交量(股)', '成交金额(元)'])
        # 写入数据
        for item in all_data:
            writer.writerow([
                item['date'],
                item['open'],
                item['high'],
                item['low'],
                item['close'],
                item['volume'],
                item['amount']
            ])
    
    print(f"数据已成功提取并保存到 {output_file}")

In [4]:
process_pipeline()

总共提取了 4775 条记录
数据已成功提取并保存到 ./assets/data/股票600618日度数据.csv


## 长久物流吧 数据爬取

这里很容易被封ip（只能爬取到方正证券的数据），建议采用代理解决，快代理可以免费使用一天。

由于后续还要处理爬到的数据，为了处理方便，这里改txt为csv。

股吧的帖子详情界面才能有带年份的时间，否则只有月份和日期，下面这份代码为了爬取带年份的，对每个详情页都进行了抓取，因此运行较慢，实际体验约10多个小时

In [5]:
import os
import time
import random
import requests
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from concurrent.futures import ThreadPoolExecutor

In [None]:
# 配置参数
STOCK_ID = "603569"  # 长久物流的股票代码
STOCK_NAME = "长久物流"  # 股票名称，用于验证
BASE_URL = f"https://guba.eastmoney.com/list,{STOCK_ID},f_{{page}}.html"
OUTPUT_DIR = "./assets/data"
OUTPUT_FILE = os.path.join(OUTPUT_DIR, "data_guba_cjwl.csv")
MAX_PAGES = 1000  # 设置一个足够大的值爬取所有页面
MAX_THREADS = 5  # 详情页爬取的最大线程数

# 代理设置
PROXY_USER = "t14428474946629"
PROXY_PASS = "k0fxm4s0"
PROXY_HOST = "x368.kdltpspro.com:15818"

# 请求计数器(每3次强制切换IP)
request_counter = 0
# 锁对象 - 用于多线程环境下的文件写入
file_lock = None

def get_proxies():
    """获取代理配置"""
    return {
        "http": f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}",
        "https": f"http://{PROXY_USER}:{PROXY_PASS}@{PROXY_HOST}"
    }

def get_headers():
    """获取随机UA"""
    try:
        ua = UserAgent()
        user_agent = ua.random
    except:
        user_agents = [
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0',
            'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/15.0 Safari/605.1.15',
            'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36'
        ]
        user_agent = random.choice(user_agents)
    
    return {
        'User-Agent': user_agent,
        'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
        'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
        'Connection': 'close',  # 非常重要，确保不复用连接，这样可以获取新IP
        'Cache-Control': 'no-cache'
    }

def force_new_ip():
    """强制获取新IP - 通过简单的请求测试站点来切换"""
    global request_counter
    request_counter = 0  # 重置计数器
    
    try:
        # 对测试站点发起请求以更换IP
        test_url = "https://dev.kdlapi.com/testproxy"
        proxies = get_proxies()
        headers = get_headers()
        
        print("正在切换IP...")
        response = requests.get(test_url, proxies=proxies, headers=headers, timeout=10)
        if response.status_code == 200:
            print("IP已切换")
            return True
    except Exception as e:
        print(f"切换IP时出错: {e}")
    
    return False

def get_page(url, retry_limit=3):
    """获取页面内容"""
    global request_counter
    
    # 每3次请求强制更换IP
    request_counter += 1
    if request_counter >= 3:
        force_new_ip()
    
    proxies = get_proxies()
    headers = get_headers()
    
    retry_count = 0
    while retry_count < retry_limit:
        try:
            # 创建新的session以确保不复用连接
            session = requests.Session()
            session.keep_alive = False
            
            # 添加随机延迟
            time.sleep(random.uniform(3, 7))
            
            # 发送请求
            response = session.get(url, proxies=proxies, headers=headers, timeout=15)
            response.encoding = 'utf-8'
            
            # 检查是否被重定向到其他股票
            if STOCK_NAME not in response.text and ("方正证券" in response.text or "验证码" in response.text):
                print(f"检测到被重定向或需要验证码，尝试切换IP...")
                force_new_ip()
                retry_count += 1
                continue
            
            return response.text
        except Exception as e:
            print(f"获取页面失败: {e}")
            retry_count += 1
            force_new_ip()  # 切换IP
    
    return None

def parse_page(html, page_num):
    """解析页面内容提取帖子列表"""
    if not html:
        return []

    soup = BeautifulSoup(html, 'html.parser')
    items = []
    
    # 检查是否是目标股票页面
    title = soup.title.text if soup.title else ""
    if STOCK_NAME not in title:
        print(f"警告: 页面标题不含目标股票名: {title}")
        return []
    
    # 提取帖子列表
    post_items = soup.select('div.articleh, div.articleh.odd')
    if not post_items:
        post_items = soup.select('div[class*="articleh"]')
    
    if not post_items:
        post_items = []
        rows = soup.select('table tr')
        for row in rows:
            if row.select('th') or (row.get('class') and 'listhead' in row.get('class')):
                continue
            if row.select('a'):
                post_items.append(row)
    
    print(f"在第 {page_num} 页找到 {len(post_items)} 个帖子")
    
    # 解析每一个帖子
    for item in post_items:
        try:
            # 根据元素类型解析
            if item.name == 'div':  # div结构
                read_count = item.select_one('span.l1').text.strip() if item.select_one('span.l1') else "0"
                comment_count = item.select_one('span.l2').text.strip() if item.select_one('span.l2') else "0"
                
                title_elem = item.select_one('span.l3 > a')
                if not title_elem:
                    continue
                title = title_elem.text.strip()
                link = title_elem.get('href', '')
                
                author = item.select_one('span.l4 > a').text.strip() if item.select_one('span.l4 > a') else "未知作者"
                
                # 列表页的时间信息可能不全，先获取初步时间，后续从详情页更新
                initial_time = item.select_one('span.l5').text.strip() if item.select_one('span.l5') else ""
                
            elif item.name == 'tr':  # 表格结构
                cells = item.select('td')
                if len(cells) < 4:
                    continue
                
                read_count = cells[0].text.strip()
                comment_count = cells[1].text.strip()
                
                title_link = cells[2].select_one('a')
                if not title_link:
                    continue
                title = title_link.text.strip()
                link = title_link.get('href', '')
                
                author = cells[3].text.strip() if len(cells) > 3 else "未知作者"
                initial_time = cells[4].text.strip() if len(cells) > 4 else ""
            else:
                continue
            
            # 验证链接中的股票代码
            if '/news,' in link and STOCK_ID not in link:
                print(f"警告: 发现其他股票代码的帖子链接: {link}")
                continue
                
            # 处理相对链接
            if link.startswith('/'):
                link = f"https://guba.eastmoney.com{link}"
            elif not link.startswith('http'):
                link = f"https://guba.eastmoney.com/{link}"
            
            # 添加到结果列表
            items.append({
                'title': title,
                'author': author,
                'post_time': initial_time,  # 初步时间，后续会从详情页获取完整时间
                'read_count': read_count,
                'comment_count': comment_count,
                'url': link
            })
            
        except Exception as e:
            print(f"解析帖子时出错: {e}")
    
    return items

def get_detail_time(post_url):
    """从帖子详情页获取准确的发帖时间"""
    html = get_page(post_url)
    if not html:
        return None
    
    soup = BeautifulSoup(html, 'html.parser')
    
    # 尝试多种选择器匹配时间元素
    time_selectors = [
        'div.time', 
        'div.author-info div.time', 
        'div.author-info time', 
        'div.zwfbtime',
        'div.zwfbtime span',
        'div[class*="time"]'
    ]
    
    for selector in time_selectors:
        time_elem = soup.select_one(selector)
        if time_elem:
            time_text = time_elem.text.strip()
            # 尝试提取日期时间格式 (例如: 2016-07-29 20:56:39)
            date_pattern = r'(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{1,2}:\d{1,2})'
            match = re.search(date_pattern, time_text)
            if match:
                return match.group(1)
    
    # 如果上面的选择器都失败了，尝试查找任何时间格式的文本
    date_pattern = r'(\d{4}-\d{1,2}-\d{1,2}\s+\d{1,2}:\d{1,2}(?::\d{1,2})?)'
    for element in soup.find_all(['div', 'span']):
        text = element.text.strip()
        match = re.search(date_pattern, text)
        if match:
            return match.group(1)
    
    return None

def process_post_detail(post):
    """处理单个帖子的详情信息"""
    post_url = post['url']
    print(f"正在获取详情: {post['title'][:10]}...")
    
    # 获取详情页的精确时间
    detail_time = get_detail_time(post_url)
    if detail_time:
        post['post_time'] = detail_time
    
    # 将处理后的帖子写入CSV文件
    with file_lock:
        with open(OUTPUT_FILE, 'a', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow([
                post['title'],
                post['author'],
                post['post_time'],
                post['read_count'],
                post['comment_count'],
                post['url']
            ])
    
    return post

def main():
    """主函数"""
    global file_lock
    from threading import Lock
    file_lock = Lock()
    
    try:
        # 确保输出目录存在
        os.makedirs(OUTPUT_DIR, exist_ok=True)
        
        # 创建CSV文件并写入表头
        with open(OUTPUT_FILE, 'w', newline='', encoding='utf-8') as f:
            writer = csv.writer(f)
            writer.writerow(["帖子标题", "作者", "发帖时间", "阅读量", "评论数", "帖子链接"])
        
        print(f"开始爬取长久物流吧 (股票代码: {STOCK_ID})...")
        print(f"数据将保存到: {os.path.abspath(OUTPUT_FILE)}")
        
        total_items = 0
        page = 1
        empty_count = 0
        max_empty = 3
        
        # 初始强制切换IP
        force_new_ip()
        
        # 创建线程池
        executor = ThreadPoolExecutor(max_workers=MAX_THREADS)
        future_to_post = {}
        
        # 爬取循环
        while page <= MAX_PAGES and empty_count < max_empty:
            url = BASE_URL.format(page=page)
            print(f"正在爬取第 {page} 页...")
            
            # 获取页面内容
            html = get_page(url)
            
            if html is None:
                print(f"无法获取第 {page} 页内容，跳过...")
                empty_count += 1
                if empty_count >= max_empty:
                    print("连续多页获取失败，停止爬取")
                    break
                page += 1
                continue
            
            # 解析页面获取帖子
            items = parse_page(html, page)
            
            if items:
                total_items += len(items)
                print(f"第 {page} 页解析完成，找到 {len(items)} 条帖子，正在获取详情...")
                
                # 提交详情页处理任务到线程池
                for post in items:
                    future = executor.submit(process_post_detail, post)
                    future_to_post[future] = post['title']
                
                # 每5页等待所有任务完成，避免并发过多
                if page % 5 == 0:
                    for future in future_to_post:
                        try:
                            future.result()
                        except Exception as e:
                            post_title = future_to_post[future]
                            print(f"处理帖子失败: {post_title[:15]}..., 错误: {e}")
                    
                    # 清空任务字典
                    future_to_post = {}
                
                empty_count = 0  # 成功获取数据，重置计数器
            else:
                print(f"第 {page} 页未解析到任何帖子")
                empty_count += 1
                if empty_count >= max_empty:
                    print("连续多页未能解析到帖子，停止爬取")
                    break
            
            # 进入下一页
            page += 1
        
        # 等待所有剩余任务完成
        print("等待所有详情页处理完成...")
        for future in future_to_post:
            try:
                future.result()
            except Exception as e:
                post_title = future_to_post[future]
                print(f"处理帖子失败: {post_title[:15]}..., 错误: {e}")
        
        # 关闭线程池
        executor.shutdown()
        
        # 完成消息
        print("\n" + "="*50)
        print(f"爬虫完成! 共爬取 {total_items} 条帖子")
        print(f"数据已保存至: {os.path.abspath(OUTPUT_FILE)}")
        print("="*50)
        
    except Exception as e:
        print(f"\n错误: 爬虫运行时出错: {e}")
        import traceback
        traceback.print_exc()

main()

## 新浪数据接口爬取

In [6]:
def fetch_stock_data(stock_code):
    """
    从新浪财经获取股票历史数据
    
    参数:
        stock_code: 股票代码，例如 'sh603569'
    
    返回:
        列表形式的股票数据
    """
    url = f"http://money.finance.sina.com.cn/quotes_service/api/json_v2.php/CN_MarketData.getKLineData?symbol={stock_code}&scale=240&ma=no&datalen=10000"
    
    try:
        response = requests.get(url)
        # 确保请求成功
        response.raise_for_status()
        
        # 新浪返回的是非标准JSON，需要进行转换
        # 将单引号替换为双引号，便于解析
        data_str = response.text.replace("'", '"')
        stock_data = json.loads(data_str)
        
        print(f"成功获取 {stock_code} 的历史数据，共 {len(stock_data)} 条记录")
        return stock_data
    except requests.exceptions.RequestException as e:
        print(f"获取数据时出错: {e}")
        return []
    except json.JSONDecodeError as e:
        print(f"解析JSON数据时出错: {e}")
        print(f"返回的数据: {response.text[:200]}...")
        return []

In [7]:
# 长久物流的股票代码
stock_code = "sh603569"
# 指定保存路径
output_file = "./assets/data/data_sina_cjwl.txt"

# 获取股票数据
stock_data = fetch_stock_data(stock_code)

if stock_data:
    # 按照要求的格式提取需要的字段
    formatted_data = []
    for item in stock_data:
        formatted_item = {
            "day": item["day"],
            "open": item["open"],
            "high": item["high"],
            "low": item["low"],
            "close": item["close"],
            "volume": item["volume"]
        }
        formatted_data.append(formatted_item)
    
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    
    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(formatted_data, f, ensure_ascii=False, indent=2)
    print(f"数据已成功保存至 {output_file}")

成功获取 sh603569 的历史数据，共 2104 条记录
数据已成功保存至 ./assets/data/data_sina_cjwl.txt


## 股吧信息量指标预测收益率

首先我们先要找到长久物流的收益率，市场收益率选用上证指数