In [1]:
def keras_experiment():
    from hops import hdfs
    from tensorflow.keras.preprocessing import sequence
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import LSTM, Embedding, TimeDistributed, Dense, RepeatVector,\
                             Activation, Flatten, Reshape, concatenate, Dropout, BatchNormalization, Conv2D, MaxPooling2D
    from tensorflow.keras.optimizers import Adam, RMSprop
    from tensorflow.keras.applications.inception_v3 import InceptionV3
    from tensorflow.keras.preprocessing import image
    from tensorflow.keras.models import Model
    from tensorflow.keras import Input, layers, optimizers
    from tensorflow.keras.initializers import Constant
    from tensorflow.keras.applications.inception_v3 import preprocess_input
    from tensorflow.keras.preprocessing.text import Tokenizer
    from tensorflow.keras.preprocessing.sequence import pad_sequences
    from tensorflow.keras.regularizers import l1, l2
    from tensorflow.keras.utils import to_categorical
    from tensorflow.keras.losses import MSE
    import tensorflow as tf
    import numpy as np
    import random
    
    
    class Environment:

        def reset(self):
            return

        def step(self, action_type, parameters):
            return
    
    class Env0(Environment):
        def __init__(self):
            self.grid_size = 6
            self.grid = np.zeros((self.grid_size, self.grid_size), dtype=int)
            self.triple_merge_mapping = {1: 2, 2: 3, 3: 4, 4: 5, 5: 6, 6: 7, 7: 8}
            #self.points = {1: 0.1, 2: 0.2, 3: 0.3, 4: 0.4, 5: 0.5, 6: 0.6, 7: 0.7,8: 0.8}  
            #self.points = {1: 5, 2: 20, 3: 100, 4: 1000, 5: 2000, 6: 3000, 7: 4000, 8: 5000} # points are realistic only up to 4 for the moment
            self.points = {1: 1, 2: 2, 3: 4, 4: 8, 5: 16, 6: 32, 7: 64, 8: 128}
            self.num_tiles = len(self.points) + 1
            self.cum_points = 0
            self.current_tile_type = 0

        def is_grid_full(self):
            if np.count_nonzero(self.grid) == self.grid_size**2:
                return True
            else:
                return False

        def reset(self):
            #self.grid = np.random.choice(6, self.grid_size**2, p=[0.5, 0.2, 0.1, 0.1, 0.05, 0.05]).reshape((self.grid_size, self.grid_size))
            self.grid = np.zeros((self.grid_size, self.grid_size), dtype=int)
            self.cum_points = 0
            self.generate_tile()
            return self.grid, self.current_tile_type

        def get_mapping(self, num_tiles, tile_type):
            return self.triple_merge_mapping[tile_type]

        def generate_tile(self):
            """
            0 -> nothing, 1 -> grass, 2 -> bush, 3 -> tree, 4 -> hut, 5 -> house, 6 -> mansion, 7 -> castle, 8 -> floating mansion, 9 -> triple castle
            :return:
            """
            self.current_tile_type = np.random.choice([1, 2, 3, 4], 100, p=[0.6, 0.25, 0.10, 0.05])[0]

        def dfs(self, tile, x, y, visited, cleaning=False):
            if x == 0 and y == 0:
                return 0
            if tile == 0:
                return 0
            if x < 0 or x >= self.grid_size or y < 0 or y >= self.grid_size or visited[x][y] == 1:
                return 0
            visited[x][y] = 1
            if self.grid[x][y] != tile:
                return 0
            if cleaning:
                self.grid[x][y] = 0
            return 1 + self.dfs(tile, x - 1, y, visited, cleaning) + self.dfs(tile, x + 1, y, visited, cleaning) + self.dfs(tile, x, y - 1,
                                                                                                        visited, cleaning) + self.dfs(
                tile, x, y + 1, visited, cleaning)

        def check_merge(self, x, y, tile_type):
            res = self.dfs(tile_type, x, y, np.zeros(self.grid.shape))
            if res >= 3 and tile_type != 8:
                return True
            else:
                return False

        def place_in_grid(self, tile, x, y):
            if self.grid[x][y] != 0:
                return None
            tile_type = tile
            tot_points = 0
            self.grid[x][y] = tile_type
            while self.check_merge(x, y, tile_type):
                num_tiles = self.dfs(tile_type, x, y, np.zeros(self.grid.shape), True)
                tile_type = self.get_mapping(num_tiles, tile_type)
                tot_points += self.points[tile_type]
                self.grid[x][y] = tile_type

            if tot_points == 0:
                return self.points[self.current_tile_type]
            else:
                return tot_points

        def place_in_storehouse(self):
            if self.grid[0][0] != 0:
                return None
            self.grid[0][0] = self.current_tile_type
            return 0.0001

        def place_from_storehouse(self, x, y):
            if self.grid[0][0] == 0:
                return None
            else:
                tile = self.grid[0][0]
                res = self.place_in_grid(tile, x, y)
                if res is None:
                    return None
                else:
                    self.grid[0][0] = 0
                    return res

        def step(self, action_type, parameters):
            """
            :param action_type:
            :param parameters: parameters[0] = x and parameters[1] = y
            :return:
            """
            if self.is_grid_full():
                return -1, [self.grid, 0]

            res = None
            if action_type == "place_current":
                if parameters[0] == 0 and parameters[1] == 0:
                    res = self.place_in_storehouse()
                else:
                    res = self.place_in_grid(self.current_tile_type, parameters[0], parameters[1])
            elif action_type == "place_from_storehouse":
                res = self.place_from_storehouse(parameters[0], parameters[1])

            if res is None:
                return 0, [self.grid, self.current_tile_type]
            else:
                self.generate_tile()
                return res, [self.grid, self.current_tile_type]
            
    class Learner:

        def __init__(self, env):
            self.env = env
            self.grid, self.current_tile = self.env.reset()
            self.cum_reward = 0

        def print_grid(self):
            print("Current tile: " + str(self.current_tile))
            for i in range(self.grid.shape[0]):
                for j in range(self.grid.shape[1]):
                    print(self.grid[i][j], end = ' ')
                print("\n")

        def select_action(self):
            return
        
    class RandomLearner(Learner):

        def __init__(self, env):
            super().__init__(env)

        def select_action(self):
            x = np.random.randint(0, 6)
            y = np.random.randint(0, 6)
            while self.grid[x][y] != 0:
                x = np.random.randint(0, 6)
                y = np.random.randint(0, 6)
            if np.random.uniform() > 0.1:
                return x*6+y
            else:
                return x*6+y+36
            
            
    class GreedyLearner(RandomLearner):

        def __init__(self, env):
            super().__init__(env)


        def select_action(self):
            shuffle_x = np.arange(self.grid.shape[0])
            shuffle_y = np.arange(self.grid.shape[1])
            np.random.shuffle(shuffle_x)
            np.random.shuffle(shuffle_y)
            for x in shuffle_x:
                for y in shuffle_y:
                    if self.grid[x][y] == 0 and self.env.check_merge(self.current_tile, x, y):
                        return x*6+y

            if self.grid[0][0] != 0:
                for x in shuffle_x:
                    for y in shuffle_y:
                        if self.grid[x][y] == 0 and self.env.check_merge(self.grid[0][0], x, y):
                            return x*6+y+36
            for x in shuffle_x:
                for y in shuffle_y:
                    if (x>0 or y>0) and self.grid[x][y] == self.current_tile:
                        if x-1 >= 0 and self.grid[x-1][y] == 0 and (x-1>0 or y>0):
                            return (x-1)*6+y
                        if x+1 < self.grid.shape[0] and self.grid[x+1][y] == 0:
                            return (x+1)*6+y
                        if y-1 >= 0 and self.grid[x][y-1] == 0 and (x>0 or y-1>0):
                            return x*6+y-1
                        if y+1 < self.grid.shape[1] and self.grid[x][y+1] == 0:
                            return x*6+y+1

            return super().select_action()
            
    
        
    class ReplayBuffer:

        def __init__(self, buffer_size):
            self.buffer_size = buffer_size
            self.buffer = []

        def add(self, s, a, r, s2):
            # s represents current state, a is action,
            # r is reward, s2 is next state
            experience = np.array([s, a, r, s2])
            self.buffer.append(experience)
            if self.size() > self.buffer_size:
                self.buffer.pop(0)

        def size(self):
            return len(self.buffer)

        def sample(self, batch_size):

            batch = []

            if self.size() < batch_size:
                batch = random.sample(self.buffer, self.size())
            else:
                batch = random.sample(self.buffer, batch_size)

            s_batch, a_batch, r_batch, s2_batch = np.array(batch).T

            return s_batch, a_batch, r_batch, s2_batch

        def clear(self):
            self.buffer.clear()
        
    TAU = 1
    DECAY_RATE = 0.9
    NUM_ACTIONS = 72 #if action a < 36, place the new tile in pos (a/6, a%6), else place tile in storehouse in position ((a-36)/6, (a-36)%6)

    class DQLNetwork():
        def __init__(self, num_tiles):
            self.num_tiles = num_tiles
            self.construct_q_network()

        def define_model(self):
            input_layer1 = Input(shape=(6, 6, 9))
            input_layer2 = Input(shape=(9))
            dense0 = Dense(288, activation="relu")(input_layer2)
            dense1 = Dense(8, activation="relu")(input_layer1)
            flattened1 = Flatten()(dense1)
            conv1 = Conv2D(filters=16, kernel_size=(2, 2), padding="same", activation="relu", kernel_regularizer=l1(0.0001))(input_layer1)
            conv2 = Conv2D(filters=16, kernel_size=(3, 3), padding="same", activation="relu", kernel_regularizer=l1(0.0001))(input_layer1)
            pool1 = MaxPooling2D(pool_size=2, strides=1, padding='same')(conv1)
            pool2 = MaxPooling2D(pool_size=2, strides=1, padding='same')(conv2)
            flattened2 = Flatten()(pool1)
            flattened3 = Flatten()(pool2)
            conc = concatenate([flattened1, flattened2, flattened3, dense0])
            dense2 = Dense(1024, activation="relu", kernel_regularizer=l2(0.0001))(conc)
            dense3 = Dense(256, activation="sigmoid", kernel_regularizer=l2(0.0001))(dense2)
            output = Dense(NUM_ACTIONS)(dense3)
            reshape = Reshape((72,1))(output)
            model = Model([input_layer1, input_layer2], reshape)
            optimizer = Adam(0.0002)
            model.compile(loss='mse', optimizer=optimizer, sample_weight_mode='temporal')
            return model

        def construct_q_network(self):

            self.model = self.define_model()
            self.target_model = self.define_model() #copy of the main model, every C iterations it gets the same weigth of the main model
            self.target_model.set_weights(self.model.get_weights())

            print("Successfully constructed networks.")


        def predict_movement(self, data, randomChoice=False, greedy=False, action=-1):
            """
            eps-greedy strategy, with probability eps it selects a random action.
            With probability 1-eps it selects the action that maximize the estimated Q value for the state
            """
            q_actions = self.model.predict(data)
            opt_policy = np.argmax(q_actions[0,:,0])
            """q_actions = self.model.predict(data)
            prob = np.exp(q_actions[0, :,0]) / np.sum(np.exp(q_actions[0, :,0]))
            prob = np.nan_to_num(prob)
            if np.sum(prob) < 1:
                prob[0] += 1-np.sum(prob)
            i = 0
            while np.sum(prob) > 1:
                prob[i] = max(0, prob[i]-(np.sum(prob)-1))
                i += 1
            opt_policy = np.random.choice(NUM_ACTIONS, 1, p=prob)[0]"""
            if randomChoice:
                #print("Random action")
                if np.random.random() < 0.1:
                    opt_policy = np.random.randint(36, NUM_ACTIONS)
                else:
                    opt_policy = np.random.randint(0, 36)
            if greedy:
                #print("Greedy action")
                opt_policy = action
            return opt_policy, q_actions[0, opt_policy, 0]
        

        def train(self, s_batch, a_batch, r_batch, s2_batch, epochs=1, batch_size=50):
            """
            Training on a given batch.
            See https://www.analyticsvidhya.com/blog/2019/04/introduction-deep-q-learning-python/
            """
            batch_size = s_batch.shape[0]
            targets = np.zeros((batch_size, NUM_ACTIONS,1))
            s1 = []
            s2 = []
            sample_weights = np.ones((batch_size, NUM_ACTIONS))/(2*NUM_ACTIONS)
            for i in range(batch_size):
                s1.append(s_batch[i][0][0])
                s2.append(s_batch[i][1][0])
                targets[i] = self.target_model.predict([[s_batch[i][0][0]], [s_batch[i][1][0]]])
                fut_action = self.model.predict([[s2_batch[i][0][0]], [s2_batch[i][1][0]]])
                fut_action_index = np.argmax(fut_action)
                target_fut_action = self.target_model.predict([[s2_batch[i][0][0]], [s2_batch[i][1][0]]])
                targets[i, a_batch[i],0] = r_batch[i]
                sample_weights[i, a_batch[i]] += 0.5
                if i==0:
                    print("Initial: "+str(self.model.predict([[s_batch[0][0][0]], [s_batch[0][1][0]]])[0,a_batch[0],0])+" "+str(r_batch[0])+" "+str(target_fut_action[0,fut_action_index,0]))
                if r_batch[i] > 0:
                    targets[i, a_batch[i],0] += DECAY_RATE * target_fut_action[0,fut_action_index,0]
                if r_batch[i] < 0:
                    targets[i] = targets[i]*0
            history = self.model.fit([s1,s2], targets, epochs=epochs, batch_size=batch_size, sample_weight=sample_weights, verbose=0)
            print("Finale: "+str(self.model.predict([[s_batch[0][0][0]], [s_batch[0][1][0]]])[0,a_batch[0],0]))
            return history.history["loss"][-1]


        def save_network(self):
            self.model.save(self.checkpoint_path)

        def load_network(self):
            self.model.load_weigths(self.checkpoint_path)

        def target_train(self):
            """
            Copy the weights of model in target_model.
            """
            model_weights = self.model.get_weights()
            target_model_weights = self.target_model.get_weights()
            """for i in range(len(model_weights)):
                target_model_weights[i] = TAU * model_weights[i] + (1 - TAU) * target_model_weights[i]"""
            self.target_model.set_weights(model_weights)
        
    BUFFER_SIZE = 400000 #experience replay buffer size
    MINIBATCH_SIZE = 50
    EPSILON_DECAY = 5000000
    FINAL_EPSILON = 0.1
    INITIAL_EPSILON = 0.40
    C = 1024 #every C iterations, the weigths of model are copied into target_model

    class DQLearner(GreedyLearner):

        def __init__(self, env):
            super().__init__(env)
            self.replay_buffer = ReplayBuffer(BUFFER_SIZE)
            self.network = DQLNetwork(env.num_tiles)
            self.num_tiles = env.num_tiles
            self.epsilon = INITIAL_EPSILON
            self.last_reward = -1


        def select_action(self):
            """
            It selects an action in order to make a step in the environment. 
            The action is selected according to the predicted Q values for the current state by the model.
            It returns the experience to add to the replay buffer.
            """
            grid_state = np.array([to_categorical(self.grid, num_classes=self.num_tiles)])
            storehouse_state = np.array([to_categorical(self.current_tile, num_classes=self.num_tiles).reshape(self.num_tiles)])
            curr_state = [grid_state, storehouse_state]
            if self.env.is_grid_full():
                predict_movement, predict_q_value = self.network.predict_movement(curr_state, randomChoice=True)
            else:
                rand_val = np.random.random()
                if rand_val < self.epsilon:
                    if np.random.random() < 0.5:
                        predict_movement, predict_q_value = self.network.predict_movement(curr_state, randomChoice=True)
                    else:
                        predict_movement, predict_q_value = self.network.predict_movement(curr_state, greedy=True, action=super().select_action())
                else:
                    if self.last_reward == 0:
                        predict_movement, predict_q_value = self.network.predict_movement(curr_state, randomChoice=True)
                    else:
                        predict_movement, predict_q_value = self.network.predict_movement(curr_state)
            print("Q_value :"+str(predict_q_value))
            action_type = "place_current"
            position_on_the_grid = predict_movement
            if position_on_the_grid >= 36:
                action_type = "place_from_storehouse"
                position_on_the_grid -= 36

            reward, [self.grid, self.current_tile] = self.env.step(action_type, [int(position_on_the_grid/6), position_on_the_grid%6])
            #if reward >= 0.1:
            #    reward += (36-np.count_nonzero(self.grid))/36
            new_grid_state = np.array([to_categorical(self.grid, num_classes=self.num_tiles)])
            new_storehouse_state = np.array([to_categorical(self.current_tile, num_classes=self.num_tiles).reshape(self.num_tiles)])
            new_curr_state = [new_grid_state, new_storehouse_state]
            return curr_state, predict_movement, reward, new_curr_state
        
        def select_action_2(self):
            """
            It selects an action in order to make a step in the environment. 
            The action is selected according to the predicted Q values for the current state by the model.
            It returns the experience to add to the replay buffer.
            """
            grid_state = np.array([to_categorical(self.grid, num_classes=self.num_tiles)])
            storehouse_state = np.array(
                [to_categorical(self.current_tile, num_classes=self.num_tiles).reshape(self.num_tiles)])
            curr_state = [grid_state, storehouse_state]
            q_actions = self.network.model.predict(curr_state)[0].reshape((72))
            opt_policy_array = np.argsort(q_actions)[::-1]
            i = 0
            reward = 0
            while reward == 0:
                predict_movement = opt_policy_array[i]
                predict_q_value = q_actions[predict_movement]
                action_type = "place_current"
                position_on_the_grid = predict_movement
                if position_on_the_grid >= 36:
                    action_type = "place_from_storehouse"
                    position_on_the_grid -= 36

                reward, [self.grid, self.current_tile] = self.env.step(action_type, [int(position_on_the_grid / 6),
                                                                                 position_on_the_grid % 6])
                i += 1
            return reward


        def simulate(self):
            tot_reward = 0
            num_experiments = 200
            for experiment in range(num_experiments):
                self.grid, self.current_tile = self.env.reset()
                while self.env.is_grid_full() is not True:
                    tot_reward += self.select_action_2()
            self.grid, self.current_tile = self.env.reset()
            print("Simulated reward: "+str(tot_reward / num_experiments))
        
        


        def train(self, epochs):
            iterations = 0

            for epoch in range(epochs):
                if epoch % 10 == 0:
                    self.simulate()
                experiences = []
                alive = True
                total_reward = 0
                pos_reward = 0
                while alive:

                    iterations += 1

                    # Slowly decay the learning rate
                    if self.epsilon > FINAL_EPSILON:
                        self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON) / EPSILON_DECAY

                    initial_state, predicted_movement, reward, new_state = self.select_action()
                    
                    self.last_reward = reward

                    total_reward += reward
                    if reward > 0:
                        pos_reward += reward
                    print("Taken action: " + str(predicted_movement) + " tot_reward: " + str(total_reward))
                    

                    if self.env.is_grid_full() and reward == -1:
                        print("Earned a total of reward equal to ", total_reward)
                        print(self.grid)
                        self.grid, self.current_tile = self.env.reset()
                        alive = False
                        total_reward = 0
                        pos_reward = 0
                        self.last_reward = -1
                        
                    experiences.append([initial_state, predicted_movement, reward, new_state])   
                    self.replay_buffer.add(initial_state, predicted_movement, reward, new_state)
                    if iterations % 4 == 0:
                        s_batch, a_batch, r_batch, s2_batch = self.replay_buffer.sample(MINIBATCH_SIZE)
                        self.network.train(s_batch, a_batch, r_batch, s2_batch, epochs=1)

                    if iterations % C == 0:
                        self.network.target_train()
                        print("epoch "+str(epoch)+" eps "+str(self.epsilon)+" reward "+str(total_reward))
                experiences = np.array(experiences)[::-1]
                s_batch, a_batch, r_batch, s2_batch = experiences.T
                loss = self.network.train(s_batch, a_batch, r_batch, s2_batch, epochs=1, batch_size=len(experiences))



    
    env = Env0()
    dql_learner = DQLearner(env)
    dql_learner.train(50000)
    

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1906,application_1573690044319_0875,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [None]:
from hops import experiment
experiment.launch(keras_experiment, name='Keras classifier', local_logdir=True)