<a href="https://colab.research.google.com/github/OneFineStarstuff/OneFineStardust/blob/main/_CNN_QRL_Integration.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
pip install optuna pennylane

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import models, transforms
from torchvision.transforms import functional as F
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import optuna
import random
import pennylane as qml
from pennylane import numpy as np

# Placeholder for loading dataset
train_images = torch.randn(100, 3, 64, 64)  # Example: 100 images, 3 channels, 64x64 resolution
train_labels = torch.randint(0, 5, (100,))  # Example: 100 labels, 5 classes
test_images = torch.randn(20, 3, 64, 64)   # Example: 20 test images
test_labels = torch.randint(0, 5, (20,))   # Example: 20 test labels

# Custom transform function
def tensor_transform(image):
    image = F.rotate(image, angle=30)
    if torch.rand(1) < 0.5:
        image = F.hflip(image)
    if torch.rand(1) < 0.5:
        image = F.vflip(image)
    return F.resized_crop(image, top=0, left=0, height=64, width=64, size=(64, 64))

# Custom Dataset class
class GalaxyDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Improved CNN with Pre-trained Model
class GalaxyCNN(nn.Module):
    def __init__(self):
        super(GalaxyCNN, self).__init__()
        self.resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
        self.resnet.fc = nn.Linear(self.resnet.fc.in_features, 5)  # Assuming 5 classes
        for param in self.resnet.parameters():  # Freeze layers
            param.requires_grad = False
        for param in self.resnet.fc.parameters():  # Train only the final layers
            param.requires_grad = True

    def forward(self, x):
        return self.resnet(x)

# Quantum Policy for Reinforcement Learning
n_qubits = 4
n_layers = 4
dev = qml.device("default.qubit", wires=n_qubits)

@qml.qnode(dev, interface='torch')
def quantum_policy(state, weights):
    qml.AngleEmbedding(state, wires=range(n_qubits))
    for i in range(n_layers):
        qml.BasicEntanglerLayers(weights[i], wires=range(n_qubits))
    return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

class ClassicalNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(ClassicalNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = x.unsqueeze(1)
        x, _ = self.lstm(x)
        return self.fc2(x[:, -1, :])

class QuantumPolicy(nn.Module):
    def __init__(self, n_qubits, n_layers):
        super(QuantumPolicy, self).__init__()
        self.weights = nn.Parameter(0.01 * torch.randn(n_layers, n_qubits, n_qubits))
        self.classical_nn = ClassicalNN(input_dim=4, hidden_dim=128, output_dim=n_qubits)

    def forward(self, x):
        classical_output = self.classical_nn(x)
        quantum_input = classical_output.detach().numpy()
        quantum_output = quantum_policy(quantum_input, self.weights)
        return torch.tensor(quantum_output, requires_grad=True)

class CriticNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim=1):
        super(CriticNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = x.unsqueeze(1)
        x, _ = self.lstm(x)
        return self.fc2(x[:, -1, :])

# Hyperparameter tuning with Optuna
def objective(trial):
    actor_lr = trial.suggest_float('actor_lr', 1e-5, 1e-2, log=True)
    critic_lr = trial.suggest_float('critic_lr', 1e-5, 1e-2, log=True)
    actor_optimizer = optim.Adam(actor.parameters(), lr=actor_lr)
    critic_optimizer = optim.Adam(critic.parameters(), lr=critic_lr)
    n_epochs = 20
    gamma = 0.99
    total_loss = 0

    for epoch in range(n_epochs):
        rewards, log_probs, state_values = [], [], []
        for _ in range(10):
            state = torch.tensor([[random.uniform(-1, 1) for _ in range(4)]], dtype=torch.float32)
            policy_output = actor(state)
            action_prob = torch.softmax(policy_output, dim=-1)
            action = torch.multinomial(action_prob, num_samples=1).item()
            reward = random.uniform(-1, 1)
            value = critic(state).squeeze()
            state_values.append(value)
            log_probs.append(torch.log(action_prob.squeeze()[action]))
            rewards.append(reward)

        discounted_rewards = []
        R = 0
        for r in reversed(rewards):
            R = r + gamma * R
            discounted_rewards.insert(0, R)

        discounted_rewards = torch.tensor(discounted_rewards, dtype=torch.float32)
        discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-5)

        policy_loss, value_loss = [], []
        for log_prob, reward, value in zip(log_probs, discounted_rewards, state_values):
            advantage = reward - value.item()
            policy_loss.append(-log_prob * advantage)
            value_loss.append(nn.MSELoss()(value, torch.tensor([reward])))
        total_loss += torch.stack(policy_loss).sum().item() + torch.stack(value_loss).sum().item()

        policy_loss = torch.stack(policy_loss).sum()
        value_loss = torch.stack(value_loss).sum()
        policy_loss.backward()
        value_loss.backward()
        actor_optimizer.step()
        critic_optimizer.step()

    return total_loss / n_epochs

actor = QuantumPolicy(n_qubits, n_layers)
critic = CriticNN(input_dim=4, hidden_dim=128)
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=10)

print("Best Hyperparameters:", study.best_params)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import models, transforms
from torchvision.transforms import functional as F
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import optuna
import random
import pennylane as qml
from pennylane import numpy as np

# Placeholder for loading dataset
train_images = torch.randn(100, 3, 64, 64)  # Example: 100 images, 3 channels, 64x64 resolution
train_labels = torch.randint(0, 5, (100,))  # Example: 100 labels, 5 classes
test_images = torch.randn(20, 3, 64, 64)   # Example: 20 test images
test_labels = torch.randint(0, 5, (20,))   # Example: 20 test labels

# Custom transform function
def tensor_transform(image):
    image = F.rotate(image, angle=30)
    if torch.rand(1) < 0.5:
        image = F.hflip(image)
    if torch.rand(1) < 0.5:
        image = F.vflip(image)
    return F.resized_crop(image, top=0, left=0, height=64, width=64, size=(64, 64))

# Custom Dataset class
class GalaxyDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Improved CNN with Pre-trained Model
class GalaxyCNN(nn.Module):
    def __init__(self):
        super(GalaxyCNN, self).__init__()
        self.resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
        self.resnet.fc = nn.Linear(self.resnet.fc.in_features, 5)  # Assuming 5 classes
        for param in self.resnet.parameters():  # Freeze layers
            param.requires_grad = False
        for param in self.resnet.fc.parameters():  # Train only the final layers
            param.requires_grad = True

    def forward(self, x):
        return self.resnet(x)

# Quantum Policy for Reinforcement Learning
n_qubits = 4
n_layers = 4
dev = qml.device("default.qubit", wires=n_qubits)

@qml.qnode(dev, interface='torch')
def quantum_policy(state, weights):
    qml.AngleEmbedding(state, wires=range(n_qubits))
    for i in range(n_layers):
        qml.BasicEntanglerLayers(weights[i], wires=range(n_qubits))
    return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

class ClassicalNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(ClassicalNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = x.unsqueeze(1)
        x, _ = self.lstm(x)
        return self.fc2(x[:, -1, :])

class QuantumPolicy(nn.Module):
    def __init__(self, n_qubits, n_layers):
        super(QuantumPolicy, self).__init__()
        self.weights = nn.Parameter(0.01 * torch.randn(n_layers, n_qubits, n_qubits))
        self.classical_nn = ClassicalNN(input_dim=4, hidden_dim=128, output_dim=n_qubits)

    def forward(self, x):
        classical_output = self.classical_nn(x)
        quantum_input = classical_output.detach().numpy()
        quantum_output = quantum_policy(quantum_input, self.weights)
        return torch.tensor(quantum_output, requires_grad=True)

class CriticNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim=1):
        super(CriticNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = x.unsqueeze(1)
        x, _ = self.lstm(x)
        return self.fc2(x[:, -1, :])

# Hyperparameter tuning with Optuna
def objective(trial):
    actor_lr = trial.suggest_float('actor_lr', 1e-5, 1e-2, log=True)
    critic_lr = trial.suggest_float('critic_lr', 1e-5, 1e-2, log=True)
    actor_optimizer = optim.Adam(actor.parameters(), lr=actor_lr)
    critic_optimizer = optim.Adam(critic.parameters(), lr=critic_lr)
    n_epochs = 20
    gamma = 0.99
    total_loss = 0

    for epoch in range(n_epochs):
        rewards, log_probs, state_values = [], [], []
        for _ in range(10):
            state = torch.tensor([[random.uniform(-1, 1) for _ in range(4)]], dtype=torch.float32)
            policy_output = actor(state)
            action_prob = torch.softmax(policy_output, dim=-1)
            action = torch.multinomial(action_prob, num_samples=1).item()
            reward = random.uniform(-1, 1)
            value = critic(state).squeeze()
            state_values.append(value)
            log_probs.append(torch.log(action_prob.squeeze()[action]))
            rewards.append(reward)

        discounted_rewards = []
        R = 0
        for r in reversed(rewards):
            R = r + gamma * R
            discounted_rewards.insert(0, R)

        discounted_rewards = torch.tensor(discounted_rewards, dtype=torch.float32)
        discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-5)

        policy_loss, value_loss = [], []
        for log_prob, reward, value in zip(log_probs, discounted_rewards, state_values):
            advantage = reward - value.item()
            policy_loss.append(-log_prob * advantage)
            value_loss.append(nn.MSELoss()(value, torch.tensor([reward])))
        total_loss += torch.stack(policy_loss).sum().item() + torch.stack(value_loss).sum().item()

        policy_loss = torch.stack(policy_loss).sum()
        value_loss = torch.stack(value_loss).sum()
        policy_loss.backward()
        value_loss.backward()
        actor_optimizer.step()
        critic_optimizer.step()

    return total_loss / n_epochs

actor = QuantumPolicy(n_qubits, n_layers)
critic = CriticNN(input_dim=4, hidden_dim=128)
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=10)

print("Best Hyperparameters:", study.best_params)

# Use the best hyperparameters to train the final model
best_actor_lr = study.best_params['actor_lr']
best_critic_lr = study.best_params['critic_lr']
actor_optimizer = optim.Adam(actor.parameters(), lr=best_actor_lr)
critic_optimizer = optim.Adam(critic.parameters(), lr=best_critic_lr)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import models, transforms
from torchvision.transforms import functional as F
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import optuna
import random
import pennylane as qml
from pennylane import numpy as np

# Placeholder for loading dataset
train_images = torch.randn(100, 3, 64, 64)  # Example: 100 images, 3 channels, 64x64 resolution
train_labels = torch.randint(0, 5, (100,))  # Example: 100 labels, 5 classes
test_images = torch.randn(20, 3, 64, 64)   # Example: 20 test images
test_labels = torch.randint(0, 5, (20,))   # Example: 20 test labels

# Custom transform function
def tensor_transform(image):
    image = F.rotate(image, angle=30)
    if torch.rand(1) < 0.5:
        image = F.hflip(image)
    if torch.rand(1) < 0.5:
        image = F.vflip(image)
    return F.resized_crop(image, top=0, left=0, height=64, width=64, size=(64, 64))

# Custom Dataset class
class GalaxyDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Improved CNN with Pre-trained Model
class GalaxyCNN(nn.Module):
    def __init__(self):
        super(GalaxyCNN, self).__init__()
        self.resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
        self.resnet.fc = nn.Linear(self.resnet.fc.in_features, 5)  # Assuming 5 classes
        for param in self.resnet.parameters():  # Freeze layers
            param.requires_grad = False
        for param in self.resnet.fc.parameters():  # Train only the final layers
            param.requires_grad = True

    def forward(self, x):
        return self.resnet(x)

# Quantum Policy for Reinforcement Learning
n_qubits = 4
n_layers = 4
dev = qml.device("default.qubit", wires=n_qubits)

@qml.qnode(dev, interface='torch')
def quantum_policy(state, weights):
    qml.AngleEmbedding(state, wires=range(n_qubits))
    for i in range(n_layers):
        qml.BasicEntanglerLayers(weights[i], wires=range(n_qubits))
    return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

class ClassicalNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(ClassicalNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = x.unsqueeze(1)
        x, _ = self.lstm(x)
        return self.fc2(x[:, -1, :])

class QuantumPolicy(nn.Module):
    def __init__(self, n_qubits, n_layers):
        super(QuantumPolicy, self).__init__()
        self.weights = nn.Parameter(0.01 * torch.randn(n_layers, n_qubits, n_qubits))
        self.classical_nn = ClassicalNN(input_dim=4, hidden_dim=128, output_dim=n_qubits)

    def forward(self, x):
        classical_output = self.classical_nn(x)
        quantum_input = classical_output.detach().numpy()
        quantum_output = quantum_policy(quantum_input, self.weights)
        return torch.tensor(quantum_output, requires_grad=True)

class CriticNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim=1):
        super(CriticNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = x.unsqueeze(1)
        x, _ = self.lstm(x)
        return self.fc2(x[:, -1, :])

# Hyperparameter tuning with Optuna
def objective(trial):
    actor_lr = trial.suggest_float('actor_lr', 1e-5, 1e-2, log=True)
    critic_lr = trial.suggest_float('critic_lr', 1e-5, 1e-2, log=True)
    actor_optimizer = optim.Adam(actor.parameters(), lr=actor_lr)
    critic_optimizer = optim.Adam(critic.parameters(), lr=critic_lr)
    n_epochs = 20
    gamma = 0.99
    total_loss = 0

    for epoch in range(n_epochs):
        rewards, log_probs, state_values = [], [], []
        for _ in range(10):
            state = torch.tensor([[random.uniform(-1, 1) for _ in range(4)]], dtype=torch.float32)
            policy_output = actor(state)
            action_prob = torch.softmax(policy_output, dim=-1)
            action = torch.multinomial(action_prob, num_samples=1).item()
            reward = random.uniform(-1, 1)
            value = critic(state).squeeze()
            state_values.append(value)
            log_probs.append(torch.log(action_prob.squeeze()[action]))
            rewards.append(reward)

        discounted_rewards = []
        R = 0
        for r in reversed(rewards):
            R = r + gamma * R
            discounted_rewards.insert(0, R)

        discounted_rewards = torch.tensor(discounted_rewards, dtype=torch.float32)
        discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-5)

        policy_loss, value_loss = [], []
        for log_prob, reward, value in zip(log_probs, discounted_rewards, state_values):
            advantage = reward - value.item()
            policy_loss.append(-log_prob * advantage)
            value_loss.append(nn.MSELoss()(value, torch.tensor([reward])))
        total_loss += torch.stack(policy_loss).sum().item() + torch.stack(value_loss).sum().item()

        policy_loss = torch.stack(policy_loss).sum()
        value_loss = torch.stack(value_loss).sum()
        policy_loss.backward()
        value_loss.backward()
        actor_optimizer.step()
        critic_optimizer.step()

    return total_loss / n_epochs

actor = QuantumPolicy(n_qubits, n_layers)
critic = CriticNN(input_dim=4, hidden_dim=128)
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=10)

print("Best Hyperparameters:", study.best_params)

# Use the best hyperparameters to train the final model
best_actor_lr = study.best_params['actor_lr']
best_critic_lr = study.best_params['critic_lr']
actor_optimizer = optim.Adam(actor.parameters(), lr=best_actor_lr)
critic_optimizer = optim.Adam(critic.parameters(), lr=best_critic_lr)

# Further Training with Best Hyperparameters
n_epochs = 50  # Increase the number of epochs for final training
gamma = 0.99

for epoch in range(n_epochs):
    rewards, log_probs, state_values = [], [], []
    for _ in range(10):
        state = torch.tensor([[random.uniform(-1, 1) for _ in range(4)]], dtype=torch.float32)
        policy_output = actor(state)
        action_prob = torch.softmax(policy_output, dim=-1)
        action = torch.multinomial(action_prob, num_samples=1).item()
        reward = random.uniform(-1, 1)
        value = critic(state).squeeze()
        state_values.append(value)
        log_probs.append(torch.log(action_prob.squeeze()[action]))
        rewards.append(reward)

    discounted_rewards = []
    R = 0
    for r in reversed(rewards):
        R = r + gamma * R
        discounted_rewards.insert(0, R)

    discounted_rewards = torch.tensor(discounted_rewards, dtype=torch.float32)
    discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-5)

    policy_loss, value_loss = [], []
    for log_prob, reward, value in zip(log_probs, discounted_rewards, state_values):
        advantage = reward - value.item()
        policy_loss.append(-log_prob * advantage)
        value_loss.append(nn.MSELoss()(value, torch.tensor([reward])))

    policy_loss = torch.stack(policy_loss).sum()
    value_loss = torch.stack(value_loss).sum()
    policy_loss.backward()
    value_loss.backward()
    actor_optimizer.step()
    critic_optimizer.step()

    print(f"Epoch {epoch+1}/{n_epochs} - Policy Loss: {policy_loss.item()} - Value Loss: {value_loss.item()}")

# Evaluate the trained model
# Note: The following evaluation is a placeholder and should be replaced with an appropriate evaluation method
# based on the specific requirements of your use case. Below is an example evaluation using accuracy.

# Evaluate the trained model
def evaluate_model(model, test_images, test_labels):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in zip(test_images, test_labels):
            images = images.unsqueeze(0)  # Add batch dimension
            labels = labels.unsqueeze(0)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy

# Example evaluation
galaxy_cnn = GalaxyCNN()
# Assuming that you have trained your galaxy_cnn model
test_accuracy = evaluate_model(galaxy_cnn, test_images, test_labels)
print(f"Test Accuracy: {test_accuracy}%")

# Model evaluation for the Quantum Policy
def evaluate_quantum_policy(actor, critic, n_tests=10):
    actor.eval()
    critic.eval()
    rewards = []
    with torch.no_grad():
        for _ in range(n_tests):
            state = torch.tensor([[random.uniform(-1, 1) for _ in range(4)]], dtype=torch.float32)
            policy_output = actor(state)
            action_prob = torch.softmax(policy_output, dim=-1)
            action = torch.multinomial(action_prob, num_samples=1).item()
            reward = random.uniform(-1, 1)
            rewards.append(reward)

    avg_reward = sum(rewards) / n_tests
    return avg_reward

# Example evaluation
avg_reward = evaluate_quantum_policy(actor, critic)
print(f"Average Reward: {avg_reward}")

# Combining everything
if __name__ == "__main__":
    # Data preparation
    train_dataset = GalaxyDataset(train_images, train_labels, transform=tensor_transform)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

    test_dataset = GalaxyDataset(test_images, test_labels, transform=None)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    # Training and evaluation
    galaxy_cnn = GalaxyCNN()
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(galaxy_cnn.parameters(), lr=0.001)

    # Train Galaxy CNN
    n_epochs = 10
    for epoch in range(n_epochs):
        galaxy_cnn.train()
        running_loss = 0.0
        for images, labels in train_loader:
            optimizer.zero_grad()
            outputs = galaxy_cnn(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"Epoch {epoch+1}/{n_epochs}, Loss: {running_loss/len(train_loader)}")

    # Evaluate Galaxy CNN
    test_accuracy = evaluate_model(galaxy_cnn, test_images, test_labels)
    print(f"Test Accuracy: {test_accuracy}%")

    # Quantum Policy training with best hyperparameters (continued)
    actor = QuantumPolicy(n_qubits, n_layers)
    critic = CriticNN(input_dim=4, hidden_dim=128)
    actor_optimizer = optim.Adam(actor.parameters(), lr=best_actor_lr)
    critic_optimizer = optim.Adam(critic.parameters(), lr=best_critic_lr)

    # Further training as described earlier
    # Evaluate Quantum Policy
    avg_reward = evaluate_quantum_policy(actor, critic)
    print(f"Average Reward: {avg_reward}")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import models, transforms
from torchvision.transforms import functional as F
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import optuna
import random
import pennylane as qml
from pennylane import numpy as np

# Placeholder for loading dataset
train_images = torch.randn(100, 3, 64, 64)  # Example: 100 images, 3 channels, 64x64 resolution
train_labels = torch.randint(0, 5, (100,))  # Example: 100 labels, 5 classes
test_images = torch.randn(20, 3, 64, 64)   # Example: 20 test images
test_labels = torch.randint(0, 5, (20,))   # Example: 20 test labels

# Custom transform function
def tensor_transform(image):
    image = F.rotate(image, angle=30)
    if torch.rand(1) < 0.5:
        image = F.hflip(image)
    if torch.rand(1) < 0.5:
        image = F.vflip(image)
    return F.resized_crop(image, top=0, left=0, height=64, width=64, size=(64, 64))

# Custom Dataset class
class GalaxyDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]
        if self.transform:
            image = self.transform(image)
        return image, label

# Improved CNN with Pre-trained Model
class GalaxyCNN(nn.Module):
    def __init__(self):
        super(GalaxyCNN, self).__init__()
        self.resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
        self.resnet.fc = nn.Linear(self.resnet.fc.in_features, 5)  # Assuming 5 classes
        for param in self.resnet.parameters():  # Freeze layers
            param.requires_grad = False
        for param in self.resnet.fc.parameters():  # Train only the final layers
            param.requires_grad = True

    def forward(self, x):
        return self.resnet(x)

# Quantum Policy for Reinforcement Learning
n_qubits = 4
n_layers = 4
dev = qml.device("default.qubit", wires=n_qubits)

@qml.qnode(dev, interface='torch')
def quantum_policy(state, weights):
    qml.AngleEmbedding(state, wires=range(n_qubits))
    for i in range(n_layers):
        qml.BasicEntanglerLayers(weights[i], wires=range(n_qubits))
    return [qml.expval(qml.PauliZ(i)) for i in range(n_qubits)]

class ClassicalNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(ClassicalNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = x.unsqueeze(1)
        x, _ = self.lstm(x)
        return self.fc2(x[:, -1, :])

class QuantumPolicy(nn.Module):
    def __init__(self, n_qubits, n_layers):
        super(QuantumPolicy, self).__init__()
        self.weights = nn.Parameter(0.01 * torch.randn(n_layers, n_qubits, n_qubits))
        self.classical_nn = ClassicalNN(input_dim=4, hidden_dim=128, output_dim=n_qubits)

    def forward(self, x):
        classical_output = self.classical_nn(x)
        quantum_input = classical_output.detach().numpy()
        quantum_output = quantum_policy(quantum_input, self.weights)
        return torch.tensor(quantum_output, requires_grad=True)

class CriticNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim=1):
        super(CriticNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.lstm = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = x.unsqueeze(1)
        x, _ = self.lstm(x)
        return self.fc2(x[:, -1, :])

# Hyperparameter tuning with Optuna
def objective(trial):
    actor_lr = trial.suggest_float('actor_lr', 1e-5, 1e-2, log=True)
    critic_lr = trial.suggest_float('critic_lr', 1e-5, 1e-2, log=True)
    actor_optimizer = optim.Adam(actor.parameters(), lr=actor_lr)
    critic_optimizer = optim.Adam(critic.parameters(), lr=critic_lr)
    n_epochs = 20
    gamma = 0.99
    total_loss = 0

    for epoch in range(n_epochs):
        rewards, log_probs, state_values = [], [], []
        for _ in range(10):
            state = torch.tensor([[random.uniform(-1, 1) for _ in range(4)]], dtype=torch.float32)
            policy_output = actor(state)
            action_prob = torch.softmax(policy_output, dim=-1)
            action = torch.multinomial(action_prob, num_samples=1).item()
            reward = random.uniform(-1, 1)
            value = critic(state).squeeze()
            state_values.append(value)
            log_probs.append(torch.log(action_prob.squeeze()[action]))
            rewards.append(reward)

        discounted_rewards = []
        R = 0
        for r in reversed(rewards):
            R = r + gamma * R
            discounted_rewards.insert(0, R)

        discounted_rewards = torch.tensor(discounted_rewards, dtype=torch.float32)
        discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-5)

        policy_loss, value_loss = [], []
        for log_prob, reward, value in zip(log_probs, discounted_rewards, state_values):
            advantage = reward - value.item()
            policy_loss.append(-log_prob * advantage)
            value_loss.append(nn.MSELoss()(value, torch.tensor([reward])))
        total_loss += torch.stack(policy_loss).sum().item() + torch.stack(value_loss).sum().item()

        policy_loss = torch.stack(policy_loss).sum()
        value_loss = torch.stack(value_loss).sum()
        policy_loss.backward()
        value_loss.backward()
        actor_optimizer.step()
        critic_optimizer.step()

    return total_loss / n_epochs

actor = QuantumPolicy(n_qubits, n_layers)
critic = CriticNN(input_dim=4, hidden_dim=128)
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=10)

print("Best Hyperparameters:", study.best_params)

# Use the best hyperparameters to train the final model
best_actor_lr = study.best_params['actor_lr']
best_critic_lr = study.best_params['critic_lr']
actor_optimizer = optim.Adam(actor.parameters(), lr=best_actor_lr)
critic_optimizer = optim.Adam(critic.parameters(), lr=best_critic_lr)

# Further Training with Best Hyperparameters
n_epochs = 50  # Increase the number of epochs for final training
gamma = 0.99

for epoch in range(n_epochs):
    rewards, log_probs, state_values = [], [], []
    for _ in range(10):
        state = torch.tensor([[random.uniform(-1, 1) for _ in range(4)]], dtype=torch.float32)
        policy_output = actor(state)
        action_prob = torch.softmax(policy_output, dim=-1)
        action = torch.multinomial(action_prob, num_samples=1).item()
        reward = random.uniform(-1, 1)
        value = critic(state).squeeze()
        state_values.append(value)
        log_probs.append(torch.log(action_prob.squeeze()[action]))
        rewards.append(reward)

    discounted_rewards = []
    R = 0
    for r in reversed(rewards):
        R = r + gamma * R
        discounted_rewards.insert(0, R)

    discounted_rewards = torch.tensor(discounted_rewards, dtype=torch.float32)
    discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-5)

    policy_loss, value_loss = [], []
    for log_prob, reward, value in zip(log_probs, discounted_rewards, state_values):
        advantage = reward - value.item()
        policy_loss.append(-log_prob * advantage)
        value_loss.append(nn.MSELoss()(value, torch.tensor([reward])))

    policy_loss = torch.stack(policy_loss).sum()
    value_loss = torch.stack(value_loss).sum()
    policy_loss.backward()
    value_loss.backward()
    actor_optimizer.step()
    critic_optimizer.step()

    print(f"Epoch {epoch+1}/{n_epochs} - Policy Loss: {policy_loss.item()} - Value Loss: {value_loss.item()}")

# Evaluate the trained model
# Note: The following evaluation is a placeholder and should be replaced with an appropriate evaluation method
# based on the specific requirements of your use case. Below is an example evaluation using accuracy.

# Evaluate Galaxy CNN model
def evaluate_model(model, test_images, test_labels):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in zip(test_images, test_labels):
            images = images.unsqueeze(0)  # Add batch dimension
            labels = labels.unsqueeze(0)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    return accuracy

# Example evaluation for Galaxy CNN
galaxy_cnn = GalaxyCNN()
# Assuming that you have trained your galaxy_cnn model
test_accuracy = evaluate_model(galaxy_cnn, test_images, test_labels)
print(f"Test Accuracy: {test_accuracy}%")

# Model evaluation for the Quantum Policy
def evaluate_quantum_policy(actor, critic, n_tests=10):
    actor.eval()
    critic.eval()
    rewards = []
    with torch.no_grad():
        for _ in range(n_tests):
            state = torch.tensor([[random.uniform(-1, 1) for _ in range(4)]], dtype=torch.float32)
            policy_output = actor(state)
            action_prob = torch.softmax(policy_output, dim=-1)
            action = torch.multinomial(action_prob, num_samples=1).item()
            reward = random.uniform(-1, 1)
            rewards.append(reward)

    avg_reward = sum(rewards) / n_tests
    return avg_reward

# Example evaluation for Quantum Policy
avg_reward = evaluate_quantum_policy(actor, critic)
print(f"Average Reward: {avg_reward}")

if __name__ == "__main__":
    # Data preparation
    train_dataset = GalaxyDataset(train_images, train_labels, transform=tensor_transform)
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

    test_dataset = GalaxyDataset(test_images, test_labels, transform=None)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    # Training and evaluation
    galaxy_cnn = GalaxyCNN()
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(galaxy_cnn.parameters(), lr=0.001)

    # Train Galaxy CNN
    n_epochs = 10
    for epoch in range(n_epochs):
        galaxy_cnn.train()
        running_loss = 0.0
        for images, labels in train_loader:
            optimizer.zero_grad()
            outputs = galaxy_cnn(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        print(f"Epoch {epoch+1}/{n_epochs}, Loss: {running_loss/len(train_loader)}")

    # Evaluate Galaxy CNN
    test_accuracy = evaluate_model(galaxy_cnn, test_images, test_labels)
    print(f"Test Accuracy: {test_accuracy}%")

    # Quantum Policy training with best hyperparameters (continued)
    actor = QuantumPolicy(n_qubits, n_layers)
    critic = CriticNN(input_dim=4, hidden_dim=128)
    actor_optimizer = optim.Adam(actor.parameters(), lr=best_actor_lr)
    critic_optimizer = optim.Adam(critic.parameters(), lr=best_critic_lr)

    # Further training as described earlier
    for epoch in range(n_epochs):
        rewards, log_probs, state_values = [], [], []
        for _ in range(10):
            state = torch.tensor([[random.uniform(-1, 1) for _ in range(4)]], dtype=torch.float32)
            policy_output = actor(state)
            action_prob = torch.softmax(policy_output, dim=-1)
            action = torch.multinomial(action_prob, num_samples=1).item()
            reward = random.uniform(-1, 1)
            value = critic(state).squeeze()
            state_values.append(value)
            log_probs.append(torch.log(action_prob.squeeze()[action]))
            rewards.append(reward)

        discounted_rewards = []
        R = 0
        for r in reversed(rewards):
            R = r + gamma * R
            discounted_rewards.insert(0, R)

        discounted_rewards = torch.tensor(discounted_rewards, dtype=torch.float32)
        discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / (discounted_rewards.std() + 1e-5)

        policy_loss, value_loss = [], []
        for log_prob, reward, value in zip(log_probs, discounted_rewards, state_values):
            advantage = reward - value.item()
            policy_loss.append(-log_prob * advantage)
            value_loss.append(nn.MSELoss()(value, torch.tensor([reward])))

        policy_loss = torch.stack(policy_loss).sum()
        value_loss = torch.stack(value_loss).sum()
        policy_loss.backward()
        value_loss.backward()
        actor_optimizer.step()
        critic_optimizer.step()

        print(f"Epoch {epoch+1}/{n_epochs} - Policy Loss: {policy_loss.item()} - Value Loss: {value_loss.item()}")

    # Evaluate Quantum Policy
    avg_reward = evaluate_quantum_policy(actor, critic)
    print(f"Average Reward: {avg_reward}")