## controllo quantistico di un qubit tramite Reinforcement Learning

**Obiettivo**: addestrare un agente di Reinforcement Learning affinché impari a manipolare un **qubit** utilizzando un **campo esterno 2D controllabile**.  
L'agente dovrà imparare una sequenza di impulsi che evolvano lo stato del qubit da uno stato iniziale \( $|\psi_0\rangle \ $) a uno stato target \( $|\psi_{\text{target}}\rangle \$), massimizzando l'**overlap finale**.

###  Modello fisico

Il qubit evolve secondo una Hamiltoniana del tipo:


$H(t) = u_x(t) \cdot \sigma_x + u_y(t) \cdot \sigma_y$


dove:
- \( $\sigma_x, \sigma_y $\) sono le matrici di Pauli
- \($ u_x(t), u_y(t) \$) sono i controlli (impulsi) sull'asse X e Y
- Il controllo è aggiornato passo dopo passo dall'agente RL

L'evoluzione temporale è descritta dalla dinamica unitaria:


|$\psi(t+\Delta t)\rangle = e^{-i H(t) \Delta t} \cdot |\psi(t)\rangle $






In [None]:
import numpy as np
from scipy.linalg import expm #to esponentiate matrix

#define Pauli matrix
sigma_x = np.array([[0, 1], [1, 0]], dtype=np.complex128)
sigma_y = np.array([[0, -1j], [1j, 0]], dtype=np.complex128)
#define hamiltonian
def H(ux,uy):
    return ux*sigma_x+uy*sigma_y
    
#define time evolution
def Evolve(state,ux,uy, deltat):
    U=expm(-1j*H(ux,uy)*deltat)
    new_state=U@state
    return new_state/np.linalg.norm(new_state)

#define fidelity (reward function) as the overlap |<psit|psi'>|^2 (to be maximized)
def Fidelity(target_state,state): 
    f= np.abs(np.vdot(target_state, state))**2
    return f


# Reinforcement learning

il reinforcement learning è un paradigma di deep learning in cui un **agente** interagisce con un **ambiente**, che fornisce **ricompense** numeriche in funzione delle **azioni**. L'agente impara una strategia otimale per **massimizzare la ricompensa ricevuta dall'ambiente**.

Un problema di Reinforcement Learning si modella come un **MDP** (Markov Decision Process),  
in cui lo stato dell'ambiente è caratterizzato da un set di parametri:

$$
Environment = (S, A, R, P, \gamma)
$$

- Spazio degli stati accessibili del sistema \(S\)  
- Spazio delle possibili azioni da compiere\(A\) 
- Spazio delle ricompense \(R(s,a)\)   
- Spazio delle probabilità di transizione, distribuzioni di probabilità di stati e azioni successivi dati i precedenti  
- Fattore di sconto \($\gamma$ $\in [0,1]$\)  

In breve, l'algoritmo è il seguente:
 1. l'agente seleziona un'azione $a_t$ secondo la sua **policy**
 2. l'ambiente conferisce una ricompensa $r_t=R(\cdot|s_t,a_t)$ e genera lo stato successivo $s_{t+1}=P(\cdot |s_t,a_t)$
 3. l'agente riceve $r_t$ ed $s_t$ e ripete

La policy è una funzione $\pi: S \rightarrow A$ che viene utilizzata per scegliere l'azione da compiere in ogni stato. L'obiettivo dell'agente è trovare la policy che massimizzi la **ricompensa cumulativa scontata** $\sum_{t}\gamma^t r_t$.
La miglior policy $\pi^*$ è quella che massimizza la ricompensa e che tiene conto della natura stocastica dell'ambiente e della policy stessa,ovvero quella che in media, produce la sequenza di azioni e transizioni che porta al massimo ritorno possibile nel lungo termine. Formalmente:
$$
\pi^* = \arg\max_{\pi} \; \mathbb{E} \left[ \sum_{t \ge 0} \gamma^t r_t \;\middle|\; \pi \right]
$$

Il problema del trovare la miglior policy può essere ridotto introducento la ** Q-value function Q(s,a)**, che misura l'efficacia di una coppia stato-azione dati i precedenti, mediando sulla ricompensa cumulativa seguendo la policy scelta:
$$
Q^\pi(s,a) \;=\; \mathbb{E} \!\left[ \sum_{t \ge 0} \gamma^t r_t 
\;\middle|\; s_0 = s,\, a_0 = a,\, \pi \right]
$$
Segue che la Q-value function **ottimale $Q^*(s,a)=max_{\pi} (Q^\pi)$** è quella che massimizza la ricompensa attesa $ r+\gamma Q^*(s',a')$, ovvero quella soddisfacente l'**equazione di Bellman**:
$$
Q^*(s,a) = \mathbb{E}_{s' \sim P(\cdot|s,a)} 
\!\left[ r + \gamma \max_{a'} Q^*(s',a') \;\middle|\; s,a \right]
$$

La **policy ottimale**, dunque, corrispondere a prendere **la miglior azione possibile in ogni stato, come specificato da $Q^*$**

Operativamente, il **value iteration algorithm** usa l'equazione di Bellman come un update iterativo, stimando la Q dell'iterazione corrente come valore di aspettazione della ricompensa immediata più la miglior Q dell'iterazione precedente, pesando sulla distribuzione degli stati successivi:
$$
Q_{i+1}(s,a) \;=\; \mathbb{E}_{s' \sim P(\cdot \mid s,a)} 
\Big[ \, r(s,a) + \gamma \max_{a'} Q_i(s',a') \,\Big]
$$
Ad ogni passo la stima di Q migliora, si può mostrare che:
$$
\lim_{i \to \infty} Q_i(s,a) = Q^*(s,a)
$$

Nel Q-learning classico (task semplici), è possibile tabulare tutti i valori di Q per ogni coppia azione-stato. Quando il problema diventa più complesso, come nel caso del qbit, si utilizza il paradigma **Deep Q-learning** in cui la Q viene **approssimata da una rete neurale profonda** $Q(s,a,\theta)=Q(s,a)$. Formalmente il problema del deep Q-learning si può formulare come segue:
$$L_i(\theta_i)
= \mathbb{E}_{(s,a)\sim \rho(\cdot)} \Big[\big(y_i - Q(s,a;\theta_i)\big)^2\Big],

$$
$$
y_i
= \mathbb{E}_{s' \sim \varepsilon}\!\left[\, r + \gamma \max_{a'} Q(s',a';\theta_{i-1}) \,\middle|\, s,a \right].
$$
dove si vuole minimizzare la loss-function L.

---
# Prima parte: spazio delle azioni discreto

Una prima realizzazione del progetto di controllo di qbit tramite reinforcement learning è stata realizzata nel caso più semplice, in cui l'agente può scegliere l'azione da compiere (scelta dell'impulso $(u_x,u_y)$) da un **set discreto di valori** nell'intervallo $[-1,1]$.

L'ambiente custom è definito utilizzando la libreria gymnasium. 

Lo stato iniziale è fissato come: $\psi_0=\frac{1}{\sqrt(2)}(|0\rangle + |1\rangle)$, mentre quello target è $\psi_T=|1\rangle$.

il metodo step, ad ogni azione, fornisce una ricompensa $r=|\langle \psi|\psi_T\rangle|^2$ e termina l'episodio se è già sufficientemente alta.

L'agente DQN in questo caso è una rete con 4 layer fully-connected, di cui 3 con attivazione relu e l'ultimo lineare, che fa da regressore.

La policy scelta in questo caso è la **epsilon-greedy policy**, che favorisce l'esplorazione, definita come:
$$
\pi(a \mid s) =
\begin{cases}
1 - \varepsilon + \dfrac{\varepsilon}{|A|}, & \text{se } a = \arg\max\limits_{a'} Q(s,a'), \\[10pt]
\dfrac{\varepsilon}{|A|}, & \text{altrimenti.}
\end{cases}
$$
con |A|= cardinalità dello spazio delle azioni. Il codice implementa il campionamento da questa distribuzione.


In [None]:
#define a gymnasium ambient
import gymnasium as gym
from gymnasium import spaces
import tensorflow as tf
import random
from collections import deque

class Qbitenvironment_discrete(gym.Env):

    def __init__(self):
        super().__init__()
    #define possible actions: discrete tuning of fields
        self.u_vals=np.linspace(-1.0, 1.0, 5)
        self.actions = [(ux, uy) for ux in self.u_vals for uy in self.u_vals]
        self.action_space = spaces.Discrete(len(self.actions)) #discrete space for now
     #   ⟨σx⟩, ⟨σy⟩, ⟨σz⟩
        self.observation_space = spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32)
        #timestep
        self.dt = 0.15  
        self.sigma_x = np.array([[0, 1], [1, 0]], dtype=np.complex128)
        self.sigma_y = np.array([[0, -1j], [1j, 0]], dtype=np.complex128)
        self.sigma_z = np.array([[1, 0], [0, -1]], dtype=np.complex128)
        self.psi_target = np.array([0.0, 1.0], dtype=np.complex128)  # es. |+⟩
        self.max_steps = 50
        self.reset()
     
    def reset(self, seed=None, options=None):
        psi = np.random.randn(2) + 1j*np.random.randn(2) #random initial state
        psi/=np.linalg.norm(psi)        
        self.psi=psi
        #self.psi = np.array([1.0/np.sqrt(2), 1.0/np.sqrt(2)], dtype=np.complex128)  

        obs = self._get_observation()
        self.current_step = 0
        return obs, {}

    def step(self, action_idx):
        ux, uy = self.actions[action_idx]
        self.psi = Evolve(self.psi, ux, uy, self.dt)
        obs = self._get_observation()
        reward = Fidelity(self.psi_target, self.psi)
        self.current_step += 1
        truncated=False
        terminated=False

        if reward > 0.97:
            terminated=True

        if (self.current_step >= self.max_steps):
            truncated = True 
        return obs, reward, terminated, truncated, {}
    

    def _get_observation(self):
        sx = np.vdot(self.psi, self.sigma_x @ self.psi).real
        sy = np.vdot(self.psi, self.sigma_y @ self.psi).real
        sz = np.vdot(self.psi, self.sigma_z @ self.psi).real
        return np.array([sx, sy, sz], dtype=np.float32)




def build_agent(obs_shape, actions):
    model=tf.keras.models.Sequential([tf.keras.layers.Flatten(input_shape=obs_shape),
                                      tf.keras.layers.Dense(256, activation='relu'),
                                      tf.keras.layers.Dense(256, activation='relu'),

                                      tf.keras.layers.Dense(actions, activation='linear')])
    return model
    
#now define policy (it is a guideline on how the agent will behave during training). i choose
#epsilon-greedy, which performs a random action with probability epsilon, otherwise it performs
#the action which maximises the espected reward Q(s,a))
def epsilon_greedy_policy(state, actions, agent, epsilon=0.1):
    if(np.random.rand()<=epsilon):
        return np.random.choice(actions)#random action with Prob=epsilon
    else:
        q_values=agent.predict(state[np.newaxis], verbose=0)
        return np.argmax(q_values)

# Training loop
Nella prima versione, il training loop è stato definito senza l'ausilio di ulteriori librerie. Il training funzione come descritto sopra.


In [None]:
env = Qbitenvironment_discrete()
obs_shape = env.observation_space.shape
actions = env.action_space.n
from tensorflow.keras.models import load_model

#agent = build_agent(obs_shape, actions)               # online
agent = load_model("dqn_qubit_model_discrete_50steps_fixedrestartsovrapp.keras")   

target_agent = build_agent(obs_shape, actions)        # target 
agent.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3), loss='mse')
target_agent.set_weights(agent.get_weights())         

episodes = 1
update_target_every = 1
memory = deque(maxlen=20000)
batch_size = 10
gamma = 0.99

reward_history = []

for episode in range(episodes):
    state, info = env.reset()
    episode_reward = 0.0
    terminated = False
    truncated  = False

    while not (terminated or truncated):
        action = epsilon_greedy_policy(state, actions, agent)
        next_state, reward, terminated, truncated, _ = env.step(action)
        memory.append((state, action, reward, next_state, terminated, truncated))
        episode_reward += reward
        state = next_state

        if len(memory) >= batch_size:
            minibatch = random.sample(memory, batch_size)
            states_mb      = np.stack([mb[0] for mb in minibatch])
            actions_mb     = np.array([mb[1] for mb in minibatch])
            rewards_mb     = np.array([mb[2] for mb in minibatch])
            next_states_mb = np.stack([mb[3] for mb in minibatch])
            terminated_mb  = np.array([mb[4] for mb in minibatch], dtype=bool)
            truncated_mb   = np.array([mb[5] for mb in minibatch], dtype=bool)

            # current Q-values (online)
            q_current = agent.predict(states_mb, verbose=0)             # (B, A)

            # predicted Qs for next states (target)
            next_q_all = target_agent.predict(next_states_mb, verbose=0)  # (B, A)
            max_next_q = next_q_all.max(axis=1)                            # (B,)
            done = (terminated_mb | truncated_mb).astype(np.float32)

            # Bellman targets (reward is r if episode is finished, else it's predicted by best Q target)
            q_target_vec = rewards_mb + gamma * max_next_q * (1.0 - done)
            # update the q prediction for the selcted action only
            targets = q_current.copy()
            targets[np.arange(batch_size), actions_mb] = q_target_vec
            # train online network
            agent.fit(states_mb, targets, epochs=1, verbose=0)

    # after episode's end
    if (episode + 1) % update_target_every == 0:
        target_agent.set_weights(agent.get_weights())

    reward_history.append(episode_reward)
    if episode >= 10:
        print(f"Ep {episode:4d} | AvgR(10)={np.mean(reward_history[-10:]):.3f}")
    else:
        print(f"Ep {episode:4d} | R={episode_reward:.3f}")

agent.save("dqn_qubit_model_discrete_50steps_fixedrestartsovrapp.keras")


env.close()


In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tensorflow.keras.models import load_model


env = Qbitenvironment_discrete()
agent = load_model("dqn_qubit_model_discrete_30steps_restartrandom.keras")   


n_test_episodes = 100


all_actions_ux = []  
all_actions_uy = []   
rewards_per_step = [] 
episode_rewards = []  
test_steps=[]
trajs=[]
episode_final_rewards=[]

for ep in range(n_test_episodes):
    state, _ = env.reset()
    terminated = False
    truncated = False
    total_reward = 0.0

    ep_actions_ux = []
    ep_actions_uy = []
    ep_rewards = []
    ep_traj=[]
    step=0

    while not (terminated or truncated):
        # policy greedy (no epsilon in test)
        q_values = agent.predict(state[np.newaxis], verbose=0)
        action = int(np.argmax(q_values[0]))

        ux, uy = env.actions[action]
        next_state, reward, terminated, truncated, _ = env.step(action)

        # salva dati
        ep_actions_ux.append(ux)
        ep_actions_uy.append(uy)
        ep_rewards.append(reward)
        total_reward += reward
        step+=1
        state = next_state
        ep_traj.append(env._get_observation())
    # fine episodio: append alle liste globali
    all_actions_ux.append(ep_actions_ux)
    all_actions_uy.append(ep_actions_uy)
    rewards_per_step.append(ep_rewards)
    episode_rewards.append(total_reward)
    test_steps.append(step)
    trajs.append(ep_traj)
    episode_final_rewards.append(ep_rewards[-1])
    print(f"Ep {ep+1}/{n_test_episodes} | TotReward={total_reward:.3f} | final reward={ep_rewards[-1]:.3f}")

env.close()

def subplots(ax,x,y, title,xlabel,ylabel, label=None):
    ax.plot(x,y, '-o', label=label)
    ax.set_title(title)
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    if label is not None:
        ax.legend()
    ax.grid(True)

def scatterplot(ax,x,y, title,xlabel,ylabel, label=None):
    ax.scatter(x,y, label=label, alpha=0.7)
    ax.set_title(title)
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    if label is not None:
        ax.legend()
    ax.grid(True)



fig, ax = plt.subplots(3,2, figsize=(30,20))
ax=ax.flatten()
N=np.arange(n_test_episodes)
scatterplot(ax[0],N, episode_rewards , "total reward on test episodes", "episodes", "r")
subplots(ax[1],N, test_steps, "steps on test episodes", "episodes", "nsteps")
subplots(ax[2], np.arange(test_steps[-1]), rewards_per_step[-1], "reward in last episode", "#steps", "r")
subplots(ax[3],np.arange(test_steps[-1]),ep_actions_ux, "agent walk on last test episode", "# steps", "field", "ux" )
subplots(ax[3],np.arange(test_steps[-1]),ep_actions_uy, "agent walk on last test episode", "# steps", "field", "uy" )

for ep in range(n_test_episodes):

    subplots(ax[4],all_actions_ux[ep],all_actions_uy[ep], "agent trajectories", "ux", "uy" )
ax[4].axis("equal")
scatterplot(ax[5], N, episode_final_rewards, "final reward per episode", "#episode", "r")
ax[5].set_ylim(0,1)






plt.show()



In [None]:
import numpy as np
import matplotlib.pyplot as plt

def plot_bloch_from_concatenated(traj, ax, show_points=True, add_labels=False):


    traj = np.asarray(traj)
    if traj.ndim != 3 or traj.shape[2] != 3:
        raise ValueError("traj deve avere shape (nepisodi, nsteps, 3)")

    nepisodi = traj.shape[0]
    cmap = plt.get_cmap("tab20")

    for i in range(nepisodi):
        seg = traj[i]
        sx, sy, sz = seg.T
        color = cmap(i % 20)
        label = f"Ep {i+1}" if add_labels else None
        ax.plot(sx, sy, sz, linewidth=2, color=color, label=label)
        if show_points:
            ax.scatter(sx, sy, sz, s=12, color=color)

# prepara la sfera una volta
fig = plt.figure(figsize=(7,6))
ax = fig.add_subplot(111, projection="3d")

u = np.linspace(0, 2*np.pi, 80)
v = np.linspace(0, np.pi, 40)
xs = np.outer(np.cos(u), np.sin(v))
ys = np.outer(np.sin(u), np.sin(v))
zs = np.outer(np.ones_like(u), np.cos(v))
ax.plot_wireframe(xs, ys, zs, linewidth=0.3, alpha=0.3)

ax.text(0, 0, 1.1, r"$|0\rangle$", fontsize=14, ha='center')
ax.text(0, 0, -1.2, r"$|1\rangle$", fontsize=14, ha='center')
ax.set_xlabel(r'$\langle \sigma_x \rangle$')
ax.set_ylabel(r'$\langle \sigma_y \rangle$')
ax.set_zlabel(r'$\langle \sigma_z \rangle$')
ax.set_xlim([-1,1]); ax.set_ylim([-1,1]); ax.set_zlim([-1,1])
ax.set_box_aspect([1,1,1])
ax.set_title("Trajectory on Bloch's sphere (test episodes)")


plot_bloch_from_concatenated(trajs[-10:], ax, show_points=True, add_labels=True)

ax.legend()
plt.show()


---
# Parte 2: Spazio delle azioni continuo

In questa versione più raffinata, l'agente può scegliere il valore del prossimo impulso da un set continui di valori in $[-1,1]$. Questo cambiamento introduce una maggiore complessità nel training, e il modello DQN precedentemente adottato non è più adeguato: avendo un set infinito di azioni, diventa impossibile scegliere la successiva stimano la Q(s,a) per ogni possibile scelta.

## Training con modello Soft Actor Critic (SAC)

L'idea fondamentale del modello SAC è quella di utilizzare una **policy parametrica** $\pi_\theta(a|s)$, detta **actor**, che restituisce una distribuzione da cui campionare le azioni. Le azioni campionate vengono valutate da 2 reti neurali, i **critic**, che ne calcolano le Q-functions.
L'obiettivo è **massimizzare la ricompensa attesa e l'entropia della policy**, in modo da garantire esplorazione in ogni fase. Formalmente, si massimizza:
$$
J(\pi) = \mathbb{E}\!\Bigg[ \sum_{t=0}^\infty \gamma^t \Big( r(s_t,a_t) + \alpha \,\mathcal{H}(\pi(\cdot|s_t)) \Big) \Bigg],
\qquad
\mathcal{H}(\pi(\cdot|s)) = -\mathbb{E}_{a\sim\pi}[ \log \pi(a|s) ].

$$
dove $\alpha$ è il parametro, aggiornato iterativamente durante il training, che regola l'entropia (quanto esplorare).

Durante il training, i critic calcolano il target di Bellman "soft":
$$
y(r,s') = r + \gamma \,\mathbb{E}_{a' \sim \pi_\theta(\cdot|s')} 
\Big[ \min_i Q_{\phi_i}(s',a') - \alpha \log \pi_\theta(a'|s') \Big].

$$
Dove il min(Q) serve per ridurre l'everestimation bias

e minimizzano MSE tra stima e target:
$$
L_Q(\phi_i) = \mathbb{E}_{(s,a)\sim \mathcal{D}}
\Big[ \big(Q_{\phi_i}(s,a) - y(r,s')\big)^2 \Big].
$$

Per quanto riguarda l'actor, deve produrre azioni con Q alto, ma che mantengano entropia. L'update della policy avviene come segue:
$$
J_\pi(\theta) = \mathbb{E}_{s \sim \mathcal{D},\, a \sim \pi_\theta}
\Big[ \alpha \log \pi_\theta(a|s) - Q_\phi(s,a) \Big].
$$
Dopo ogni update si aggiorna il parametro di entropia, minimizzando la sua loss risetto a un valore target $\mathcal{H}_{\text{target}}$:
$$
L(\alpha) = \mathbb{E}_{a \sim \pi_\theta}
\Big[ -\alpha \, (\log \pi_\theta(a|s) + \mathcal{H}_{\text{target}}) \Big].
$$

### Training Loop

1. la policy campiona un'azione dalla distribuzione (gaussiana) generata
2. l'ambiente formisce reward e stato successivo
3. i Critic si aggiornano minimizzando la MSE delle loro predizioni sullo stato corrente rispetto al target di Bellman soft
4. la policy viene aggiornata per minimizzare J
5. il parametro $\alpha$ viene adattato minimizzandone la differenza rispetto a un target di eplorazione fissato

La libreria stable-baselines3 possiede un implementazione completa ed efficiente dell'agente SAC, che è quella utilizzata di seguito.

In [None]:
#parte 2: passo a spazio continuo
#define a gymnasium ambient
import gymnasium as gym
import tensorflow as tf
import random
from collections import deque

class Qbitenvironment_continuum(gym.Env):

    def __init__(self):
        super().__init__()

    #define possible actions: continuous tuning of fields
        self.action_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32)

     # Stato osservabile: ⟨σx⟩, ⟨σy⟩, ⟨σz⟩
        self.observation_space = gym.spaces.Box(low=-1.0, high=1.0, shape=(3,), dtype=np.float32)

        self.dt = 0.15  # passo temporale
        
        self.sigma_x = np.array([[0, 1], [1, 0]], dtype=np.complex128)
        self.sigma_y = np.array([[0, -1j], [1j, 0]], dtype=np.complex128)
        self.sigma_z = np.array([[1, 0], [0, -1]], dtype=np.complex128)

        self.psi_target = np.array([0.0, 1.0], dtype=np.complex128)  # es. |+⟩
        self.max_steps = 15
        self.reset()


        
    def reset(self, seed=None, options=None):
        #self.psi = np.array([1.0, 0.0], dtype=np.complex128)  # |0⟩
        #self.psi = np.array([1.0/np.sqrt(2), 1.0/np.sqrt(2.0)], dtype=np.complex128)  # quasi già in |+⟩
        psi = np.random.randn(2) + 1j*np.random.randn(2) #random initial state
        psi/=np.linalg.norm(psi)
        self.psi = psi
        obs = self._get_observation()
        self.current_step = 0
        return obs, {}

    def step(self, action):
        ux = np.clip(action[0], -1.0, 1.0)
        uy = np.clip(action[1], -1.0, 1.0)
        self.psi = Evolve(self.psi, ux, uy, self.dt)
        fidelity = Fidelity(self.psi_target, self.psi)
        obs = self._get_observation()
        self.current_step += 1
        #reward = -np.log(1 - fidelity + 1e-6)  # diverge quando fidelity → 1
        #reward=1/(1-fidelity+1e-6)
        reward=fidelity
        truncated=False
        terminated=False
       # print(f"norm_psi: {np.linalg.norm(self.psi):.4f}, reward: {reward:.4f}, psi={self.psi}")
        #print(ux,uy)
        if fidelity > 0.97:
            terminated=True
            
        if (self.current_step >= self.max_steps):
            truncated = True 
        return obs, reward, terminated, truncated, {}
    
    def _get_observation(self):
        sx = np.vdot(self.psi, self.sigma_x @ self.psi).real
        sy = np.vdot(self.psi, self.sigma_y @ self.psi).real
        sz = np.vdot(self.psi, self.sigma_z @ self.psi).real
        return np.array([sx, sy, sz], dtype=np.float32)




In [None]:
import tensorflow as tf
#sac by hand
env = Qbitenvironment_continuum()
obs_shape = env.observation_space.shape[0]
actions_shape = env.action_space.shape[0]
#define Q(s,a) networks
def build_Q(obs_shape, actions_shape):
     input_obs= tf.keras.layers.Input(shape=(obs_shape,))
     input_actions= tf.keras.layers.Input(shape=(actions_shape,))
     input=tf.keras.layers.Concatenate()([input_obs, input_actions])
     h1=tf.keras.layers.Dense(256, activation='relu')(input)
     h2=tf.keras.layers.Dense(256, activation='relu')(h1)
     Q=tf.keras.layers.Dense(1, activation='linear')(h2)
     model=tf.keras.Model(inputs=[input_obs, input_actions], outputs=Q)
     return model

#define policy network
def build_policy(obs_shape, actions_shape):
     inputs= tf.keras.layers.Input(shape=(obs_shape,))
     h1= tf.keras.layers.Dense(256, activation='relu')(inputs)
     h2=tf.keras.layers.Dense(256, activation='relu')(h1)
     mu=tf.keras.layers.Dense(actions_shape, activation='linear')(h2)#first output layer
     logsigma=tf.keras.layers.Dense(actions_shape, activation='linear')(h2)#second output layer
     model=tf.keras.Model(inputs=inputs, outputs=[mu,logsigma])
     return model

def sample_from_policy(mu, log_sigma):
    sigma = tf.exp(log_sigma)
    eps = tf.random.normal(shape=tf.shape(mu))
    a_prime = mu + sigma * eps
    a = tf.tanh(a_prime)
    #logπ(a)=logN(a′∣μ,σ2)−i∑​log(1−tanh2(ai′​)) (change of variables)
    # log N(a'|mu, sigma^2)
    log_prob_gauss = -0.5 * (
        tf.square((a_prime - mu) / (sigma + 1e-8)) +
        2.0 * log_sigma +
        tf.math.log(2.0 * np.pi)
    )
    log_prob_gauss = tf.reduce_sum(log_prob_gauss, axis=-1, keepdims=True)
    log_det_jac = tf.reduce_sum(
        tf.math.log(1.0 - tf.square(tf.tanh(a_prime)) + 1e-6),
        axis=-1, keepdims=True
    )

    log_pi = log_prob_gauss - log_det_jac
    return a, log_pi

#declaration of optimizers outside the function that calls them, otherwise they will reset their state whenever an opt step is called
optimizer_q1 = tf.keras.optimizers.Adam(learning_rate=3e-4)
optimizer_q2 = tf.keras.optimizers.Adam(learning_rate=3e-4)
optimizer_pi = tf.keras.optimizers.Adam(learning_rate=3e-4)
optimizer_alpha= tf.keras.optimizers.Adam(learning_rate=3e-4)
log_alpha = tf.Variable(0.0, trainable=True, dtype=tf.float32)#to guarantee alpha>=0
alpha = tf.exp(log_alpha)


def Train_Qs(Q1, Q2,state_mb, action_mb, targets):
    with tf.GradientTape(persistent=True) as tape:
     #calculate predictions
        predicted_q1s = Q1([state_mb, action_mb], training=True)
        predicted_q2s = Q2([state_mb, action_mb], training=True)
      #evaluate mse
        mse1= tf.reduce_mean(tf.square(predicted_q1s-targets))
        mse2= tf.reduce_mean(tf.square(predicted_q2s-targets))

      #evaluate gradients
    grad1= tape.gradient(mse1, Q1.trainable_variables)
    grad2= tape.gradient(mse2, Q2.trainable_variables)
      #perform one optimization step using adam
    optimizer_q1.apply_gradients(zip(grad1, Q1.trainable_variables))#apply_gradients want a[gradient, weights]->zip
    optimizer_q2.apply_gradients(zip(grad2, Q2.trainable_variables))
    del tape
    return mse1, mse2


def Train_policy(state_mb, Q1, Q2, policy, log_alpha):

    with tf.GradientTape() as tape:
        mu, logsigma = policy(state_mb, training=True)
        #evaluate action and logpi
        action, log_pi = sample_from_policy(mu, logsigma)
        #update temperature
        entropy = Update_alpha( log_alpha, log_pi)
        #evaluate best estimation of q for the sampled action
        q1=Q1([state_mb, action], training=False)
        q2=Q2([state_mb, action], training=False)
        q=tf.minimum(q1,q2)
        #evaluate loss
        loss=-(tf.reduce_mean(q-tf.exp(log_alpha)*log_pi))#negative loss, i want to maximize -loss
        #evaluate gradient
    grad=tape.gradient(loss, policy.trainable_variables)
        #perform optimization step
    optimizer_pi.apply_gradients(zip(grad, policy.trainable_variables))
    return loss, entropy
    
    
def update_target_network(Q, target_Q, rho=0.995):
    #rho is a parameter that handles the "velocity" of updating the target Qs with the main Qs parameters. 
    #e.g. rho=0-> copy all the parameters of Q into the target network. 
    #by doing so, the target network weights become a mobile average of main network's (stabilize bootstrap of Qs during training)
    main_weights=Q.get_weights()
    target_weights=target_Q.get_weights()
    updated_target_weights= [rho * tw + (1 - rho) * mw for mw, tw in zip(main_weights, target_weights)]
    target_Q.set_weights(updated_target_weights)


def Update_alpha(log_alpha, logpi, Htarget=-2): #from the paper: Htarget=-dim(action space)

    with tf.GradientTape() as tape:
        alpha=tf.exp(log_alpha)
        loss=-tf.reduce_mean(alpha*(logpi+Htarget))
    grad =tape.gradient(loss, [log_alpha])
    optimizer_alpha.apply_gradients(zip(grad, [log_alpha]))
    return loss












In [None]:
import random
import os

#training loop
#define two Q functions
Q1=build_Q(obs_shape, actions_shape)
Q2=build_Q(obs_shape, actions_shape)
#define target copies
target_Q1=build_Q(obs_shape, actions_shape)
target_Q2=build_Q(obs_shape, actions_shape)
target_Q1.set_weights(Q1.get_weights())
target_Q2.set_weights(Q2.get_weights())
#define policy
policy=build_policy(obs_shape, actions_shape)
#initialize replay buffer
"""
save_dir = "sac_checkpoints.keras"
os.makedirs(save_dir, exist_ok=True)
policy = tf.keras.models.load_model(os.path.join(save_dir, "policy_50steps_restartfixedto0.keras"))
Q1 = tf.keras.models.load_model(os.path.join(save_dir, "Q1_50steps_restartfixedto0.keras"))
Q2 = tf.keras.models.load_model(os.path.join(save_dir, "Q2_50steps_restartfixedto0.keras"))
target_Q1 = tf.keras.models.load_model(os.path.join(save_dir, "target_Q1_50steps_restartfixedto0.keras"))
target_Q2 = tf.keras.models.load_model(os.path.join(save_dir, "target_Q2_50steps_restartfixedto0.keras"))

# carica log_alpha
log_alpha = tf.Variable(np.load(os.path.join(save_dir, "log_alpha_50steps_restartfixedto0.npy")),
                        trainable=True, dtype=tf.float32)
                     """
replay_buffer = deque(maxlen=200000)
critic_losses1=[]
critic_losses2=[]
actor_losses=[]
entropy_losses=[]
reward_history=[]
#training loop
episodes = 8000
update_target_every = 1
batch_size = 128
gamma = 0.99


for episode in range(episodes):
    state, info= env.reset()
    terminated= False
    truncated = False
    episode_reward=0.0
    steps=0
    while not (terminated or truncated):
        mu, logsigma=policy(state[np.newaxis], training=False)
        action, _=sample_from_policy(mu,logsigma)
        next_state, reward, terminated, truncated, _=env.step(action.numpy()[0])
        done = terminated or truncated
        replay_buffer.append((state, action, reward, next_state, done) )
        state=next_state
        episode_reward+=reward
        steps+=1
        #training
        if(len(replay_buffer)>=batch_size):
            #random sample a batch of transitions from replay buffer
            minibatch= random.sample(replay_buffer, batch_size)
            #targets yi:
            state_mb, action_mb, reward_mb, next_state_mb, done_mb=map(np.array, zip(*minibatch))
            reward_mb = tf.reshape(tf.convert_to_tensor(reward_mb, dtype=tf.float32), (-1,1))
            done_mb   = tf.reshape(tf.convert_to_tensor(done_mb, dtype=tf.float32), (-1,1))
            state_mb  = tf.convert_to_tensor(state_mb, dtype=tf.float32)
            action_mb = tf.convert_to_tensor(action_mb, dtype=tf.float32)
            action_mb = tf.squeeze(action_mb, axis=1)  # (64,1,2) → (64,2)

            next_state_mb = tf.convert_to_tensor(next_state_mb, dtype=tf.float32)

            #evaluate target Qs predictions and take the min
            mutrial, logsigmatrial = policy(next_state_mb, training= False)
            trial_action, log_pi = sample_from_policy(mutrial, logsigmatrial)
            target_qs1=target_Q1([next_state_mb, trial_action], training= False)
            target_qs2=target_Q2([next_state_mb, trial_action], training=False)
            #evaluate targets
            y=reward_mb+gamma*(1-done_mb)*(tf.minimum(target_qs1, target_qs2)- tf.exp(log_alpha)*log_pi)
            #update Q networks
            mse1, mse2= Train_Qs(Q1, Q2, state_mb, action_mb, y)
            critic_losses1.append(mse1)
            critic_losses2.append(mse2)
            #update policy network (and temperature)
            actor_loss, entropy_loss=Train_policy(state_mb, Q1, Q2, policy, log_alpha)
            actor_losses.append(actor_loss)
            entropy_losses.append(entropy_loss)
            #update target networks
            if(steps)%update_target_every==0:
                update_target_network(Q1, target_Q1)
                update_target_network(Q2, target_Q2)
 

    reward_history.append(episode_reward)
    if episode >= 10:
        print(f"Ep {episode:4d} | R={episode_reward:.3f}| AvgR(10)={np.mean(reward_history[-10:]):.3f}| steps={steps}")
    else:
        print(f"Ep {episode:4d} | R={episode_reward:.3f}")





# salvataggio modelli
save_dir = "sac_checkpoints.keras"
os.makedirs(save_dir, exist_ok=True)
policy.save(os.path.join(save_dir, "policy_50steps_restartfixedto0_final.keras"))
Q1.save(os.path.join(save_dir, "Q1_50steps_restartfixedto0_final.keras"))
Q2.save(os.path.join(save_dir, "Q2_50steps_restartfixedto0_final.keras"))
target_Q1.save(os.path.join(save_dir, "target_Q1_50steps_restartfixedto0_final.keras"))
target_Q2.save(os.path.join(save_dir, "target_Q2_50steps_restartfixedto0_final.keras"))

# salvataggio log_alpha (variabile standalone)
np.save(os.path.join(save_dir, "log_alpha_50steps_restartfixedto0_final.npy"), log_alpha.numpy())





In [None]:
# carica modelli
import os
import matplotlib.pyplot as plt
save_dir = "sac_checkpoints.keras"
os.makedirs(save_dir, exist_ok=True)

policy = tf.keras.models.load_model(os.path.join(save_dir, "policy_15steps_restartrandom.keras"))



def sample_from_policy_eval(mu, log_sigma, eps=1e-6):
    sigma = tf.exp(tf.clip_by_value(log_sigma, -5.0, 2.0))
    epsn  = tf.random.normal(shape=tf.shape(mu))
    a_p   = mu + sigma * epsn
    a     = tf.tanh(a_p)
    return a


total_rewards = []
n_test_episodes = 50


all_actions_ux = []  
all_actions_uy = []   
rewards_per_step = []
episode_rewards = []  
test_steps=[]
trajs=[]
episode_final_rewards=[]
for ep in range(n_test_episodes):
    state, info = env.reset()
    done = False
    ep_reward = 0.0
    step = 0
    ep_actions_ux = []
    ep_actions_uy = []
    ep_rewards = []
    ep_traj=[]
    total_reward=0.0

    while not done:
        mu, logsigma = policy(state[np.newaxis], training=False)
        action = sample_from_policy_eval(mu, logsigma).numpy()[0]
        ux=action[0]
        uy=action[1]

        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        ep_actions_ux.append(ux)
        ep_actions_uy.append(uy)
        ep_rewards.append(reward)
        total_reward += reward
        step+=1
        state = next_state
        ep_traj.append(env._get_observation())


    all_actions_ux.append(ep_actions_ux)
    all_actions_uy.append(ep_actions_uy)
    rewards_per_step.append(ep_rewards)
    episode_rewards.append(total_reward)
    test_steps.append(step)
    trajs.append(ep_traj)
    episode_final_rewards.append(ep_rewards[-1])

    
    print(f"Ep {ep+1}/{n_test_episodes} | TotReward={total_reward:.3f}")
    total_rewards.append(ep_reward)
    print(f"[TEST] Ep {ep+1}/{n_test_episodes} | R={ep_reward:.3f} final reward={ep_rewards[-1]:.3f}| steps={step}")
avgR = np.mean(total_rewards)
print(f"[TEST] AvgR over {n_test_episodes} episodes = {avgR:.3f}")




def subplots(ax,x,y, title,xlabel,ylabel, label=None):
    ax.plot(x,y, '-o', label=label)
    ax.set_title(title)
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    if label is not None:
        ax.legend()
    ax.grid(True)



def scatterplot(ax,x,y, title,xlabel,ylabel, label=None):
    ax.scatter(x,y, label=label, alpha=0.7)
    ax.set_title(title)
    ax.set_xlabel(xlabel)
    ax.set_ylabel(ylabel)
    if label is not None:
        ax.legend()
    ax.grid(True)



fig, ax = plt.subplots(3,4, figsize=(30,20))
ax=ax.flatten()
N=np.arange(n_test_episodes)
scatterplot(ax[0],N, episode_rewards , "total reward on test episodes", "episodes", "r")
subplots(ax[1],N, test_steps, "steps on test episodes", "episodes", "nsteps")
subplots(ax[2], np.arange(test_steps[-1]), rewards_per_step[-1], "reward in last episode", "#steps", "r")
subplots(ax[3],np.arange(test_steps[-1]),ep_actions_ux, "agent walk on last test episode", "# steps", "field", "ux" )
subplots(ax[3],np.arange(test_steps[-1]),ep_actions_uy, "agent walk on last test episode", "# steps", "field", "uy" )

for ep in range(n_test_episodes):

    subplots(ax[4],all_actions_ux[ep],all_actions_uy[ep], "agent trajectories", "ux", "uy" )
ax[4].axis("equal")
scatterplot(ax[5], N, episode_final_rewards, "final reward per episode", "#episode", "r")
ax[5].set_ylim(0,1)
subplots(ax[6], np.arange(len(actor_losses)), actor_losses, "actor loss during training", "episode", "Loss")
subplots(ax[7], np.arange(len(entropy_losses)), entropy_losses, "entropy loss during training", "episode", "Loss")
scatterplot(ax[8], np.arange(len(critic_losses1)), critic_losses1, "critic loss during training", "episode", "Loss", "Q1")
scatterplot(ax[8], np.arange(len(critic_losses2)), critic_losses2, "critic loss during training", "episode", "Loss", "Q2")
scatterplot(ax[10], np.arange(len(reward_history)), reward_history, "reward during training", "episode", "reward")





plt.show()



In [None]:
from stable_baselines3 import SAC
from stable_baselines3.common.env_checker import check_env

env = Qbitenvironment_continuum()
check_env(env)  # controlla correttezza spazi

policy_kwargs = dict(net_arch=[256, 256])
history = HistoryCallback()
model = SAC(
    "MlpPolicy",
    env,
    verbose=1,
    learning_starts=10_000,
    buffer_size=200_000,
    batch_size=128,
    learning_rate=3e-4,
    train_freq=1,
    gradient_steps=1,
    tau=0.005,
    ent_coef="auto",
    policy_kwargs=policy_kwargs,
    
)

model.learn(total_timesteps=2_000_000, callback=history)
model.save("sac_qubit_15steps_randominit")

env.close()
