In [None]:
import pennylane as qml
import torch
import torch.nn as nn
import gymnasium as gym
import numpy as np
import time

try:
    dev = qml.device("lightning.gpu", wires=1)
except:
    dev = qml.device("default.qubit", wires=1)

class QuantumPolicy(nn.Module):
    def __init__(self):
        super().__init__()
        self.theta = nn.Parameter(torch.tensor(0.01))

    def forward(self, obs):
        x = torch.clamp(obs[:, 0], -1.0, 1.0) * np.pi

        @qml.qnode(dev, interface="torch")
        def circuit(x_, theta_):
            qml.RX(x_, wires=0)
            qml.RY(theta_, wires=0)
            return qml.expval(qml.PauliZ(0))

        output = torch.stack([circuit(x[i], self.theta) for i in range(len(x))])
        probs = torch.sigmoid(output)
        return probs

env = gym.make("CartPole-v1", render_mode="human")
policy = QuantumPolicy()
optimizer = torch.optim.Adam(policy.parameters(), lr=0.01)

episodes = 100
gamma = 0.99

for episode in range(episodes):
    obs, _ = env.reset()
    done = False
    rewards = []
    log_probs = []

    while not done:
        obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
        probs = policy(obs_tensor)

        m = torch.distributions.Bernoulli(probs)
        action = m.sample()
        log_prob = m.log_prob(action)

        obs, reward, terminated, truncated, _ = env.step(int(action.item()))
        done = terminated or truncated

        rewards.append(reward)
        log_probs.append(log_prob)

        time.sleep(0.02)

    returns = []
    G = 0
    for r in reversed(rewards):
        G = r + gamma * G
        returns.insert(0, G)
    returns = torch.tensor(returns)
    returns = (returns - returns.mean()) / (returns.std() + 1e-9)

    loss = -torch.sum(torch.stack(log_probs) * returns)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    print(f"Episode {episode+1:3d}: Total Reward = {sum(rewards):.0f}")

env.close()

In [None]:
import pennylane as qml
import torch
import torch.nn as nn
import gymnasium as gym
import numpy as np
import time
import imageio

try:
    dev = qml.device("lightning.gpu", wires=1)
except:
    dev = qml.device("default.qubit", wires=1)

class QuantumPolicy(nn.Module):
    def __init__(self):
        super().__init__()
        self.theta = nn.Parameter(torch.tensor(0.01))

    def forward(self, obs):
        x = torch.clamp(obs[:, 0], -1.0, 1.0) * np.pi

        @qml.qnode(dev, interface="torch")
        def circuit(x_, theta_):
            qml.RX(x_, wires=0)
            qml.RY(theta_, wires=0)
            return qml.expval(qml.PauliZ(0))

        output = torch.stack([circuit(x[i], self.theta) for i in range(len(x))])
        probs = torch.sigmoid(output)
        return probs

env = gym.make("CartPole-v1", render_mode="rgb_array")
policy = QuantumPolicy()
optimizer = torch.optim.Adam(policy.parameters(), lr=0.01)

episodes = 100
gamma = 0.99

frames = []

for episode in range(episodes):
    obs, _ = env.reset()
    done = False
    rewards = []
    log_probs = []

    while not done:
        obs_tensor = torch.tensor(obs, dtype=torch.float32).unsqueeze(0)
        probs = policy(obs_tensor)

        m = torch.distributions.Bernoulli(probs)
        action = m.sample()
        log_prob = m.log_prob(action)

        obs, reward, terminated, truncated, _ = env.step(int(action.item()))
        done = terminated or truncated

        rewards.append(reward)
        log_probs.append(log_prob)

        frame = env.render()
        frames.append(frame)
        
        time.sleep(0.02)

    returns = []
    G = 0
    for r in reversed(rewards):
        G = r + gamma * G
        returns.insert(0, G)
    returns = torch.tensor(returns)
    returns = (returns - returns.mean()) / (returns.std() + 1e-9)

    loss = -torch.sum(torch.stack(log_probs) * returns)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    print(f"Episode {episode+1:3d}: Total Reward = {sum(rewards):.0f}")

env.close()

gif_filename = "cartpole_training1.gif"
imageio.mimsave(gif_filename, frames, duration=0.15)
print(f"GIF saved as {gif_filename}")


In [1]:
import pennylane as qml
import torch
import torch.nn as nn
import numpy as np

class PQC(nn.Module):
    def __init__(self, n_qubits, n_depth):
        super().__init__()
        self.n_qubits = n_qubits
        self.n_depth = n_depth
        self.s = nn.Parameter(torch.randn(n_qubits))
        self.theta = nn.Parameter(torch.randn(n_qubits))
        self.w = nn.Parameter(torch.randn(n_qubits))
        self.beta = nn.Parameter(torch.tensor(1.0))

    def forward(self, s):
        @qml.qnode(dev, interface="torch")
        def circuit(s, theta, w):
            for depth in range(self.n_depth):
                for i in range(self.n_qubits):
                    qml.Hadamard(wires=i)
                    qml.RZ(s[i], wires=i)
                    qml.RY(s[i], wires=i)
                for i in range(self.n_qubits):
                    qml.RX(theta[i], wires=i)
            return [qml.expval(qml.PauliZ(i)) for i in range(self.n_qubits)]

        raw_expectations = circuit(s, self.theta, self.w)
        
        raw_expectations_tensor = torch.stack(raw_expectations)
        weighted_expectations = raw_expectations_tensor * self.w
        
        softmax_output = torch.softmax(weighted_expectations, dim=0)

        return raw_expectations_tensor, softmax_output

n_qubits = 4
n_depth = 3
dev = qml.device("lightning.gpu", wires=n_qubits)

pqc = PQC(n_qubits=n_qubits, n_depth=n_depth)

s = torch.tensor([0.5, -0.2, 0.1, -0.3], dtype=torch.float32)

raw_pqc_output, softmax_pqc_output = pqc(s)

print("RAW-PQC Output (Expectation of Pauli-Z operators):", raw_pqc_output)
print("SOFTMAX-PQC Output (Softmax of weighted Hermitian operators):", softmax_pqc_output)

RAW-PQC Output (Expectation of Pauli-Z operators): tensor([ 0.4432,  0.1305, -0.0617,  0.8447], grad_fn=<StackBackward0>)
SOFTMAX-PQC Output (Softmax of weighted Hermitian operators): tensor([0.3485, 0.2796, 0.2915, 0.0804], grad_fn=<SoftmaxBackward0>)


In [None]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import pennylane as qml
from typing import List, Tuple

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class QuantumCircuit:
    def __init__(self, n_qubits: int, n_layers: int):
        self.n_qubits = n_qubits
        self.n_layers = n_layers
        self.dev = qml.device("default.qubit", wires=n_qubits)
        self.qnode = qml.QNode(self.circuit, self.dev, interface="torch")
        self.params = nn.Parameter(
            torch.randn(n_layers, n_qubits, 3, requires_grad=True, device=device)
        )
    
    def circuit(self, inputs, params):
        for i in range(self.n_qubits):
            qml.RX(inputs[i], wires=i)
        for layer in range(self.n_layers):
            for qubit in range(self.n_qubits):
                qml.RX(params[layer, qubit, 0], wires=qubit)
                qml.RY(params[layer, qubit, 1], wires=qubit)
                qml.RZ(params[layer, qubit, 2], wires=qubit)
            for qubit in range(self.n_qubits - 1):
                qml.CNOT(wires=[qubit, qubit + 1])
        return [qml.expval(qml.PauliZ(i)) for i in range(self.n_qubits)]

class QuantumPolicyNetwork(nn.Module):
    def __init__(self, n_qubits: int, n_layers: int, input_dim: int, output_dim: int):
        super().__init__()
        self.n_qubits = n_qubits
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.pre_process = nn.Linear(input_dim, n_qubits)
        self.quantum_circuit = QuantumCircuit(n_qubits, n_layers)
        self.post_process = nn.Linear(n_qubits, output_dim)
        
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x.float()
        x = torch.tanh(self.pre_process(x))
        q_out = self.quantum_circuit.qnode(x.detach().cpu().numpy(), 
                                           self.quantum_circuit.params.detach().cpu().numpy())
        q_out = torch.tensor(q_out, dtype=torch.float32, device=device)
        action_probs = F.softmax(self.post_process(q_out), dim=-1)
        return action_probs
    
class QuantumAdvantagePolicy:
    def __init__(self, env_name: str, n_qubits: int = 4, n_layers: int = 2):
        self.env = gym.make(env_name)
        if isinstance(self.env.observation_space, gym.spaces.Box):
            self.input_dim = np.prod(self.env.observation_space.shape)
        elif isinstance(self.env.observation_space, gym.spaces.Discrete):
            self.input_dim = self.env.observation_space.n
        else:
            raise ValueError(f"Unsupported observation space type: {type(self.env.observation_space)}")
        self.output_dim = self.env.action_space.n
        self.policy = QuantumPolicyNetwork(
            n_qubits=n_qubits,
            n_layers=n_layers,
            input_dim=self.input_dim,
            output_dim=self.output_dim
        ).to(device)
        self.optimizer = torch.optim.Adam(self.policy.parameters(), lr=0.01)

    def select_action(self, state) -> Tuple[int, torch.Tensor]:
        if isinstance(state, tuple):
            state = state
        if isinstance(state, list):
            state = np.array(state)
        state = np.asarray(state).reshape(-1)
        state = torch.FloatTensor(state).to(device)
        probs = self.policy(state)
        action_dist = torch.distributions.Categorical(probs)
        action = action_dist.sample()
        return action.item(), action_dist.log_prob(action)
    
    def train_episode(self) -> float:
        state, _ = self.env.reset()
        episode_reward = 0
        episode_log_probs = []
        rewards = []
        done = False
        truncated = False
        while not (done or truncated):
            action, log_prob = self.select_action(state)
            next_state, reward, done, truncated, _ = self.env.step(action)
            episode_log_probs.append(log_prob)
            rewards.append(reward)
            episode_reward += reward
            state = next_state
        returns = self._calculate_returns(rewards, gamma=0.99)
        policy_loss = self._update_policy(episode_log_probs, returns)
        return episode_reward
    
    def _calculate_returns(self, rewards: List[float], gamma: float) -> torch.Tensor:
        returns = []
        R = 0
        for r in reversed(rewards):
            R = r + gamma * R
            returns.insert(0, R)
        returns = torch.tensor(returns, device=device)
        returns = (returns - returns.mean()) / (returns.std() + 1e-8)
        return returns
    
    def _update_policy(self, log_probs: List[torch.Tensor], returns: torch.Tensor) -> float:
        policy_loss = []
        for log_prob, R in zip(log_probs, returns):
            policy_loss.append(-log_prob * R)
        policy_loss = torch.stack(policy_loss).sum()
        self.optimizer.zero_grad()
        policy_loss.backward()
        self.optimizer.step()
        return policy_loss.item()

def train_quantum_policy(env_name: str, episodes: int = 1000):
    quantum_policy = QuantumAdvantagePolicy(env_name)
    for episode in range(episodes):
        episode_reward = quantum_policy.train_episode()
        if episode % 10 == 0:
            print(f"Episode {episode}, Reward: {episode_reward}")

if __name__ == "__main__":
    env_name = "CartPole-v1"
    train_quantum_policy(env_name)
