In [42]:
#Imports
import time
import random

import numpy as np
from scipy.stats import norm
import gurobipy as gb
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

#Set seed for reproducibility of instances
np.random.seed(25)

print_selections = False

def instance_generator(n=50000):
    '''
    Function that generates hard Knapsack problem instances.
    Input:
        -n: desired size of set of items I, defaulted at 50,000 as we use this number in our study
    Returns:
        -v: array of values for all i items
        -w: array of weights of all i items
    ''' 
    v = np.round(norm.rvs(100, 10, size=n))
    w = np.zeros(n)
    for i in range(n):
        w[i] = round(norm.rvs(v[i], 5))
    return v, w

v, w = instance_generator()

# Problem size set-up
N = [10, 25, 50, 100, 250, 500, 750, 1000, 1500]

# Capacity constraint function based on the problem size
def W(n):
    return round(0.45*np.sum(w[0:n]))

# Binary problem
def binary_method(n, v=v, w=w):
    '''
    Function that runs the Gurobi-binary problem.
    Input:
        -n: Input problem size
    Output:
        -obj_val: Outcome of the optimizatoin
        -running_time: Time to run the algorithm on the problem size
    '''
    # Selecting relevant i for problem size
    W_gb = W(n)

    start_time = time.time()

    m = gb.Model("Binary model")
    x = m.addVars(n, vtype=gb.GRB.BINARY, name="x")

    m.setObjective(gb.quicksum(v[i]*x[i] for i in range(n)), gb.GRB.MAXIMIZE)

    m.addConstr(gb.quicksum(w[i]*x[i] for i in range(n)) <= W_gb)

    m.update()
    m.Params.LogToConsole = 0
    m.optimize()
    
    obj_val = m.objVal
    running_time = time.time() - start_time
    selected_items = [i for i in range(n) if x[i].X == 1] 
    return obj_val, running_time, selected_items

optimal_solution_binary = [binary_method(n)[0] for n in N]
running_time_binary = [binary_method(n)[1] for n in N]
print(f"For n = {N}:")
print(f"Gurobi model optimal solution: {optimal_solution_binary}")
print(f"Gurobi model running times: {running_time_binary}")

if print_selections: 
    print(f"Gurobi model selected items:{[binary_method(n)[2] for n in N]}")

# Dynamic programming
def dyn_prog_method(n, v=v, w=w):
    
    # Selecting relevant i for problem size, and calculate knapsack capacity
    W_dp = int(W(n))

    # Start runtime measurement
    start_time = time.perf_counter()

    # Create table for bottom up dynamic programming
    OPT_table = [[0 for i in range(W_dp+1)] for i in range(n+1)]
    
    # Dynamic programming algorithm
    for i in range(1, n+1):
        for j in range(1, W_dp+1):
            if w[i-1] <= j:
                OPT_table[i][j] = max(OPT_table[i-1][j], v[i-1]+ OPT_table[i-1][int(j-w[i-1])])
            else:
                OPT_table[i][j] = OPT_table[i-1][j]

    # Backtrack to find the selected items
    selected_items = []
    j = W_dp
    for i in range(n, 0, -1):
        if OPT_table[i][j] != OPT_table[i - 1][j]:  # Item i-1 is included
            selected_items.append(i - 1)  # Add the index of the item
            j -= int(w[i - 1])  # Reduce the remaining capacity
    selected_items.reverse()  # Reverse the list to get the correct order

    # End runtime measurement
    running_time = time.perf_counter() - start_time

    # Return the value in the knapsack and the running time
    return OPT_table[n][W_dp], running_time, selected_items

optimal_solution_dp, running_time_dp, selected_items_dp = [], [], []
print("Warning: total DP analysis can be lengthy (around 2 minutes)")
for n in N:
    DP_sol = dyn_prog_method(n)
    optimal_solution_dp.append(DP_sol[0])
    running_time_dp.append(DP_sol[1])
    selected_items_dp.append(DP_sol[2])
    print(f"DP case N={n} analysed!")

print(f"For n = {N}:")
print(f"DP optimal solution: {optimal_solution_dp}")
print(f"DP running times: {running_time_dp}")
if print_selections:
    print(f"DP selected items: {selected_items_dp}")

# Greedy Hueristic
def greedy_heuristic(n, v=v, w=w):
    
    # Selecting relevant i for problem size, and calculate knapsack capacity
    W_gy = W(n)

    # Start runtime measurement
    start_time = time.perf_counter()

    # Calculate ratios
    ratios = [(v[i]/w[i], v[i], w[i]) for i in range(n) ]
    ratios.sort(reverse=True)

    # initialize empty knapsack
    weight_in_knapsack = 0
    value_in_knapsack = 0

    # fill up iteratively over items in ratio order
    for _, value, weight in ratios:
        if weight_in_knapsack + weight <= W_gy:
            value_in_knapsack += value
            weight_in_knapsack += weight

    # End runtime measurement
    running_time = time.perf_counter() - start_time

    # Return the value in the knapsack and the running time
    return value_in_knapsack, running_time

optimal_solution_gh = [greedy_heuristic(n)[0] for n in N]
running_time_gh = [greedy_heuristic(n)[1] for n in N]
print(f"For n = {N}:")
print(f"Gurobi model optimal solution: {optimal_solution_gh}")
print(f"Gurobi model running times: {running_time_gh}")  

For n = [10, 25, 50, 100, 250, 500, 750, 1000, 1500]:
Gurobi model optimal solution: [453.0, 1173.0, 2337.0, 4679.0, 11673.0, 23493.0, 35247.0, 47151.0, 70656.0]
Gurobi model running times: [0.0009970664978027344, 0.001995563507080078, 0.0039882659912109375, 0.012969255447387695, 0.017532825469970703, 0.0348973274230957, 0.0518496036529541, 0.06946325302124023, 0.09635663032531738]
DP case N=10 analysed!
DP case N=25 analysed!
DP case N=50 analysed!
DP case N=100 analysed!
DP case N=250 analysed!


KeyboardInterrupt: 

In [None]:
# RL Agent

#Initial training values for capacity and size of item set
capacity, max_items = W(N[0]), 150

# Define the device (using GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class NeuralNetworkAgent(nn.Module):
    def __init__(self, input_dim=5, hidden_dim=64, n_heads=4, n_layers=2, seq_len=1):
        super(NeuralNetworkAgent, self).__init__()

        # Fully connected layers for initial transformation
        self.fc1_1 = nn.Linear(input_dim, hidden_dim)
        self.fc1_2 = nn.Linear(hidden_dim, hidden_dim)

        # Transformer Encoder setup
        self.transformer_encoder_layer = nn.TransformerEncoderLayer(
            d_model=hidden_dim, nhead=n_heads, dim_feedforward=hidden_dim
        )
        self.transformer = nn.TransformerEncoder(self.transformer_encoder_layer, num_layers=n_layers)

        # Fully connected layers after transformer processing
        self.fc2_1 = nn.Linear(hidden_dim, hidden_dim)
        self.fc2_2 = nn.Linear(hidden_dim, 1)  # Output a single Q-value per row

        self.relu = nn.ReLU()

    def forward(self, x):
        # Initial transformation
        x = self.relu(self.fc1_1(x))
        x = self.relu(self.fc1_2(x))

        # Transformer expects input of shape (sequence length, batch size, embedding dimension)
        x = x.unsqueeze(1)  # Add sequence length dimension
        x = self.transformer(x)
        x = x.squeeze(1)  # Remove the sequence length dimension

        # Final transformation to output Q-values
        x = self.relu(self.fc2_1(x))
        qvalues = self.fc2_2(x).squeeze(-1)  # Output one Q-value per row

        return qvalues

# Knapsack environment created through class
class Knapsack_environment:
    def __init__(self, v=v, w=w, capacity=capacity, max_items=max_items):
        self.v = np.concatenate((v, np.zeros(max_items-len(v)))) # concatenate allows for later adaptation of amount of training set items
        self.w = np.concatenate((w, np.zeros(max_items-len(v))))
        self.free = capacity
        self.selected = np.zeros(len(v))
        self.reset(len(v))

    # Re-set knapsack to empty for new problem
    def reset(self, N_items, new_input=True):

        if new_input:
            v_new, w_new = instance_generator(N_items)
            W_new = W(N_items)
        else:
            v_new, w_new, W_new = self.v, self.w, self.free

        self.v = np.concatenate((v_new, np.zeros(max_items-len(v))))
        self.w = np.concatenate((w_new, np.ones(max_items-len(v))))
        self.ratios =  self.v / self.w
        self.free =  np.concatenate((np.full(N_items, W_new), np.zeros(max_items-len(v))))
        self.preselected =  np.concatenate((np.zeros(N_items), np.ones(max_items-len(v))))
        return self.get_state()
    
    def get_state(self):
        return np.vstack([self.v, self.w, self.ratios, self.free, self.preselected]).T

    def step(self, action):
        if np.all(self.preselected == 1):
            return self.get_state(), 0,  True
        
        if self.preselected[action] == 1: 
            return self.get_state(), -5, False # wanting to include a pre-selected items is penalized
        
        self.preselected[action] = 1
        reward = self.v[action] # add reward as value of addd item
        self.free = self.free - np.full(len(self.v), self.w[action]) # subtract item weight from leftover capacity
        self.preselected[self.w > self.free] = 1 # add item to list of pre-selected items
        
        done = np.all(self.preselected == 1) # termination upon having inspected all items
        return self.get_state(), reward, done

train = False # CHANGE IF YOU WANT TO RE-TRAIN MODEL
model_save_path = "trained_knapsack_agent.pth"

# Training method variables
if train:
    episodes = 100
    batch_size = 64
    gamma = 0.95  # Discount factor
    epsilon_start = 1.0
    epsilon_end = 0.01
    epsilon_decay = 10

    env = Knapsack_environment()
    agent = NeuralNetworkAgent().to(device)
    optimizer = optim.Adam(agent.parameters(), lr=1e-4)

    # Save trained agent
    future_agent = NeuralNetworkAgent().to(device)
    future_agent.load_state_dict(agent.state_dict())

    batch_index = 0
    history_state = []
    history_next_state = []
    history_action = []
    history_reward = []
    history_done = []

    # training episodes
    for episode in range(episodes):
        epsilon = epsilon_end + (epsilon_start - epsilon_end) * np.exp(-1. * episode / epsilon_decay)
        N_items = 100 # Amount of items included in training
        
        # Initalise termination status and zero reward
        ep_finished = False
        total_reward = 0

        # Get a new state
        state_matrix = env.reset(N_items)
        state = torch.FloatTensor(state_matrix).to(device)

        while not ep_finished: 
            # Epsilon greedy policy
            if random.random() > epsilon:
                with torch.no_grad():
                    q_values = agent.forward(state)
                    action = q_values.argmax().item()
            else:
                available_items = np.where(state_matrix[:,4] == 0)[0] #only consider selectable items
                action = random.choice(available_items)
            
            next_state, reward, ep_finished = env.step(action)
            next_state_matrix = next_state
            next_state = torch.FloatTensor(next_state_matrix).to(device)
            
            # Add step in episode to history
            history_state.append(state)
            history_next_state.append(next_state)
            history_action.append(action)
            history_reward.append(reward)
            history_done.append(float(ep_finished))
            
            # Update to inspect next step
            batch_index += 1
            state = next_state
            state_matrix = next_state_matrix

            if batch_index >= batch_size:
                q_valuestates = []
                q_values_next_states = []

                # Manually calculate q_values and next_q_values using your approach
                for q_index in range(batch_index - batch_size, batch_index):
                    q_value = agent(history_state[q_index])[history_action[q_index]]
                    next_q_value = future_agent(history_next_state[q_index]).max() # find optimal q-value
                    q_valuestates.append(q_value)
                    q_values_next_states.append(next_q_value)

                q_valuestates = torch.stack(q_valuestates).to(device)
                q_values_next_states = torch.stack(q_values_next_states).to(device)

                rewards = torch.tensor(history_reward[-batch_size:], dtype=torch.float32).to(device)
                dones = torch.tensor(history_done[-batch_size:], dtype=torch.float32).to(device)

                # Calculate the target using next_q_values
                with torch.no_grad():
                    targets = rewards + (1.0 - dones) * gamma * q_values_next_states # Bellman equation

                targets = targets.float()
                loss = nn.MSELoss()(q_valuestates, targets)

                # Update neural network parameters
                optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(agent.parameters(), max_norm=0.1)
                optimizer.step()

                print(f"Episode {episode} completed. Loss: {loss.item()}")

    # Store agent locally so it won't need to be trained again
    torch.save(agent.state_dict(), model_save_path)
    print(f"Model saved to {model_save_path}")

else:
    env = Knapsack_environment()
    agent = NeuralNetworkAgent().to(device)
    agent.load_state_dict(torch.load(model_save_path))
    agent.eval()
    print(f"Model loaded from {model_save_path}")

In [15]:
# def sort_by_ratio(weights, values):
#     ratio = [(i, v/w) for i, (v,w) in enumerate(zip(values, weights))]
#     sorted_items = sorted(ratio, key=lambda x:x[1], reverse=True)
#     sorted_indices = [i for i, _ in sorted_items]

#     sorted_weights = [weights[i] for i in sorted_indices]
#     sorted_values = [values[i] for i in sorted_indices]
#     return sorted_weights, sorted_values

# class Knapsack:
#     '''
#     Class with all relevant functions to accurately portray a knapsack problem.
#     '''

#     #Initialization function
#     def __init__(self, weights, values, capacity):
#         self.weights = weights
#         self.values = values
#         self.capacity = capacity
#         self.n_items = len(weights)
#         self.memory = deque(maxlen=1000)
#         self.reset()
    
#     #Resetting the environment to an empty knapsack
#     def reset(self):
#         self.current_weight = 0
#         self.current_value = 0
#         self.done = False
#         self.selected_items = []
#         self.inspected_items = set()
#         return self.current_status()
    
#     # Function returns the current status of the knapsacks value and weight
#     def current_status(self):
#         return np.array([self.current_weight, self.current_value])
    
#     def step(self, action):
#         # Terminate when we have processed all items or have otherwise reached a stopping criterium
#         if action >= self.n_items or self.done:
#             return self.current_status(), 0, self.selected_items, self.done
        
#         self.inspected_items.add(action)

#         # Outrules the option to select already included items multiple times
#         if action in self.selected_items:
#             reward = 0
#         else:
#             # In case of adding an item to the knapsack
#             if self.current_weight + self.weights[action] <= self.capacity: # if item actually still fits
#                 self.current_weight += self.weights[action] # update weight/leftover capacity
#                 self.current_value += self.values[action] # update total item value in knapsack
#                 self.selected_items.append(action) # add item to list of included items, which we want to return ultimately
#                 reward = self.values[action] #reward associated to adding an item for the RL model is equal to the value the item represents in the regular problem

#             else:
#                 reward = 0
#                 self.done = True # If we do not add the item we do not give a reward. No negative rewards as it is not harmful to not include an item

#         if self.current_weight >= self.capacity or len(self.inspected_items) >= self.n_items: 
#             self.done = True # Stopping criterium if capacity is fulfilled -> update self.done

#         return self.current_status(), reward, self.selected_items, self.done
    
# class DQNAgent:
#     '''
#     Preparing a Q-learning RL agent
#     ''' 
#     def __init__(self, state_size, action_size):
#         self.state_size = state_size #defaults to 2 in other pieces of the code (because we include value and weight as input)
#         self.action_size = action_size #defaults to n_items 
#         self.memory = [] # to store previous action/responses
#         self.gamma = 0.95 # discount parameter in the Bellman equation
#         self.model = self.create_nn()

#     def create_nn(self):
#         nn = Sequential() # Make use of a very simple and basic NN-structure, COMPLETELY UNTWEAKED right now
#         nn.add(Dense(24, input_dim=self.state_size, activation="relu"))
#         nn.add(Dropout(0.25))
#         nn.add(Dense(24, activation="relu"))
#         nn.add(Dropout(0.25))
#         nn.add(Dense(self.action_size, activation="linear"))
#         nn.compile(loss="mse", optimizer=Adam(learning_rate=0.001))
#         return nn
    
#     # Store previous experiences
#     def remember(self, state, action, reward, next_state, done):
#         self.memory.append((state, action, reward, next_state, done)) 

#     # Make a decision on the best action to take
#     def act(self, state, epsilon, selected_items):
#         if np.random.rand() <= epsilon: # greedy epsilon policy. In 1% of the cases we perform a random action to explore instead of exploit previous knowledge. Necessary to avoid infinite loops.
#             available_actions = [i for i in range(self.action_size) if i not in selected_items]
#             if available_actions:
#                 return random.choice(available_actions) 
#             else:
#                 return 0 
#         Q_vals = self.model.predict(state, verbose=0) # create Q-learning table

#         for item in selected_items:
#             Q_vals[0][item] = -np.inf # Brute-transform Q-values for strategies that include selecting a previously-included item

#         return np.argmax(Q_vals[0]) # return best possible decision
    
#     # Train agent using earlier experiences
#     def replay(self, batch_size):
#         if len(self.memory) < batch_size: # when we have gathered enough earlier experiences, proceed to next lines
#             return 
        
#         # take random sample of previous experience which is used to update the Q-table
#         batch = random.sample(self.memory, batch_size)
#         for state, action, reward, next_state, done in batch:
#             Q = reward
#             if not done:
#                 Q += self.gamma * np.amax(self.model.predict(next_state, verbose=0)[0]) # Bellman equation
#             updated_Q = self.model.predict(state, verbose=0) #Use NN to approximate the new Q-table values.
#             updated_Q[0][action] = Q
#             self.model.fit(state, updated_Q, epochs=1, verbose=0) #

# # agent must be trained. Episode count is set to 100 by default.
# def train_agent(weights, values, capacity, episodes=1000, batch_size=16):
#     knapsack = Knapsack(weights, values, capacity) #initialise knapsack environment
#     q_agent = DQNAgent(state_size=2, action_size=len(weights)) #initialise DQN agent.
    
#     epsilon = 1.0  # Start with a high exploration rate
#     epsilon_decay = 0.995 # Over time epsilon will decrease, meaning more exploitation and less exploration through random steps
#     epsilon_min = 0.01
    
#     for ep in range(episodes):
#         state = np.reshape(knapsack.reset(), [1, 2])  # Reset the knapsack at the start of each episode
#         selected_items = []
#         print(f"starting {ep}")

#         for iteration in range(100):
#             # print(f"starting iter {iteration}")
#             action = q_agent.act(state, epsilon, selected_items)  # Pass epsilon for exploration
#             next_state, reward, selected_items, done = knapsack.step(action)
#             next_state = np.reshape(next_state, [1, 2])
#             q_agent.remember(state, action, reward, next_state, done) 
#             state = next_state

#             if done: # when we arrive at a stopping criterium:
#                 # print(f"Ep {ep}/{episodes}, score: {knapsack.current_value}, included items: {knapsack.selected_items}")
#                 print(f"Ep {ep}/{episodes}")
#                 break
        
#         q_agent.replay(batch_size)
        
#         # Decay epsilon
#         if epsilon > epsilon_min:
#             epsilon *= epsilon_decay

#     return q_agent

# trained_agent.model.save("dqn_knapsack_agent.h5")

# imported_trained_agent = DQNAgent(state_size=2, action_size=len(weights))
# imported_trained_agent.model = load_model("dqn_knapsack_agent.h5")

# weights, values = sort_by_ratio(w_train[:4], v_train)[:4]
# print(weights, values)
# capacity = W(4)
# print(capacity)

# def knapsackSolver(n_items, weights, values, capacity, epsilon=0.01, agent=trained_agent):
#     weights = weights[:n_items] # cut-off at n_items
#     values = values[:n_items] # cut-off at n_items
#     knapsack = Knapsack(weights, values, capacity)
#     state = np.reshape(knapsack.reset(), [1,2])
#     total_val = 0
#     selected_items = []

#     for item in range(100):
#         action = agent.act(state, epsilon, selected_items)
#         next_state, reward, selected_items, done = knapsack.step(action)
#         total_val += reward
#         state = np.reshape(next_state, [1,2])

#         if done:
#             break

#     return total_val, selected_items

# total_value, selected_items = knapsackSolver(4, weights, values, capacity)

# print(f"Total Value: {total_value}")
# print(f"Selected Items: {selected_items}")