In [None]:
# pip install pymdptoolbox numpy
import numpy as np
import mdptoolbox

# ========== 参数 ==========
max_tries = 7
target_atk = 14
A_max = 30          # 攻击状态上限（防越界，取比目标稍大）
gamma = 1.0         # 最大化成功概率
reward_success = 1.0

# 四个动作：A5, A2, B5, B2 及其成功率、加成、失败时是否带消失风险
# 格式: (success_prob, delta_on_success, fail_destroy_prob)
# 失败不加成（+0）；若 fail_destroy_prob > 0，则失败后以该概率进入 GONE，否则留在 (t+1, atk)
ACTIONS = [
    ("A5", 0.11, 3, 0.0),  # 成功10% +5；失败不消失
    ("A2", 0.66, 2, 0.0),  # 成功60% +2；失败不消失
    ("B5", 0.33, 3, 0.5),  # 成功30% +5；失败时50%消失
    ("B2", 0.77, 2, 0.5),  # 成功70% +2；失败时50%消失
]
nA = len(ACTIONS)

# ========== 状态编码 ==========
id_map = {}
states = []
for t in range(max_tries + 1):
    for atk in range(A_max + 1):
        sid = len(id_map)
        id_map[(t, atk)] = sid
        states.append((t, atk))

S_goal = len(id_map); id_map["GOAL"] = S_goal
S_gone = S_goal + 1;  id_map["GONE"] = S_gone
S_dead = S_goal + 2;  id_map["DEAD"] = S_dead

nS = len(id_map)

def clamp_atk(a):
    return min(a, A_max)

# ========== 初始化 P, R ==========
P = [np.zeros((nS, nS)) for _ in range(nA)]
R = [np.zeros((nS, nS)) for _ in range(nA)]

def add_transition(a_idx, s_from, s_to, prob, reward):
    if prob <= 1e-15:  # 避免浮点噪声
        return
    P[a_idx][s_from, s_to] += prob
    # 仅在到达 GOAL 的转移上给奖励 1
    if reward != 0.0:
        R[a_idx][s_from, s_to] = reward

for t in range(max_tries + 1):
    for atk in range(A_max + 1):
        s = id_map[(t, atk)]

        # 若已达标，吸收到 GOAL
        if atk >= target_atk:
            for a_idx in range(nA):
                add_transition(a_idx, s, S_goal, 1.0, 0.0)
            continue

        # 若次数用尽且未达标，吸收到 DEAD
        if t >= max_tries:
            for a_idx in range(nA):
                add_transition(a_idx, s, S_dead, 1.0, 0.0)
            continue

        t_next = t + 1

        # 为每个动作构造转移
        for a_idx, (name, p_succ, delta, fail_destroy_prob) in enumerate(ACTIONS):
            p_fail = 1.0 - p_succ

            # 成功分支：atk 增加 delta；若达标则进入 GOAL，否则到 (t+1, atk+delta)
            atk_succ = clamp_atk(atk + delta)
            if atk_succ >= target_atk:
                add_transition(a_idx, s, S_goal, p_succ, reward_success)
            else:
                s_next = id_map[(t_next, atk_succ)]
                add_transition(a_idx, s, s_next, p_succ, 0.0)

            # 失败分支：+0
            if p_fail > 0:
                # 部分失败直接消失
                if fail_destroy_prob > 0:
                    add_transition(a_idx, s, S_gone, p_fail * fail_destroy_prob, 0.0)
                    p_fail_keep = p_fail * (1.0 - fail_destroy_prob)
                else:
                    p_fail_keep = p_fail

                # 失败但未消失：停留在攻击不变、次数+1
                if p_fail_keep > 0:
                    s_fail_keep = id_map[(t_next, clamp_atk(atk))]
                    add_transition(a_idx, s, s_fail_keep, p_fail_keep, 0.0)

# 终止态吸收
for a_idx in range(nA):
    P[a_idx][S_goal, S_goal] = 1.0
    P[a_idx][S_gone, S_gone] = 1.0
    P[a_idx][S_dead, S_dead] = 1.0
    # 终止奖励默认保持 0（除了到达 GOAL 的那一下已经给了 1）

In [4]:
# ========== 值迭代求解 ==========
vi = mdptoolbox.mdp.ValueIteration(P, R, discount=gamma)
vi.run()

V = vi.V            # 状态的最大成功概率
policy = vi.policy  # 最优动作：0=A5, 1=A2, 2=B5, 3=B2

# 初始状态的结果
s0 = id_map[(0, 0)]
print("Initial best action (0=A5,1=A2,2=B5,3=B2):", policy[s0])
print("Initial success probability:", V[s0])

# 查看某些中间状态的最优动作
def best_action_at(t, atk):
    return policy[id_map[(t, atk)]]

probes = [(0,0), (1,0), (2,2), (3,4), (4,6), (5,10), (6,12)]
for t_probe, atk_probe in probes:
    print(f"(t={t_probe}, atk={atk_probe}) best action:", best_action_at(t_probe, atk_probe))

Initial best action (0=A5,1=A2,2=B5,3=B2): 1
Initial success probability: 0.19977434999999996
(t=0, atk=0) best action: 1
(t=1, atk=0) best action: 2
(t=2, atk=2) best action: 2
(t=3, atk=4) best action: 2
(t=4, atk=6) best action: 2
(t=5, atk=10) best action: 1
(t=6, atk=12) best action: 3


In [8]:
2**7

128

In [None]:
num_slots = 3
all_possible_states = [3, 2, 0, -1]
states = [all_possible_states]
for slot_i in range(num_slots):
    for every_state in states:
        states.append([every_state, i for i in all_possible_states])



In [16]:
to_paths[2]

[[0, 2],
 [0, 2, 2],
 [0, 2, 2, 2],
 [0, 2, 2, 2, 2],
 [0, 2, 2, 2, 2, 2],
 [0, 2, 2, 2, 2, 2, 2],
 [0, 2, 2, 2, 2, 2, 2, 2],
 [0, 0, 2],
 [0, 0, 2, 2],
 [0, 0, 2, 2, 2],
 [0, 0, 2, 2, 2, 2],
 [0, 0, 2, 2, 2, 2, 2],
 [0, 0, 2, 2, 2, 2, 2, 2],
 [0, 0, 0, 2],
 [0, 0, 0, 2, 2],
 [0, 0, 0, 2, 2, 2],
 [0, 0, 0, 2, 2, 2, 2],
 [0, 0, 0, 2, 2, 2, 2, 2],
 [0, 0, 0, 0, 2],
 [0, 0, 0, 0, 2, 2],
 [0, 0, 0, 0, 2, 2, 2],
 [0, 0, 0, 0, 2, 2, 2, 2],
 [0, 0, 0, 0, 0, 2],
 [0, 0, 0, 0, 0, 2, 2],
 [0, 0, 0, 0, 0, 2, 2, 2],
 [0, 0, 0, 0, 0, 0, 2],
 [0, 0, 0, 0, 0, 0, 2, 2],
 [0, 0, 0, 0, 0, 0, 0, 2]]

In [None]:
SCROLL_A = Scroll("A_10p_+3", success_p=0.11, atk_value=3, destroy_on_fail_p=0.0)
SCROLL_B = Scroll("B_60p_+2", success_p=0.66, atk_value=2, destroy_on_fail_p=0.0)
SCROLL_C = Scroll("C_30p_+3_boom50", success_p=0.33, atk_value=3, destroy_on_fail_p=0.50)
SCROLL_D = Scroll("D_70p_+2_boom50", success_p=0.77, atk_value=2, destroy_on_fail_p=0.50)

SCROLL_SET = {"A": SCROLL_A, "B": SCROLL_B, "C": SCROLL_C, "D": SCROLL_D}

In [None]:
ACTIONS = SCROLL_SET

In [None]:
from collections import defaultdict
from dataclasses import dataclass

# --- 你的动作数据结构 ---
@dataclass
class Scroll:
    name: str
    success_p: float
    atk_value: int
    destroy_on_fail_p: float  # 失败时爆炸概率

SCROLL_A = Scroll("A_10p_+3", success_p=0.11, atk_value=3, destroy_on_fail_p=0.0)
SCROLL_B = Scroll("B_60p_+2", success_p=0.66, atk_value=2, destroy_on_fail_p=0.0)
SCROLL_C = Scroll("C_30p_+3_boom50", success_p=0.33, atk_value=3, destroy_on_fail_p=0.50)
SCROLL_D = Scroll("D_70p_+2_boom50", success_p=0.77, atk_value=2, destroy_on_fail_p=0.50)
SCROLL_SET = {"A": SCROLL_A, "B": SCROLL_B, "C": SCROLL_C, "D": SCROLL_D}

# --- 奖励设计（示例，可按你的业务修改） ---
GROVE_SCROLL_PRICE = {
    "A": 33, 
    "B": 597, 
    "C": 14959, 
    "D": 2325
}  # 每次卷轴成本
mxb_to_rmb = 56 # 56W mxb = 1rmb
GROVE_ATTACK_PRICE = {
    "10": 1500,
    "11": 2500,
    "12": 5000,
    "13": 14000,
    "14": 41000,
    "15": 2200*mxb_to_rmb,
    "16": 6000*mxb_to_rmb,
    "17": 12500*mxb_to_rmb
} # 攻击力的价值
BOOM_PENALTY = 10.0       # 爆炸惩罚
TARGET_ATK = None         # 若设为整数，达到或超过后给终止奖励
GOAL_BONUS = 0.0          # 目标奖励

# --- 状态空间：你可以枚举合理上限（例如 0..MAX_ATK）加上 -1 终止状态 ---
TERMINAL_BROKEN = -1

def is_terminal_state(atk: int, t: int) -> bool:
    # t == 0 表示没次数了；atk == -1 表示爆炸
    return atk == TERMINAL_BROKEN or t == 0

def reward_function(s: int, a: str, s_next: int) -> float:
    # s: current attack value
    # a: which scroll to use. "A", "B", "C", "D"
    # s_next: next possible attack value
    # scroll: scroll set
    
    # 基础：动作成本为负
    r = -GROVE_SCROLL_PRICE.get(a)
    # 成功增益：以“状态变化带来的攻击力提升”计价
    if s_next >= 10 and s_next > s: # only larger than 10 attack worth a price
        r += GROVE_ATTACK_PRICE.get(s_next) - GROVE_ATTACK_PRICE.get(s)
    # 爆炸惩罚
    if s_next == TERMINAL_BROKEN:
        r -= GROVE_ATTACK_PRICE.get(s)
    return r

# --- 从动作定义构造 MDP 的转移分布 P(s'|s,a) 与期望奖励 R(s,a) ---
def build_transition_and_reward(max_atk=MAX_ATK):
    S = set(range(0, max_atk + 1))
    S.add(TERMINAL_BROKEN)  # 爆炸终止

    # A[s] contains all possible actions on state s
    A = defaultdict(dict)
    # P[s][a] 是 dict: s' -> prob
    P = defaultdict(lambda: defaultdict(dict))
    # R[s][a] 期望一步奖励（按 s' 加权后的期望）
    R = defaultdict(lambda: defaultdict(dict))

    for s in S:

        # handle boom state
        if is_terminal(s):
            continue

        for a, scroll in SCROLL_SET.items():
            succ = scroll.success_p
            boom_on_fail = scroll.destroy_on_fail_p
            fail = 1.0 - succ

            # three possible s_prime
            s_succ = s + scroll.atk_value
            s_fail = s
            s_boom = TERMINAL_BROKEN
            if s_succ > max_atk: # impossible state
                continue

            P[s][a] = {
                s_succ: succ,                    # 成功
                s_fail: fail * (1-boom_on_fail), # 失败-未爆
                s_boom: fail * boom_on_fail      # 失败-爆炸
            }

            # expected reward
            exp_r = 0.0
            for s_prime, prob in trans.items():
                exp_r += prob * reward_function(s, a, s_prime)
            R[s][a] = exp_r

        # all possible actions
        A[s] = SCROLL_SET

    return S, A, P, R

In [None]:
# --- 产出四元组样本 (s,a,r,s')（如果你更喜欢样本而不是显式分布） ---
import random

def sample_transition(s: int, a: str, scroll: Scroll) -> int:
    u = random.random()
    succ = scroll.success_p
    fail = 1.0 - succ
    boom = fail * scroll.destroy_on_fail_p
    stay = fail * (1.0 - scroll.destroy_on_fail_p)

    if u < succ:
        return s + scroll.atk_value
    elif u < succ + stay:
        return s
    else:
        return TERMINAL_BROKEN

def generate_sars_episodes(num_episodes=1000, max_steps=50, start_state=0):
    episodes = []
    for _ in range(num_episodes):
        s = start_state
        traj = []
        for _ in range(max_steps):
            if is_terminal(s):
                break
            a = random.choice(list(SCROLL_SET.keys()))
            sp = sample_transition(s, a, SCROLL_SET[a])
            r = reward_function(s, a, sp, SCROLL_SET[a])
            traj.append((s, a, r, sp))
            s = sp
        episodes.append(traj)
    return episodes

# --- 值迭代（可用 P,R 直接求最优策略） ---
def value_iteration(S, A, P, R, gamma=0.95, theta=1e-8, max_iter=10000):
    V = {s: 0.0 for s in S}
    policy = {s: None for s in S}

    for _ in range(max_iter):
        delta = 0.0
        for s in S:
            if not A[s]:  # 终止
                continue
            best_q, best_a = float("-inf"), None
            for a in A[s]:
                q = R[s][a]
                for sp, prob in P[s][a].items():
                    q += gamma * prob * V[sp]
                if q > best_q:
                    best_q, best_a = q, a
            delta = max(delta, abs(best_q - V[s]))
            V[s], policy[s] = best_q, best_a
        if delta < theta:
            break
    return V, policy

if __name__ == "__main__":
    # 构建分布形式的 MDP
    S, A, P, R = build_transition_and_reward(max_atk=MAX_ATK)
    V, policy = value_iteration(S, A, P, R, gamma=0.95)

    # 展示若干状态的最优动作
    for s in [0, 1, 2, 3, 5, 10]:
        if s in policy and policy[s] is not None:
            print(f"s={s:2d}, best action={policy[s]}, V={V[s]:.3f}")

    # 若你想得到 (s,a,r,s') 样本形式：
    episodes = generate_sars_episodes(num_episodes=3, max_steps=10, start_state=0)
    for i, traj in enumerate(episodes, 1):
        print(f"\nEpisode {i}")
        for t, (s, a, r, sp) in enumerate(traj):
            print(f" t={t:2d}: (s={s}, a={a}) -> s'={sp}, r={r:.2f}")

In [None]:
import numpy as np
from collections import defaultdict

def states_to_paths(changes, start=0, steps=3):
    to_paths = defaultdict(list)

    def dfs(level, cur_path, cur_state):
        to_paths[cur_state].append(cur_path[:])
        if level == steps:
            return
        
        for d in changes:
            if d != -1:
                next_state = cur_state + d
            else:
                next_state = -1
            next_path = cur_path + [next_state]
            if d == -1:
                to_paths[next_state].append(next_path)
                continue
            dfs(level + 1, next_path, next_state)

    dfs(0, [start], start)
    return to_paths

# 7个slots，攻击力卷轴，所有可能状态
to_paths = states_to_paths([3, 2, 0, -1], start=0, steps=7)

# print("状态 14 的到达路径条数:", len(to_paths[14]))
# for p in to_paths[14]:
#     print(p)

状态 14 的到达路径条数: 261
[0, 3, 6, 9, 12, 14]
[0, 3, 6, 9, 12, 14, 14]
[0, 3, 6, 9, 12, 14, 14, 14]
[0, 3, 6, 9, 12, 12, 14]
[0, 3, 6, 9, 12, 12, 14, 14]
[0, 3, 6, 9, 12, 12, 12, 14]
[0, 3, 6, 9, 11, 14]
[0, 3, 6, 9, 11, 14, 14]
[0, 3, 6, 9, 11, 14, 14, 14]
[0, 3, 6, 9, 11, 11, 14]
[0, 3, 6, 9, 11, 11, 14, 14]
[0, 3, 6, 9, 11, 11, 11, 14]
[0, 3, 6, 9, 9, 12, 14]
[0, 3, 6, 9, 9, 12, 14, 14]
[0, 3, 6, 9, 9, 12, 12, 14]
[0, 3, 6, 9, 9, 11, 14]
[0, 3, 6, 9, 9, 11, 14, 14]
[0, 3, 6, 9, 9, 11, 11, 14]
[0, 3, 6, 9, 9, 9, 12, 14]
[0, 3, 6, 9, 9, 9, 11, 14]
[0, 3, 6, 8, 11, 14]
[0, 3, 6, 8, 11, 14, 14]
[0, 3, 6, 8, 11, 14, 14, 14]
[0, 3, 6, 8, 11, 11, 14]
[0, 3, 6, 8, 11, 11, 14, 14]
[0, 3, 6, 8, 11, 11, 11, 14]
[0, 3, 6, 8, 10, 12, 14]
[0, 3, 6, 8, 10, 12, 14, 14]
[0, 3, 6, 8, 10, 12, 12, 14]
[0, 3, 6, 8, 10, 10, 12, 14]
[0, 3, 6, 8, 8, 11, 14]
[0, 3, 6, 8, 8, 11, 14, 14]
[0, 3, 6, 8, 8, 11, 11, 14]
[0, 3, 6, 8, 8, 10, 12, 14]
[0, 3, 6, 8, 8, 8, 11, 14]
[0, 3, 6, 6, 9, 12, 14]
[0, 3, 6, 6, 9, 12, 14

In [None]:
# 每条路径的卷轴策略
scrolls_type_number = [np.unique(np.diff(i), return_counts=True) for i in to_paths[14]]
num_plus_three_scrolls = np.array([], dtype=np.int16)
for i in range(len(scrolls_type_number)):
    if 3 not in scrolls_type_number[i][0]:
        num_plus_three_scrolls = np.append(num_plus_three_scrolls, 0)
    else:
        temp = scrolls_type_number[i][1][scrolls_type_number[i][0]==3][0]
        num_plus_three_scrolls = np.append(num_plus_three_scrolls, temp)

[to_paths[14][i] for i,j in enumerate(num_plus_three_scrolls == 2) if j]

[[0, 3, 6, 8, 10, 12, 14],
 [0, 3, 6, 8, 10, 12, 14, 14],
 [0, 3, 6, 8, 10, 12, 12, 14],
 [0, 3, 6, 8, 10, 10, 12, 14],
 [0, 3, 6, 8, 8, 10, 12, 14],
 [0, 3, 6, 6, 8, 10, 12, 14],
 [0, 3, 5, 8, 10, 12, 14],
 [0, 3, 5, 8, 10, 12, 14, 14],
 [0, 3, 5, 8, 10, 12, 12, 14],
 [0, 3, 5, 8, 10, 10, 12, 14],
 [0, 3, 5, 8, 8, 10, 12, 14],
 [0, 3, 5, 7, 10, 12, 14],
 [0, 3, 5, 7, 10, 12, 14, 14],
 [0, 3, 5, 7, 10, 12, 12, 14],
 [0, 3, 5, 7, 10, 10, 12, 14],
 [0, 3, 5, 7, 9, 12, 14],
 [0, 3, 5, 7, 9, 12, 14, 14],
 [0, 3, 5, 7, 9, 12, 12, 14],
 [0, 3, 5, 7, 9, 11, 14],
 [0, 3, 5, 7, 9, 11, 14, 14],
 [0, 3, 5, 7, 9, 11, 11, 14],
 [0, 3, 5, 7, 9, 9, 12, 14],
 [0, 3, 5, 7, 9, 9, 11, 14],
 [0, 3, 5, 7, 7, 10, 12, 14],
 [0, 3, 5, 7, 7, 9, 12, 14],
 [0, 3, 5, 7, 7, 9, 11, 14],
 [0, 3, 5, 5, 8, 10, 12, 14],
 [0, 3, 5, 5, 7, 10, 12, 14],
 [0, 3, 5, 5, 7, 9, 12, 14],
 [0, 3, 5, 5, 7, 9, 11, 14],
 [0, 3, 3, 6, 8, 10, 12, 14],
 [0, 3, 3, 5, 8, 10, 12, 14],
 [0, 3, 3, 5, 7, 10, 12, 14],
 [0, 3, 3, 5, 7, 9, 12, 

## Example

In [13]:
P.shape

(2, 3, 3)

In [11]:
import mdptoolbox, mdptoolbox.example

P, R = mdptoolbox.example.forest()
fh = mdptoolbox.mdp.FiniteHorizon(P, R, 0.9, 3)
fh.setVerbose()
fh.run()

stage: 2, policy: [0, 1, 0]
stage: 1, policy: [0, 0, 0]
stage: 0, policy: [0, 0, 0]


In [6]:
fh.policy

array([[0, 0, 0],
       [0, 0, 1],
       [0, 0, 0]])

In [7]:
P

array([[[0.1, 0.9, 0. ],
        [0.1, 0. , 0.9],
        [0.1, 0. , 0.9]],

       [[1. , 0. , 0. ],
        [1. , 0. , 0. ],
        [1. , 0. , 0. ]]])

In [None]:
num_action= 4
num_steps = 7

num_state = 


In [3]:
R

array([[0., 0.],
       [0., 1.],
       [4., 2.]])

In [4]:
fh.V

array([[2.6973, 0.81  , 0.    , 0.    ],
       [5.9373, 3.24  , 1.    , 0.    ],
       [9.9373, 7.24  , 4.    , 0.    ]])