In [1]:
#imports
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import pandas as pd
import numpy as np
import random
import copy
from tqdm import tqdm
from collections import deque
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import typing
from numpy.random import default_rng
import argparse

  from .autonotebook import tqdm as notebook_tqdm


In [189]:
class Engine():
    
    #initialize dataset, a random trajectory, and current cycle
    def __init__(self, dataset):
        self.dataset = dataset
        self.service = True
        self.episode = self.get_trajectory(np.random.randint(low=1, high=79, size=1))
        self.cycle = 0
        
    #get random trajectory
    def get_trajectory(self, engine_id):
        return self.dataset[self.dataset.engine_id==engine_id.item()].health_indicator.to_numpy()
    
    #return current state
    def get_state(self):
        return self.cycle, self.episode[self.cycle]
    
    #take action
    def step(self, action):
        if action == 0:
            #failure occurs
            if self.cycle+1 == self.episode.size:
                res = (None, -78)
                self.service = False
            #continued operation, return 1 and continue episode
            else:
                res = (self.episode[self.cycle+1], self.episode[self.cycle])
                self.cycle+=1
        elif action == 1:
            self.cycle=0+int(np.random.uniform(0,50))
            res = (self.episode[self.cycle], -self.episode[self.cycle])
        return res

In [222]:
#environment class
class Environment():
    
    #initialize dataset, a random trajectory, and current cycle
    def __init__(self, fleet_size=5):
        self.dataset = pd.read_csv('train.csv')
        self.fleet_size = fleet_size
        self.fleet = []
        for engine in range(fleet_size):
            self.fleet.append(Engine(self.dataset))
        self.balance = 100
        self.repair_cost = 25
        
    def get_state(self):
        healths = []
        cycles = []
        for engine in self.fleet:
            if engine.service:
                cycle, health = engine.get_state()
                healths.append(health)
                cycles.append(cycle)
            else:
                healths.append(-1)
                cycles.append(-1)
        return np.array([self.balance]+cycles+healths).flatten()
        
    def take_action(self, actions):
        #initialize reward as zero
        reward = 0
        #iterate over action-engine pair
        for action, engine in zip(actions, self.fleet):
            #hold
            if action == 0:
                if engine.service:
                    _, r = engine.step(action)
                    reward+=r
            #replace
            elif action == 1:
                #perform replacement if enough money
                if self.balance >= self.repair_cost:
                    self.balance-=self.repair_cost
                    if engine.service:
                        _, r = engine.step(action)
                        reward+=r
                #penalize if not enough money
                else:
                    reward-=10
                    
        #return reward and terminal flag
        if np.all([engine.service==False for engine in self.fleet]) or self.balance<=0:
            return reward, True
        else:
            return reward, False

In [228]:
#transition class for replay memory
class Transition():
    
    def __init__(self, state, action, state_new, reward, term ):
        self.state = state
        self.action = action
        self.state_new = state_new
        self.reward = reward
        self.term = term

In [229]:
#prioritized replay memory class
class PrioritizedReplayMemory:
    """Fixed-size buffer to store priority, Experience tuples."""

    def __init__(self,
                 batch_size: int,
                 buffer_size: int,
                 alpha: float = 0.0,
                 random_state: np.random.RandomState = None) -> None:
        """
        Initialize an ExperienceReplayBuffer object.

        Parameters:
        -----------
        buffer_size (int): maximum size of buffer
        batch_size (int): size of each training batch
        alpha (float): Strength of prioritized sampling. Default to 0.0 (i.e., uniform sampling).
        random_state (np.random.RandomState): random number generator.
        
        """
        self._batch_size = batch_size
        self._buffer_size = buffer_size
        self._buffer_length = 0 # current number of prioritized experience tuples in buffer
        self._buffer = np.empty(self._buffer_size, dtype=[("priority", np.float32), ("transition", Transition)])
        self._alpha = alpha
        self._random_state = np.random.RandomState() if random_state is None else random_state
        
    def __len__(self) -> int:
        """Current number of prioritized experience tuple stored in buffer."""
        return self._buffer_length

    @property
    def alpha(self):
        """Strength of prioritized sampling."""
        return self._alpha

    @property
    def batch_size(self) -> int:
        """Number of experience samples per training batch."""
        return self._batch_size
    
    @property
    def buffer_size(self) -> int:
        """Maximum number of prioritized experience tuples stored in buffer."""
        return self._buffer_size

    def add(self, transition: Transition) -> None:
        """Add a new experience to memory."""
        priority = 1.0 if self.is_empty() else self._buffer["priority"].max()
        if self.is_full():
            if priority > self._buffer["priority"].min():
                idx = self._buffer["priority"].argmin()
                self._buffer[idx] = (priority, transition)
            else:
                pass # low priority experiences should not be included in buffer
        else:
            self._buffer[self._buffer_length] = (priority, transition)
            self._buffer_length += 1

    def is_empty(self) -> bool:
        """True if the buffer is empty; False otherwise."""
        return self._buffer_length == 0
    
    def is_full(self) -> bool:
        """True if the buffer is full; False otherwise."""
        return self._buffer_length == self._buffer_size
    
    def sample(self, beta: float) -> typing.Tuple[np.array, np.array, np.array]:
        """Sample a batch of experiences from memory."""
        # use sampling scheme to determine which experiences to use for learning
        ps = self._buffer[:self._buffer_length]["priority"]
        sampling_probs = ps**self._alpha / np.sum(ps**self._alpha)
        idxs = self._random_state.choice(np.arange(ps.size),
                                         size=self._batch_size,
                                         replace=True,
                                         p=sampling_probs)
        
        # select the experiences and compute sampling weights
        transitions = self._buffer["transition"][idxs]        
        weights = (self._buffer_length * sampling_probs[idxs])**-beta
        normalized_weights = weights / weights.max()
        
        return idxs, transitions, normalized_weights

    def update_priorities(self, idxs: np.array, priorities: np.array) -> None:
        """Update the priorities associated with particular experiences."""
        self._buffer["priority"][idxs] = priorities

In [230]:
#dqn model class
class DQN(nn.Module):
    def __init__(self):
        super(DQN, self, input_dim, output_dim).__init__()
        self.lin1 = nn.Linear(input_dim,32)
        self.lin2 = nn.Linear(32,64)
        self.lin3 = nn.Linear(64,32)
        self.lin4 = nn.Linear(32,output_dim)
        self.dropout1 = nn.Dropout(0.4)
        self.dropout2 = nn.Dropout(0.3)
        self.dropout3 = nn.Dropout(0.2)

    def forward(self, x):
        x = F.relu(self.lin1(x))
        x = self.dropout1(x)
        x = F.relu(self.lin2(x))
        x = self.dropout2(x)
        x = F.relu(self.lin3(x))
        x = self.dropout3(x)
        x = F.relu(self.lin4(x))
        return x

In [249]:
#get action given net, state, and probability of random action
def get_action(net, state, epsilon, size):
    with torch.no_grad():
        greedy = np.random.choice([True, False], p=[1-epsilon, epsilon])
        if greedy:
            state = torch.tensor(state, dtype=torch.float32)
            q_values = net(state)
            q_values = q_values.reshape(-1,2)
            actions = torch.argmax(q_values, dim=1)
        else:
            actions = np.random.choice([1,0], size=fleet_size, p=[epsilon, 1-epsilon])
        return action