In [1]:
# 数据获取类
import pandas as pd
from pytdx.hq import TdxHq_API
import time as time_module
from datetime import datetime, time, timedelta
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class TDXDataProvider:
    """
    通达信数据提供器类
    封装所有与通达信数据获取相关的功能
    """
    
    def __init__(self, servers=None):
        """
        初始化数据提供器
        servers: 服务器列表，格式为 [(host, port), ...]
        """
        # 默认服务器列表
        ''' IP和端口：
            ‌上海电信主站‌：180.153.18.170:7709、180.153.18.171:7709。
‌            北京联通主站‌：202.108.253.130:7709、202.108.253.131:7709。
‌            杭州电信主站‌：60.191.117.167:7709、115.238.56.198:7709。
‌            深圳电信主站‌：14.17.75.71:7709。‌
        '''
        self.default_servers = [
            ('180.153.18.170', 7709),
            ('180.153.18.171', 7709),
            ('202.108.253.130', 7709),
            ('202.108.253.131', 7709),
            ('60.191.117.167', 7709),
            ('115.238.56.198', 7709),
            ('14.17.75.71', 7709),
        ]
        
        self.servers = servers or self.default_servers
        self.api = TdxHq_API()
        
        # K线周期映射
        '''
            api.get_security_bars(period, market, stock_code, start, count) 
            参数说明：
            period:  0->5分钟线 1->15分钟线 2->30分钟线 3->1小时 4->日线 5->周线 6->月线 7-> 1分钟线
            start: 从最新的K线开始向历史数据方向的偏移量。0 表示从最新的一根K线开始取。
            count：希望获取的K线根数。请注意，该函数单次请求有最大值限制，通常为 800 条
        '''
        self.period_map = {
            '1min': 7,      # 1分钟
            '5min': 0,      # 5分钟
            '15min': 1,     # 15分钟
            '30min': 2,     # 30分钟
            '1hour': 3,     # 1小时
            'daily': 4,     # 日线
            'weekly': 5,    # 周线
            'monthly': 6,   # 月线
        }
    
    def connect(self):
        """连接到通达信服务器"""
        for host, port in self.servers:
            try:
              #  logger.info(f"尝试连接服务器: {host}:{port}")
                if self.api.connect(host, port):
             #       logger.info("连接成功")
                    return True
            except Exception as e:
             #   logger.warning(f"连接 {host}:{port} 失败: {e}")
                continue
        
        logger.error("所有服务器连接失败")
        return False
    
    def disconnect(self):
        """断开连接"""
        try:
            self.api.disconnect()
      #      logger.info("已断开连接")
        except Exception as e:
            logger.warning(f"断开连接时出错: {e}")
    
    def get_market_code(self, stock_code):
        """根据股票代码返回市场代码"""
        if stock_code.startswith('6'):
            return 1  # 上海
        else:
            return 0  # 深圳
    
    def get_kline_data(self, stock_code, period='daily', count=100):
        """
        获取K线数据
        
        参数:
        stock_code: 股票代码，如 '000001'
        period: K线周期，可选 '1min', '5min', '15min', '30min', '1hour', 'daily', 'weekly', 'monthly'
        count: 获取的K线数量
        
        返回:
        DataFrame: 包含K线数据的DataFrame，列包括: datetime, open, high, low, close, volume, amount
        """
        if not self.connect():
            return None
        
        try:
            market = self.get_market_code(stock_code)
            period_code = self.period_map.get(period, 9)  # 默认为日线
            
            # 获取K线数据
            kline_data = self.api.get_security_bars(period_code, market, stock_code, 0, count)
            
            if kline_data:
                # 转换为DataFrame
                df = self.api.to_df(kline_data)
                
                # 确保datetime列是日期时间类型
                df['datetime'] = pd.to_datetime(df['datetime'])
                
                # 按日期排序
                df = df.sort_values('datetime').reset_index(drop=True)
                
         #       logger.info(f"成功获取 {stock_code} 的 {period} K线数据，共 {len(df)} 条")
                return df
            else:
                logger.warning(f"未获取到 {stock_code} 的K线数据")
                return None
                
        except Exception as e:
            logger.error(f"获取K线数据时出错: {e}")
            return None
        finally:
            self.disconnect()
    
    def get_realtime_quote(self, stock_code):
        """
        获取实时行情数据
        
        返回:
        dict: 包含实时行情数据的字典，包括 price, open, high, low, close, volume 等
        """
        if not self.connect():
            return None
    
        try:
            market = self.get_market_code(stock_code)
            stock_info = self.api.get_security_quotes([(market, stock_code)])
            
            if not stock_info:
                logger.warning(f"未获取到 {stock_code} 的实时行情")
                return None
        
            quote = stock_info[0]
            
            # 处理 OrderedDict
            if isinstance(quote, dict):
                # 已经是字典类型，直接使用
                quote_dict = dict(quote)  # 转换为普通字典
                
                # 重命名一些字段，使其更易读
                field_mapping = {
                    'market': 'market',
                    'code': 'code',
                    'price': 'price',
                    'last_close': 'prev_close',
                    'open': 'open',
                    'high': 'high',
                    'low': 'low',
                    'vol': 'volume',
                    'amount': 'amount',
                    'servertime': 'server_time',
                    'bid1': 'bid1',
                    'ask1': 'ask1',
                    'bid_vol1': 'bid_volume1',
                    'ask_vol1': 'ask_volume1',
                    # 可以继续添加其他需要的字段映射
                }
                
                # 创建新的字典，只保留我们关心的字段
                filtered_dict = {}
                for source_key, target_key in field_mapping.items():
                    if source_key in quote_dict:
                        filtered_dict[target_key] = quote_dict[source_key]
                
       #         logger.info(f"成功获取 {stock_code} 的实时行情: {filtered_dict.get('price', 'N/A')}")
                return filtered_dict
            else:
                logger.warning(f"未知的数据类型: {type(quote)}")
                return None
            
        except Exception as e:
            logger.error(f"获取实时行情时出错: {e}")
            return None
        finally:
            self.disconnect()
    
    def get_realtime_price(self, stock_code):
        """获取实时价格（简化版）"""
        quote = self.get_realtime_quote(stock_code)
        if quote and 'price' in quote:
            return quote['price']
        return None

    def get_detailed_quote(self, stock_code):
        """
        获取详细的行情信息，包括五档行情
        """
        quote_dict = self.get_realtime_quote_fixed(stock_code)
        
        if not quote_dict:
            return None
        
        # 提取五档行情
        bid_ask_data = {}
        for i in range(1, 6):
            bid_key = f'bid{i}'
            ask_key = f'ask{i}'
            bid_vol_key = f'bid_vol{i}'
            ask_vol_key = f'ask_vol{i}'
            
            if bid_key in quote_dict:
                bid_ask_data[f'bid_price_{i}'] = quote_dict[bid_key]
            if ask_key in quote_dict:
                bid_ask_data[f'ask_price_{i}'] = quote_dict[ask_key]
            if bid_vol_key in quote_dict:
                bid_ask_data[f'bid_volume_{i}'] = quote_dict[bid_vol_key]
            if ask_vol_key in quote_dict:
                bid_ask_data[f'ask_volume_{i}'] = quote_dict[ask_vol_key]
        
        # 合并基本信息和五档行情
        detailed_quote = {
            'basic': {
                'code': quote_dict.get('code'),
                'price': quote_dict.get('price'),
                'prev_close': quote_dict.get('prev_close'),
                'open': quote_dict.get('open'),
                'high': quote_dict.get('high'),
                'low': quote_dict.get('low'),
                'volume': quote_dict.get('volume'),
                'amount': quote_dict.get('amount'),
                'server_time': quote_dict.get('server_time')
            },
            'bid_ask': bid_ask_data
        }
        
        return detailed_quote
    
    def get_multiple_realtime_prices(self, stock_codes):
        """
        批量获取多个股票的实时价格
        
        返回:
        dict: {股票代码: 价格}
        """
        if not self.connect():
            return {}
        
        try:
            # 按市场分组股票代码
            sh_codes = [(1, code) for code in stock_codes if code.startswith('6')]
            sz_codes = [(0, code) for code in stock_codes if not code.startswith('6')]
            
            all_codes = sh_codes + sz_codes
            if not all_codes:
                return {}
            
            # 批量获取行情
            stock_info = self.api.get_security_quotes(all_codes)
            
            prices = {}
            for quote in stock_info:
                if  quote and 'price' in quote:
                    prices[quote['code']] = quote['price']
            
            logger.info(f"成功获取 {len(prices)} 只股票的实时价格")
            return prices
            
        except Exception as e:
            logger.error(f"批量获取实时价格时出错: {e}")
            return {}
        finally:
            self.disconnect()
    def get_historical_data_range(self, stock_code, start_date, end_date, period='daily'):
        """
        获取指定日期范围内的历史数据
        
        注意: 这个方法需要先有本地历史数据或使用其他数据源
        这里简化实现，实际应用中可能需要更复杂的数据获取逻辑
        """
        # 这里简化实现，实际应用中可能需要从数据库或其他数据源获取
        # 我们可以通过获取足够多的数据然后过滤来实现
        days_needed = (pd.to_datetime(end_date) - pd.to_datetime(start_date)).days + 100
        df = self.get_kline_data(stock_code, period, count=days_needed)
        
        if df is not None:
            mask = (df['datetime'] >= pd.to_datetime(start_date)) & (df['datetime'] <= pd.to_datetime(end_date))
            filtered_df = df[mask].reset_index(drop=True)
            return filtered_df
        
        return None
    
  

# 使用示例
if __name__ == "__main__":
    # 创建数据提供器实例
    data_provider = TDXDataProvider()
    
    # 测试获取K线数据
    kline_data = data_provider.get_kline_data('000001', period='daily', count=10)
    if kline_data is not None:
        print("K线数据:")
        print(kline_data[['datetime', 'open', 'high', 'low', 'close']].tail())
    
    # 测试获取实时价格
    realtime_price = data_provider.get_realtime_price('000001')
    if realtime_price is not None:
        print(f"\n实时价格: {realtime_price}")
    
    # 测试批量获取
    prices = data_provider.get_multiple_realtime_prices(['000001', '600036'])
    print(f"\n批量价格: {prices}")

K线数据:
             datetime   open   high    low  close
5 2025-09-23 15:00:00  11.37  11.58  11.32  11.52
6 2025-09-24 15:00:00  11.51  11.62  11.43  11.46
7 2025-09-25 15:00:00  11.43  11.47  11.36  11.40
8 2025-09-26 15:00:00  11.39  11.44  11.32  11.40
9 2025-09-29 15:00:00  11.40  11.48  11.27  11.37

实时价格: 11.37


2025-09-29 21:50:30,350 - INFO - 成功获取 2 只股票的实时价格



批量价格: {'600036': 40.68, '000001': 11.37}


In [2]:
from abc import ABC, abstractmethod
import pandas as pd

class BaseStrategy(ABC):
    """
    策略基类
    所有具体策略都应该继承这个类
    """
    
    def __init__(self, data_provider, capital=100000, name="未命名策略"):
        """
        初始化策略
        
        参数:
        data_provider: 数据提供器实例
        capital: 初始资金
        name: 策略名称
        """
        self.data_provider = data_provider
        self.initial_capital = capital
        self.capital = capital
        self.name = name
        
        # 持仓和交易记录
        self.positions = {}  # {股票代码: 持仓信息}
        self.trade_log = []  # 交易记录列表
        self.performance_log = []  # 绩效记录
        
        logger.info(f"初始化策略: {name}, 初始资金: {capital}")
    
    @abstractmethod
    def calculate_signals(self, stock_code, data):
        """
        计算交易信号 - 子类必须实现这个方法
        
        参数:
        stock_code: 股票代码
        data: K线数据或其他所需数据
        
        返回:
        dict: 包含交易信号的字典，例如 {'action': 'BUY', 'price': 10.5, 'shares': 100}
        """
        pass
    
    @abstractmethod
    def should_enter(self, stock_code, signals):
        """
        判断是否应该建仓 - 子类必须实现这个方法
        
        返回:
        bool: 是否建仓
        """
        pass
    
    @abstractmethod
    def should_exit(self, stock_code, signals):
        """
        判断是否应该平仓 - 子类必须实现这个方法
        
        返回:
        bool: 是否平仓
        """
        pass
    
    @abstractmethod
    def should_add_position(self, stock_code, signals):
        """
        判断是否应该加仓 - 子类必须实现这个方法
        
        返回:
        bool: 是否加仓
        """
        pass
    
    def get_position_size(self, price, risk_per_trade=0.02):
        """
        计算仓位大小
        
        参数:
        price: 股票价格
        risk_per_trade: 每笔交易风险比例
        
        返回:
        int: 股票数量
        """
        risk_amount = self.capital * risk_per_trade
        shares = int(risk_amount / price)
        return max(1, shares)  # 至少1股
    
    def enter_position(self, stock_code, price, shares, reason="建仓"):
        """
        建立仓位
        
        返回:
        bool: 是否成功建仓
        """
        cost = price * shares
        
        if cost > self.capital:
            logger.warning(f"资金不足，无法建仓 {stock_code}")
            return False
        
        # 记录持仓
        self.positions[stock_code] = {
            'entry_price': price,
            'shares': shares,
            'entry_date': datetime.now(),
            'entry_reason': reason
        }
        
        # 更新资金
        self.capital -= cost
        
        # 记录交易
        trade_record = {
            'date': datetime.now(),
            'stock': stock_code,
            'action': 'BUY',
            'price': price,
            'shares': shares,
            'value': cost,
            'reason': reason,
            'capital': self.capital
        }
        self.trade_log.append(trade_record)
        
        logger.info(f"{reason} {stock_code}: {shares}股 @ {price:.2f}, 成本: {cost:.2f}")
        return True
    
    def exit_position(self, stock_code, price, reason="平仓"):
        """
        平仓
        
        返回:
        bool: 是否成功平仓
        """
        if stock_code not in self.positions:
            logger.warning(f"尝试平仓但未持有 {stock_code}")
            return False
        
        position = self.positions[stock_code]
        shares = position['shares']
        exit_value = shares * price
        entry_cost = position['entry_price'] * shares
        
        # 计算盈亏
        pnl = exit_value - entry_cost
        pnl_percent = (pnl / entry_cost) * 100
        
        # 更新资金
        self.capital += exit_value
        
        # 记录交易
        trade_record = {
            'date': datetime.now(),
            'stock': stock_code,
            'action': 'SELL',
            'price': price,
            'shares': shares,
            'value': exit_value,
            'pnl': pnl,
            'pnl_percent': pnl_percent,
            'reason': reason,
            'capital': self.capital
        }
        self.trade_log.append(trade_record)
        
        # 移除持仓
        del self.positions[stock_code]
        
        action_type = "盈利" if pnl > 0 else "亏损"
        logger.info(f"{reason} {stock_code}: {shares}股 @ {price:.2f}, "
                   f"{action_type}: {pnl:.2f}({pnl_percent:.2f}%)")
        
        return True
    
    def add_to_position(self, stock_code, price, shares, reason="加仓"):
        """
        加仓
        
        返回:
        bool: 是否成功加仓
        """
        if stock_code not in self.positions:
            logger.warning(f"尝试加仓但未持有 {stock_code}")
            return False
        
        cost = price * shares
        
        if cost > self.capital:
            logger.warning(f"资金不足，无法加仓 {stock_code}")
            return False
        
        # 更新持仓（计算平均成本）
        position = self.positions[stock_code]
        total_shares = position['shares'] + shares
        total_cost = (position['entry_price'] * position['shares']) + cost
        new_avg_price = total_cost / total_shares
        
        position['shares'] = total_shares
        position['entry_price'] = new_avg_price
        position['add_position_date'] = datetime.now()
        
        # 更新资金
        self.capital -= cost
        
        # 记录交易
        trade_record = {
            'date': datetime.now(),
            'stock': stock_code,
            'action': 'ADD',
            'price': price,
            'shares': shares,
            'value': cost,
            'reason': reason,
            'capital': self.capital
        }
        self.trade_log.append(trade_record)
        
        logger.info(f"{reason} {stock_code}: {shares}股 @ {price:.2f}, 平均成本: {new_avg_price:.2f}")
        return True
    
    def run_on_stock(self, stock_code, data=None, period='daily', count=10):
        """
        在单个股票上运行策略
        
        返回:
        bool: 是否有交易发生
        """
        # 获取数据（如果未提供）
        if data is None:
            data = self.data_provider.get_kline_data(stock_code, period, count)
        
        if data is None or len(data) < 3:
            logger.warning(f"数据不足，无法分析 {stock_code}")
            return False
        
        # 计算信号
        signals = self.calculate_signals(stock_code, data)

        # 先计算各个条件的值，避免重复计算
        should_enter = self.should_enter(stock_code, signals)
        should_exit = self.should_exit(stock_code, signals)
        should_add = self.should_add_position(stock_code, signals)

        # 如果是建仓信号，记录建仓前一天的最低点
        if should_enter and not signals.get('in_position', False):
            self.entry_day0_low = signals.get('day0_low', 0)
    #    print(signals)
        # 检查是否需要平仓
        if stock_code in self.positions and should_exit:
            # 使用最新价格平仓
            current_price = signals.get('current_price', data.iloc[-1]['close'])
            self.exit_position(stock_code, current_price, signals.get('exit_reason', '策略平仓'))
            #测试用
       #     print('策略平仓','日期=',signals['date'],'股票=',stock_code, '价格：', current_price)
            return True
        
        # 检查是否需要加仓
        if stock_code in self.positions and should_add:
            current_price = signals.get('current_price', data.iloc[-1]['close'])
            add_shares = self.get_position_size(current_price, 0.01)  # 加仓使用较小风险比例
            self.add_to_position(stock_code, current_price, add_shares, signals.get('add_reason', '策略加仓'))
      #      print('策略加仓','日期=',signals['date'],'股票=',stock_code, '加仓前日最高价：', signals['day0_high'],
       #           '加仓日最高价：', signals['day1_high'],)
            return True
        
        # 检查是否需要建仓
        if stock_code not in self.positions and should_enter:
            current_price = signals.get('current_price', data.iloc[-1]['close'])
            shares = self.get_position_size(current_price)
            self.enter_position(stock_code, current_price, shares, signals.get('enter_reason', '策略建仓'))
            #  print('策略建仓','日期=',signals['date'],'股票=',stock_code, '建仓前日最高价：', signals['day0_high'],'建仓前日最低价：',signals['day0_low'],
            #       '建仓日最高价：', signals['day1_high'],'建仓日最低价：',signals['day1_low'])
            return True
        
        return False
    
    def run_on_multiple_stocks(self, stock_codes, period='daily', count=10):
        """
        在多个股票上运行策略
        """
        trades_count = 0
        for stock_code in stock_codes:
            if self.run_on_stock(stock_code, period=period, count=count):
                trades_count += 1
        
        logger.info(f"在 {len(stock_codes)} 只股票上运行策略，发生 {trades_count} 笔交易")
        return trades_count
    
    def get_portfolio_value(self, current_prices=None):
        """
        计算投资组合当前价值
        
        参数:
        current_prices: {股票代码: 当前价格}，如果为None则尝试获取实时价格
        
        返回:
        float: 投资组合总价值
        """
        # 获取当前价格
        if current_prices is None and self.positions:
            stock_codes = list(self.positions.keys())
            current_prices = self.data_provider.get_multiple_realtime_prices(stock_codes)
        
        portfolio_value = self.capital
        
        for stock_code, position in self.positions.items():
            if current_prices and stock_code in current_prices:
                current_price = current_prices[stock_code]
            else:
                # 如果没有实时价格，使用持仓成本
                current_price = position['entry_price']
            
            position_value = position['shares'] * current_price
            portfolio_value += position_value
        
        return portfolio_value
    
    def get_performance_summary(self):
        """
        获取策略绩效摘要
        
        返回:
        dict: 包含各种绩效指标的字典
        """
        current_value = self.get_portfolio_value()
        total_return = (current_value - self.initial_capital) / self.initial_capital * 100
        
        # 计算胜率
        if self.trade_log:
            exit_trades = [t for t in self.trade_log if t['action'] == 'SELL']
            if exit_trades:
                winning_trades = [t for t in exit_trades if t.get('pnl', 0) > 0]
                win_rate = len(winning_trades) / len(exit_trades) * 100
            else:
                win_rate = 0
        else:
            win_rate = 0
        
        summary = {
            '策略名称': self.name,
            '初始资金': self.initial_capital,
            '当前价值': current_value,
            '总收益率(%)': round(total_return, 2),
            '持仓数量': len(self.positions),
            '交易次数': len(self.trade_log),
            '胜率(%)': round(win_rate, 2) if win_rate else 0,
            '当前资金': self.capital
        }
        
        return summary
    
    def print_summary(self):
        """打印策略摘要"""
        summary = self.get_performance_summary()
        
        print(f"\n=== {summary['策略名称']} 策略摘要 ===")
        for key, value in summary.items():
            print(f"{key}: {value}")
        
        if self.positions:
            print("\n当前持仓:")
            for stock_code, position in self.positions.items():
                print(f"  {stock_code}: {position['shares']}股 @ {position['entry_price']:.2f}")
        
        if self.trade_log:
            print(f"\n最近5笔交易:")
            for trade in self.trade_log[-5:]:
                action = f"{trade['action']} {trade['stock']}"
                pnl_info = f", 盈亏: {trade.get('pnl', 0):.2f}" if 'pnl' in trade else ""
                print(f"  {trade['date'].strftime('%Y-%m-%d %H:%M')} {action} "
                      f"{trade['shares']}股 @ {trade['price']:.2f}{pnl_info}")

In [3]:
import logging
import sys
from datetime import datetime

class TDXFormatter(logging.Formatter):
    """通达信风格的日志格式化器"""
    
    def format(self, record):
        # 只保留消息内容，去掉时间戳和日志级别
        return record.getMessage()

class TDXLogger:
    """通达信风格的日志输出"""
    
    def __init__(self):
        # 创建logger
        self.logger = logging.getLogger('TDXStrategy')
        self.logger.setLevel(logging.INFO)
        
        # 移除所有现有的handler
        for handler in self.logger.handlers[:]:
            self.logger.removeHandler(handler)
        
        # 创建通达信格式的handler
        tdx_handler = logging.StreamHandler(sys.stdout)
        tdx_handler.setLevel(logging.INFO)
        tdx_handler.setFormatter(TDXFormatter())
        
        self.logger.addHandler(tdx_handler)
        
        # 创建文件handler（可选）
        file_handler = logging.FileHandler('strategy_signals.log', encoding='gbk')
        file_handler.setLevel(logging.INFO)
        file_handler.setFormatter(TDXFormatter())
        
        self.logger.addHandler(file_handler)
    
    def info(self, msg):
        self.logger.info(msg)



In [3]:
import pandas as pd
import numpy as np
from datetime import datetime, time, timedelta
import logging

# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class KLineMerger:
    """K线合并器，用于处理价格容差和K线合并"""
    
    def __init__(self, tolerance=0.001):  # 默认千分之一的容差
        self.tolerance = tolerance
    
    def should_merge(self, kline1, kline2):
        """判断两根K线是否应该合并（修复逻辑）"""
        high1, high2 = kline1['high'], kline2['high']
        low1, low2 = kline1['low'], kline2['low']
        
        # 计算最高价差值与基准价格的比率
        high_diff = abs(high2 - high1)
        high_base = max(high1, high2)
        high_ratio = high_diff / high_base if high_base > 0 else 0
        
        # 计算最低价差值与基准价格的比率
        low_diff = abs(low2 - low1)
        low_base = max(low1, low2)
        low_ratio = low_diff / low_base if low_base > 0 else 0
        
        # 只要最高价或最低价差值不超过千分之一，就合并
        return high_ratio <= self.tolerance or low_ratio <= self.tolerance
    
    def merge_klines(self, kline1, kline2):
        """合并两根K线（增加跳空处理）"""
        # 取最高点中的最大值作为合并后的最高点
        merged_high = max(kline1['high'], kline2['high'])
        # 取最低点中的最小值作为合并后的最低点
        merged_low = min(kline1['low'], kline2['low'])
        
        # 处理跳空情况
        close1 = kline1['close']
        open2 = kline2['open']
        
        # 如果当日K线的最低点比前一日的收盘价高（向上跳空）
        if open2 > close1:
            # 基准线最低点就是前一日的收盘价
            merged_low = min(merged_low, close1)
        
        # 如果当日K线的最高点比前日收盘价低（向下跳空）
        if open2 < close1:
            # 基准线最高点就是前一日的收盘价
            merged_high = max(merged_high, close1)
        
        # 开盘价取第一根K线的开盘价
        merged_open = kline1['open']
        # 收盘价取第二根K线的收盘价
        merged_close = kline2['close']
        
     #   print(kline1)

        
        # 成交量合并
        merged_volume = kline1['vol'] + kline2['vol']
        
        # 日期范围（开始日期到结束日期）
        start_date = kline1['date'] if 'date' in kline1 else kline1.name
        end_date = kline2['date'] if 'date' in kline2 else kline2.name
        
        merged_kline = {
            'open': merged_open,
            'high': merged_high,
            'low': merged_low,
            'close': merged_close,
            'volume': merged_volume,
            'start_date': start_date,
            'end_date': end_date,
            'merged': True,  # 标记为合并K线
            'merged_count': kline1.get('merged_count', 0) + 1,  # 记录合并次数
            'gap_up': open2 > close1,  # 标记向上跳空
            'gap_down': open2 < close1   # 标记向下跳空
        }
        
        return merged_kline
    
    def process_data(self, data):
        """处理整个数据集的K线合并（修复合并逻辑）"""
        if len(data) < 2:
            return data
        
        # 转换为列表形式处理
        if isinstance(data, pd.DataFrame):
            data_list = []
            for i, row in data.iterrows():
                row_dict = row.to_dict()
                row_dict['date'] = i
                data_list.append(row_dict)
        else:
            data_list = data.copy()
        
        merged_data = []
        i = 0
        
        while i < len(data_list):
            current_kline = data_list[i]
            
            # 如果是最后一根K线，直接添加
            if i == len(data_list) - 1:
                current_kline['merged'] = False
                current_kline['merged_count'] = 0
                current_kline['gap_up'] = False
                current_kline['gap_down'] = False
                merged_data.append(current_kline)
                break
            
            next_kline = data_list[i+1]
            
            # 判断是否应该合并
            if self.should_merge(current_kline, next_kline):
                # 合并K线
                merged_kline = self.merge_klines(current_kline, next_kline)
                merged_data.append(merged_kline)
                i += 2  # 跳过下一根K线，因为已经合并
            else:
                # 不合并，直接添加当前K线
                current_kline['merged'] = False
                current_kline['merged_count'] = 0
                current_kline['gap_up'] = False
                current_kline['gap_down'] = False
                merged_data.append(current_kline)
                i += 1
        
        # 转换回DataFrame
        if isinstance(data, pd.DataFrame):
            df = pd.DataFrame(merged_data)
            if 'date' in df.columns:
                df.set_index('date', inplace=True)
            return df
        
        return merged_data

In [5]:
import pandas as pd
import numpy as np
from datetime import datetime, time, timedelta
#from strategy_base import BaseStrategy, TDXDataProvider
import logging


# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# # 创建通达信日志器
# tdx_logger = TDXLogger()
class BreakoutStrategy(BaseStrategy):
    """
    基于BaseStrategy的突破策略实现（修正版）
    建仓条件在收盘前5分钟判断当日最高价是否未超过前一日最高价
    """
    
    
    def __init__(self, data_provider, capital=100000, name="突破策略-尾盘判断", 
                 tolerance=0, max_additions=5, add_position_ratio=0.5,
                 close_time_judgment=True, judgment_time="14:55:00",enable_kline_merge=True):
        """
        初始化突破策略
        
        参数:
        data_provider: 数据提供器实例
        capital: 初始资金
        name: 策略名称
        tolerance: 价格容差百分比
        max_additions: 最大加仓次数
        add_position_ratio: 加仓比例（相对于初始仓位）
        close_time_judgment: 是否在收盘前判断
        judgment_time: 判断时间（默认14:55:00）
        """
        super().__init__(data_provider, capital, name)
        
        self.tolerance = tolerance# 价格容差百分比（仅用于K线合并）
        self.max_additions = max_additions
        self.add_position_ratio = add_position_ratio
        self.close_time_judgment = close_time_judgment
        self.judgment_time = judgment_time
        self.enable_kline_merge = enable_kline_merge

         # K线合并器
        self.kline_merger = KLineMerger(tolerance)
        
        # 策略特定状态
        self.benchmark_high = None  # 基准最高价（用于加仓判定）
       # self.reference_low = None   # 参考最低价（第0天最低价）
        self.addition_count = 0     # 加仓次数计数
        self.today_high = None      # 当日最高价（用于尾盘判断）
        self.today_low = None       # 当日最低价（用于尾盘判断）
        self.entry_day0_low = None  # 建仓前一天的最低点（用于止损）
        self.entry_day0_high = None # 建仓前一天的最高点
        self.entry_date = None      # 记录建仓日期

         # 添加last_signals存储
        self.last_signals = {}      # 存储每个股票的最后信号
        self.merged_data = {}       # 存储合并后的K线数据
        
        logger.info(f"初始化突破策略: 容差={tolerance*100}%, 最大加仓={max_additions}次, 尾盘判断={close_time_judgment}")

    def reset_daily_data(self):
        """重置每日数据"""
        self.today_high = None
        self.today_low = None

    def preprocess_data(self, stock_code, data):
        """预处理数据：应用K线合并（修改：合并的K线不做买入判断）"""
        if not self.enable_kline_merge:
            # 为每根K线添加合并标记
            if isinstance(data, pd.DataFrame):
                data_copy = data.copy()
                data_copy['merged'] = False
                data_copy['merged_count'] = 0
                data_copy['gap_up'] = False
                data_copy['gap_down'] = False
                return data_copy
            else:
                # 如果是列表形式，为每个字典添加合并标记
                for item in data:
                    item['merged'] = False
                    item['merged_count'] = 0
                    item['gap_up'] = False
                    item['gap_down'] = False
                return data
        
        # 检查是否已经处理过该股票的数据
        if stock_code in self.merged_data:
            return self.merged_data[stock_code]
        
        # 处理K线合并
        processed_data = self.kline_merger.process_data(data)
        
         # 转换为DataFrame以便后续处理
        if processed_data is not None and len(processed_data) > 0 and isinstance(processed_data[0] if not isinstance(processed_data, pd.DataFrame) else processed_data.iloc[0], dict):
            if not isinstance(processed_data, pd.DataFrame):
                df = pd.DataFrame(processed_data)
                # 设置日期索引
                if 'date' in df.columns:
                    df.set_index('date', inplace=True)
                elif 'start_date' in df.columns:
                    df.set_index('start_date', inplace=True)
            else:
                df = processed_data
            
            self.merged_data[stock_code] = df
            return df
        
        return data
    
    def calculate_signals(self, stock_code, data):
        """
        计算突破策略的信号（修正版：支持尾盘判断）
        
        参数:
        stock_code: 股票代码
        data: K线数据DataFrame
        
        返回:
        dict: 包含交易信号的字典
        """
          # 预处理数据：应用K线合并
        processed_data = self.preprocess_data(stock_code, data)
        
        if len(data) < 2:  # 至少需要2天数据
            return {}

         # 重置每日数据（新的一天开始）
        self.reset_daily_data()
         # 检查当前K线是否是合并的K线
        current_kline_merged = processed_data.iloc[-1].get('merged', False) if hasattr(processed_data.iloc[-1], 'get') else False
        current_kline_merged_count = processed_data.iloc[-1].get('merged_count', 0) if hasattr(processed_data.iloc[-1], 'get') else 0
        
        # 获取当前时间（如果是实时数据）
        current_time = datetime.now().time() if hasattr(data, 'real_time') else None
        
        # 提取关键价格（使用合并后的数据）
        # 第0天数据（前一天）
        day0_high = processed_data.iloc[-2]['high'] if len(processed_data) >= 2 else processed_data.iloc[-1]['high']
        day0_low = processed_data.iloc[-2]['low'] if len(processed_data) >= 2 else processed_data.iloc[-1]['low']
        day0_close = processed_data.iloc[-2]['close'] if len(processed_data) >= 2 else processed_data.iloc[-1]['close']
        # 第1天数据（当天）
        day1_high = processed_data.iloc[-1]['high']
        day1_low = processed_data.iloc[-1]['low']
        day1_close = processed_data.iloc[-1]['close']
        day1_open = processed_data.iloc[-1]['open']
         # 处理跳空情况
        # 如果当日K线的最低点比前一日的收盘价高（向上跳空）
        if day1_open > day0_close:
            # 基准线最低点就是前一日的收盘价
            adjusted_day0_low = min(day0_low, day0_close)
       #     logger.info(f"向上跳空: {stock_code}, 前日收盘价={day0_close:.2f}, 当日开盘价={day1_open:.2f}, 调整前日最低价={day0_low:.2f}->{adjusted_day0_low:.2f}")
            day0_low = adjusted_day0_low
        
        # 如果当日K线的最高点比前日收盘价低（向下跳空）
        if day1_open < day0_close:
            # 基准线最高点就是前一日的收盘价
            adjusted_day0_high = max(day0_high, day0_close)
       #     logger.info(f"向下跳空: {stock_code}, 前日收盘价={day0_close:.2f}, 当日开盘价={day1_open:.2f}, 调整前日最高价={day0_high:.2f}->{adjusted_day0_high:.2f}")
            day0_high = adjusted_day0_high
        
        # 获取实时价格（如果可用）
        current_price = self.data_provider.get_realtime_price(stock_code) or day1_close

        #以下为实时监控的条件
        # 检查是否在持仓中
        in_position = stock_code in self.positions
        
        # 更新当日最高最低价（用于尾盘判断）
        if current_price is not None:
            if self.today_high is None or current_price > self.today_high:
                self.today_high = current_price
            if self.today_low is None or current_price < self.today_low:
                self.today_low = current_price
        
        # 判断是否在收盘前5分钟
        is_close_time = False
        if current_time and self.close_time_judgment:
            judgment_time_obj = datetime.strptime(self.judgment_time, "%H:%M:%S").time()
            # 判断是否在收盘前5分钟内
            close_time = datetime.strptime("15:00:00", "%H:%M:%S").time()
            is_close_time = (current_time >= judgment_time_obj and current_time < close_time)
        
        # 计算建仓条件
        # 使用尾盘判断时的最高价或当日整体最高价
        judgment_high = self.today_high if (is_close_time and self.today_high is not None) else day1_high
        judgment_low = self.today_low if (is_close_time and self.today_low is not None) else day1_low
       # print(judgment_high,judgment_low,day0_low * (1 - self.tolerance))

           # 应用价格容差（滑点概念）
   #     tolerance_factor = 1 + self.tolerance
        #考虑容差
        # entry_condition1 = judgment_low > day0_low * (1 - self.tolerance)  # 当日最低价 > 第0天最低价
        # entry_condition2 = judgment_high < day0_high * tolerance_factor   # 当日最高价 < 第0天最高价
        
        # 修改：建仓条件不考虑误差（去掉容差）
        entry_condition1 = judgment_low > day0_low  # 当日最低价 > 第0天最低价
        entry_condition2 = judgment_high <= day0_high  # 当日最高价 ≤ 第0天最高价
        
        # 只有在收盘前5分钟或非尾盘判断模式下才允许建仓
        entry_condition_time = not self.close_time_judgment or is_close_time

        # 修改：合并的K线当日不做买入判断
        if current_kline_merged:
            entry_condition1 = False
            entry_condition2 = False
            logger.info(f"跳过建仓判断: {stock_code} 当前K线为合并K线 (合并了{current_kline_merged_count}根K线)")

          #调试信息：记录建仓日期
              
        # logger.info(f"建仓条件: 日期={data.iloc[-1]['year'] },{data.iloc[-1]['month'] }, {data.iloc[-1]['day']},股票={stock_code},"
        #             f"条件1={entry_condition1}, 条件2={entry_condition2}, 时间条件={entry_condition_time}")
        
        # 计算加仓条件
        if in_position:
            # 获取当前基准最高价
            position_info = self.positions[stock_code]
            current_benchmark_high  = position_info.get('benchmark_high', 0)
            entry_date = position_info.get('entry_date', None)
            
           # add_condition = judgment_high > current_benchmark_high  * tolerance_factor  # 当日最高价 > 基准最高价
             # 修改：加仓条件不考虑误差
            add_condition = judgment_high > current_benchmark_high
            # print(type(judgment_high))
            # print(judgment_high - current_benchmark * (1 - self.tolerance))
            # 加仓也需要在尾盘进行
            add_condition_time = not self.close_time_judgment or is_close_time

            #  # 调试信息
            # if add_condition:
            #     logger.info(f"加仓条件满足: {stock_code}, "
            #                 f"日期:[{data.iloc[-1]['year'] },{data.iloc[-1]['month'] }, {data.iloc[-1]['day']}]," 
            #               f"当日最高价={judgment_high:.2f}, "
            #               f"基准最高价={current_benchmark_high:.2f}, "
            #               f"容差阈值={current_benchmark_high * (1 + self.tolerance):.2f}, "
            #               f"价格差值={(judgment_high - current_benchmark_high * (1 + self.tolerance)):.4f}")
            # else:
            #     logger.debug(f"加仓条件不满足: {stock_code}, "
            #                f"当日最高价={judgment_high:.2f}, "
            #                f"基准最高价={current_benchmark_high:.2f}, "
            #                f"容差阈值={current_benchmark_high * (1 + self.tolerance):.2f}, "
            #                f"价格差值={(judgment_high - current_benchmark_high * (1 + self.tolerance)):.4f}")
        else:
            add_condition = False
            add_condition_time = False

       

        # #调试信息：记录加仓日期
              
        # logger.info(f"加仓条件: 日期={data.iloc[-1]['year'] },{data.iloc[-1]['month'] }, {data.iloc[-1]['day']},股票={stock_code},"
        #             f"条件={add_condition}, 时间条件={add_condition_time}")
        
        # 计算止损条件（盘中实时判断）
        if in_position:
            # 从持仓信息中获取建仓前一天的最低点
            position_info = self.positions[stock_code]
            entry_day0_low = position_info.get('entry_day0_low', day0_low)
            # 止损条件：当日最低价 < 建仓前一天的最低点（考虑容差）
          #  stop_condition = day1_low < entry_day0_low * (1 - self.tolerance)
            # 修改：止损条件不考虑误差
            stop_condition = judgment_low < entry_day0_low 
        else:
            stop_condition = False
        # #调试信息：记录止损日期
        # logger.info(f"加仓条件: 日期={data.iloc[-1]['year'] },{data.iloc[-1]['month'] }, {data.iloc[-1]['day']},股票={stock_code},"
        #             f"条件={stop_condition}")

                
        # 构建信号字典
        signals = {
            'day0_high': day0_high,
            'day0_low': day0_low,
            'day1_high': day1_high,
            'day1_low': day1_low,
            'day1_close': day1_close,
            'judgment_high': judgment_high,
            'judgment_low': judgment_low,
            'current_price': current_price,
            'entry_condition1': entry_condition1,
            'entry_condition2': entry_condition2,
            'entry_condition_time': entry_condition_time,
            'add_condition': add_condition,
            'add_condition_time': add_condition_time,
            'stop_condition': stop_condition,
            'in_position': in_position,
            'addition_count': self.addition_count if in_position else 0,
            'is_close_time': is_close_time,
            'today_high': self.today_high,
            'today_low': self.today_low,
            #回测用
            'date':[processed_data.iloc[-1]['year'], processed_data.iloc[-1]['month'], processed_data.iloc[-1]['day']] if 'year' in processed_data.columns else [2025, 8, 1],
            'entry_day0_low': self.entry_day0_low if in_position else day0_low,
            'current_benchmark_high': self.benchmark_high if in_position else 0,
        #    'tolerance_factor': tolerance_factor
            'kline_merged': current_kline_merged,
            'merged_count': current_kline_merged_count,
            'gap_up': day1_open > day0_close,  # 向上跳空
            'gap_down': day1_open < day0_close  # 向下跳空            
        }
        # # 调试信息：记录止损条件详情
        # if in_position and stop_condition:
        #     logger.info(f"止损条件触发: {stock_code}, "
        #                f"当前最低价={day1_low:.2f}, "
        #                f"建仓前一日最低价={signals['entry_day0_low']:.2f}")

         # 保存信号到last_signals
        self.last_signals[stock_code] = signals
        
        return signals
    
    def should_enter(self, stock_code, signals):
        """
        判断是否应该建仓（修正版：考虑尾盘判断）
        
        返回:
        bool: 是否建仓
        """
        # 合并的K线当日不做买入判断
        if signals.get('kline_merged', False):
            logger.info(f"跳过建仓: {stock_code} 当前为合并K线")
            return False
        
        # 建仓条件：满足价格条件、时间条件且不在持仓中
        entry_signal = (signals.get('entry_condition1', False) and 
                signals.get('entry_condition2', False) and 
                signals.get('entry_condition_time', False) and 
                not signals.get('in_position', False))
        if entry_signal:
            date_str = f"{signals.get('date')[0]}-{signals.get('date')[1]:02d}-{signals.get('date')[2]:02d}"
            logger.info(f"建仓信号触发: {stock_code}, "
                        f"日期={date_str}，" 
                       f"前一日最低价={signals.get('day0_low')}, "
                       f"前一日最高价={signals.get('day0_high')},"
                       f"建仓日最低价={signals.get('day1_low')}, "
                       f"建仓日最高价={signals.get('day1_high')},"
                       f"跳空情况: 上={signals.get('gap_up', False)}, 下={signals.get('gap_down', False)}")
            
        return entry_signal
    
    def should_exit(self, stock_code, signals):
        """
        判断是否应该平仓（止损）- 盘中实时判断
        
        返回:
        bool: 是否平仓
        """
        # 止损条件：触发止损条件且在持仓中（盘中实时判断，不限于尾盘）
        stop_signal = signals.get('stop_condition', False) and signals.get('in_position', False)

        if stop_signal:
            date_str = f"{signals.get('date')[0]}-{signals.get('date')[1]:02d}-{signals.get('date')[2]:02d}"
            logger.info(f"止损信号触发: {stock_code}, "
                        f"日期={date_str},"
                       f"当前最低价={signals.get('day1_low', 0):.2f}, "
                       f"建仓前一日最低价={signals.get('entry_day0_low', 0):.2f}")
        
        return stop_signal
    
    def should_add_position(self, stock_code, signals):
        """
        判断是否应该加仓（修正版：考虑尾盘判断、基于基准线最高点的连续加仓）
        
        返回:
        bool: 是否加仓
        """
        # 加仓条件：满足加仓条件、时间条件、在持仓中且未超过最大加仓次数
        in_position = signals.get('in_position', False)
        add_condition = signals.get('add_condition', False)
        add_condition_time = signals.get('add_condition_time', False)
        addition_count = signals.get('addition_count', 0)
        current_benchmark_high = signals.get('current_benchmark_high', 0)
        judgment_high = signals.get('judgment_high', 0)
  #      tolerance_factor = signals.get('tolerance_factor', 1)
  #      print(tolerance_factor)

        add_signal = (in_position and add_condition and add_condition_time and 
                addition_count < self.max_additions)
        if add_signal:
            date_str = f"{signals.get('date')[0]}-{signals.get('date')[1]:02d}-{signals.get('date')[2]:02d}"
            logger.info(f"加仓信号触发: {stock_code}, 加仓次数={addition_count + 1}, "
                       f"日期={date_str}, "
                       f"基准最高价={current_benchmark_high:.2f}, "
                       f"当日最高价={judgment_high:.2f}" )
             #          f"容差阈值={current_benchmark_high * tolerance_factor:.2f}")
        
        
        return add_signal
    
    def get_position_size(self, price, risk_per_trade=0.02):
        """
        计算仓位大小（重写基类方法）
        
        参数:
        price: 股票价格
        risk_per_trade: 每笔交易风险比例
        
        返回:
        int: 股票数量
        """
        # 对于突破策略，我们使用固定的仓位比例而不是基于风险的计算
        position_value = self.capital * 0.2  # 固定使用20%资金建仓
        shares = int(position_value / price)
        return max(1, shares)  # 至少1股
    
    def enter_position(self, stock_code, price, shares, reason="建仓"):
        """
        建立仓位（重写基类方法以添加策略特定逻辑）
        
        返回:
        bool: 是否成功建仓
        """

        # 从last_signals获取信号数据
        signals = self.last_signals.get(stock_code, {})
        
        # 使用收盘价作为建仓价格，而不是传入的price参数
        # 因为策略是在收盘时建仓
        close_price = signals.get('day1_close', price)
       # print(close_price)
        # 应用滑点（价格容差）
        slippage = close_price * self.tolerance
        adjusted_price = close_price + slippage  # 买入时加滑点
        
        # 调用基类方法建立仓位
        success = super().enter_position(stock_code, adjusted_price, shares, reason)
        
        if success:
             
            # 设置策略特定状态
           # 基准最高价设置为建仓前日的最高价（不是建仓价格）
            self.benchmark_high = signals.get('day0_high', adjusted_price)
            self.entry_day0_low = signals.get('day0_low', adjusted_price)  # 记录建仓前一天的最低点
            self.entry_day0_high = signals.get('day0_high', adjusted_price)  # 记录建仓前一天的最高点
            self.addition_count = 0      # 重置加仓计数
            
            # 重置当日最高最低价
            self.today_high = None
            self.today_low = None
            
            # 更新持仓信息
            if stock_code in self.positions:
                self.positions[stock_code]['benchmark_high'] = self.benchmark_high
                self.positions[stock_code]['entry_day0_low'] = self.entry_day0_low
                self.positions[stock_code]['addition_count'] = self.addition_count
                 # 记录建仓前一天的最低点
                self.positions[stock_code]['entry_day0_low'] = self.entry_day0_low
                
            logger.info(f"建仓成功: {stock_code}, "
                       f"基准最高价={self.benchmark_high:.2f}, "
                       f"建仓价格={adjusted_price:.2f} (收盘价{close_price:.2f} + 滑点{slippage:.4f}), "
                       f"止损价={self.entry_day0_low:.2f}")
        
        return success
    
    def add_to_position(self, stock_code, price, shares, reason="加仓"):
        """
        加仓（重写基类方法以添加策略特定逻辑、基于基准线最高点的连续加仓）
        
        返回:
        bool: 是否成功加仓
        """
        # 从last_signals获取信号数据
        signals = self.last_signals.get(stock_code, {})
        
        # 使用收盘价作为加仓价格
        close_price = signals.get('day1_close', price)
        
        # 应用滑点（价格容差）
        # slippage = close_price * self.tolerance
        # adjusted_price = close_price + slippage  # 买入时加滑点
        
        # 计算加仓数量（使用递减比例）
        if stock_code in self.positions:
            current_shares = self.positions[stock_code]['shares']
            add_shares = int(current_shares * self.add_position_ratio)
            add_shares = max(1, add_shares)
        else:
            add_shares = shares
            
        # 调用基类方法加仓
        success = super().add_to_position(stock_code, close_price, add_shares, reason)
        
        if success:
            # 从last_signals获取信号数据
            signals = self.last_signals.get(stock_code, {})
            
            # 更新策略状态
            self.addition_count += 1

            # # 更新基准最高价为加仓当日的最高价
           
            # new_high = signals.get('judgment_high', close_price)
            # self.benchmark_high = new_high  # 直接更新为当日最高价          
                  
            
            # 重置当日最高最低价
            self.today_high = None
            self.today_low = None
            
            # 更新持仓信息
            if stock_code in self.positions:
                self.positions[stock_code]['benchmark_high'] = self.benchmark_high
                self.positions[stock_code]['addition_count'] = self.addition_count

            logger.info(f"加仓成功: {stock_code}, 第{self.addition_count}次加仓, "
                       f"基准最高价={self.benchmark_high:.2f}, "
                       f"加仓价格={close_price:.2f} ,收盘价{close_price:.2f} ")
        
        return success
    
    def exit_position(self, stock_code, price, reason="平仓"):
        """
        平仓（重写基类方法以添加策略特定逻辑）
        
        返回:
        bool: 是否成功平仓
        """
         # 从last_signals获取信号数据
        signals = self.last_signals.get(stock_code, {})
        
        # 使用收盘价作为平仓价格
        close_price = signals.get('day1_close', price)
        
        # 应用滑点（价格容差）
        # slippage = close_price * self.tolerance
        # adjusted_price = close_price - slippage  # 卖出时减滑点
        
        # 调用基类方法平仓
        success = super().exit_position(stock_code, price, reason)
        
        if success:
            # 重置策略状态
            self.benchmark_high = None
            self.entry_day0_low = None
            self.entry_day0_high = None
            self.addition_count = 0
            self.today_high = None
            self.reset_daily_data()

            # 从last_signals中移除该股票
            if stock_code in self.last_signals:
                del self.last_signals[stock_code]
            
            logger.info(f"平仓成功: {stock_code}, 原因={reason}, "
                       f"平仓价格={close_price:.2f} ,收盘价={close_price:.2f} ")
        
        return success
    
    def run_strategy(self, stock_codes, period='daily', count=20):
        """
        运行策略（重写基类方法以添加策略特定逻辑）
        
        参数:
        stock_codes: 股票代码列表
        period: K线周期
        count: 获取的K线数量
        
        返回:
        int: 交易次数
        """
        logger.info(f"开始运行突破策略，监控{len(stock_codes)}只股票")
        
        trades_count = 0
        for stock_code in stock_codes:
            # 获取K线数据
            klines = self.data_provider.get_kline_data(stock_code, period, count)
            
            if klines is None or len(klines) < 2:
                logger.warning(f"无法获取{stock_code}的K线数据或数据不足")
                continue
            
            # 运行策略
            if self.run_on_stock(stock_code, data=klines):
                trades_count += 1
        
        # 记录策略状态
        self.log_strategy_status()
        
        return trades_count
    
    def run_strategy_realtime(self, stock_codes, check_interval=60):
        """
        实时运行策略（支持尾盘判断）
        
        参数:
        stock_codes: 股票代码列表
        check_interval: 检查间隔（秒）
        """
        logger.info(f"开始实时运行突破策略，监控{len(stock_codes)}只股票，检查间隔{check_interval}秒")
        
        try:
            while True:
                current_time = datetime.now().time()
                judgment_time_obj = datetime.strptime(self.judgment_time, "%H:%M:%S").time()
                close_time = datetime.strptime("15:00:00", "%H:%M:%S").time()
                
                # 检查是否在交易时间内
                market_open = datetime.strptime("09:30:00", "%H:%M:%S").time()
                market_close = datetime.strptime("15:00:00", "%H:%M:%S").time()
                
                if market_open <= current_time <= market_close:
                    trades_count = 0
                    
                    for stock_code in stock_codes:
                        # 获取实时数据（包含当日最高最低价）
                        realtime_data = self.get_realtime_data(stock_code)
                        
                        if realtime_data:
                            # 运行策略
                            if self.run_on_stock(stock_code, data=realtime_data):
                                trades_count += 1
                    
                    if trades_count > 0:
                        logger.info(f"本轮检查完成，发生{trades_count}笔交易")
                    
                    # 如果是尾盘时间，记录判断结果
                    if self.close_time_judgment and judgment_time_obj <= current_time < close_time:
                        self.log_close_time_judgment(stock_codes)
                
                # 等待下一次检查
                import time
                time.sleep(check_interval)
                
        except KeyboardInterrupt:
            logger.info("实时策略被用户中断")
        except Exception as e:
            logger.error(f"实时策略运行过程中出错: {e}")
    
    def get_realtime_data(self, stock_code):
        """
        获取实时数据（包含当日最高最低价）
        
        返回:
        DataFrame: 包含实时数据的DataFrame
        """
        # 获取日线数据（作为基础）
        klines = self.data_provider.get_kline_data(stock_code, 'daily', 10)
        
        if klines is None or len(klines) < 2:
            return None
        
        # 获取实时行情
        quote = self.data_provider.get_realtime_quote(stock_code)
        
        if quote:
            # 更新最新数据
            latest_data = klines.iloc[-1].copy()
            latest_data['close'] = quote.get('price', latest_data['close'])
            
            # 更新当日最高最低价
            if 'high' in quote and quote['high'] > latest_data['high']:
                latest_data['high'] = quote['high']
            if 'low' in quote and quote['low'] < latest_data['low']:
                latest_data['low'] = quote['low']
            
            # 替换最后一行数据
            klines.iloc[-1] = latest_data
        
        return klines
    
    def log_close_time_judgment(self, stock_codes):
        """记录尾盘判断结果"""
        judgment_time = datetime.now().strftime("%H:%M:%S")
        logger.info(f"尾盘判断时间: {judgment_time}")
        
        for stock_code in stock_codes:
            # 获取实时数据
            realtime_data = self.get_realtime_data(stock_code)
            
            if realtime_data is None or len(realtime_data) < 2:
                continue
            
            # 计算信号
            signals = self.calculate_signals(stock_code, realtime_data)
            
            # 记录判断结果
            in_position = signals.get('in_position', False)
            entry_condition1 = signals.get('entry_condition1', False)
            entry_condition2 = signals.get('entry_condition2', False)
            entry_condition_time = signals.get('entry_condition_time', False)
            
            judgment_high = signals.get('judgment_high', 0)
            day0_high = signals.get('day0_high', 0)
            
            status = "持仓中" if in_position else "空仓"
            judgment = "符合" if (entry_condition1 and entry_condition2 and entry_condition_time) else "不符合"
            
            logger.info(f"{stock_code} 状态: {status}, 建仓条件: {judgment}, "
                       f"当日最高: {judgment_high:.2f}, 前日最高: {day0_high:.2f}")
    
    def log_strategy_status(self):
        """记录策略状态"""
        status_info = self.get_performance_summary()
        
        logger.info(f"策略状态: {status_info['策略名称']}")
        logger.info(f"当前资金: {status_info['当前资金']:.2f}")
        logger.info(f"持仓数量: {status_info['持仓数量']}")
        logger.info(f"交易次数: {status_info['交易次数']}")
        
        if self.positions:
            for stock, position in self.positions.items():
                logger.info(f"持仓: {stock}, 成本: {position['entry_price']:.2f}, "
                           f"股数: {position['shares']}, 加仓次数: {position.get('addition_count', 0)}")



In [11]:
# 使用完全兼容的导出器
def test_tdx_v771_export():
    """测试通达信v7.71兼容导出"""
    print("正在生成通达信v7.71兼容文件...")
    
    # 创建导出器实例
    exporter = TDXCompatibleExporter(None)
    
    # 添加测试信号（基于您提供的数据）
    test_signals = [
        {'stock_code': '000001', 'date': datetime(2025, 7, 25), 'type': 'buy', 'price': 12.36, 'desc': '建仓'},
        {'stock_code': '000001', 'date': datetime(2025, 7, 26), 'type': 'add', 'price': 12.50, 'desc': '第一次加仓'},
        {'stock_code': '000001', 'date': datetime(2025, 7, 27), 'type': 'add', 'price': 12.60, 'desc': '第二次加仓'},
        {'stock_code': '000001', 'date': datetime(2025, 7, 28), 'type': 'sell', 'price': 12.30, 'desc': '止损平仓'},
        {'stock_code': '600036', 'date': datetime(2025, 8, 4), 'type': 'buy', 'price': 44.94, 'desc': '建仓'},
        {'stock_code': '600036', 'date': datetime(2025, 8, 5), 'type': 'add', 'price': 45.30, 'desc': '第一次加仓'},
        {'stock_code': '600036', 'date': datetime(2025, 8, 6), 'type': 'add', 'price': 45.17, 'desc': '第二次加仓'},
        {'stock_code': '600036', 'date': datetime(2025, 8, 11), 'type': 'sell', 'price': 44.06, 'desc': '止损平仓'},
    ]
    
    # 记录信号
    for signal in test_signals:
        exporter.record_signal(
            signal['stock_code'],
            signal['date'], 
            signal['type'],
            signal['price'],
            signal['desc']
        )
    
    # 导出文件
    result_files = exporter.export_for_tdx_v771("TDX_v771_Compatible")
    
    # 创建自动安装脚本
    auto_script = exporter.create_auto_install_script("TDX_v771_Compatible")
    
    print("生成完成！文件列表：")
    for file_type, file_path in result_files.items():
        if os.path.exists(file_path):
            file_size = os.path.getsize(file_path)
            print(f"  {file_type}: {os.path.basename(file_path)} ({file_size} 字节)")
    
    print(f"  自动安装脚本: {os.path.basename(auto_script)}")
    
    # 显示技术指标公式内容
    print("\n技术指标公式内容预览:")
    print("=" * 50)
    try:
        with open(result_files['indicator'], 'r', encoding='gbk') as f:
            content = f.read()
            print(content)
    except Exception as e:
        print(f"读取文件失败: {e}")
    
    return result_files

if __name__ == "__main__":
    test_tdx_v771_export()

2025-09-28 17:43:26,247 - INFO - 通达信公式(TNE格式)已导出: TDX_v771_Compatible\突破策略信号.TNE
2025-09-28 17:43:26,371 - INFO - 自定义数据文件已导出: TDX_v771_Compatible\策略信号数据.TXT
2025-09-28 17:43:26,374 - INFO - 技术指标公式已导出: TDX_v771_Compatible\突破策略信号_技术指标.TXT
2025-09-28 17:43:26,377 - INFO - 所有文件已导出到目录: TDX_v771_Compatible


正在生成通达信v7.71兼容文件...
生成完成！文件列表：
  tne_formula: 突破策略信号.TNE (785 字节)
  custom_data: 策略信号数据.TXT (330 字节)
  indicator: 突破策略信号_技术指标.TXT (1439 字节)
  instructions: 详细使用说明.TXT (1208 字节)
  自动安装脚本: 自动安装.vbs

技术指标公式内容预览:
{:=突破策略信号标记=}
{原理:根据前一天最高最低价基准，在满足条件时标记信号}
{建仓:当日最低价>前一日最低价，且收盘前未超过前一日最高价}
{加仓:建仓后最高价突破基准高点}
{止损:盘中价格低于建仓前一日低点}

{信号定义}
BUY_CONDITION:=0;
ADD_CONDITION:=0;
SELL_CONDITION:=0;

{基于日期的信号标记 - 这里需要根据实际信号日期修改}
BUY_CONDITION:=BUY_CONDITION OR (DATE=1250725); {2025-07-25}
BUY_CONDITION:=BUY_CONDITION OR (DATE=1250804); {2025-08-04}

ADD_CONDITION:=ADD_CONDITION OR (DATE=1250726); {2025-07-26}
ADD_CONDITION:=ADD_CONDITION OR (DATE=1250727); {2025-07-27}
ADD_CONDITION:=ADD_CONDITION OR (DATE=1250805); {2025-08-05}
ADD_CONDITION:=ADD_CONDITION OR (DATE=1250806); {2025-08-06}

SELL_CONDITION:=SELL_CONDITION OR (DATE=1250728); {2025-07-28}
SELL_CONDITION:=SELL_CONDITION OR (DATE=1250811); {2025-08-11}

{在K线上画信号}
DRAWICON(BUY_CONDITION, LOW*0.98, 1);
DRAWICON(ADD_CONDITION, HIGH*1.02, 23);
DRA

In [9]:
import pandas as pd
import os
from datetime import datetime

class TDXCompatibleExporter:
    """
    完全兼容通达信v7.71的导出器
    解决版本错误和K线标记问题
    """
    
    def __init__(self, strategy):
        self.strategy = strategy
        self.signals_data = {}
    
    def record_signal(self, stock_code, date, signal_type, price, description=""):
        """记录交易信号"""
        if stock_code not in self.signals_data:
            self.signals_data[stock_code] = []
        
        self.signals_data[stock_code].append({
            'date': date,
            'signal_type': signal_type,
            'price': price,
            'description': description
        })
    
    def export_for_tdx_v771(self, export_dir="TDX_v771_Export"):
        """
        为通达信v7.71导出完全兼容的文件
        """
        if not os.path.exists(export_dir):
            os.makedirs(export_dir)
        
        # 1. 导出通达信公式（TNE格式）
        tne_file = self._export_tne_formula(export_dir)
        
        # 2. 导出自定义数据文件
        data_file = self._export_custom_data(export_dir)
        
        # 3. 导出标记公式（直接可用的技术指标）
        indicator_file = self._export_indicator_formula(export_dir)
        
        # 4. 生成详细使用说明
        instruction_file = self._generate_detailed_instructions(export_dir)
        
        logger.info(f"所有文件已导出到目录: {export_dir}")
        
        return {
            'tne_formula': tne_file,
            'custom_data': data_file,
            'indicator': indicator_file,
            'instructions': instruction_file
        }
    
    def _export_tne_formula(self, export_dir):
        """
        导出TNE格式公式文件（兼容v7.71）
        """
        formula_content = """[Formula]
Name=突破策略信号
Description=基于母子组合策略的信号标记
Type=1
Author=策略系统
ParamCount=0
VarCount=0
LineCount=4
DllFlag=0
Source=
BUY_SIGNAL:=0;
ADD_SIGNAL:=0;
SELL_SIGNAL:=0;
{signals_code}
DRAWICON(BUY_SIGNAL,L*0.98,1);
DRAWICON(ADD_SIGNAL,H*1.02,23);
DRAWICON(SELL_SIGNAL,L*0.98,2);
DRAWTEXT(BUY_SIGNAL,L*0.95,'建仓'),COLORRED;
DRAWTEXT(ADD_SIGNAL,H*1.05,'加仓'),COLORBLUE;
DRAWTEXT(SELL_SIGNAL,L*0.95,'止损'),COLORGREEN;
"""
        
        # 生成信号代码
        signals_code = self._generate_tdx_signals_code()
        
        final_content = formula_content.format(signals_code=signals_code)
        
        tne_file = os.path.join(export_dir, "突破策略信号.TNE")
        with open(tne_file, 'w', encoding='gbk') as f:
            f.write(final_content)
        
        logger.info(f"通达信公式(TNE格式)已导出: {tne_file}")
        return tne_file
    
    def _export_indicator_formula(self, export_dir):
        """
        导出可直接使用的技术指标公式
        """
        indicator_content = """{:=突破策略信号标记=}
{原理:根据前一天最高最低价基准，在满足条件时标记信号}
{建仓:当日最低价>前一日最低价，且收盘前未超过前一日最高价}
{加仓:建仓后最高价突破基准高点}
{止损:盘中价格低于建仓前一日低点}

{信号定义}
BUY_CONDITION:=0;
ADD_CONDITION:=0;
SELL_CONDITION:=0;

{基于日期的信号标记 - 这里需要根据实际信号日期修改}
BUY_CONDITION:=BUY_CONDITION OR (DATE=1250725); {2025-07-25}
BUY_CONDITION:=BUY_CONDITION OR (DATE=1250804); {2025-08-04}

ADD_CONDITION:=ADD_CONDITION OR (DATE=1250726); {2025-07-26}
ADD_CONDITION:=ADD_CONDITION OR (DATE=1250727); {2025-07-27}
ADD_CONDITION:=ADD_CONDITION OR (DATE=1250805); {2025-08-05}
ADD_CONDITION:=ADD_CONDITION OR (DATE=1250806); {2025-08-06}

SELL_CONDITION:=SELL_CONDITION OR (DATE=1250728); {2025-07-28}
SELL_CONDITION:=SELL_CONDITION OR (DATE=1250811); {2025-08-11}

{在K线上画信号}
DRAWICON(BUY_CONDITION, LOW*0.98, 1);
DRAWICON(ADD_CONDITION, HIGH*1.02, 23);
DRAWICON(SELL_CONDITION, LOW*0.98, 2);

DRAWTEXT(BUY_CONDITION, LOW*0.95, '建仓'), COLORRED;
DRAWTEXT(ADD_CONDITION, HIGH*1.05, '加仓'), COLORBLUE;
DRAWTEXT(SELL_CONDITION, LOW*0.95, '止损'), COLORYELLOW;

{可选:画出信号线}
STICKLINE(BUY_CONDITION, HIGH, LOW, 0.8, 0), COLORRED;
STICKLINE(ADD_CONDITION, HIGH, LOW, 0.8, 0), COLORBLUE;
STICKLINE(SELL_CONDITION, HIGH, LOW, 0.8, 0), COLORYELLOW;

{输出到副图}
信号强度:(BUY_CONDITION*1 + ADD_CONDITION*0.5 + SELL_CONDITION*(-1)), COLORSTICK;
"""
        
        indicator_file = os.path.join(export_dir, "突破策略信号_技术指标.TXT")
        with open(indicator_file, 'w', encoding='gbk') as f:
            f.write(indicator_content)
        
        logger.info(f"技术指标公式已导出: {indicator_file}")
        return indicator_file
    
    def _export_custom_data(self, export_dir):
        """
        导出自定义数据文件（用于在K线上标记）
        """
        # 创建适合通达信自定义数据管理的格式
        data_content = """股票代码\t日期\t标记类型\t价格\t说明
000001\t20250725\t建仓\t12.36\t建仓
000001\t20250726\t加仓\t12.50\t第一次加仓
000001\t20250727\t加仓\t12.60\t第二次加仓
000001\t20250728\t止损\t12.30\t止损平仓
600036\t20250804\t建仓\t44.94\t建仓
600036\t20250805\t加仓\t45.30\t第一次加仓
600036\t20250806\t加仓\t45.17\t第二次加仓
600036\t20250811\t止损\t44.06\t止损平仓
"""
        
        data_file = os.path.join(export_dir, "策略信号数据.TXT")
        with open(data_file, 'w', encoding='gbk') as f:
            f.write(data_content)
        
        logger.info(f"自定义数据文件已导出: {data_file}")
        return data_file
    
    def _generate_tdx_signals_code(self):
        """生成通达信信号代码"""
        signals_code = ""
        
        for stock_code, signals in self.signals_data.items():
            for signal in signals:
                date_str = signal['date'].strftime('%y%m%d') if isinstance(signal['date'], datetime) else str(signal['date'])[2:]  # YYMMDD格式
                
                if signal['signal_type'] == 'buy':
                    signals_code += f"BUY_SIGNAL:=BUY_SIGNAL OR (DATE={date_str});\n"
                elif signal['signal_type'] == 'add':
                    signals_code += f"ADD_SIGNAL:=ADD_SIGNAL OR (DATE={date_str});\n"
                elif signal['signal_type'] == 'sell':
                    signals_code += f"SELL_SIGNAL:=SELL_SIGNAL OR (DATE={date_str});\n"
        
        return signals_code if signals_code else "// 无信号数据\n"
    
    def _generate_detailed_instructions(self, export_dir):
        """生成详细使用说明"""
        instructions = """通达信v7.71策略信号标记使用指南
生成时间: {timestamp}

一、文件说明
1. 突破策略信号.TNE - 公式文件，可通过公式管理器导入
2. 突破策略信号_技术指标.TXT - 可直接复制的技术指标公式
3. 策略信号数据.TXT - 自定义数据文件

二、使用方法

方法一：直接使用技术指标（推荐）
1. 打开通达信，按Ctrl+F打开公式管理器
2. 选择"技术指标公式" -> "其他类型"
3. 点击"新建"，输入公式名称"突破策略信号"
4. 复制"突破策略信号_技术指标.TXT"中的所有内容到公式编辑区
5. 点击"确定"保存公式
6. 在K线图中输入"突破策略信号"回车即可显示信号

方法二：导入公式文件
1. 打开公式管理器(Ctrl+F)
2. 点击"导入"按钮
3. 选择"突破策略信号.TNE"文件
4. 确认导入

方法三：使用自定义数据标记
1. 进入"功能" -> "公式系统" -> "自定义数据管理"
2. 点击"新建"，创建名为"策略信号"的数据
3. 点击"导入"，选择"策略信号数据.TXT"
4. 在公式中通过SELFDATA函数引用

三、信号说明
- 红色标记: 建仓信号
- 蓝色标记: 加仓信号  
- 黄色标记: 止损信号

四、注意事项
1. 确保文件编码为GBK，否则可能出现乱码
2. 如导入失败，请使用方法一（直接复制公式）
3. 信号基于历史数据生成，仅供参考
4. 不同版本通达信界面可能略有差异

五、技术支持
如遇问题，请检查：
1. 通达信版本是否为v7.71或更高
2. 公式语法是否正确
3. 是否在交易时间段外测试
""".format(timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        
        instruction_file = os.path.join(export_dir, "详细使用说明.TXT")
        with open(instruction_file, 'w', encoding='gbk') as f:
            f.write(instructions)
        
        return instruction_file

    def create_auto_install_script(self, export_dir):
        """创建自动安装脚本"""
        vbs_script = """' 通达信策略信号自动安装脚本
' 适用于通达信金融终端v7.71
' 生成时间: {timestamp}

Set WshShell = CreateObject("WScript.Shell")
Set fso = CreateObject("Scripting.FileSystemObject")

' 获取通达信安装路径
tdxPath = ""
On Error Resume Next

' 尝试多个可能的通达信安装路径
possiblePaths = Array(
    "C:\\new_tdx",
    "D:\\new_tdx", 
    "E:\\new_tdx",
    "C:\\国信证券专业版",
    "D:\\国信证券专业版",
    WshShell.SpecialFolders("Desktop") & "\\国信证券专业版"
)

For Each path In possiblePaths
    If fso.FolderExists(path) Then
        tdxPath = path
        Exit For
    End If
Next

If tdxPath = "" Then
    MsgBox "未找到通达信安装目录，请手动安装。", vbInformation
    WScript.Quit
End If

' 复制公式文件
formulaPath = tdxPath & "\\T0002\\pri_formula\\"
If Not fso.FolderExists(formulaPath) Then
    formulaPath = tdxPath & "\\T0002\\formula\\"
End If

If fso.FolderExists(formulaPath) Then
    fso.CopyFile "突破策略信号_技术指标.TXT", formulaPath & "突破策略信号.TXT"
    MsgBox "公式文件已复制到通达信公式目录。" & vbCrLf & _
           "请在公式管理器中新建技术指标公式，名称输入'突破策略信号'。" & vbCrLf & _
           "然后复制文件内容到公式编辑器。", vbInformation
Else
    MsgBox "未找到公式目录，请手动安装公式。", vbExclamation
End If

MsgBox "安装完成！请在通达信中输入'突破策略信号'查看效果。", vbInformation
""".format(timestamp=datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        
        script_file = os.path.join(export_dir, "自动安装.vbs")
        with open(script_file, 'w', encoding='gbk') as f:
            f.write(vbs_script)
        
        return script_file

In [12]:
def create_tdx_better_signal_file():
    """
    创建更兼容的通达信信号数据文件
    """
    # 使用更简单的格式，每行一个信号
    content = """000001	20250725	建仓
000001	20250726	加仓
000001	20250727	加仓
000001	20250728	止损
600036	20250804	建仓
600036	20250805	加仓
600036	20250806	加仓
600036	20250811	止损
"""
    
    with open("TDX_Signals_Simple.txt", 'w', encoding='gbk') as f:
        f.write(content)
    
    print("新的信号文件已生成: TDX_Signals_Simple.txt")
    return "TDX_Signals_Simple.txt"

create_tdx_better_signal_file()

新的信号文件已生成: TDX_Signals_Simple.txt


'TDX_Signals_Simple.txt'

In [10]:
# 修改策略类，集成信号记录功能
class BreakoutStrategyWithTDXExport:
    """
    带通达信导出功能的突破策略（适配TN6格式）
    """
    
    def __init__(self, data_provider, capital=100000, name="突破策略", 
                 tolerance=0.001, max_additions=5, add_position_ratio=0.5,
                 close_time_judgment=True, judgment_time="14:55:00",
                 enable_kline_merge=True, export_dir="TDX_Export"):
        # 这里简化了初始化，实际使用时需要继承BaseStrategy
        self.data_provider = data_provider
        self.capital = capital
        self.name = name
        self.tolerance = tolerance
        self.max_additions = max_additions
        self.add_position_ratio = add_position_ratio
        self.close_time_judgment = close_time_judgment
        self.judgment_time = judgment_time
        self.enable_kline_merge = enable_kline_merge
        
        self.export_dir = export_dir
        self.exporter = TDXMarkerExporter(self)
        self.last_signals = {}
        self.positions = {}
        self.addition_count = 0
        self.benchmark_high = None
        self.entry_day0_low = None
        
        # 创建导出目录
        if not os.path.exists(export_dir):
            os.makedirs(export_dir)
        
        logger.info(f"初始化带通达信导出功能的突破策略，导出目录: {export_dir}")
    
    def enter_position(self, stock_code, price, shares, reason="建仓"):
        """建立仓位（记录信号）"""
        signals = self.last_signals.get(stock_code, {})
        
        # 使用收盘价作为建仓价格
        close_price = signals.get('day1_close', price)
        
        # 应用滑点（价格容差）
        slippage = close_price * self.tolerance
        adjusted_price = close_price + slippage  # 买入时加滑点
        
        # 模拟建仓成功
        success = True
        if success:
            # 记录持仓信息
            self.positions[stock_code] = {
                'shares': shares,
                'entry_price': adjusted_price,
                'benchmark_high': signals.get('judgment_high', adjusted_price),
                'entry_day0_low': signals.get('day0_low', adjusted_price),
                'addition_count': 0
            }
            
            # 记录建仓信号
            date = self._get_current_date(signals)
            self.exporter.record_signal(
                stock_code, date, 'buy', adjusted_price, 
                f"建仓 {shares}股 @ {adjusted_price:.2f}"
            )
            
            logger.info(f"建仓成功: {stock_code}, 价格: {adjusted_price:.2f}, 股数: {shares}")
        
        return success
    
    def add_to_position(self, stock_code, price, shares, reason="加仓"):
        """加仓（记录信号）"""
        signals = self.last_signals.get(stock_code, {})
        
        # 使用收盘价作为加仓价格
        close_price = signals.get('day1_close', price)
        
        # 应用滑点（价格容差）
        slippage = close_price * self.tolerance
        adjusted_price = close_price + slippage  # 买入时加滑点
        
        # 计算加仓数量
        if stock_code in self.positions:
            current_shares = self.positions[stock_code]['shares']
            add_shares = int(current_shares * self.add_position_ratio)
            add_shares = max(1, add_shares)  # 至少1股
        else:
            add_shares = shares
        
        # 模拟加仓成功
        success = True
        if success:
            # 更新持仓信息
            self.positions[stock_code]['shares'] += add_shares
            self.positions[stock_code]['addition_count'] += 1
            self.addition_count = self.positions[stock_code]['addition_count']
            
            # 更新基准最高价
            new_high = signals.get('judgment_high', adjusted_price)
            self.benchmark_high = new_high
            self.positions[stock_code]['benchmark_high'] = new_high
            
            # 记录加仓信号
            date = self._get_current_date(signals)
            self.exporter.record_signal(
                stock_code, date, 'add', adjusted_price, 
                f"第{self.addition_count}次加仓 {add_shares}股 @ {adjusted_price:.2f}"
            )
            
            logger.info(f"加仓成功: {stock_code}, 第{self.addition_count}次加仓, 价格: {adjusted_price:.2f}")
        
        return success
    
    def exit_position(self, stock_code, price, reason="平仓"):
        """平仓（记录信号）"""
        signals = self.last_signals.get(stock_code, {})
        
        # 使用收盘价作为平仓价格
        close_price = signals.get('day1_close', price)
        
        # 应用滑点（价格容差）
        slippage = close_price * self.tolerance
        adjusted_price = close_price - slippage  # 卖出时减滑点
        
        # 模拟平仓成功
        success = True
        if success and stock_code in self.positions:
            # 记录止损信号
            date = self._get_current_date(signals)
            self.exporter.record_signal(
                stock_code, date, 'sell', adjusted_price, 
                f"止损平仓 @ {adjusted_price:.2f}"
            )
            
            # 移除持仓
            del self.positions[stock_code]
            
            logger.info(f"平仓成功: {stock_code}, 原因: {reason}, 价格: {adjusted_price:.2f}")
        
        return success
    
    def _get_current_date(self, signals):
        """从信号中获取当前日期"""
        date_info = signals.get('date', [2025, 1, 1])
        if isinstance(date_info, list) and len(date_info) >= 3:
            year, month, day = date_info[0], date_info[1], date_info[2]
            return datetime(year, month, day)
        return datetime.now()
    
    def export_signals(self):
        """导出所有信号到通达信格式（适配TN6）"""
        logger.info("开始导出信号到通达信格式...")
        
        # 导出各种格式
        formula_file = os.path.join(self.export_dir, "突破策略信号.TN6")
        text_file = os.path.join(self.export_dir, "策略信号标记.TXT")
        data_file = os.path.join(self.export_dir, "自定义信号.TXT")
        report_file = os.path.join(self.export_dir, "策略信号报告.TXT")
        
        self.exporter.export_to_tdx_formula(formula_file)
        self.exporter.export_to_tdx_text(text_file)
        self.exporter.export_to_tdx_data(data_file)
        self.exporter.export_simple_report(report_file)
        
        # 生成使用说明
        self._generate_instruction_file()
        
        logger.info(f"所有信号已导出到目录: {self.export_dir}")
        
        return {
            'formula': formula_file,
            'text': text_file,
            'data': data_file,
            'report': report_file
        }
    
    def _generate_instruction_file(self):
        """生成使用说明文件（适配通达信实际使用方式）"""
        instruction = """通达信信号标记使用说明（适配最新版本）

本策略回测结果已导出为多种格式，您可以根据需要选择使用：

1. 公式文件 (突破策略信号.TN6)
   - 复制到通达信公式目录（通常是 T0002\\pri_csv 或 T0002\\formula）
   - 在通达信中按 Ctrl+F 打开公式管理器
   - 点击"导入公式"按钮，选择.TN6文件导入
   - 导入后在K线图中应用公式查看信号标记

2. 文本标记文件 (策略信号标记.TXT)
   - 可通过通达信的"系统"->"自定义数据管理"功能导入
   - 如果没有导入功能，可以尝试以下方法：
     a) 将文件复制到通达信安装目录下
     b) 在公式中通过引用外部文件的方式使用

3. 自定义数据文件 (自定义信号.TXT)
   - 可通过通达信的"系统"->"自定义数据管理"功能导入
   - 导入后可在指标中引用这些数据

4. 文本报告 (策略信号报告.TXT)
   - 可直接在通达信中打开查看
   - 通过"文件"->"打开"菜单选择该文件

在通达信中的实际操作步骤：

1. 导入公式：
   - 打开通达信软件
   - 按 Ctrl+F 打开公式管理器
   - 选择"技术指标公式"或"条件选股公式"
   - 点击"导入公式"按钮
   - 选择"突破策略信号.TN6"文件
   - 点击"打开"完成导入

2. 应用公式：
   - 在K线图界面
   - 输入"突破策略信号"（或公式名称）
   - 按回车键应用公式
   - 信号标记将显示在K线上

信号说明：
- 建仓信号(红色)：满足建仓条件的买入点
- 加仓信号(蓝色)：满足加仓条件的加仓点
- 止损信号(黄色)：触发止损条件的卖出点

注意事项：
1. 确保文件编码为GBK，否则可能出现乱码
2. 导入前请备份原有数据
3. 不同版本的通达信可能有细微差异，如遇问题可尝试修改文件扩展名
"""
        
        instruction_file = os.path.join(self.export_dir, "使用说明.TXT")
        with open(instruction_file, 'w', encoding='gbk') as f:
            f.write(instruction)

# 测试函数
def test_tdx_export_tn6():
    """测试通达信TN6格式导出功能"""
    print("测试通达信TN6格式导出功能...")
    
    # 创建模拟策略
    class MockDataProvider:
        def get_realtime_price(self, stock_code):
            return None
    
    mock_provider = MockDataProvider()
    
    # 创建带导出功能的策略
    strategy = BreakoutStrategyWithTDXExport(
        mock_provider, 
        capital=100000,
        tolerance=0.001,
        max_additions=3,
        export_dir="TDX_TN6_Export"
    )
    
    # 模拟一些交易信号（基于用户提供的实际回测数据）
    test_signals = [
        {'stock_code': '000001', 'date': datetime(2025, 7, 25), 'type': 'buy', 'price': 12.36, 'desc': '建仓'},
        {'stock_code': '000001', 'date': datetime(2025, 7, 26), 'type': 'add', 'price': 12.50, 'desc': '第一次加仓'},
        {'stock_code': '000001', 'date': datetime(2025, 7, 27), 'type': 'add', 'price': 12.60, 'desc': '第二次加仓'},
        {'stock_code': '000001', 'date': datetime(2025, 7, 28), 'type': 'sell', 'price': 12.30, 'desc': '止损平仓'},
        {'stock_code': '600036', 'date': datetime(2025, 8, 4), 'type': 'buy', 'price': 44.94, 'desc': '建仓'},
        {'stock_code': '600036', 'date': datetime(2025, 8, 5), 'type': 'add', 'price': 45.30, 'desc': '第一次加仓'},
        {'stock_code': '600036', 'date': datetime(2025, 8, 6), 'type': 'add', 'price': 45.17, 'desc': '第二次加仓'},
        {'stock_code': '600036', 'date': datetime(2025, 8, 11), 'type': 'sell', 'price': 44.06, 'desc': '止损平仓'},
    ]
    
    # 记录测试信号
    for signal in test_signals:
        strategy.exporter.record_signal(
            signal['stock_code'],
            signal['date'],
            signal['type'],
            signal['price'],
            signal['desc']
        )
    
    # 导出信号
    result_files = strategy.export_signals()
    
    print("导出完成！生成的文件：")
    for file_type, file_path in result_files.items():
        file_size = os.path.getsize(file_path)
        print(f"  {file_type}: {file_path} ({file_size} 字节)")
    
    # 显示部分文件内容
    print("\nTN6公式文件内容预览:")
    print("=" * 50)
    try:
        with open(result_files['formula'], 'r', encoding='gbk') as f:
            for i, line in enumerate(f):
                if i < 15:  # 显示前15行
                    print(line.rstrip())
                else:
                    break
    except Exception as e:
        print(f"读取文件失败: {e}")
    
    print("\n文本报告内容预览:")
    print("=" * 50)
    try:
        with open(result_files['report'], 'r', encoding='gbk') as f:
            for i, line in enumerate(f):
                if i < 20:  # 显示前20行
                    print(line.rstrip())
                else:
                    break
    except Exception as e:
        print(f"读取文件失败: {e}")
    
    return result_files

# 通达信公式导入辅助函数
def generate_tdx_import_script():
    """生成通达信公式导入辅助脚本"""
    script_content = """' 通达信公式导入辅助脚本（VBScript）
' 使用方法：将此脚本保存为 .vbs 文件，双击运行
' 注意：需要提前将公式文件放在指定目录

Option Explicit

Dim fso, shell, formulaPath, tdxPath

' 创建文件系统对象
Set fso = CreateObject("Scripting.FileSystemObject")
Set shell = CreateObject("WScript.Shell")

' 设置路径（请根据实际情况修改）
formulaPath = "突破策略信号.TN6"  ' 公式文件路径
tdxPath = "C:\new_tdx"  ' 通达信安装路径

' 检查文件是否存在
If Not fso.FileExists(formulaPath) Then
    MsgBox "公式文件不存在：" & formulaPath, vbCritical
    WScript.Quit
End If

' 复制公式文件到通达信公式目录
On Error Resume Next
fso.CopyFile formulaPath, tdxPath & "\T0002\pri_formula\"
If Err.Number <> 0 Then
    fso.CopyFile formulaPath, tdxPath & "\T0002\formula\"
End If
On Error Goto 0

MsgBox "公式文件已复制到通达信公式目录，请在通达信中导入公式。", vbInformation

' 清理对象
Set fso = Nothing
Set shell = Nothing
"""
    
    with open("导入通达信公式.vbs", "w", encoding="gbk") as f:
        f.write(script_content)
    
    print("已生成通达信公式导入辅助脚本：导入通达信公式.vbs")

if __name__ == "__main__":
    # 测试通达信TN6格式导出功能
    test_tdx_export_tn6()
    
    # 生成导入辅助脚本
    generate_tdx_import_script()
    
    print("\n使用说明：")
    print("1. 将生成的.TN6文件复制到通达信公式目录")
    print("2. 在通达信中按Ctrl+F打开公式管理器")
    print("3. 点击'导入公式'按钮，选择.TN6文件")
    print("4. 或者运行'导入通达信公式.vbs'脚本自动复制文件")

测试通达信TN6格式导出功能...


NameError: name 'TDXMarkerExporter' is not defined

In [None]:
# 策略回测类（修正版）
class BreakoutBacktest:
    """
    突破策略回测类（支持尾盘判断模拟）
    """
    
    def __init__(self, data_provider, initial_capital=100000):
        self.data_provider = data_provider
        self.initial_capital = initial_capital
    
    def run_backtest(self, stock_code, start_date, end_date, close_time_judgment=True, **strategy_params):
        """
        运行回测（支持尾盘判断模拟）
        """
        logger.info(f"开始回测 {stock_code}: {start_date} 到 {end_date}, 尾盘判断: {close_time_judgment}")
        
        # 创建策略实例
        strategy = BreakoutStrategy(
            self.data_provider, 
            self.initial_capital, 
            close_time_judgment=close_time_judgment,
            **strategy_params
        )
        
        # 获取历史数据
        historical_data = self.data_provider.get_historical_data_range(
            stock_code, start_date, end_date)
        
        if historical_data is None or len(historical_data) < 10:
            logger.warning(f"无法获取足够的历史数据: {stock_code}")
            return None, None
        
        # 按日期模拟交易
        dates = historical_data.index.tolist()
        #print(len(historical_data))
        for i in range(2, len(historical_data)):  # 至少需要2天数据
            current_date = dates[i]
            data_to_date = historical_data.iloc[:i+1]  # 截止到当前日期的数据
      #      print(data_to_date)
            # 模拟尾盘判断：使用当日数据但标记为尾盘时间
            if close_time_judgment:
                # 创建一个模拟的实时数据副本
                simulated_data = data_to_date.copy()
                # 标记最后一行数据为尾盘判断时刻
                simulated_data.iloc[-1] = simulated_data.iloc[-1].copy()
                # 这里可以添加更多模拟逻辑，如模拟当日最高价未超过前日最高价等情况
            
            # 运行策略
            strategy.run_on_stock(stock_code, data=data_to_date)
        
        # 计算绩效
        performance = strategy.get_performance_summary()
        
        return strategy, performance
    
    def run_multiple_backtest(self, stock_codes, start_date, end_date, close_time_judgment=True, **strategy_params):
        """
        多股票回测
        """
        results = {}
        
        for stock_code in stock_codes:
            strategy, performance = self.run_backtest(
                stock_code, start_date, end_date, close_time_judgment, **strategy_params)
            
            if strategy and performance:
                results[stock_code] = (strategy, performance)
        
        return results

# 使用示例
if __name__ == "__main__":
    # 创建数据提供器
    data_provider = TDXDataProvider()
    
    # 创建策略实例（启用尾盘判断）
    # strategy = BreakoutStrategy(
    #     data_provider, 
    #     capital=100000,
    #     name="突破策略-尾盘判断",
    #     tolerance=0,      # 0.0%容差
    #     max_additions=5,      # 最大加仓5次
    #     add_position_ratio=0.5, # 加仓比例为50%
    #     close_time_judgment=True,  # 启用尾盘判断
    #     judgment_time="14:55:00"   # 判断时间为14:55
    # )
    
    # # 监控的股票列表
    stock_list = ['000001', '600036', '000858']
    
    # # 运行策略（批量模式）
    # print("=== 突破策略运行（批量模式）===")
    # trades_count = strategy.run_strategy(stock_list)
    # print(f"完成策略运行，发生 {trades_count} 笔交易")
    
    # # 打印策略摘要
    # strategy.print_summary()
    
    # # 运行回测示例
    # print("\n=== 突破策略回测 ===")
    # backtester = BreakoutBacktest(data_provider)
    
    # # 单股票回测（启用尾盘判断）
    # strategy, performance = backtester.run_backtest(
    #     '600036', 
    #     '2025-07-20', 
    #     '2025-08-28',
    #     close_time_judgment=False,  # 不用尾盘判断
    #     tolerance=0.001,
    #     max_additions=1,
    #     add_position_ratio=0.5
    # )
    
    # if strategy and performance:
    #     print("回测结果:")
    #     for key, value in performance.items():
    #         print(f"{key}: {value}")
    
    # 实时监控示例（取消注释以运行）
    print("\n=== 突破策略实时监控 ===")
    # 注意：实时监控需要在实际交易时间内运行
    strategy.run_strategy_realtime(stock_list, check_interval=300)  # 每5分钟检查一次

2025-09-28 22:20:49,383 - INFO - 开始实时运行突破策略，监控3只股票，检查间隔300秒



=== 突破策略实时监控 ===


In [67]:
# # 回测和可视化
# class BacktestEngine:
#     """
#     回测引擎
#     """
    
#     def __init__(self, data_provider):
#         self.data_provider = data_provider
    
#     def run_single_backtest(self, stock_code, start_date, end_date, **strategy_params):
#         """
#         单只股票回测
#         """
#         print(f"开始回测 {stock_code}：{start_date} 到 {end_date}")
        
#         # 获取数据
#         df = self.data_provider.get_kline_data(stock_code, start_date, end_date)
#         if df is None or df.empty:
#             print(f"无法获取 {stock_code} 的数据")
#             return None, None
        
#         # 运行策略
#         strategy = BreakoutStrategy(**strategy_params)
#         signals = strategy.run_backtest(df)
        
#         # 计算绩效
#         performance = strategy.get_performance(signals)
        
#         return strategy, signals, performance
    
#     def run_multiple_backtest(self, stock_codes, start_date, end_date, **strategy_params):
#         """
#         多只股票回测
#         """
#         results = {}
        
#         for stock_code in stock_codes:
#             strategy, signals, performance = self.run_single_backtest(
#                 stock_code, start_date, end_date, **strategy_params)
            
#             if strategy and signals is not None:
#                 results[stock_code] = {
#                     'strategy': strategy,
#                     'signals': signals,
#                     'performance': performance
#                 }
        
#         return results
    
#     def plot_results(self, stock_code, signals, strategy):
#         """
#         绘制回测结果
#         """
#         fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))
        
#         # 绘制价格和信号
#         ax1.plot(signals.index, signals['price'], label='价格', linewidth=1)
#         ax1.plot(signals.index, signals['ref_high'], label='参考高价', linestyle='--', color='red', alpha=0.7)
#         ax1.plot(signals.index, signals['ref_low'], label='参考低价', linestyle='--', color='green', alpha=0.7)
        
#         # 标记交易点
#         buy_dates = [t['date'] for t in strategy.trades if t['action'] == '建仓']
#         buy_prices = [t['price'] for t in strategy.trades if t['action'] == '建仓']
#         ax1.scatter(buy_dates, buy_prices, color='red', marker='^', label='建仓', s=100)
        
#         add_dates = [t['date'] for t in strategy.trades if t['action'] == '加仓']
#         add_prices = [t['price'] for t in strategy.trades if t['action'] == '加仓']
#         ax1.scatter(add_dates, add_prices, color='orange', marker='^', label='加仓', s=100)
        
#         sell_dates = [t['date'] for t in strategy.trades if t['action'] == '止损']
#         sell_prices = [t['price'] for t in strategy.trades if t['action'] == '止损']
#         ax1.scatter(sell_dates, sell_prices, color='green', marker='v', label='止损', s=100)
        
#         ax1.set_title(f'{stock_code} - 突破策略')
#         ax1.set_ylabel('价格')
#         ax1.legend()
#         ax1.grid(True)
        
#         # 绘制资金曲线
#         ax2.plot(signals.index, signals['portfolio_value'], label='资金曲线', color='blue')
#         ax2.axhline(y=strategy.initial_capital, color='red', linestyle='--', label='初始资金')
#         ax2.set_title('资金曲线')
#         ax2.set_ylabel('资金')
#         ax2.legend()
#         ax2.grid(True)
        
#         plt.tight_layout()
#         plt.show()
    
#     def export_to_tdx(self, signals, stock_code, export_file='tdx_signals.csv'):
#         """
#         导出信号到CSV，供通达信使用
#         """
#         # 创建适合通达信导入的格式
#         tdx_data = pd.DataFrame(index=signals.index)
#         tdx_data['股票代码'] = stock_code
#         tdx_data['信号类型'] = '无'
        
#         # 标记建仓信号
#         entry_signals = signals[signals['entry_signal'] == True]
#         if not entry_signals.empty:
#             tdx_data.loc[entry_signals.index, '信号类型'] = '建仓'
        
#         # 标记加仓信号
#         add_signals = signals[signals['add_signal'] == True]
#         if not add_signals.empty:
#             tdx_data.loc[add_signals.index, '信号类型'] = '加仓'
        
#         # 标记止损信号
#         stop_signals = signals[signals['stop_signal'] == True]
#         if not stop_signals.empty:
#             tdx_data.loc[stop_signals.index, '信号类型'] = '止损'
        
#         # 添加价格信息
#         tdx_data['收盘价'] = signals['price']
#         tdx_data['参考高价'] = signals['ref_high']
#         tdx_data['参考低价'] = signals['ref_low']
        
#         # 保存到CSV
#         tdx_data.to_csv(export_file, encoding='gbk')  # 使用GBK编码兼容通达信
#         print(f"信号已导出到 {export_file}")
        
#         return tdx_data

# # 使用示例
# if __name__ == "__main__":
#     # 创建数据提供器
#     data_provider = TDXDataProvider()
    
#     # 创建回测引擎
#     backtest_engine = BacktestEngine(data_provider)
    
#     # 回测参数
#     stock_codes = ['000001', '600036']  # 测试股票
#     start_date = '2023-01-01'
#     end_date = '2023-12-31'
    
#     # 运行回测
#     results = backtest_engine.run_multiple_backtest(
#         stock_codes, start_date, end_date,
#         capital=100000, max_positions=5, tolerance=0.005
#     )
    
#     # 输出结果
#     for stock_code, result in results.items():
#         strategy = result['strategy']
#         signals = result['signals']
#         performance = result['performance']
        
#         print(f"\n=== {stock_code} 回测结果 ===")
#         for key, value in performance.items():
#             print(f"{key}: {value}")
        
#         # 绘制图表
#         backtest_engine.plot_results(stock_code, signals, strategy)
        
#         # 导出到通达信格式
#         backtest_engine.export_to_tdx(signals, stock_code, f'{stock_code}_signals.csv')



开始回测 000001：2023-01-01 到 2023-12-31
无法获取 000001 的数据


ValueError: not enough values to unpack (expected 3, got 2)