In [None]:
import pandas as pd
import os
import glob
from functools import lru_cache  # 缓存装饰器
import logging
import traceback
from pathlib import Path

In [None]:
import sys, pathlib
# 把 tools 的绝对路径插到最前面
sys.path.insert(0, str(pathlib.Path('__file__').resolve().parent.parent / 'Quant_base'))
import Drawpic
import CLbasement
import Basement

In [None]:
# 配置参数集中管理
CONFIG = {
    'sdate': '', #开始日期，如果为空则为数据最早日期
    'edate': '', #结束日期，如果为空则为数据最晚日期
    'data_path': r'F:\study\stock_related\2020_2025data\A股\天粒度', #数据路径
    'stock_list':['SZ.300750'],
    'initial_money': 100000, #初始资金
    'max_hold': 5, #最大持仓数量
}
logging.basicConfig(
    filename='backtest.log',      # 日志文件名
    level=logging.INFO,      # 只记录 INFO 及以上级别
    format='%(asctime)s [%(levelname)s] %(message)s'
)

In [None]:
#数据加载模块
class DataLoader:
    def __init__(self, CONFIG):
        self.data_path = CONFIG['data_path']
        self.stock_list = [p.stem for p in Path(self.data_path).glob('*.csv')] if not CONFIG['stock_list'] else CONFIG['stock_list']
        self.sdate = CONFIG['sdate']
        self.edate = CONFIG['edate']
        # 预加载所有文件路径
        self.files_map = {
            os.path.basename(f).split('.csv')[0]: f 
            for f in glob.glob(os.path.join(self.data_path, "*.csv"))
        }
        # 初始化时立即加载天粒度数据（仅加载一次）
        self.data, self.trade_cal = self._load_data()
        logging.info(f"DataLoader初始化完成: 加载天粒度数据 {self.sdate} 至 {self.edate}, 共 {len(self.trade_cal)} 个交易日")

    def _load_data(self):
        """加载并处理指定日期范围内的所有天粒度数据"""
        stockdf = pd.DataFrame()
        for code in self.stock_list:
            filepath = self.files_map.get(code)
            if not filepath:
                print(f'{code}无数据')
                continue  # 跳过无数据的股票
            try:
                #tmpdf = pd.read_csv(filepath)
                tmpdf = Basement.read_data_local(filepath)
                stockdf = pd.concat([stockdf, tmpdf], axis=0, ignore_index=True)
                stockdf.columns = [['code','datetime','open','close','low','high','volume','amount']]
            except Exception as e:
                logging.info(f"读取天粒度数据 {code} 出错: {str(e)}")

        if self.sdate != '':
            stockdf = stockdf[stockdf['datetime'] >= self.sdate]
        if self.edate != '':
            stockdf = stockdf[stockdf['datetime'] <= self.sdate]
        
        trade_cal = sorted(stockdf['datetime'].drop_duplicates().tolist())
        
        return stockdf,trade_cal


In [None]:
#仓位管理模块
class PositionManager:
    def __init__(self, CONFIG):
        self.cash = CONFIG['initial_money']  # 当前现金
        self.positions = 0
        logging.info('持仓初始化完成')
        
    def buy(self, price, volume):
        """执行买入"""
        price = round(price,2)
        cost = round(price * volume * 100,2)
        self.cash -= cost
        self.positions += volume
        logging.info(f'买入{volume}手，买入价格{price}')
        
    def sell(self, price, volume):
        """执行买入"""
        price = round(price,2)
        cost = round(price * volume * 100,2)
        self.cash += cost
        self.positions -= volume
        logging.info(f'卖出{volume}手，卖出价格{price}')

In [None]:
#策略模块
class Strategy:
    def __init__(self,position_manager):
        self.window_size = 50
        self.position_manager = position_manager
        self.high_price = 0
        logging.info('策略初始化完成')

    def calculate_moving_average(self, prices_list, x):
        """
        简单移动平均（SMA）
        :param prices_list: 按时间顺序的收盘价列表 [float]
        :param x: 周期
        :return: 与 prices_list 等长，前 x-1 个元素保持原值，之后为对应均线
        """
        if x <= 0:
            raise ValueError('周期 x 必须为正整数')
    
        n = len(prices_list)
        ma = prices_list.copy()          # 先复制一份，前 x-1 个位置保留原值
    
        for i in range(x, n + 1):        # 从第 x 个元素开始算
            window = prices_list[i - x:i]
            ma[i - 1] = round(sum(window) / x, 2)
        return ma
    
    def generate_singals(self, df):
        if len(df) < self.window_size:
            return None
        else:
            pricelist = df['close'].tolist()
            curprice = pricelist[-1]
            moving_average = self.calculate_moving_average(pricelist,20)
            A = pricelist[-5:]
            B = moving_average[-5:]

            #当前有持仓,需要判断出厂逻辑
            if self.position_manager.positions > 0:
                if curprice < self.high_price * 0.90:
                    logging.info(f'触发出场信号，价格{curprice},最高价{self.high_price}')
                    sellcount = self.position_manager.positions
                    return {'type':0,'hands':sellcount,'price':curprice}
                else:
                    self.high_price = max(curprice,self.high_price)
            
            #当前没有持仓
            elif self.position_manager.positions == 0:
                #入场条件满足
                if (all(a < b for a, b in zip(A[:-1], B[:-1]))) and (A[-1] > B[-1]):
                    logging.info(f'触发均线入场信号，价格{str(A)},均线{str(B)}')
                    #计算购买量
                    buycount = self.money_management(curprice)
                    if buycount > 0:
                        self.high_price = curprice #记录买入以来最高价格
                        #购买量大于0，买入
                        return {'type':1,'hands':buycount,'price':curprice}

            else:
                return None
            

                        

    def money_management(self,curprice):
        cost = curprice * 100
        if self.position_manager.cash >= cost:
            return 1
        else:
            return 0
        
        

In [None]:
class Backtester:
    def __init__(self, config, data_loader):
        """
        初始化回测器
        参数:
            config: 配置字典
            data_loader: 已初始化的DataLoader实例（外部传入，共享数据）
        """
        self.config = config
        self.data_loader = data_loader
        self.cashlist = []
        if not self.data_loader.trade_cal:
            logging.info("警告：无有效交易日数据，回测可能无法正常运行")

    def run(self):
        # 检查数据有效性
        if not self.data_loader.trade_cal:
            logging.info("无法执行回测：无有效交易日数据")
            return

        # 重置回测状态（便于多次运行）
        self.position_manager = PositionManager(self.config)
        self.strategy = Strategy(self.position_manager)
        self.windowsize = self.strategy.window_size
        self.buysell_point = []
        logging.info('初始化完成，开始回测')
        # 回测主循环
        for i in range(1, len(self.data_loader.trade_cal)+1):
            tmpdf = self.data_loader.data.iloc[max(0, i-self.windowsize):i]
            cur_close = float(tmpdf.tail(1)['close'].values[0])
            cur_date = tmpdf.tail(1)['datetime'].values[0]

            signal = self.strategy.generate_singals(tmpdf)
            if signal:
                #买入逻辑
                if signal['type'] == 1:
                    self.buysell_point.append([i-1,cur_close,1]) #画图中的买点
                    self.position_manager.buy(
                        price=signal['price'],
                        volume=signal['hands']
                    )
                #卖出逻辑
                elif signal['type'] == 0:
                    self.buysell_point.append([i-1,cur_close,0]) #画图中的买点
                    self.position_manager.sell(
                        price=signal['price'],
                        volume=signal['hands']
                    )
            self.cashlist.append(self.position_manager.cash)
            logging.info(f'日期{cur_date}，当前现金{self.position_manager.cash}，持仓{self.position_manager.positions}手')


In [None]:
dloader = DataLoader(CONFIG)

In [None]:
backtester = Backtester(CONFIG,dloader)
backtester.run()

In [None]:
grid = Drawpic.generate_pic_by_df(dloader.data,backtester.buysell_point)
grid.load_javascript()

In [None]:
grid.render_notebook()