## 导入库

In [6]:
import json
import requests
import time
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import rcParams
from datetime import datetime, timedelta
from matplotlib.dates import DateFormatter, HourLocator

## 1. 数据管理模块

In [30]:
# 安装：pip install Ashare
from Ashare import get_price

class DataManager:
    def load_data(self, symbol, start_date, end_date):
        # 自动兼容多种股票代码格式（如sh600519/sz000001）
        df = get_price(symbol, 
                      frequency='1d', 
                      count=1000,  # 自动计算所需天数
                      end_date=end_date)
        df = df.rename(columns={'date':'trade_date'})
        df.to_csv(f"{self.data_path}{symbol}.csv")
        return df

ModuleNotFoundError: No module named 'Ashare'

In [None]:
# 修改 DataManager 初始化（需先到 https://qos.hk 注册获取证书）
def __init__(self):
    self.license = "b3053f8ec33f11a965f60555a7b9bc59"  # 替换为申请的证书

def load_data(self, symbol, start_date, end_date):
    url = f"http://api.mairui.club/hszbl/fsjy/{symbol}/dn/{self.license}"
    params = {"start": start_date, "end": end_date}
    # 其他处理逻辑保持不变（需验证字段映射）

### 配置文字，参数，初始化数据

In [12]:
# 设置字体为支持中文的字体
rcParams['font.sans-serif'] = ['Microsoft YaHei']  # 或者使用其他支持中文的字体，如 SimHei
rcParams['axes.unicode_minus'] = False

# 配置参数
name_map = {
    #"HK:700": "腾讯控股",
    #"SH:600519": "贵州茅台",
    "SZ:002594": "比亚迪"
}

api_url = "https://api.qos.hk/snapshot?key=b3053f8ec33f11a965f60555a7b9bc59"
update_interval = 30  # 秒

# 初始化数据存储
history_data = {
    code: {'timestamps': [], 'prices': [], 'volumes': [], 'amounts': []}
    for code in name_map
}

### 获取数据

In [14]:
def fetch_data():
    """获取最新行情数据"""
    payload = json.dumps({"codes": list(name_map.keys())})
    headers = {'Content-Type': 'application/json'}
    
    try:
        response = requests.post(api_url, headers=headers, data=payload)
        return response.json()["data"]
    except Exception as e:
        print(f"数据获取失败：{str(e)}")
        return None

## 2. 策略开发模块
* 信号生成器：技术指标（如均线突破、RSI）
* 风险管理：止损/止盈、仓位控制
* 交易执行：订单类型（市价单、限价单）

In [15]:
class Strategy:
    def __init__(self, short_window=5, long_window=20):
        self.short_window = short_window
        self.long_window = long_window
        self.position = 0  # 持仓状态
    
    def generate_signal(self, data):
        data['short_ma'] = data['prices'].rolling(self.short_window).mean()
        data['long_ma'] = data['prices'].rolling(self.long_window).mean()
        
        # 金叉买入，死叉卖出
        if data['short_ma'].iloc[-1] > data['long_ma'].iloc[-1]:
            return 'BUY'
        elif data['short_ma'].iloc[-1] < data['long_ma'].iloc[-1]:
            return 'SELL'
        else:
            return 'HOLD'

## 3. 回测引擎模块
* 功能：模拟历史交易并评估策略表现
* __核心指标__：年化收益率、夏普比率、最大回撤、胜率

In [3]:
class Backtester:
    def __init__(self, initial_capital=100000):
        self.initial_capital = initial_capital
        self.portfolio = {'cash': initial_capital, 'shares': 0}
    
    def run_backtest(self, data, strategy):
        returns = []
        for i in range(len(data)):
            signal = strategy.generate_signal(data.iloc[:i+1])
            price = data.iloc[i]['close']
            if signal == 'BUY' and self.portfolio['cash'] > 0:
                self.portfolio['shares'] = self.portfolio['cash'] // price
                self.portfolio['cash'] -= self.portfolio['shares'] * price
            elif signal == 'SELL' and self.portfolio['shares'] > 0:
                self.portfolio['cash'] += self.portfolio['shares'] * price
                self.portfolio['shares'] = 0
            # 记录每日净值
            total_value = self.portfolio['cash'] + self.portfolio['shares'] * price
            returns.append(total_value / self.initial_capital - 1)
        return returns

## 4. 可视化模块
* __工具__ :Matplotlib/Seaborn 或交互式工具

In [16]:
def update_chart():
    """更新图表"""
    for ax in [ax1, ax2, ax3]:
        ax.clear()
    
    for code in name_map:
        timestamps = [datetime.fromtimestamp(ts) for ts in history_data[code]['timestamps']]
        
        # 价格走势（折线图）
        ax1.plot(timestamps, history_data[code]['prices'], 
                marker='o', label=name_map[code])
        
        # 成交量（柱状图）
        ax2.bar(timestamps, history_data[code]['volumes'],
               width=0.02, alpha=0.5, label=name_map[code])
        
        # 成交额（折线图）
        ax3.plot(timestamps, history_data[code]['amounts'],
                linestyle='--', label=name_map[code])

    # 设置坐标轴格式
    time_format = DateFormatter("%H:%M")
    for ax in [ax1, ax2, ax3]:
        ax.xaxis.set_major_formatter(time_format)
        ax.xaxis.set_major_locator(HourLocator())  # 设置X轴主刻度为每小时
        ax.grid(True)
        ax.legend()

    ax1.set_ylabel('Price (Yuan)')
    ax2.set_ylabel('Volume (in lots)')
    ax3.set_ylabel('Transaction Amount (in ten thousand yuan)')
    
    plt.tight_layout()  # 自动调整布局
    plt.draw()  # 更新绘图
    plt.pause(0.1)

## 5. 风险管理与绩效评估

In [16]:
import numpy as np

def calculate_performance(returns):
    annual_return = np.mean(returns) * 252
    volatility = np.std(returns) * np.sqrt(252)
    sharpe_ratio = annual_return / volatility
    max_drawdown = (np.maximum.accumulate(returns) - returns).max()
    return {
        "年化收益率": annual_return,
        "夏普比率": sharpe_ratio,
        "最大回撤": max_drawdown
    }

In [25]:
def validate_data(df):
    # **检查麦蕊数据完整性**
    required_columns = ['open', 'high', 'low', 'close', 'vol']
    missing = set(required_columns) - set(df.columns)
    if missing:
        raise ValueError(f"数据缺失关键字段: {missing}")
    
    # **检查异常值（如涨跌幅超过20%）**
    df['pct_chg'] = df['close'].pct_change()
    abnormal = df[abs(df['pct_chg']) > 0.2]
    if not abnormal.empty:
        print(f"警告：发现{len(abnormal)}条异常波动数据")

## 运行

In [None]:
def main():
    """主运行循环"""
    strategy = Strategy()  # 初始化策略

    while True:
        start_time = time.time()
        
        # 获取数据
        data = fetch_data()
        if data:
            for stock in data:
                code = stock["c"]
                if code in history_data:
                    # 记录当前时间戳
                    ts = datetime.now().timestamp()
                    try:
                        history_data[code]['timestamps'].append(ts)
                        history_data[code]['prices'].append(stock["lp"])
                        history_data[code]['volumes'].append(int(stock["v"]) / 100)  # 转换为手
                        history_data[code]['amounts'].append(float(stock["t"]) / 10000)  # 转换为万元
                    except KeyError as e:
                        print(f"数据解析失败：{stock}，错误：{e}")

                    # 转换数据为DataFrame
                    data_frame = pd.DataFrame(history_data[code])

                    # 生成交易信号
                    signal = strategy.generate_signal(data_frame)
                    print(f"股票 {name_map[code]} 当前信号：{signal}")
            update_chart()
        
        # 等待下一个周期
        elapsed = time.time() - start_time
        if elapsed < update_interval:
            time.sleep(update_interval - elapsed)

In [None]:
# 创建画布
plt.ion()  # 启用交互模式
fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 8))
fig.suptitle('Real-time Stock Market Monitoring')

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        plt.close()
        print("监控已停止")

<Figure size 1200x800 with 0 Axes>

<Figure size 640x480 with 0 Axes>