In [1]:
import gymnasium
import CoppeliaSim_Gym # carpeta del entorno
import tensorflow as tf
import math
import numpy as np
import matplotlib.pyplot as plt

In [2]:
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Activation, Flatten, Input, Conv2D, MaxPooling2D, Permute, Dropout
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import plot_model # Graficar modelo CNN
from keras.initializers import random_normal

from rl.agents.dqn import DQNAgent
from rl.core import Processor
from rl.policy import LinearAnnealedPolicy, EpsGreedyQPolicy
from rl.memory import SequentialMemory
from rl.callbacks import FileLogger, ModelIntervalCheckpoint

In [3]:
# VARIABLES
INPUT_SHAPE = (256, 256)
WINDOW_LENGTH = 4

In [4]:
# PREPARACION DEL ENTORNO 
ENV_NAME = "CoppeliaSim_Gym/GymCoppManR-v0"
env = gymnasium.make(ENV_NAME) 

Conectado al servidor API remoto


In [27]:
observation, info = env.reset(is_dynamic=True) # Empezar episodio realizando una observacion
#ent = (observation['c_image']/255).shape
ent = observation.shape
#print(ent)
actions = env.action_space.shape[0] # mostrar espacio accion de 7 valores
#width, higth, channel = observation['c_image'].shape[0], observation['c_image'].shape[1], observation['c_image'].shape[2]
#shape_img = (width, higth, channel)
#print(shape_img)
#print(info['info'])
#print(f'\info: \n{info}')
print(actions)
print(ent)

7
(256, 256, 1)


In [13]:
#o = env.step(1)
#print ('imagen:', observation)
#print(observation)
#print('Angulos de las articulaciones:',env.joint_angles())
#te = [0.0, 1.1, 0.0, -1.0, 0.0, 2., 0.0] #posicion inicial del el extremo de path
#print('CambioPosicion', env.set_posicion(te))
#print('Distancia desde el punto al objetivo:', env.distance_to_goal())
#env.set_posicion(te)
#print('Verificar colision', env.A_colision())

#print("The new observation is {}".format(observation))
#observation_next, reward, done, info = env.step(env.action_space.sample())
#print(f'observation: \n{observation} \nreward: \n{reward} \ndone: \n{done} \ninfo: \n{info}')
#print(f'observation space: \n{env.observation_space} \naction space: \n{env.action_space}')

#img=env.render(mode="rgb_array")
#plt.imshow(img)
#plt.show()

In [18]:
# PROCESAR DATOS DE OBSERVACION, ACCION Y RECOMPENSA
class CoppeliaProcessor(Processor):
    def process_observation(observation):
        assert observation.ndim == 3  # (altura, ancho, canal)
        img = Image.fromarray(observation)
        img = img.resize(INPUT_SHAPE).convert('L')  # resize and convert to grayscale
        processed_observation = np.array(img)
        assert processed_observation.shape == INPUT_SHAPE
        return processed_observation.astype('uint8')  # saves storage in experience memory

    def process_state_batch(batch):
        # We could perform this processing step in `process_observation`. In this case, however,
        # we would need to store a `float32` array instead, which is 4x more memory intensive than
        # an `uint8` array. This matters if we store 1M observations.
        processed_batch = batch.astype('float32')/255.
        return processed_batch

    def process_reward(self, reward):
        return np.clip(reward, -1., 1.)

In [31]:
# Modelo CNN.
input_shape = (WINDOW_LENGTH,) + INPUT_SHAPE
model = Sequential()
model.add(Permute((2, 3, 1), input_shape=input_shape))
model.add(Conv2D(32, (8, 8), strides=(4, 4)))
model.add(Activation('relu'))
model.add(Conv2D(64, (4, 4), strides=(2, 2)))
model.add(Activation('relu'))
model.add(Conv2D(64, (3, 3), strides=(1, 1)))
model.add(Activation('relu'))
model.add(Flatten())
model.add(Dense(512))
model.add(Activation('relu'))
model.add(Dense(actions))
model.add(Activation('linear'))
#print(model.summary())

In [30]:
activation = 'relu'
pic_input = Input(shape=(ent)) # shape=(255,255,1)

img_stack = Conv2D(16, (3, 3), name='capa1', padding='same', activation=activation)(pic_input)
img_stack = MaxPooling2D(pool_size=(2,2))(img_stack)
img_stack = Conv2D(32, (3, 3), activation=activation, padding='same', name='capa2')(img_stack)
img_stack = MaxPooling2D(pool_size=(2, 2))(img_stack)
img_stack = Conv2D(32, (3, 3), activation=activation, padding='same', name='capa3')(img_stack)
img_stack = MaxPooling2D(pool_size=(2, 2))(img_stack)
img_stack = Flatten()(img_stack)
img_stack = Dropout(0.2)(img_stack)

img_stack = Dense(128, name='rl_dense', kernel_initializer=random_normal(stddev=0.01))(img_stack)
img_stack=Dropout(0.2)(img_stack)
output = Dense(actions, name='rl_output', kernel_initializer=random_normal(stddev=0.01))(img_stack)

opt = Adam()
action_model = Model(inputs=[pic_input], outputs=output)

action_model.compile(optimizer=opt, loss='mean_squared_error')
#print(action_model.summary())
#plot_model(model, to_file='img_file.png', show_shapes=True) # Graficar modelo, conda install graphviz

In [14]:
def build_callbacks(env): # registro
    checkpoint_weights_filename = 'dqn_' + env + '_weights_{step}.h5f'
    log_filename = 'dqn_{}_log.json'.format(env)
    callbacks = [ModelIntervalCheckpoint(checkpoint_weights_filename, interval=5000)]
    callbacks += [FileLogger(log_filename, interval=100)]
    return callbacks

In [15]:
# Parametros para el agente DQN
policy = LinearAnnealedPolicy(EpsGreedyQPolicy(), attr='eps', value_max=1., value_min=1., value_rest=.05, nb_steps=10000)
processor = CoppeliaProcessor()
memory = SequentialMemory(limit=50000, window_length=WINDOW_LENGTH)   

In [16]:
dqn = DQNAgent(model=model, nb_actions=actions, policy=policy, memory=memory,
               processor=processor, nb_steps_warmup=50000, gamma=.99, target_model_update=10000,
               train_interval=4, delta_clip=1.)
dqn.compile(Adam(learning_rate=.0001), metrics=['mae'])
callbacks = build_callbacks('CoppeliaSim')

In [29]:
dqn.fit(env, nb_steps=50, visualize=False, verbose=2, callbacks=callbacks) # , callbacks=callbacks
scores = dqn.test(env, nb_episodes=100, visualize=True, log_interval=10000)
print(np.mean(scores.history['recompensa_episodio']))

Training for 50 steps ...


AttributeError: 'tuple' object has no attribute 'ndim'

In [11]:
pic = Input(shape=(255,255,1))
model = tf.keras.models.Sequential()
# Añadimos la primera capa
model.add(Conv2D(128,(4,4), activation = 'relu', input_shape = (ent)))
model.add(MaxPooling2D(pool_size = (2,2), strides=(2, 2)))
# Añadimos la segunda capa
model.add(Conv2D(64,(2,2), activation = 'relu'))
model.add(MaxPooling2D(pool_size = (2,2)))
# Hacemos un flatten para poder usar una red fully connected
model.add(Flatten())
model.add(Dense(32, activation='relu'))
model.add(Flatten())
# Añadimos una capa softmax para que podamos clasificar las imágenes
model.add(Dense(actions, activation='softmax'))

#print(model.summary())
