In [10]:
import pandas as pd
import random
import numpy as np
import copy
import networkx as nx
import matplotlib.pyplot as plt
import itertools
import collections
from collections import deque  # Add this import

!which python


/Users/berat/Desktop/quantum_entanglement/.venv/bin/python


In [1]:
class QuantumNetworksBaseClass():
    def __init__(self, initialEdges, pGen, pSwap, cutOffAge, maxLinks, users):
        self.initialEdges = copy.deepcopy(initialEdges)  # Don't really need a deep copu
        self.currentEdges = {} 
        self.pGen = pGen
        self.pSwap = pSwap
        self.cutOffAge = cutOffAge
        self.maxLinks = maxLinks
        self.users= users
    
    def reset(self) -> None:
        self.currentEdges = {}
    
    def getState(self) -> dict:
        return self.currentEdges
        
    def generateEntanglements(self, node1, node2): # Extend later to attemptGenerateEntanglements()
        edge = tuple(sorted([node1, node2]))
        if edge not in self.currentEdges:
            self.currentEdges[edge] = deque([0])
        else:
            if len(self.currentEdges[edge]) < self.maxLinks:
                self.currentEdges[edge].appendleft(0) # Retain order by age

    def probalisticallyGenerateEntanglements(self):
        for edge in self.initialEdges:
            if random.random() < self.pGen:
                self.generateEntanglements(*edge)
    
    def discardEntanglement(self, edge: tuple):
        # Should never be the case that these aren't the same
        if edge in self.currentEdges and len(self.currentEdges[edge]) > 0:
            self.currentEdges[edge].pop() # TODO: Assume we have information on the age of the entanglement
    
    def increaseGlobalEntanglementAge(self):
        for edge in self.currentEdges:
            newAges = [age + 1 for age in self.currentEdges[edge] if age < self.cutOffAge]
            self.currentEdges[edge] = deque(newAges) # Ensure deque! Python type hinting sucks

    def performSwapping(self, edge1: tuple, edge2: tuple): # Extend later to attemptSwapping()
        # Ensure valid swaps
        if edge1 not in self.currentEdges or edge2 not in self.currentEdges:
            print(f"Edge {edge1} or {edge2} not found in currentEdges")
        
        if len(self.currentEdges[edge1]) == 0 or len(self.currentEdges[edge2]) == 0:
            print(f"Edge {edge1} or {edge2} has no entanglement")
        
        # Swap entanglement with the new edge having ascending order ndoes
        combined_nodes = list(edge1) + list(edge2)
        unique_nodes = sorted([node for node in combined_nodes if combined_nodes.count(node) == 1])
        newLink = tuple(unique_nodes) 
        old_ages = self.currentEdges.get(edge1, deque([])) + self.currentEdges.get(edge2, deque([]))
        newAge = max(old_ages) if old_ages else 0
        # Timesteps should not occur during swapping
        if newLink in self.currentEdges:
            if len(self.currentEdges[newLink]) < self.maxLinks:
                self.currentEdges[newLink].append(newAge)
        else:
            self.currentEdges[newLink] = [newAge]
            
        self.discardEntanglement(edge1)
        self.discardEntanglement(edge2)          
        
    def getPossibleActionsTimeStepOne(self) -> list: # Only does T=1 steps
        active_edges = [edge for edge, ages in self.currentEdges.items() if len(ages) > 0]
        
        # Find all pairs of edges that share a node (potential swaps)
        possible_swaps = []
        for edge1, edge2 in itertools.combinations(active_edges, 2):
            # Check if edges share a node
            if set(edge1) & set(edge2):
                possible_swaps.append((edge1, edge2))
        
        # Add "no swap" action
        all_actions = [None]  # None represents "do nothing"
        all_actions.extend(possible_swaps)
        
        return all_actions
    
    def getPossibleActions(self) -> list:
        def get_possible_swap_sequences(edges_state, depth=0, max_depth=3):
            if depth >= max_depth:
                return [[]]
            
            # Get all possible single swaps in current state
            possible_swaps = []
            active_edges = [edge for edge, ages in edges_state.items() if len(ages) > 0]
            
            for edge1, edge2 in itertools.combinations(active_edges, 2):
                if set(edge1) & set(edge2):  # if edges share a node
                    possible_swaps.append((edge1, edge2))
            
            if not possible_swaps:
                return [[]]
                    
            # For each possible swap, simulate it and recurse
            all_sequences = [[]]  # Include empty sequence (no swaps)
            for swap in possible_swaps:
                # Simulate the swap
                edge1, edge2 = swap
                # Create new edge from the swap (nodes in ascending order)
                combined_nodes = list(edge1) + list(edge2)
                new_edge = tuple(sorted([n for n in combined_nodes if combined_nodes.count(n) == 1]))
                
                # Create new state after swap
                new_state = copy.deepcopy(edges_state)
                # Remove used edges
                if edge1 in new_state and len(new_state[edge1]) > 0:
                    new_state[edge1].pop()
                if edge2 in new_state and len(new_state[edge2]) > 0:
                    new_state[edge2].pop()
                
                # Clean up empty edges
                if edge1 in new_state and len(new_state[edge1]) == 0:
                    del new_state[edge1]
                if edge2 in new_state and len(new_state[edge2]) == 0:
                    del new_state[edge2]
                
                # Add new edge
                if new_edge not in new_state:
                    new_state[new_edge] = deque([0])  # Age of new entanglement
                elif len(new_state[new_edge]) < self.maxLinks:
                    new_state[new_edge].appendleft(0)
                    
                # Recurse to find subsequent swaps
                next_sequences = get_possible_swap_sequences(new_state, depth + 1, max_depth)
                # Add current swap to start of each subsequent sequence
                for seq in next_sequences:
                    all_sequences.append([swap] + seq)
            
            return all_sequences
        
        # Get all possible sequences of swaps
        all_sequences = get_possible_swap_sequences(self.currentEdges)
        return all_sequences
        

In [68]:
# Env Parameters
cutOffAge = 1
pSwap = 1
pGen= 0.8
maxLinks = 1 # Multiplexing
users = [((1,6), 1)] # (user, reward)
initialEdges = [(1,3), (2,3), (3,4), (4,5), (4,6)]
desiredEdge = [(1,3), (3,4), (4,5), (5,6)]
random.seed(1)
############################
network = QuantumNetworksBaseClass(initialEdges, pGen, pSwap, cutOffAge, maxLinks, users)
network.probalisticallyGenerateEntanglements()
network.probalisticallyGenerateEntanglements()
network.probalisticallyGenerateEntanglements()
network.probalisticallyGenerateEntanglements()
network.increaseGlobalEntanglementAge()
print(network.getState())
network.getPossibleActions()

{(1, 3): deque([1]), (3, 4): deque([1]), (4, 5): deque([1]), (4, 6): deque([1]), (2, 3): deque([1])}


[[],
 [((1, 3), (3, 4))],
 [((1, 3), (3, 4)), ((4, 5), (4, 6))],
 [((1, 3), (3, 4)), ((4, 5), (1, 4))],
 [((1, 3), (3, 4)), ((4, 6), (1, 4))],
 [((1, 3), (2, 3))],
 [((1, 3), (2, 3)), ((3, 4), (4, 5))],
 [((1, 3), (2, 3)), ((3, 4), (4, 6))],
 [((1, 3), (2, 3)), ((4, 5), (4, 6))],
 [((3, 4), (4, 5))],
 [((3, 4), (4, 5)), ((1, 3), (2, 3))],
 [((3, 4), (4, 5)), ((1, 3), (3, 5))],
 [((3, 4), (4, 5)), ((2, 3), (3, 5))],
 [((3, 4), (4, 6))],
 [((3, 4), (4, 6)), ((1, 3), (2, 3))],
 [((3, 4), (4, 6)), ((1, 3), (3, 6))],
 [((3, 4), (4, 6)), ((2, 3), (3, 6))],
 [((3, 4), (2, 3))],
 [((3, 4), (2, 3)), ((4, 5), (4, 6))],
 [((3, 4), (2, 3)), ((4, 5), (2, 4))],
 [((3, 4), (2, 3)), ((4, 6), (2, 4))],
 [((4, 5), (4, 6))],
 [((4, 5), (4, 6)), ((1, 3), (3, 4))],
 [((4, 5), (4, 6)), ((1, 3), (2, 3))],
 [((4, 5), (4, 6)), ((3, 4), (2, 3))]]

In [70]:
def epsilon_greedy(Q, state, possibleActions, epsilon):
    if np.random.random() < epsilon:
        # Choose a random action
        return np.random.choice(possibleActions)
    else:
        # Choose the action with the highest Q-value
        q_values = [Q[(state, action)] for action in possibleActions]
        return possibleActions[np.argmax(q_values)]

network = QuantumNetworksBaseClass(initialEdges, pGen, pSwap, cutOffAge, maxLinks, users)
numEpisodes = 5
step = 2
gamma = 0.99
epsilon = 0.1

def nStepLearning(network=network, step=step, numEpisodes=10, alpha=0.1, gamma=0.9, epsilon = 0.9):
    Q = collections.defaultdict() 
    
    for episode in range(1, numEpisodes + 1):
        print(f"Episode {episode}")

        T = int(1e5)  # Represents the time step at which the episode ends. 
        tau = 0  # Indicates the time step for the state-action pair to be updated.
        t = 0  # Current time step in the episode

        # Reset env for a new run and get initial states
        network.reset() # No entanglements, is it worth producing a entangelment?
        initialState = network.getState() 
        possibleActions = network.getPossibleActions(initialState[1]) 
        initialAction = epsilon_greedy(Q=Q, state = initialState, possibleActions=possibleActions, epsilon=epsilon)
        
        # Initialize state, action, and reward history for n-step 
        nStates, nAction, nReward = [initialState], [initialAction], [0]
        
        while True:
            if t < T:
                pass
            tau = t - step + 1
            if tau >= 0:
                pass
            if tau == T-1: # Can replace while loop later on!
                break

 
nStepLearning()

Episode 1


KeyError: 1