In [2]:
import gym
import pandas as pd
import numpy as np
from keras.models import Sequential
from keras.layers import Dense
from collections import namedtuple
from time import sleep

RIGHT_CMD = [0, 1]
LEFT_CMD = [1, 0]

# Define Reward Config
BEST_GAMES_TO_EVOLVE = 10

# Define Game Commands
GAME_ACTIONS_MAPPING_TO_ARRAY = [
    [1, 0, 0],  # Movement 0
    [0, 1, 0],  # Movement 1
    [0, 0, 1]   # Movement 2
]

# Initialize Game Environment
env = gym.make('MountainCar-v0')

# Define Structures
GameData = namedtuple('GameData', 'reward data')


def compute_reward(position):
    """
    Compute Reward for Current Position.
    :param position:
    :return:
    """
    # Update Best Position
    if position >= -0.1000000:
        return 6
    if position >= -0.1100000:
        return 5
    if position >= -0.1300000:
        return 4
    if position >= -0.1500000:
        return 3
    if position >= -0.1700000:
        return 2
    if position >= -0.2000000:
        return 1

    return -1


def play_random_games(games=100):
    """
    Play Random Games to Get Some Observations
    :param games:
    :return:
    """

    # Storage for All Games Movements
    all_movements = []

    for episode in range(games):

        # Reset Game Reward
        episode_reward = 0

        # Define Storage for Current Game Data
        current_game_data = []

        # Reset Game Environment
        env.reset()

        # Get First Random Movement
        action = env.action_space.sample()

        while True:

            # Play
            observation, reward, done, info = env.step(action)  # observation=position, velocity

            # Update Reward Value
            reward = compute_reward(observation[[0]])

            # Get Random Action (On Real, its get a "Next" movement to compensate Previous Movement)
            action = env.action_space.sample()

            # Store Observation Data and Action Taken
            current_game_data.append(
                np.hstack((observation, GAME_ACTIONS_MAPPING_TO_ARRAY[action]))
            )

            if done:
                break

            episode_reward += reward

        # Compute Reward
        if episode_reward > -199.0:
            print(f'Reward={episode_reward}')

            # Save All Data
            all_movements.append(
                GameData(episode_reward, current_game_data)
            )

    # Sort Movements Array
    all_movements.sort(key=lambda item: item.reward, reverse=True)

    # Filter the best N games
    all_movements = all_movements[BEST_GAMES_TO_EVOLVE] if len(all_movements) > BEST_GAMES_TO_EVOLVE else all_movements

    # Retrieve only the Game Movements
    movements_only = []
    for single_game_movements in all_movements:
        movements_only.extend([item for item in single_game_movements.data])

    # Create DataFrame
    dataframe = pd.DataFrame(
        movements_only,
        columns=['position', 'velocity', 'action_0', 'action_1', 'action_2']
    )

    return dataframe


def generate_ml(dataframe):
    """
    Train and Generate NN Model
    :param dataframe:
    :return:
    """

    # Define Neural Network Topology
    model = Sequential()
    model.add(Dense(64, input_dim=2, activation='relu'))
    # model.add(Dense(128,  activation='relu'))
    # model.add(Dense(128,  activation='relu'))
    model.add(Dense(64,  activation='relu'))
    model.add(Dense(32,  activation='relu'))
    model.add(Dense(3,  activation='sigmoid'))

    # Compile Neural Network
    model.compile(optimizer='adam', loss='categorical_crossentropy')

    # Fit Model with Data
    model.fit(
        dataframe[['position', 'velocity']],
        dataframe[['action_0', 'action_1', 'action_2']],
        epochs=80
    )

    return model


def play_game(ml_model, games=100):
    """
    Play te Game
    :param ml_model:
    :param games:
    :return:
    """

    for i_episode in range(games):

        # Define Reward Var
        episode_reward = 0

        # Reset Env for the Game
        observation = env.reset()

        while True:
            render = env.render()
            sleep(0.01)

            # Predict Next Movement
            current_action_pred = ml_model.predict(observation.reshape(1, 2))[0]

            # Define Movement
            current_action = np.argmax(current_action_pred)

            # Make Movement
            observation, reward, done, info = env.step(current_action)

            # Update Reward Value
            episode_reward += compute_reward(observation[[0]])

            if done:
                print(f"Episode finished after {i_episode+1} steps", end='')
                break

        print(f" Score = {episode_reward}")


print("[+] Playing Random Games")
df = play_random_games(games=1000)

print("[+] Training NN Model")
ml_model = generate_ml(df)

print("[+] Playing Games with NN")
play_game(ml_model=ml_model, games=30)

[+] Playing Random Games
Reward=-153
Reward=-148
Reward=-193
Reward=-197
Reward=-191
Reward=-165
Reward=-183
Reward=-179
[+] Training NN Model
Epoch 1/80
Epoch 2/80
Epoch 3/80
Epoch 4/80
Epoch 5/80
Epoch 6/80
Epoch 7/80
Epoch 8/80
Epoch 9/80
Epoch 10/80
Epoch 11/80
Epoch 12/80
Epoch 13/80
Epoch 14/80
Epoch 15/80
Epoch 16/80
Epoch 17/80
Epoch 18/80
Epoch 19/80
Epoch 20/80
Epoch 21/80
Epoch 22/80
Epoch 23/80
Epoch 24/80
Epoch 25/80
Epoch 26/80
Epoch 27/80
Epoch 28/80
Epoch 29/80
Epoch 30/80
Epoch 31/80
Epoch 32/80
Epoch 33/80
Epoch 34/80
Epoch 35/80
Epoch 36/80
Epoch 37/80
Epoch 38/80
Epoch 39/80
Epoch 40/80
Epoch 41/80
Epoch 42/80
Epoch 43/80
Epoch 44/80
Epoch 45/80
Epoch 46/80
Epoch 47/80
Epoch 48/80
Epoch 49/80
Epoch 50/80
Epoch 51/80
Epoch 52/80
Epoch 53/80
Epoch 54/80
Epoch 55/80
Epoch 56/80
Epoch 57/80
Epoch 58/80
Epoch 59/80
Epoch 60/80
Epoch 61/80
Epoch 62/80
Epoch 63/80
Epoch 64/80
Epoch 65/80
Epoch 66/80
Epoch 67/80
Epoch 68/80
Epoch 69/80
Epoch 70/80
Epoch 71/80
Epoch 72/80
Ep

TypeError: item 1 in _argtypes_ passes a union by value, which is unsupported.