In [2]:
import gym
from gym import spaces
import numpy as np

class WarehouseOrientationEnv(gym.Env):
    """
    Magazyn NxN z orientacją robota:
      - Stan: [r_x, r_y, t_x, t_y, orientation_index]
      - Akcje: 0=obrót w lewo (alpha), 1=obrót w prawo (alpha), 2=ruch do przodu
    """
    def __init__(self, n=5, alpha=90, max_steps=100):
        super(WarehouseOrientationEnv, self).__init__()

        self.n = n
        self.alpha = alpha
        self.max_steps = max_steps

        # Ilu "kroków" orientacji mamy (alpha musi dzielić 360)
        assert 360 % alpha == 0, "Kąt alpha musi dzielić 360 (np. 90, 45, 60 itp.)"
        self.num_orientations = 360 // alpha

        # Przestrzeń akcji: 3 akcje
        #  0: obrót w lewo o alpha
        #  1: obrót w prawo o alpha
        #  2: ruch do przodu
        self.action_space = spaces.Discrete(3)

        # Przestrzeń stanów = Box, 5 wymiarów:
        #   r_x, r_y, t_x, t_y, orientation_index
        # r_x, r_y, t_x, t_y w [0, n-1]
        # orientation_index w [0, num_orientations - 1]
        low = np.array([0, 0, 0, 0, 0], dtype=np.float32)
        high = np.array([n-1, n-1, n-1, n-1, self.num_orientations - 1], dtype=np.float32)
        self.observation_space = spaces.Box(low, high, dtype=np.float32)

        # Pola do przechowywania stanu
        self.robot_x = 0
        self.robot_y = 0
        self.target_x = 0
        self.target_y = 0
        self.orientation = 0  # indeks 0..num_orientations-1
        self.current_step = 0
        self.reward=0
        self.terminated=False
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.current_step = 0
        self.terminated=False
        # Losujemy pozycję robota
        self.robot_x = np.random.randint(self.n)
        self.robot_y = np.random.randint(self.n)
        self.reward=0
        # Losujemy pozycję celu (inną niż robota)
        while True:
            self.target_x = np.random.randint(self.n)
            self.target_y = np.random.randint(self.n)
            if (self.target_x != self.robot_x) or (self.target_y != self.robot_y):
                break

        # Losujemy orientację (0..num_orientations-1)
        self.orientation = np.random.randint(self.num_orientations)

        return self._get_obs()

    def step(self, action):
        self.reward=0
        old_distance = np.linalg.norm([
            self.robot_x - self.target_x,
            self.robot_y - self.target_y
        ])

        # Wykonujemy akcję
        if action == 0:
            # Obrót w lewo
            reward=-0.
            self.orientation = (self.orientation - 1) % self.num_orientations
        elif action == 1:
            # Obrót w prawo
            reward=-0.
            self.orientation = (self.orientation + 1) % self.num_orientations
        elif action == 2:
            # Ruch do przodu
            self._move_forward()
        else:
            raise ValueError(f"Nieznana akcja: {action}")

        # Domyślna niewielka kara za ruch
        reward = -0.0001
        terminated = self.terminated

        new_distance = np.linalg.norm([
        self.robot_x - self.target_x,
        self.robot_y - self.target_y
        ])

        # Nagroda na podstawie odwrotności odległości
        if new_distance > 0:  # Unikamy dzielenia przez zero
          reward += 1/new_distance-1/old_distance
        else:
    # Maksymalna nagroda, jeśli osiągnięto cel (odległość = 0)
          reward += 0


        # Sprawdzamy, czy dotarliśmy do celu
        if (self.robot_x == self.target_x) and (self.robot_y == self.target_y):
            reward = 1.0

            terminated = True

        # Sprawdzamy, czy przekroczyliśmy limit kroków
        obs = self._get_obs()
        self.current_step += 1
        truncated = (self.current_step >= self.max_steps)


        return obs, reward+self.reward, terminated, truncated

    def render(self, mode='human'):
        """
        Prosta wizualizacja tekstowa w konsoli.
        'R' = robot
        'T' = cel
        '.' = puste
        """
        grid = [["." for _ in range(self.n)] for _ in range(self.n)]

        # robot
        grid[self.robot_y][self.robot_x] = "R"
        # cel
        grid[self.target_y][self.target_x] = "T"

        print("="*(2*self.n))
        for row in grid:
            print(" ".join(row))
        print("="*(2*self.n))
        print(f"Orientacja (indeks): {self.orientation} (kąt = {self.orientation * self.alpha} stopni)")

    def _get_obs(self):
        """
        Zwracamy krotkę (lub tablicę) [robot_x, robot_y, target_x, target_y, orientation].
        """
        return np.array([self.robot_x,
                         self.robot_y,
                         self.target_x,
                         self.target_y,
                         self.orientation])

    def _move_forward(self):
        """
        Ruch o 1 'kratkę' w kierunku orientation.
        orientation * alpha -> kąt w stopniach.
        """
        angle_deg = self.orientation * self.alpha
        # Konwersja na radians
        angle_rad = np.deg2rad(angle_deg)

        # Wektor ruchu
        dx = int(round(np.cos(angle_rad)))
        dy = int(round(np.sin(angle_rad)))

        # Sprawdzamy, czy wyjście w [0..n-1]
        new_x = self.robot_x + dx
        new_y = self.robot_y + dy

        if 0 <= new_x < self.n and 0 <= new_y < self.n:
            self.robot_x = new_x
            self.robot_y = new_y
        else:
          self.reward=-0.2
          self.terminated=True

In [None]:
import numpy as np

import tensorflow as tf
from tensorflow.keras import models, layers, optimizers
from collections import deque
import random

GAMMA = 0.99
EPSILON_START = 1.0
EPSILON_MIN = 0.0
EPSILON_DECAY = 0.5
BATCH_SIZE =4000
LEARNING_RATE = 0.1
MEMORY_SIZE = 20000000
EPISODES = 200000

# Przykładowe środowisko, które zwraca obs shape=(5,)
env = WarehouseOrientationEnv(n=64, max_steps=200)

###############################################################################
# 1) Funkcja transformująca stan 5D -> 6D:
#    Załóżmy, że obs[4] = {0,1,2,3} => kąty 0°, 90°, 180°, 270°
###############################################################################
def transform_state(obs):
    """
    obs ma postać [x0, x1, x2, x3, orientation].
    Zwracamy [x0, x1, x2, x3, sin(kąta), cos(kąta)],
    czyli wektor o rozmiarze 6.
    """
    obs = np.array(obs, dtype=float)  # Upewnij się, że float

    orientation = obs[4]   # np. 0,1,2,3
    angle_deg = orientation * 90.0
    angle_rad = np.deg2rad(angle_deg)
    sin_v = np.sin(angle_rad)
    cos_v = np.cos(angle_rad)

    # Sklejamy: 4 pierwsze elementy + sin + cos
    new_obs = np.concatenate([obs[:4], [sin_v, cos_v]])
    return new_obs

###############################################################################
# 2) Zmieniamy state_size na 6
###############################################################################
state_size = 6
action_size = env.action_space.n  # np. 3

def build_q_network(state_size, action_size):
    model = models.Sequential()
    model.add(layers.Dense(3, input_shape=(state_size,), activation='sigmoid'))
    model.add(layers.Dense(3, activation='relu'))
    model.add(layers.Dense(action_size, activation='linear'))
    model.compile(optimizer=optimizers.Adam(learning_rate=LEARNING_RATE), loss='mse')
    return model

def build_target_network(model):
    target = models.clone_model(model)
    target.set_weights(model.get_weights())
    return target

# q_network = build_q_network(state_size, action_size)
# target_network = build_target_network(q_network)

memory = deque(maxlen=MEMORY_SIZE)
epsilon = EPSILON_START
all_rewards = []
loss_value = 0.0  # Zmienna do przechowywania bieżącej wartości loss

###############################################################################
# 3) Główna pętla treningowa
###############################################################################
for episode in range(EPISODES):
    # Resetujemy środowisko -> obs ma wymiar (5,)
    obs = env.reset()
    # Zamieniamy go na wymiar (6,)
    obs = transform_state(obs)

    done = False
    total_reward = 0
    terminated =False
    truncated=False
    while not (terminated or truncated) :
        # Epsilon-greedy
        if np.random.rand() < epsilon or len(memory)<190000:
            action = np.random.randint(action_size)
        else:
            q_values = q_network.predict(obs[np.newaxis, :], verbose=0)
            action = np.argmax(q_values[0])

        # Step w środowisku => dostajemy next_obs (5D)
        next_obs, reward, terminated, truncated = env.step(action)
        # Zamieniamy na 6D
        next_obs = transform_state(next_obs)

        done = terminated
        total_reward += reward

        # Zapisujemy w pamięci
        memory.append((obs, action, reward, next_obs, done))

        # Przechodzimy do nowej obserwacji
        obs = next_obs

    # Uczenie sieci, jeśli mamy wystarczającą ilość danych w pamięci
    if  len(memory)>=190000:
        minibatch = random.sample(memory, BATCH_SIZE)
        #print(minibatch)
        states_mb      = np.array([m[0] for m in minibatch])  # shape=(BATCH_SIZE, 6)
        actions_mb     = np.array([m[1] for m in minibatch])
        rewards_mb     = np.array([m[2] for m in minibatch])
        next_states_mb = np.array([m[3] for m in minibatch])
        dones_mb       = np.array([m[4] for m in minibatch])

        q_next        = q_network.predict(next_states_mb, verbose=0)
        q_next_target = target_network.predict(next_states_mb, verbose=0)
        q_targets     = q_network.predict(states_mb, verbose=0)

        for i in range(BATCH_SIZE):
            if dones_mb[i]:
                q_targets[i, actions_mb[i]] = rewards_mb[i]
            else:
                a_max = np.argmax(q_next[i])
                q_targets[i, actions_mb[i]] = rewards_mb[i] + GAMMA * q_next_target[i, a_max]

        # Trenujemy Q-network i przechwytujemy wartość loss
        # Zapisujemy loss z pojedynczej epoki treningu
        history = q_network.fit(states_mb, q_targets, epochs=1, verbose=0)
        loss_value = history.history['loss'][0]



                # Zapisanie loss do TensorBoard
        #global_step += 1  # zwiększamy licznik kroków
        #with train_summary_writer.as_default():
         # tf.summary.scalar('loss', data=loss_value, step=global_step)
        q_weights = q_network.get_weights()
        target_weights = target_network.get_weights()
        #for i in range(len(target_weights)):
        #target_weights[i] = 0.05 * q_weights[i] + (1.0 - 0.01) * target_weights[i]
          #target_network.set_weights(target_weights)
    # Zmniejszamy epsilon
    if epsilon > EPSILON_MIN:
        epsilon *= EPSILON_DECAY

    all_rewards.append(total_reward)

    # Okresowe kopiowanie wag do sieci target
    if episode % 5 == 0:
       target_network.set_weights(q_network.get_weights())

    # Monitoring co 5 epizodów (przykładowo)
    if (episode + 1) % 1 == 0:
        avg_reward = np.mean(all_rewards[-10:])
        print(f"Epizod: {episode+1}/{EPISODES}, "
              f"średni reward z ostatnich 10 epizodów: {avg_reward:.3f}, "
              f"epsilon: {epsilon:.3f}, "
              f"loss: {loss_value:.5f}")

print("Trening zakończony!")

Epizod: 1/200000, średni reward z ostatnich 10 epizodów: -0.203, epsilon: 0.500, loss: 0.00006
Epizod: 2/200000, średni reward z ostatnich 10 epizodów: -0.202, epsilon: 0.250, loss: 0.00004
Epizod: 3/200000, średni reward z ostatnich 10 epizodów: -0.140, epsilon: 0.125, loss: 0.00007
Epizod: 4/200000, średni reward z ostatnich 10 epizodów: -0.111, epsilon: 0.062, loss: 0.00006
Epizod: 5/200000, średni reward z ostatnich 10 epizodów: -0.131, epsilon: 0.031, loss: 0.00002
Epizod: 6/200000, średni reward z ostatnich 10 epizodów: -0.112, epsilon: 0.016, loss: 0.00005
Epizod: 7/200000, średni reward z ostatnich 10 epizodów: -0.099, epsilon: 0.008, loss: 0.00004
Epizod: 8/200000, średni reward z ostatnich 10 epizodów: -0.089, epsilon: 0.004, loss: 0.00002
Epizod: 9/200000, średni reward z ostatnich 10 epizodów: -0.082, epsilon: 0.002, loss: 0.00008
Epizod: 10/200000, średni reward z ostatnich 10 epizodów: -0.075, epsilon: 0.001, loss: 0.00007
Epizod: 11/200000, średni reward z ostatnich 10 e

KeyboardInterrupt: 

In [8]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import load_model

# Wczytujemy nasz wytrenowany model
new_model = q_network
trained_model = new_model

def test_agent(env, model, n_episodes=5):
    """
    Wykonujemy n_episodes epizodów i zapisujemy trace w postaci:
    (obs, action, reward, next_obs) dla każdego kroku.
    """
    all_episode_rewards = []
    all_traces = []  # aby przechować szczegółowe info do ewentualnej wizualizacji

    for episode_i in range(n_episodes):
        obs_5d = env.reset()
        obs_6d = transform_state(obs_5d)  # Funkcja jak w Twoim kodzie

        episode_reward = 0
        episode_trace = []

        terminated = False
        truncated = False

        while not (terminated or truncated):
            # Wybór akcji - tutaj greedy, bo chcemy pokazać, co robi nauczony agent
            q_values = model.predict(obs_6d[np.newaxis, :], verbose=0)
            action = np.argmax(q_values[0])

            # Krok w środowisku
            next_obs_5d, reward, terminated, truncated = env.step(action)
            next_obs_6d = transform_state(next_obs_5d)

            episode_reward += reward

            # Zapis do trace
            episode_trace.append({
                'obs': obs_5d,
                'action': action,
                'reward': reward,
                'next_obs': next_obs_5d
            })

            # Przechodzimy do następnego stanu
            obs_5d = next_obs_5d
            obs_6d = next_obs_6d

        all_episode_rewards.append(episode_reward)
        all_traces.append(episode_trace)

    # Podsumowanie wyników
    avg_reward = np.mean(all_episode_rewards)
    print(f"Test: średni reward z {n_episodes} epizodów = {avg_reward:.2f}")

    return all_episode_rewards, all_traces

# Uruchamiamy testy
env = WarehouseOrientationEnv(n=64, max_steps=200)
test_rewards, traces = test_agent(env, trained_model, n_episodes=5)

Test: średni reward z 5 epizodów = -0.10


In [14]:
import matplotlib.pyplot as plt

def visualize_episode(episode_trace):
    """
    Wizualizuje pozycje agenta i celu w kolejnych krokach epizodu.
    Załóżmy, że `episode_trace` to lista słowników, gdzie:
        step['obs'] = [agent_x, agent_y, target_x, target_y, ...]
    """

    # Listy do przechowania kolejnych pozycji agenta i celu
    agent_positions = []
    target_positions = []
    a=print(episode_trace)
    for step in episode_trace:
        # Wyciągamy z obs: [agent_x, agent_y, target_x, target_y, ...]
      agent_x, agent_y   = step['obs'][0], step['obs'][1]
      target_x, target_y = step['obs'][2], step['obs'][3]

      agent_positions.append((agent_x, agent_y))
      target_positions.append((target_x, target_y))

    # Rozbijamy na osobne listy (x, y) — jeśli target się porusza, to zobaczymy jego ścieżkę
    agent_xs  = [pos[0] for pos in agent_positions]
    agent_ys  = [pos[1] for pos in agent_positions]
    target_xs = [pos[0] for pos in target_positions]
    target_ys = [pos[1] for pos in target_positions]

    # Rysujemy ścieżkę agenta
    plt.plot(agent_xs, agent_ys, 'bo-', label='Agent')

    # Rysujemy ścieżkę (lub pozycje) celu
    # Jeśli cel jest statyczny, zobaczysz same nakładające się punkty.
    plt.plot(target_xs, target_ys, 'rx--', label='Cel')

    plt.xlabel('X')
    plt.ylabel('Y')
    plt.title('Ścieżka agenta i pozycja celu')
    plt.legend()
    plt.grid(True)
    plt.show()

# Załóżmy, że mamy listę epizodów `traces`,
# gdzie np. traces[0] to trace z pierwszego epizodu
episode_trace = traces[5]  # bierzemy pierwszy epizod z listy

# Wywołujemy wizualizację
visualize_episode(episode_trace)

IndexError: list index out of range