### Import libraries

In [1]:
import os
import numpy as np
import pandas as pd
import random
from collections import deque
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Input, LSTM, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, accuracy_score, precision_recall_fscore_support
import matplotlib.pyplot as plt

### Load and preprocess data

The function `read_alb_file` reads and loads .ALB files into a list of dictionaries named `all_data`

In [2]:
# Define the path to the data folders
base_path = os.path.expanduser(r'~\Downloads\DataSet SBF1')
data_folders = ['SBF1-0.25', 'SBF1-0.50', 'SBF1-0.75', 'SBF1-1.00']

# Function to read and load .ALB files
def read_alb_file(file_path):
    with open(file_path, 'r') as file:
        lines = file.readlines()
    
    data = {}
    current_header = None
    
    for line in lines:
        line = line.strip()
        if line.startswith('<') and line.endswith('>'):
            current_header = line[1:-1]
            data[current_header] = []
        elif current_header:
            data[current_header].append(line)
    
    # Convert lists to appropriate data structures
    for key, value in data.items():
        if key in ['number of tasks', 'cycle time']:
            if value:
                data[key] = int(value[0].split('(')[0])
            else:
                data[key] = None
        elif key == 'task times':
            data[key] = {int(k.split()[0]): int(k.split()[1]) for k in value if ' ' in k}
        elif key in ['precedence relations']:
            data[key] = [tuple(map(int, k.split(','))) for k in value if ',' in k]
        elif key == 'setup times forward':
            data[key] = {tuple(map(int, k.split(':')[0].split(','))): int(k.split(':')[1]) for k in value if ':' in k}
        elif key == 'setup times backward':
            data[key] = {tuple(map(int, k.split(':')[0].split(','))): int(k.split(':')[1]) for k in value if ':' in k}
    
    return data

# Load all .ALB files into a list of dictionaries
all_data = []
for folder in data_folders:
    folder_path = os.path.join(base_path, folder)
    for file_name in os.listdir(folder_path):
        if file_name.endswith('.alb'):
            file_path = os.path.join(folder_path, file_name)
            data = read_alb_file(file_path)
            all_data.append(data)

### Deep Q-learning (DQN) model

This function is used to define, initialize and training the DQN agent

In [None]:
# Define the DQN agent
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)
        self.gamma = 0.95    # discount rate
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.model = self._build_model()

    def _build_model(self):
        # LSTM for Deep-Q learning Model
        model = Sequential()
        model.add(Input(shape=(self.state_size, 1)))
        model.add(LSTM(50, activation='tanh', return_sequences=True))
        model.add(Dropout(0.2))
        model.add(LSTM(50, activation='tanh'))
        model.add(Dropout(0.2))
        model.add(Dense(self.action_size, activation='linear'))
        model.compile(loss='mse', optimizer=Adam(learning_rate=self.learning_rate))
        return model

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        if np.random.rand() <= self.epsilon:
            return random.randrange(self.action_size)
        act_values = self.model.predict(state)
        return np.argmax(act_values[0])  # returns action

    def replay(self, batch_size):
        minibatch = random.sample(self.memory, batch_size)
        for state, action, reward, next_state, done in minibatch:
            target = reward
            if not done:
                target = (reward + self.gamma *
                          np.amax(self.model.predict(next_state)[0]))
            target_f = self.model.predict(state)
            target_f[0][action] = target
            self.model.fit(state, target_f, epochs=1, verbose=0)
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

# Define the environment (assembly line)
class AssemblyLineEnv:
    def __init__(self, action_size):
        self.action_size = action_size
        # Initialize the state
        self.state = [1, 0, 5, 2]  # Example initial state
    
    def reset(self):
        # Reset the state
        self.state = [1, 0, 5, 2]  # Example initial state
        return self.state
    
    def step(self, action):
        # Apply the action and update the state
        task_id = action // self.action_size
        workstation_id = action % self.action_size
        setup_time = 2  # Example setup time
        idle_time = 1  # Example idle time
        reward = -(setup_time + idle_time)
        done = False  # Example termination condition
        next_state = [task_id, workstation_id, 5, setup_time]
        return next_state, reward, done, {}

# Define the action size
action_size = 2  # Example action size

# Initialize the environment
env = AssemblyLineEnv(action_size)

# Initialize the DQN agent
state_size = 4  # Example state size
agent = DQNAgent(state_size, action_size)

# Training the DQN agent
episodes = 100
batch_size = 32

for e in range(episodes):
    state = np.reshape(env.reset(), [1, state_size, 1])
    for time in range(500):
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        reward = reward if not done else -10
        next_state = np.reshape(next_state, [1, state_size, 1])
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        if done:
            print(f"episode: {e}/{episodes}, score: {time}, e: {agent.epsilon:.2}")
            break
    if len(agent.memory) > batch_size:
        agent.replay(batch_size)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 527ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 23ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 32ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 24ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 30ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 42ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 36ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 19ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 34ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 40ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 38ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 29ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 37ms/step
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3