In [None]:
#####################
### 导入部分 ###
#####################
import akshare as ak
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import ParameterGrid
from sklearn.metrics import mean_squared_error
from datetime import datetime, timedelta
from collections import Counter
from functools import lru_cache

# 显示所有列(参数设置为None代表显示所有行，也可以自行设置数字)
pd.set_option('display.max_columns', None)
# 显示所有行
pd.set_option('display.max_rows', None)
# 设置数据的显示长度，默认为50
pd.set_option('max_colwidth', 200)
# 禁止自动换行(设置为Flase不自动换行，True反之)
pd.set_option('expand_frame_repr', False)

KEYPOINT = {}

In [None]:
#####################
###  公司基本信息  ###
#####################
def get_company_info(code):
    # 获取市场前缀
    symbol = f"{code}"
    info_dict = {}

    try:
        # 基础信息
        base_info = ak.stock_individual_info_em(symbol=symbol)
        info_dict['股票简称'] = base_info.loc[base_info['item'] == '股票简称', 'value'].values[0]
        info_dict['行业'] = base_info.loc[base_info['item'] == '行业', 'value'].values[0]
        info_dict['上市时间'] = base_info.loc[base_info['item'] == '上市时间', 'value'].values[0]
    except Exception as e:
        print(f"基础信息获取失败: {str(e)}")
        info_dict.update({'股票简称': '未知', '行业': '未知', '上市时间': '未知'})

    try:
        # 发行信息
        stock_ipo_info_df = ak.stock_ipo_info(stock=symbol)
        if not stock_ipo_info_df.empty:
            info_dict['发行价'] = stock_ipo_info_df.loc[stock_ipo_info_df['item'] == '发行价(元)', 'value'].values[0]
        else:
            info_dict['发行价'] = '暂无数据'
    except Exception as e:
        print(f"发行价获取失败: {str(e)}")
        info_dict['发行价'] = '暂无数据'

    try:
        # 分红信息
        stock_history_dividend_df = ak.stock_history_dividend()
        dividend_info = stock_history_dividend_df[stock_history_dividend_df['代码'] == code]
        # 如果找到记录，获取分红次数列的值；如果没找到记录，则为0
        info_dict['分红次数'] = dividend_info['分红次数'].iloc[0] if not dividend_info.empty else 0
    except Exception as e:
        print(f"分红信息获取失败: {str(e)}")
        info_dict['分红次数'] = 0

    try:
        # 机构参与度
        jg_info = ak.stock_comment_detail_zlkp_jgcyd_em(symbol=symbol)
        info_dict['机构参与度'] = f"{jg_info['机构参与度'].values[0]}%"
    except Exception as e:
        print(f"机构参与度获取失败: {str(e)}")
        info_dict['机构参与度'] = '暂无数据'

    try:
        # 市场成本
        cost_info = ak.stock_comment_detail_scrd_cost_em(symbol=symbol)
        info_dict['市场成本'] = f"{cost_info['市场成本'].values[0]}元"
    except Exception as e:
        print(f"市场成本获取失败: {str(e)}")
        info_dict['市场成本'] = '暂无数据'

    # 格式化输出
    print(f"\n===== {code} 公司基本信息 =====")
    print(f"股票简称：{info_dict['股票简称']}")
    print(f"所属行业：{info_dict['行业']}")
    print(f"上市时间：{info_dict['上市时间']}")
    print(f"发行价格：{info_dict['发行价']}")
    print(f"分红次数：{info_dict['分红次数']}次")
    print(f"机构参与：{info_dict['机构参与度']}")
    print(f"成本均价：{info_dict['市场成本']}")

    return info_dict


stock_code = input("请输入6位股票代码: ")
company_info = get_company_info(stock_code)

In [None]:
#####################
###  获取历史数据   ###
#####################
@lru_cache(maxsize=1024)
def get_history_data(code):
    symbol = f"{code}"
    days = 365
    end_date = datetime.now().strftime("%Y%m%d")
    start_date = (datetime.now() - timedelta(days)).strftime("%Y%m%d")
    df = ak.stock_zh_a_hist(symbol=symbol, period="daily", start_date=start_date, end_date=end_date, adjust="qfq")
    print(f"历史{days}天数据获取完成，共获取{len(df)}条记录")
    return df


history_df = get_history_data(stock_code)

In [None]:
#####################
###分析交易量的均匀性###
#####################
def analyze_volume(data):
    if '成交量' not in data.columns:
        print("数据中没有成交量信息，请检查数据是否完整。")
        return

    volume_data = data['成交量'].astype(float)  # 确保成交量是数值类型
    volume_mean = volume_data.mean()
    volume_std = volume_data.std()

    print(f"\n交易量分析:")
    print(f"交易量均值: {volume_mean:.2f}")
    print(f"交易量标准差: {volume_std:.2f}")

    # 判断交易量的均匀性
    if volume_std < (0.5 * volume_mean):
        print("交易量分布较为均匀。")
        keypoint = "警惕"
    else:
        print("交易量分布不均匀，可能存在非量化资金的大量交易行为。")
        keypoint = "观望"

    return keypoint


keypoint = analyze_volume(history_df)
KEYPOINT.setdefault('交易量分布研判', keypoint)

In [None]:
#####################
###   分析价格波动  ###
#####################
def analyze_price_fluctuation(data):
    # 检查数据是否包含收盘价
    if '收盘' not in data.columns:
        print("数据中没有收盘价信息，请检查数据是否完整。")
        return

    # 计算日收益率
    data['日收益率'] = data['收盘'].pct_change()

    # 计算波动率（标准差）
    volatility = data['日收益率'].std()

    print(f"\n价格波动分析:")
    print(f"价格波动率（标准差）: {volatility:.2%}")

    # 判断价格波动是否存在规律
    if volatility < 0.015:  # 假设波动率小于1.5%为规律波动
        print("价格波动较为规律，可能存在量化资金的控制。")
        keypoint = "警惕"
    else:
        print("价格波动较大，可能存在非量化资金的大量交易行为。")
        keypoint = "观望"

    return keypoint


keypoint = analyze_price_fluctuation(history_df)
KEYPOINT.setdefault("价格波动研判", keypoint)

In [None]:
#####################
###   获取筹码分布  ###
#####################
def get_chip_distribution(code):
    symbol = f"{code}"
    df = ak.stock_cyq_em(symbol=symbol, adjust="qfq")
    latest_chip = df.iloc[-1].to_dict()
    print(
        f"最新交易日筹码分布：获利比例={latest_chip['获利比例'] * 100:.2f}% 70集中度={(latest_chip['70集中度'] * 100):.2f}%")

    if latest_chip['获利比例'] <= 0.2:
        keypoint = "√"
    elif 0.15 < latest_chip['获利比例'] <= 0.5:
        keypoint = "-"
    else:
        keypoint = "×"
    return latest_chip, keypoint, latest_chip['平均成本']


_, keypoint, mean_chip = get_chip_distribution(stock_code)
KEYPOINT.setdefault("长期筹码获利比例(√-×)", keypoint)

In [None]:
#####################
###  BBIBOLL计算   ###
#####################
def calculate_bbiboll(df):
    # 原有的BBIBOLL计算部分
    df['MA3'] = df['收盘'].rolling(3).mean()
    df['MA6'] = df['收盘'].rolling(6).mean()
    df['MA12'] = df['收盘'].rolling(12).mean()
    df['MA24'] = df['收盘'].rolling(24).mean()
    df['BBIBOLL'] = (df['MA3'] + df['MA6'] + df['MA12'] + df['MA24']) / 4
    df['UPPER'] = df['BBIBOLL'] + 2 * df['BBIBOLL'].rolling(11).std()
    df['LOWER'] = df['BBIBOLL'] - 2 * df['BBIBOLL'].rolling(11).std()

    # 获取最新数据
    latest = df.iloc[-1]
    latest_price = latest['收盘']
    bbiboll_data = {
        'BBIBOLL': latest['BBIBOLL'],
        'UPPER': latest['UPPER'],
        'LOWER': latest['LOWER'],
        'latest_price': latest_price
    }

    # 打印结果
    print(
        f"最新BBIBOLL值: mid={bbiboll_data['BBIBOLL']:.2f} upper={bbiboll_data['UPPER']:.2f} lower={bbiboll_data['LOWER']:.2f}")
    print(f"最新收盘价: {latest_price:.2f}")

    # 判断当前价格位置
    if latest_price > bbiboll_data['UPPER']:
        print("当前价格高于BBIBOLL上轨")
        keypoint = "×"
    elif latest_price < bbiboll_data['LOWER']:
        print("当前价格低于BBIBOLL下轨")
        keypoint = "√"
    else:
        print("当前价格在BBIBOLL通道内")
        keypoint = "-"

    return bbiboll_data, keypoint, latest_price


# 调用函数
_, keypoint, latest_price = calculate_bbiboll(history_df)
KEYPOINT.setdefault("中期BBIBOLL(√-×)", keypoint)

In [None]:
def calculate_rsi(data, window=14):
    """
    计算相对强弱指标（RSI），判断超买超卖状态。
    :param data: 包含收盘价的DataFrame
    :param window: RSI计算窗口，默认14天
    :return: 包含RSI值的DataFrame
    """
    # 计算涨跌幅
    delta = data['收盘'].diff()
    gain = delta.where(delta > 0, 0)  # 上涨部分
    loss = -delta.where(delta < 0, 0)  # 下跌部分

    # 计算平均涨幅和平均跌幅
    avg_gain = gain.rolling(window).mean()
    avg_loss = loss.rolling(window).mean()

    # 计算RS值
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))

    # 添加RSI列到DataFrame
    data['RSI'] = rsi

    # 判断超买超卖状态
    latest_rsi = data['RSI'].iloc[-1]
    if latest_rsi > 70:
        print(f"当前RSI值为{latest_rsi:.2f}，处于超买状态，可能存在回调风险。")
        keypoint = "×"
    elif latest_rsi < 30:
        print(f"当前RSI值为{latest_rsi:.2f}，处于超卖状态，可能存在反弹机会。")
        keypoint = "√"
    else:
        print(f"当前RSI值为{latest_rsi:.2f}，市场处于正常状态。")
        keypoint = "-"

    return data, keypoint


# 调用RSI计算函数
_, keypoint = calculate_rsi(history_df)
KEYPOINT.setdefault("短期RSI(√-×)", keypoint)

In [None]:
def calculate_macd(data, short_window=12, long_window=26, signal_window=9):
    """
    计算MACD指标，判断趋势和买卖信号。
    :param data: 包含收盘价的DataFrame
    :param short_window: 短期EMA窗口，默认12天
    :param long_window: 长期EMA窗口，默认26天
    :param signal_window: 信号线窗口，默认9天
    :return: 包含MACD指标的DataFrame
    """
    # 计算短期和长期指数移动平均线
    data['EMA_short'] = data['收盘'].ewm(span=short_window, adjust=False).mean()
    data['EMA_long'] = data['收盘'].ewm(span=long_window, adjust=False).mean()

    # 计算DIF（差离值）
    data['DIF'] = data['EMA_short'] - data['EMA_long']

    # 计算DEA（信号线）
    data['DEA'] = data['DIF'].ewm(span=signal_window, adjust=False).mean()

    # 计算MACD柱状图（DIF - DEA）
    data['MACD'] = data['DIF'] - data['DEA']

    # 判断趋势和买卖信号
    latest_dif = data['DIF'].iloc[-1]
    latest_dea = data['DEA'].iloc[-1]

    if latest_dif > latest_dea:
        print("DIF > DEA，当前趋势为多头，可能存在买入信号。")
        keypoint = "√"
    else:
        print("DIF <= DEA，当前趋势为空头，可能存在卖出信号。")
        keypoint = "×"

    return data, keypoint


# 调用MACD计算函数
_, keypoint = calculate_macd(history_df)
KEYPOINT.setdefault("长期MACD(√×)", keypoint)

In [None]:
def calculate_kdj(data, window=9):
    """
    计算KDJ随机指标，判断买入信号。
    :param data: 包含收盘价、最高价、最低价的DataFrame
    :param window: KDJ计算窗口，默认9天
    :return: 包含KDJ指标的DataFrame
    """
    # 检查数据长度是否足够
    if len(data) < window:
        print(f"数据长度不足{window}天，无法计算KDJ指标。")
        return data

    # 检查数据是否完整
    required_columns = ['收盘', '最高', '最低']
    if not all(col in data.columns for col in required_columns):
        print("数据中缺少必要的列，请确保包含'收盘', '最高', '最低'列。")
        return data

    # 检查数据是否有缺失值
    if data[required_columns].isnull().values.any():
        print("数据中存在缺失值，正在填充...")
        data[required_columns] = data[required_columns].fillna(method='ffill')  # 用前值填充

    # 计算RSV值
    data['RSV'] = ((data['收盘'] - data['最低'].rolling(window).min()) /
                   (data['最高'].rolling(window).max() - data['最低'].rolling(window).min())) * 100

    # 避免除零错误
    data['RSV'] = data['RSV'].replace([np.inf, -np.inf], np.nan)  # 替换无穷值为NaN
    data['RSV'] = data['RSV'].fillna(0)  # 如果有NaN，填充为0

    # 初始化K和D值为浮点数
    data['K'] = 50.0  # 初始值为浮点数
    data['D'] = 50.0  # 初始值为浮点数

    # 计算K和D值
    for i in range(1, len(data)):
        data.loc[data.index[i], 'K'] = (2 / 3) * data.loc[data.index[i - 1], 'K'] + (1 / 3) * data.loc[
            data.index[i], 'RSV']
        data.loc[data.index[i], 'D'] = (2 / 3) * data.loc[data.index[i - 1], 'D'] + (1 / 3) * data.loc[
            data.index[i], 'K']

    # 计算J值
    data['J'] = 3 * data['K'] - 2 * data['D']

    # 判断买入条件：K > D 且两者从超卖区（<20）回升
    data['K_prev'] = data['K'].shift(1)
    data['D_prev'] = data['D'].shift(1)

    # 买入信号条件
    buy_condition = (
            (data['K'] > data['D']) &
            (data['K_prev'] < 20) &
            (data['D_prev'] < 20)
    )

    # 标记买入信号
    data['Buy_Signal'] = buy_condition

    # 检查是否存在NaN值
    if data[['K', 'D', 'J']].isnull().values.any():
        print("警告：KDJ指标计算结果中存在NaN值，请检查数据完整性。")

    # 打印最新KDJ值和买入信号
    latest_k = data['K'].iloc[-1]
    latest_d = data['D'].iloc[-1]
    latest_j = data['J'].iloc[-1]
    latest_buy_signal = data['Buy_Signal'].iloc[-1]

    print(f"最新KDJ值: K={latest_k:.2f}, D={latest_d:.2f}, J={latest_j:.2f}")
    if latest_buy_signal:
        print("当前存在买入信号：K > D 且两者从超卖区回升。")
        keypoint = "√"
    else:
        print("当前不存在买入信号。")
        keypoint = "-"

    return data, keypoint


# 调用KDJ计算函数
_, keypoint = calculate_kdj(history_df)
KEYPOINT.setdefault("短期KDJ(√-×)", keypoint)

In [None]:
def calculate_sar(data, af_start=0.02, af_step=0.02, af_max=0.2):
    """
    计算抛物线转向指标（SAR），判断趋势反转信号。
    :param data: 包含最高价和最低价的DataFrame
    :param af_start: 加速因子初始值，默认0.02
    :param af_step: 加速因子步进值，默认0.02
    :param af_max: 加速因子最大值，默认0.2
    :return: 包含SAR指标的DataFrame和趋势信号
    """
    # 检查数据是否包含必要的列
    if '最高' not in data.columns or '最低' not in data.columns:
        print("数据中缺少最高价或最低价信息，请检查数据是否完整。")
        return data, "×"

    # 初始化SAR列
    data['SAR'] = 0.0
    data['trend'] = 0  # 1表示上涨趋势，-1表示下跌趋势

    # 初始化变量
    trend = 1  # 假设初始趋势为上涨
    af = af_start  # 加速因子
    ep = data['最高'].iloc[0]  # 极值点
    sar = data['最低'].iloc[0]  # SAR初始值

    # 计算SAR值
    for i in range(1, len(data)):
        # 保存前一个SAR值
        prev_sar = sar

        # 根据趋势计算SAR
        if trend == 1:  # 上涨趋势
            sar = prev_sar + af * (ep - prev_sar)
            # 确保SAR不高于前两个周期的最低价
            sar = min(sar, data['最低'].iloc[max(0, i - 2):i].min())

            # 检查趋势是否改变
            if sar > data['最低'].iloc[i]:
                trend = -1  # 转为下跌趋势
                sar = ep  # SAR设为前期最高点
                ep = data['最低'].iloc[i]  # 新的极值点为当前最低价
                af = af_start  # 重置加速因子
            else:
                # 更新极值点和加速因子
                if data['最高'].iloc[i] > ep:
                    ep = data['最高'].iloc[i]
                    af = min(af + af_step, af_max)

        else:  # 下跌趋势
            sar = prev_sar - af * (prev_sar - ep)
            # 确保SAR不低于前两个周期的最高价
            sar = max(sar, data['最高'].iloc[max(0, i - 2):i].max())

            # 检查趋势是否改变
            if sar < data['最高'].iloc[i]:
                trend = 1  # 转为上涨趋势
                sar = ep  # SAR设为前期最低点
                ep = data['最高'].iloc[i]  # 新的极值点为当前最高价
                af = af_start  # 重置加速因子
            else:
                # 更新极值点和加速因子
                if data['最低'].iloc[i] < ep:
                    ep = data['最低'].iloc[i]
                    af = min(af + af_step, af_max)

        # 记录SAR值和趋势
        data.loc[data.index[i], 'SAR'] = sar
        data.loc[data.index[i], 'trend'] = trend

    # 获取最新趋势和SAR值
    latest_trend = data['trend'].iloc[-1]
    latest_sar = data['SAR'].iloc[-1]
    latest_price = data['收盘'].iloc[-1]

    print(f"\nSAR分析结果:")
    print(f"最新SAR值: {latest_sar:.2f}")
    print(f"当前收盘价: {latest_price:.2f}")

    # 判断趋势信号
    if latest_trend == 1 and latest_price > latest_sar:
        print("当前处于上涨趋势，SAR位于价格下方，建议持有或买入。")
        keypoint = "√"
    elif latest_trend == -1 and latest_price < latest_sar:
        print("当前处于下跌趋势，SAR位于价格上方，建议观望或卖出。")
        keypoint = "×"
    else:
        print("趋势不明确，建议观望。")
        keypoint = "-"

    return data, keypoint


# 调用SAR计算函数
_, keypoint = calculate_sar(history_df)
KEYPOINT.setdefault("短期SAR(√-×)", keypoint)

In [None]:
print(f"代码：{stock_code}")
print(company_info)
print(f"最新收盘价: {latest_price:.2f}")
print(f"筹码平均成本: {mean_chip:.2f}")
print(KEYPOINT)

# 计算 value_counts
value_counts = Counter(KEYPOINT.values())
# 对 value_counts 按 key 值进行排序
sorted_value_counts = dict(sorted(value_counts.items(), key=lambda item: item[0]))

print(sorted_value_counts)