In [None]:
import socket
import threading
from getData import get_data
from time import sleep, time
import utils

In [None]:
def data_getter_function():
    global data
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.connect(("127.0.0.1", 9000))
        while True:
            data = get_data(s)


data_getter_thread = threading.Thread(target=data_getter_function, daemon=True)
data_getter_thread.start()
sleep(0.1)

In [None]:
from PIL import Image
from screenshot import screenshot
from getLidar import getMesuresDistances

In [None]:
def getInfos():
    speed = data['speed']
    finish = data['finish']
    # screenshot
    frame = Image.fromarray(screenshot())
    # get distances
    distances = getMesuresDistances(frame)
    return distances, speed, finish

In [None]:
from gym import Env
from gym.spaces import Discrete, Box, Tuple
import numpy as np

from driver import controlKeySmooth, reloadKey, releaseAllKeys, saveReplay, saveReplay2

In [None]:
nb_lidar = 11
race_step = 400

In [None]:
class TMEnvDistances(Env):
    def __init__(self):
        # Actions we can take
        self.action_space = Discrete(6)
        # Speed [{0:1000}]
        self.observation_space = Box(low=0, high=400, shape=(nb_lidar,))
        # Set start speed
        self.state = [100]*nb_lidar
        self.speed = 0
        # Set race length
        self.race_length = race_step

    def step(self, action):
        # Make ingame action 
        controlKeySmooth(action)
        # Get data from game
        n_distances, n_speed, n_finish = getInfos()
        # Reduce race length by 1 second
        self.race_length -= 1 
        # Calculate reward
        reward = 0
        reward += n_speed# - self.speed - 10
        # Save speed
        self.speed = n_speed
        self.state = n_distances
        # Check if race is done
        if self.race_length <= 0:
            done = True
            # saveReplay()
            # sleep(1)
        elif n_finish:
            reward += (self.race_length)*100
            done = True
            sleep(3)
            # saveReplay2()
            # sleep(0.5)
        else:
            done = False

        # Set placeholder for info
        info = {}
        sleep(0.05)
        # Return step information
        return self.state, reward, done, info

    def render(self):
        # The game is the visual representation
        pass
    
    def reset(self):
        # Restart the game
        releaseAllKeys()
        reloadKey()
        sleep(1)
        # Reset speed
        self.state = [100]*nb_lidar
        self.speed = 0
        # Reset race time
        self.race_length = race_step
        return self.state

In [None]:
env = TMEnvDistances()
np.random.seed(123)
env.seed(123)
states = env.observation_space.shape
actions = env.action_space.n
print("States shape",states)
print("Actions shape",actions)

In [None]:
model = utils.build_model(states, actions)
model.summary()

In [None]:
from tensorflow.keras.optimizers import Adam
dqn = utils.build_agent(model, actions)
dqn.compile(Adam(lr=1e-4), metrics=['mae'])
cb_ep_fit = utils.EpisodeLogger()
# dqn.get_config()

In [None]:
sleep(2)
scores_fit = dqn.fit(env, nb_steps=100000, visualize=False,
                     verbose=2, callbacks=[cb_ep_fit])

In [None]:
import matplotlib.pyplot as plt

plt.plot(scores_fit.history['episode_reward'])
plt.xlabel("épisodes")
plt.ylabel("récompenses")

In [None]:
t1 = time()
dqn.save_weights('weights/w_distances_'+str(t1)+'.h5f', overwrite=True)

In [None]:
del model
del dqn
del env

In [None]:
env = TMEnvDistances()
actions = env.action_space.n
states = env.observation_space.shape
model = utils.build_model(states, actions)
dqn = utils.build_agent(model, actions)
dqn.compile(Adam(lr=1e-4), metrics=['mae'])

In [None]:
dqn.load_weights('weights/w_distances_'+str(t1)+'.h5f')

In [None]:
sleep(2)
cb_ep = utils.EpisodeLoggerTest()
scores = dqn.test(env, nb_episodes=5, visualize=False, callbacks=[cb_ep])
print(np.mean(scores.history['episode_reward']))

releaseAllKeys()

In [None]:
ep = 0
legends = []
for obs in cb_ep.rewards.values():
    ep += 1
    legends.append("ep_"+str(ep))
    plt.plot([o for o in utils.sumReward(obs)])
plt.legend(legends)
plt.xlabel("étapes")
plt.ylabel("récompenses")