In [42]:
import pandas as pd
import numpy as np
import akshare as ak

stock_zh_a_tick_tx_js_df = ak.stock_zh_a_tick_tx_js(symbol="sz000031")
# print(stock_zh_a_tick_tx_js_df)
def calc_main_force_score_detail(df):
    df = df.copy()

    # ========= 基础处理 =========
    df["成交时间"] = pd.to_datetime(df["成交时间"])
    df = df.sort_values("成交时间").reset_index(drop=True)

    total_amount = df["成交金额"].sum()
    if total_amount == 0 or len(df) < 10:
        return None

    # ========= 大单定义 =========
    BIG_RATIO = 0.005   # 单笔 >= 当日成交额 0.5%
    big_threshold = total_amount * BIG_RATIO
    df["is_big"] = df["成交金额"] >= big_threshold
    big_df = df[df["is_big"]]

    if big_df.empty:
        return None

    # =====================================================
    # 1️⃣ 主动买大单占比（40 分）
    # =====================================================
    buy_big_amount = big_df.loc[big_df["性质"] == "买盘", "成交金额"].sum()
    buy_big_ratio = buy_big_amount / total_amount
    buy_big_score = min(buy_big_ratio / 0.2, 1) * 40

    # =====================================================
    # 2️⃣ 大单推价能力（30 分）
    # =====================================================
    def calc_push_ratio(df, look_ahead=3):
        success, total = 0, 0
        for idx in df[df["is_big"]].index:
            base_price = df.loc[idx, "成交价格"]
            future = df.loc[idx + 1: idx + look_ahead, "成交价格"]
            if len(future) == 0:
                continue
            total += 1
            if future.mean() > base_price:
                success += 1
        return success / total if total > 0 else 0

    push_ratio = calc_push_ratio(df)
    push_score = push_ratio * 30

    # =====================================================
    # 3️⃣ 大单集中度（20 分）
    # =====================================================
    big_amount_ratio = big_df["成交金额"].sum() / total_amount
    concentration_score = min(big_amount_ratio / 0.4, 1) * 20

    # =====================================================
    # 4️⃣ 中性盘对倒识别（只惩罚异常行为）
    # =====================================================
    df["is_suspect_duidui"] = False

    neutral = df[df["性质"] == "中性盘"].copy()
    if not neutral.empty and len(neutral) >= 3:

        neutral["time_diff"] = neutral["成交时间"].diff().dt.total_seconds()
        neutral["price_diff"] = neutral["成交价格"].diff()
        neutral["amount_diff_ratio"] = (
            neutral["成交金额"].diff().abs() / neutral["成交金额"]
        )
        # 修改为：3秒内、同价格、金额相近
        suspect_cond = (
            (neutral["time_diff"] <= 3) &              # 3秒内
            (neutral["price_diff"] == 0) &             # 同价
            (neutral["amount_diff_ratio"] < 0.2)       # 金额差 < 20%
        )

        # suspect_cond = (
        #     (neutral["time_diff"] == 0) &              # 同一秒
        #     (neutral["price_diff"] == 0) &             # 同价
        #     (neutral["amount_diff_ratio"] < 0.1)       # 金额相近
        # )

        suspect_idx = neutral[suspect_cond].index
        df.loc[suspect_idx, "is_suspect_duidui"] = True

    suspect_duidui_ratio = df["is_suspect_duidui"].mean()

    # ≥10% 可疑中性盘 → 扣满 20 分
    duidui_penalty = min(suspect_duidui_ratio / 0.1, 1) * 20

    # =====================================================
    # 总分 & 评级
    # =====================================================
    total_score = (
        buy_big_score +
        push_score +
        concentration_score -
        duidui_penalty
    )

    total_score = max(0, min(100, round(total_score, 2)))

    if total_score >= 70:
        level = "强主力主导"
    elif total_score >= 55:
        level = "疑似主力（可博弈）"
    elif total_score >= 40:
        level = "资金一般"
    else:
        level = "散户盘 / 诱多"

    return {
        "total_score": total_score,
        "level": level,
        "is_tradeable": total_score >= 60,

        "detail": {
            "大单主买占比（buy_big_ratio）": round(buy_big_ratio, 4),
            "大单主买评分（满分40）（buy_big_score）": round(buy_big_score, 2),

            "大单推价成功率（push_ratio）": round(push_ratio, 4),
            "推价能力换算成分数（满分30）push_score": round(push_score, 2),

            "大单集中度（满分20）（big_amount_ratio）": round(big_amount_ratio, 4),
            "大单集中度评分（concentration_score）": round(concentration_score, 2),

            "对倒比例（扣满 20 分）（duidui_ratio）": round(suspect_duidui_ratio, 4),
            "对倒比例评分（duidui_penalty）": round(duidui_penalty, 2)
        }
    }
res = calc_main_force_score_detail(stock_zh_a_tick_tx_js_df)

if res:
    print("总分:", res["total_score"])
    print("评级:", res["level"])
    print("是否可交易:", res["is_tradeable"])

    for k, v in res["detail"].items():
        print(f"{k}: {v}")




总分: 62.64
评级: 疑似主力（可博弈）
是否可交易: True
大单主买占比（buy_big_ratio）: 0.1642
大单主买评分（满分40）（buy_big_score）: 32.83
大单推价成功率（push_ratio）: 0.5
推价能力换算成分数（满分30）push_score: 15.0
大单集中度（满分20）（big_amount_ratio）: 0.2962
大单集中度评分（concentration_score）: 14.81
对倒比例（扣满 20 分）（duidui_ratio）: 0.0
对倒比例评分（duidui_penalty）: 0.0


  df["成交时间"] = pd.to_datetime(df["成交时间"])


In [None]:
import numpy as np
import pandas as pd


def realtime_entry_score(df, big_amt_quantile=0.9):
    score_detail = {}
    total_score = 0

    # =========================
    # 基础统计
    # =========================
    price = df["成交价格"]
    amount = df["成交金额"]

    price_range = (price.max() - price.min()) / price.min() * 100

    buy_amt = df[df["性质"] == "买盘"]["成交金额"].sum()
    sell_amt = df[df["性质"] == "卖盘"]["成交金额"].sum()
    neutral_amt = df[df["性质"] == "中性盘"]["成交金额"].sum()
    total_amt = amount.sum()

    # =========================
    # ① 方向一致性（30）
    # =========================
    direction_ratio = buy_amt / (buy_amt + sell_amt + 1e-6)
    direction_score = min(direction_ratio * 30, 30)

    score_detail["方向一致性"] = {
        "direction_ratio": round(direction_ratio, 3),
        "score": round(direction_score, 2)
    }
    total_score += direction_score

    # =========================
    # ② 推价有效性（25）
    # =========================
    price_diff = price.diff()
    push_up = (price_diff > 0).sum()
    push_down = (price_diff < 0).sum()

    push_ratio = push_up / (push_up + push_down + 1e-6)
    push_score = min(push_ratio * 25, 25)

    score_detail["推价有效性"] = {
        "push_ratio": round(push_ratio, 3),
        "score": round(push_score, 2)
    }
    total_score += push_score

    # =========================
    # ③ 成交压缩度（20）
    # =========================
    big_threshold = amount.quantile(big_amt_quantile)
    big_amt_ratio = (amount > big_threshold).mean()

    # 大单越少越好
    compression_score = max((1 - big_amt_ratio) * 20, 0)

    score_detail["成交压缩"] = {
        "big_amount_ratio": round(big_amt_ratio, 3),
        "score": round(compression_score, 2)
    }
    total_score += compression_score

    # =========================
    # ④ 中性盘地基（15）
    # =========================
    neutral_ratio = neutral_amt / total_amt

    if price_range < 1.5:
        neutral_score = min(neutral_ratio * 40, 15)
    else:
        neutral_score = 0

    score_detail["中性盘地基"] = {
        "neutral_ratio": round(neutral_ratio, 3),
        "score": round(neutral_score, 2)
    }
    total_score += neutral_score

    # =========================
    # ⑤ 风险扣分（-30 ~ 0）
    # =========================
    risk_penalty = 0

    # 高位放量
    if price_range > 3 and big_amt_ratio > 0.3:
        risk_penalty -= 15

    # 对倒但不涨
    if neutral_ratio > 0.4 and push_ratio < 0.5:
        risk_penalty -= 15

    score_detail["风险扣分"] = {
        "penalty": risk_penalty
    }
    total_score += risk_penalty

    # =========================
    # 状态判定
    # =========================
    if total_score >= 80:
        state = "已拉升"
    elif total_score >= 60:
        state = "可进"
    else:
        state = "观察"

    return {
        "total_score": round(total_score, 2),
        "state": state,
        "detail": score_detail
    }
res = realtime_entry_score(stock_zh_a_tick_tx_js_df)

if res:
    print("总分:", res["total_score"])
    print("是否可交易:", res["state"])
    # print("是否可交易:", res["is_tradeable"])

    for k, v in res["detail"].items():
        print(f"{k}: {v}")




In [44]:
res = realtime_entry_score(stock_zh_a_tick_tx_js_df)

if res:
    print("总分:", res["total_score"])
    print("是否可交易:", res["state"])
    # print("是否可交易:", res["is_tradeable"])

    for k, v in res["detail"].items():
        print(f"{k}: {v}")


总分: 47.13
是否可交易: 观察
方向一致性: {'direction_ratio': np.float64(0.56), 'score': np.float64(16.79)}
推价有效性: {'push_ratio': np.float64(0.494), 'score': np.float64(12.35)}
成交压缩: {'big_amount_ratio': np.float64(0.1), 'score': np.float64(17.99)}
中性盘地基: {'neutral_ratio': np.float64(0.025), 'score': 0}
风险扣分: {'penalty': 0}
