In [1]:
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import os
import keras
import tensorflow
import tensorflow.keras

In [2]:
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

In [3]:
torch.cuda.is_available()

True

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [5]:
import logging.config
import math
import pkg_resources
import random

In [6]:
%pip install cfg_load




In [7]:
from mininet.net import Mininet
from mininet.net import Mininet,CLI
from mininet.node import OVSKernelSwitch, Host
from mininet.link import TCLink,Link
from mininet.log import setLogLevel, info

import time, os

import numpy as np


class MininetBackEnd(object):

    def init_params(self, mu, sigma, link_bw, sla_bw):
        self.mu = float(mu)
        self.sigma = float(sigma)
        self.sla_bw = float(sla_bw)
        self.link_lat = float(0.0)
        self.link_bw = float(link_bw)
     
    def reset_links(self):
        self.current_link_failure = False
        self.previous_link_failure = False

        self.active_link = 0 # internet by default
        self.episode_over = False

        self.take_measurements()

    def __init__(self, mu, sigma, link_bw, sla_bw, seed):

        np.random.seed(seed)

        self.init_params(mu, sigma, link_bw, sla_bw)

        self.net = Mininet( topo=None, listenPort=6633, ipBase='10.0.0.0/8')

        self.h1 = self.net.addHost( 'host1', mac = '00:00:00:00:00:01', ip='10.0.0.1' )
        self.h2 = self.net.addHost( 'host2', mac = '00:00:00:00:00:02', ip='10.0.0.2' )
        self.h3 = self.net.addHost( 'noise1', mac = '00:00:00:00:00:03', ip='10.0.0.3' )
        self.h4 = self.net.addHost( 'noise4', mac = '00:00:00:00:00:04', ip='10.0.0.4' )
        self.s1 = self.net.addSwitch( 'edge1', cls=OVSKernelSwitch, protocols='OpenFlow13' )
        self.s2 = self.net.addSwitch( 'edge2', cls=OVSKernelSwitch, protocols='OpenFlow13' )
        self.s3 = self.net.addSwitch( 'core1', cls=OVSKernelSwitch, protocols='OpenFlow13' )
        self.s4 = self.net.addSwitch( 'core2', cls=OVSKernelSwitch, protocols='OpenFlow13' )
        self.net.addLink( self.h1, self.s1, cls=Link)
        self.net.addLink( self.h2, self.s2, cls=Link)
        self.net.addLink( self.h3, self.s1, cls=Link)
        self.net.addLink( self.h4, self.s2, cls=Link)
        self.net.addLink( self.s1, self.s3, cls=TCLink, bw=self.link_bw)
        self.net.addLink( self.s1, self.s4, cls=TCLink, bw=self.link_bw)
        self.net.addLink( self.s2, self.s3, cls=TCLink, bw=self.link_bw)
        self.net.addLink( self.s2, self.s4, cls=TCLink, bw=self.link_bw)

        self.net.start()


        # add flows

        self.s1.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge1 priority=20,ip,nw_dst=10.0.0.2,actions=output:4')
        self.s1.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge1 priority=10,ip,nw_dst=10.0.0.1,actions=output:1')
        self.s1.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge1 priority=10,ip,nw_dst=10.0.0.3,actions=output:2')
        self.s1.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge1 priority=10,arp,nw_dst=10.0.0.1,actions=output:1')
        self.s1.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge1 priority=10,arp,nw_dst=10.0.0.3,actions=output:2')
        self.s1.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge1 priority=10,arp,nw_dst=10.0.0.2,actions=normal')
        self.s1.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge1 priority=10,arp,nw_dst=10.0.0.4,actions=normal')
        self.s1.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge1 priority=10,ip,nw_dst=10.0.0.4,actions=output:4')

        self.s2.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge2 priority=20,ip,nw_dst=10.0.0.1,actions=output:4')
        self.s2.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge2 priority=10,ip,nw_dst=10.0.0.2,actions=output:1')
        self.s2.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge2 priority=10,ip,nw_dst=10.0.0.4,actions=output:2')
        self.s2.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge2 priority=10,arp,nw_dst=10.0.0.2,actions=output:1')
        self.s2.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge2 priority=10,arp,nw_dst=10.0.0.4,actions=output:2')
        self.s2.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge2 priority=10,arp,nw_dst=10.0.0.1,actions=normal')
        self.s2.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge2 priority=10,arp,nw_dst=10.0.0.3,actions=normal')
        self.s2.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  edge2 priority=10,ip,nw_dst=10.0.0.3,actions=output:4')


        self.s3.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  core1 priority=10,in_port=1,actions=output:2')
        self.s3.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  core1 priority=10,in_port=2,actions=output:1')
        
        self.s4.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  core2 priority=10,in_port=1,actions=output:2')
        self.s4.cmd('ovs-ofctl --protocols=OpenFlow13 add-flow  core2 priority=10,in_port=2,actions=output:1')

        #CLI(self.net)

        # start udp  traffic receiver - simulate other traffic
        self.h4.cmd("iperf  -u -s -i 1 >& /tmp/udp_server.log &")

        # start host tcp traffic receiver - simulate main flow
        self.h2.cmd("iperf  -s -i 1  >& /tmp/tcp_server.log &")

        self.reset_links()


    def cleanup(self):
        self.net.stop()


    def take_measurements(self):
        """ Send udp traffic and then take bandwidth measurement """
        # send udp traffic  - simulate other flow
      
        #udp_bw = np.random.normal(self.mu, self.sigma)
        # cleanup /tmp for logs

        os.system("rm /tmp/*.log")

        # send udp traffic  - simulate other flow
        ip = self.h4.IP()
        bw = np.random.normal(self.mu, self.sigma) 
        cmd = "iperf -u -c {0} -b  {1}M -t 10  >& /tmp/udp_client.log &".format(ip, bw)
        cmd2 = "ping {0} -c 5 >& /tmp/udp_client_lat.log &".format(ip)
        #info(cmd)
        self.h3.cmd(cmd)
        self.h3.cmd(cmd2)


        ## we measure  internet link only, MPLS link => full BW 
        if self.active_link == 0:   
            # send tcp  traffic  - main flow
            ip = self.h2.IP()
            cmd = "iperf -c {0} -t 5 >& /tmp/tcp_client.log &".format(ip)
            cmd2 = "ping {0} -c 5 >& /tmp/tcp_client_lat.log &".format(ip)
            #info(cmd)
            self.h1.cmd(cmd)
            self.h1.cmd(cmd2)

        # wait for  traffic flow to settle
        # if you set this too low, output file will not be generated properly !!!!!!
        time.sleep(15)
        
        # always measure internet link available bw
        self.available_bw = float(self.link_bw) - float(self.read_udp_bw())

        if self.available_bw < 0.0:
            self.available_bw = 0.0

        ## we measure  internet link only, MPLS link => full BW 
        if self.active_link == 0:   
            self.current_bw = np.random.normal(self.available_bw, 3)
            self.link_lat = self.read_tcp_lat()
        else:
            self.current_bw = self.link_bw
            self.link_lat = 5.0
            
    def read_tcp_bw(self):
        bw = ['None']
        with open('/tmp/tcp_client.log') as f:
            for line in f:
                #print ('line = ', line)
                if 'bits/sec' in line:
                    line = line.replace('-', ' ')
                    fields = line.strip().split()
                    # Array indices start at 0 unlike AWK

                    if len(fields) > 7:
                        bw.append(fields[7])

        #print("TCP measure:",bw[-1])
        return(bw[-1])


    def read_udp_bw(self):
        bw = ['None']
        with open('/tmp/udp_client.log') as f:
            for line in f:
                #print ('line = ', line)
                if 'bits/sec' in line:
                    line = line.replace('-', ' ')
                    fields = line.strip().split()
                    # Array indices start at 0 unlike AWK
    
                    if len(fields) > 7:
                        bw.append(fields[7])

        #print("Udp measure:",bw[-1])
        return(bw[-1])     
            
    def read_tcp_lat(self):
        with open('/tmp/tcp_client_lat.log') as f:
            for line in f:
                #print ('line = ', line)
                if 'rtt' in line:
                    line = line.replace('-', ' ')
                    fields = line.strip().split()
                    #print(line)
                    #print(fields[3])
                    fields = fields[3].strip().split('/')
                    #print("Latency: ",fields[1])
                    return(fields[1])

    def switch_flows(self, action):
        if action == 0:
            channel = 4
        elif action == 1:
            channel = 3
        else:
            return

        cmd = "ovs-ofctl --protocols=OpenFlow13 add-flow  edge1 priority=20,ip,\
                    nw_dst=10.0.0.2,actions=output:{0}".format(channel)
        #info(cmd)
        self.s1.cmd(cmd)



    def switch_link(self, action):

        # if action specifies same link as before it is not a switch
        if action != self.active_link:
            self.switch_flows(action)
            

        ## action is 0 => link is internet
        ## action 1 => Fast link
        self.active_link  = action

        self.take_measurements()

        ## Here is the logic that checks  two subsequent SLA failures
        self.current_link_failure = False

        # if current bandwidth less than SLA it is a failure
        if self.active_link == 0:
            if float(self.current_bw) < float(self.sla_bw):
                info ('current link failure')
                self.current_link_failure = True

                # if it failed in previous tick also, mark it a link failure
                if  self.previous_link_failure == True:
                    info ('previous link also failure, episode over')
                    self.episode_over = True
            
        # copy current to previous
        self.previous_link_failure = self.current_link_failure 
        
        return self.episode_over 
        

In [8]:
import logging.config
import math
import pkg_resources
import random

# 3rd party modules
from gym import spaces
import cfg_load
import gym
import numpy as np




class SdwanEnv(gym.Env):
    """
    Define Sdwan environment.
    The environment defines  how links will be selected based on bandwidth
    availability 
    """

    def __init__(self, max_ticks=300):

        # General variables defining the environment

        self.LINK_BW = 10.0
        self.LINK_SELECT_ACTION_INTERNET = 0
        self.LINK_SELECT_ACTION_MPLS = 1
        self.MAX_TICKS = max_ticks

        self.backend = MininetBackEnd(mu=4, sigma=2, link_bw=self.LINK_BW, sla_bw=6, seed=100)

        # Define what the agent can do
        # Choose link1 or Link2 
        self.action_space = spaces.Discrete(2)

        # Observation 

        low = np.array([self.LINK_SELECT_ACTION_INTERNET,  # active link
                        0.0,  #current_bw
                        0.0,  #available bw
                        ])
        high = np.array([self.LINK_SELECT_ACTION_MPLS, self.LINK_BW, self.LINK_BW])

        self.observation_space = spaces.Box(low, high, dtype=np.float32)

        # episode over 
        self.episode_over = False
        self.info = {} 

        # Store what the agent tried
        self.curr_episode = -1
        self.action_episode_memory = []

    def step(self, action):
        """
        The agent takes a step in the environment.
        Parameters
        ----------
        action : int
        Returns
        -------
        ob, reward, episode_over, info : tuple
            ob (object) :
                an environment-specific object representing your observation of
                the environment.
            reward (float) :
                amount of reward achieved by the previous action. The scale
                varies between environments, but the goal is always to increase
                your total reward.
            episode_over (bool) :
                whether it's time to reset the environment again. Most (but not
                all) tasks are divided up into well-defined episodes, and done
                being True indicates the episode has terminated. (For example,
                perhaps the pole tipped too far, or you lost your last life.)
            info (dict) :
                 diagnostic information useful for debugging. It can sometimes
                 be useful for learning (for example, it might contain the raw
                 probabilities behind the environment's last state change).
                 However, official evaluations of your agent are not allowed to
                 use this for learning.
        """
        self.take_action(action)
        reward = self.get_reward()
        ob = self.get_state()
        return ob, reward, self.episode_over, self.info 

    def take_action(self, action):
        self.episode_over = self.backend.switch_link(action)
                
        self.ticks += 1

        # check if episode ended by ERROR, then mark it in 'info'
        if self.episode_over:
            logging.info ('Episode ended by ERROR')
            self.info['exit_status'] = 'ERROR'

        # else Stop if max ticks over
        elif self.ticks == self.MAX_TICKS:
            logging.info ('Max ticks over, ending episode')
            self.episode_over = True
            self.info['exit_status'] = 'NORMAL'

    def get_reward(self):

        logging.debug('current bw:{0}, sla bw:{1}'.format(self.backend.current_bw, self.backend.sla_bw))

        # maximum penalty for loosing the episode by ERROR
        if self.episode_over and self.info['exit_status'] == 'ERROR':
            return -5
	
        # otherwise, reward for surviving this 'tick'
        reward = 1

        # every time we use the MPLS link reward is deducted
        if self.backend.active_link == 1:
            reward -= 3

        # check bandwidth for internet link - if less than SLA then penalize
        elif float(self.backend.current_bw)  <   float(self.backend.sla_bw):
            logging.debug('BW is less than SLA')
            reward -= 2

        # everything fine - reward up
        else:
           reward += 2
        
        reward = reward - (float(self.backend.link_lat))/10.0
        print('Bw:{0}, Latency {1}, Chosen action:{2}, Action Reward:{3}'.format(self.backend.current_bw, self.backend.link_lat, self.backend.active_link, reward))
        
        return reward


    def reset(self):
        """
        Reset the state of the environment and returns an initial observation.
        Returns
        -------
        observation (object): the initial observation of the space.
        """
        self.curr_episode += 1
        self.ticks = 0
        self.action_episode_memory.append([])
        self.backend.reset_links()
        return self.get_state()

    def render(self, mode='human', close=False):
        return

    def get_state(self):
        """Get the observation.  it is a tuple """
        ob = (self.backend.active_link, self.backend.current_bw,  self.backend.available_bw)
        return ob

    def seed(self, seed):
        random.seed(seed)
        np.random.seed


    def cleanup(self):
        self.backend.cleanup()

In [9]:
env = SdwanEnv()
n_actions = env.action_space.n
n_actions

  logger.warn(f"Box bound precision lowered by casting to {self.dtype}")


2

In [10]:
import collections
import gym
import numpy as np
import statistics
import tensorflow as tf
import tqdm

from matplotlib import pyplot as plt
from tensorflow.keras import layers
from typing import Any, List, Sequence, Tuple


# Create the environment


# Set seed for experiment reproducibility
seed = 42
#env.seed(seed)
tf.random.set_seed(seed)
np.random.seed(seed)

# Small epsilon value for stabilizing division operations
eps = np.finfo(np.float32).eps.item()


In [11]:
%pip install tqdm

Note: you may need to restart the kernel to use updated packages.


In [12]:
class ActorCritic(tf.keras.Model):
  """Combined actor-critic network."""

  def __init__(
      self, 
      num_actions: int, 
      num_hidden_units: int):
    """Initialize."""
    super().__init__()

    self.common = layers.Dense(num_hidden_units, activation="relu")
    self.actor = layers.Dense(num_actions)
    self.critic = layers.Dense(1)

  def call(self, inputs: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor]:
    x = self.common(inputs)
    return self.actor(x), self.critic(x)


In [13]:
num_actions = n_actions  # 2
num_hidden_units = 128

model = ActorCritic(num_actions, num_hidden_units)


In [14]:
def env_step(action: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
  """Returns state, reward and done flag given an action."""

  state, reward, done, _ = env.step(action)
  d=np.array(state)
  return (d.astype(np.float32), 
          np.array(reward, np.int32), 
          np.array(done, np.int32))


def tf_env_step(action: tf.Tensor) -> List[tf.Tensor]:
  return tf.numpy_function(env_step, [action], 
                           [tf.float32, tf.int32, tf.int32])


In [15]:
def run_episode(
    initial_state: tf.Tensor,  
    model: tf.keras.Model, 
    max_steps: int) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
  """Runs a single episode to collect training data."""

  action_probs = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
  values = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True)
  rewards = tf.TensorArray(dtype=tf.int32, size=0, dynamic_size=True)

  initial_state_shape = initial_state.shape
  state = initial_state

  for t in tf.range(max_steps):
    # Convert state into a batched tensor (batch size = 1)
    state = tf.expand_dims(state, 0)

    # Run the model and to get action probabilities and critic value
    action_logits_t, value = model(state)

    # Sample next action from the action probability distribution
    action = tf.random.categorical(action_logits_t, 1)[0, 0]
    action_probs_t = tf.nn.softmax(action_logits_t)

    # Store critic values
    values = values.write(t, tf.squeeze(value))

    # Store log probability of the action chosen
    action_probs = action_probs.write(t, action_probs_t[0, action])

    # Apply action to the environment to get next state and reward
    state, reward, done = tf_env_step(action)
    state.set_shape(initial_state_shape)

    # Store reward
    rewards = rewards.write(t, reward)

    if tf.cast(done, tf.bool):
      break

  action_probs = action_probs.stack()
  values = values.stack()
  rewards = rewards.stack()

  return action_probs, values, rewards

In [16]:
def get_expected_return(
    rewards: tf.Tensor, 
    gamma: float, 
    standardize: bool = True) -> tf.Tensor:
  """Compute expected returns per timestep."""

  n = tf.shape(rewards)[0]
  returns = tf.TensorArray(dtype=tf.float32, size=n)

  # Start from the end of `rewards` and accumulate reward sums
  # into the `returns` array
  rewards = tf.cast(rewards[::-1], dtype=tf.float32)
  discounted_sum = tf.constant(0.0)
  discounted_sum_shape = discounted_sum.shape
  for i in tf.range(n):
    reward = rewards[i]
    discounted_sum = reward + gamma * discounted_sum
    discounted_sum.set_shape(discounted_sum_shape)
    returns = returns.write(i, discounted_sum)
  returns = returns.stack()[::-1]

  if standardize:
    returns = ((returns - tf.math.reduce_mean(returns)) / 
               (tf.math.reduce_std(returns) + eps))

  return returns

In [17]:
huber_loss = tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)

def compute_loss(
    action_probs: tf.Tensor,  
    values: tf.Tensor,  
    returns: tf.Tensor) -> tf.Tensor:
  """Computes the combined actor-critic loss."""

  advantage = returns - values

  action_log_probs = tf.math.log(action_probs)
  actor_loss = -tf.math.reduce_sum(action_log_probs * advantage)

  critic_loss = huber_loss(values, returns)

  return actor_loss + critic_loss

In [18]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)


@tf.function
def train_step(
    initial_state: tf.Tensor, 
    model: tf.keras.Model, 
    optimizer: tf.keras.optimizers.Optimizer, 
    gamma: float, 
    max_steps_per_episode: int) -> tf.Tensor:
  """Runs a model training step."""

  with tf.GradientTape() as tape:

    # Run the model for one episode to collect training data
    action_probs, values, rewards = run_episode(
        initial_state, model, max_steps_per_episode) 

    # Calculate expected returns
    returns = get_expected_return(rewards, gamma)

    # Convert training data to appropriate TF tensor shapes
    action_probs, values, returns = [
        tf.expand_dims(x, 1) for x in [action_probs, values, returns]] 

    # Calculating loss values to update our network
    loss = compute_loss(action_probs, values, returns)

  # Compute the gradients from the loss
  grads = tape.gradient(loss, model.trainable_variables)

  # Apply the gradients to the model's parameters
  optimizer.apply_gradients(zip(grads, model.trainable_variables))

  episode_reward = tf.math.reduce_sum(rewards)

  return episode_reward

In [None]:
min_episodes_criterion = 1000
max_episodes = 10000
max_steps_per_episode = 30

# Cartpole-v0 is considered solved if average reward is >= 195 over 100 
# consecutive trials
reward_threshold = 30
running_reward = 0

# Discount factor for future rewards
gamma = 0.99

# Keep last episodes reward
episodes_reward: collections.deque = collections.deque(maxlen=min_episodes_criterion)

with tqdm.trange(max_episodes) as t:

    for i in t:
        z=env.reset()
        zz=np.array(z)
        initial_state = tf.constant(zz, dtype=tf.float32)
        episode_reward = int(train_step(
            initial_state, model, optimizer, gamma, max_steps_per_episode))

        episodes_reward.append(episode_reward)
        running_reward = statistics.mean(episodes_reward)

        t.set_description(f'Episode {i}')
        t.set_postfix(
        episode_reward=episode_reward, running_reward=running_reward)

        # Show average episode reward every 10 episodes
        if i % 1 == 0:
            print(f'Episode {i}: average reward: {episode_reward}')

        if i >= min_episodes_criterion:  
            break

print(f'\nSolved at episode {i}: average reward: {running_reward:.2f}!')
