In [9]:
from server_comms import ServerComms, ServerMessageTypes
import time
import threading
import json
from IPython.display import clear_output
import numpy as np
import utils
import random
np.set_printoptions(suppress=True)

In [10]:
server_message_types = ServerMessageTypes()

In [11]:
class ApiRunner:
    def __init__(self):
        self.is_connected = True
        self.server = ServerComms("127.0.0.1", 8052)
            
    def send(self, type, payload=None):
        self.server.send_message(type, payload)
        
    def disconnect(self):
        self.is_connected = False
        self.server.server_socket.close()

In [12]:
api_runner = ApiRunner()

In [13]:
class TankRunner:
    def __init__(self, name, api_runner):
        self.name = name
        self.api_runner = api_runner
        self.start()
        self.setup_reward_variables()
        
    def setup_reward_variables(self):
        self.prev_closest_enemy_distance = None
        self.prev_health = None
        self.prev_num_of_enemies = None
        self.prev_heading_difference = None
        self.prev_fired = False
        
    def start(self):
        self.has_ended = False
        self.state = dict(me={}, enemies={}, items={})
        self.spawn()
        self.receive_thread = threading.Thread(target=self.read)
        self.receive_thread.daemon = True
        self.receive_thread.start()
        
    def spawn(self):
        self.send(server_message_types.CREATETANK, dict(Name=self.name))
        
    def despawn(self):
        self.send(server_message_types.DESPAWNTANK)
        
    def read(self):
        while not self.has_ended:
            message = self.api_runner.server.read_message()
            self.update_state(message)
     
    def update_state(self, message):
        if message == None: return
        if message['type'] == server_message_types.OBJECTUPDATE:
            data = message['data']
            if data['Type'] == 'Tank':
                if data['Name'] == self.name:
                    self.update_me(data)
                elif data['Name'] != self.name:
                    self.update_enemy(data)
            elif data['Type'] == 'AmmoPickup':
                self.update_item(data)
    
    def update_me(self, data):
        self.state['me'] = data
    
    def update_enemy(self, data):
        self.state['enemies'][data['Name']] = data
    
    def update_item(self, data):
        self.state['items'][data['Id']] = data
    
    def send(self, type, payload=None):
        self.api_runner.send(type, payload)
    
    def stop(self):
        self.has_ended = True
        self.despawn()
        
    def do_action(self, action_type):
        if action_type == 1:
            self.send(server_message_types.TOGGLEFORWARD)
        elif action_type == 2:
            self.send(server_message_types.TOGGLEREVERSE)
        elif action_type == 3:
            self.send(server_message_types.TOGGLELEFT)
        elif action_type == 4:
            self.send(server_message_types.TOGGLERIGHT)
        elif action_type == 5:
            self.send(server_message_types.TOGGLETURRETLEFT)
        elif action_type == 6:
            self.send(server_message_types.TOGGLETURRETRIGHT)
        elif action_type == 7:
            self.prev_fired = True
            self.send(server_message_types.FIRE)
        elif action_type == 8:
            self.send(server_message_types.STOPMOVE)
        elif action_type == 9:
            self.send(server_message_types.STOPTURN)
        elif action_type == 10:
            self.send(server_message_types.STOPTURRET)
        elif action_type == 11:
            self.send(server_message_types.STOPALL)
    
    def get_closest_goal(self):
        me_coords = (self.state['me']['X'], self.state['me']['Y'])
        left_goal_coords = (0, 100)
        right_goal_coords = (0, -100)
        left_goal_distance = utils.calculate_distance(me_coords, left_goal_coords)
        right_goal_distance = utils.calculate_distance(me_coords, right_goal_coords)
        if (left_goal_distance < right_goal_distance):
            return left_goal_coords
        return right_goal_coords
    
    def get_closest_enemy(self):
        min_coords = (-99999, -99999)
        min_distance = -99999
        if len(self.state['enemies']) == 0: 
            return min_coords, self.prev_closest_enemy_distance if self.prev_closest_enemy_distance != None else abs(min_distance)
        
        me_coords = (self.state['me']['X'], self.state['me']['Y'])
        for enemy in self.state['enemies'].values():
            enemy_coords = (enemy['X'], enemy['Y'])
            distance = utils.calculate_distance(me_coords, enemy_coords)
            if min_distance == -99999 or min_distance < distance:
                min_distance = distance
                min_coords = enemy_coords
        return min_coords, round(min_distance, 3)
    
    def get_number_of_enemies(self):
        return len(self.state['enemies'].keys())
    
    def calculate_reward(self):
        reward = 0
        if len(self.state['me'].keys()) == 0: 
            return reward
        
        # reward on previous distance is less than normal
        if self.prev_closest_enemy_distance == None:
            _, self.prev_closest_enemy_distance = self.get_closest_enemy()
        else:
            _, current_closest_enemy_distance = self.get_closest_enemy()
            reward += 5 if self.prev_closest_enemy_distance > current_closest_enemy_distance else 0
            reward += -5 if self.prev_closest_enemy_distance < current_closest_enemy_distance else 0
            self.prev_closest_enemy_distance = current_closest_enemy_distance
        
        # reward on health increase and vice versa
        if self.prev_health == None:
            self.prev_health = self.state['me']['Health']
        else:
            current_health = self.state['me']['Health']
            reward += 15 if self.prev_health > current_health else 0
            reward += -10 if self.prev_health < current_health else 0
            self.prev_health = current_health
        
        # reward if enemies are smaller
        if self.prev_num_of_enemies == None:
            self.prev_num_of_enemies = len(self.state['enemies'].keys())
        else:
            current_num_of_enemies = len(self.state['enemies'].keys())
            reward += 5 if self.prev_num_of_enemies < current_num_of_enemies else 0
            self.prev_num_of_enemies = current_num_of_enemies
            
        # reward on heading difference is less than normal heading difference
        if self.prev_heading_difference == None:
            me_coords = (self.state['me']['X'], self.state['me']['Y'])
            enemy_coords, _ = self.get_closest_enemy()
            self.prev_heading_difference = utils.calculate_heading(me_coords, enemy_coords)
        else:
            me_coords = (self.state['me']['X'], self.state['me']['Y'])
            enemy_coords, _ = self.get_closest_enemy()
            current_heading_difference = utils.calculate_heading(me_coords, enemy_coords)
            reward += 5 if self.prev_heading_difference < current_heading_difference else 0
            self.prev_heading_difference = current_heading_difference
            
        # reward on fire
        if self.prev_fired:
            reward += 10
            self.prev_fired = False
        
        # reward (high) if heading is close to the turret heading
        if abs(self.state['me']['Heading'] - self.state['me']['TurretHeading']) < 10:
            reward += 30
        
        return reward
    
    def get_state_details(self):
        if len(self.state['me'].keys()) == 0: 
            return np.array([0, 0, 0, 0, 0, 0, 0, 0, 0])
        
        me_coords = [round(self.state['me']['X'],3), round(self.state['me']['Y'],3)]
        closest_enemy_coords, closest_enemy_distance = self.get_closest_enemy()
        number_of_enemies = self.get_number_of_enemies()
        nearest_goal = self.get_closest_goal()
        heading = self.state['me']['Heading']
        turret_heading = self.state['me']['TurretHeading']
        health = self.state['me']['Health']
        return np.array([
                         round(me_coords[0], 3),
                         round(me_coords[1], 3),
                         round(heading, 3),
                         round(turret_heading, 3),
                         health,
                         round(closest_enemy_coords[0], 3),
                         round(closest_enemy_coords[1], 3),
                         round(number_of_enemies, 3),
                         round(nearest_goal[0], 3),
                         round(nearest_goal[1], 3),
                        ])
    
    def step(self, action):
        self.do_action(action)
        time.sleep(0.1)
        state = self.get_state_details()
        reward = self.calculate_reward()
        return state, reward

In [None]:
for i in range(10):
    tank_runner = TankRunner("bruh{0}".format(i), api_runner)
    for i in range(10):
        action = random.randint(1,12)
        print(tank_runner.step(action))
    time.sleep(5)
    tank_runner.stop()

(array([0, 0, 0, 0, 0, 0, 0, 0, 0]), 0)
(array([-24.161,  54.129,  79.   ,  69.486,  10.   ,  50.782,   7.083,
         1.   ,   0.   , 100.   ]), 30)
(array([-24.161,  54.129,  79.   ,  69.486,  10.   ,  46.483,  28.956,
         1.   ,   0.   , 100.   ]), 35)
(array([-24.161,  54.129,  79.   ,  69.486,  10.   ,  46.483,  28.956,
         1.   ,   0.   , 100.   ]), 30)
(array([-24.161,  54.129,  79.   ,  64.667,  10.   ,  46.483,  28.956,
         1.   ,   0.   , 100.   ]), 0)
(array([-24.161,  54.129,  79.   ,  64.667,  10.   ,  46.483,  28.956,
         1.   ,   0.   , 100.   ]), 0)
(array([-24.161,  54.129,  79.   ,  64.667,  10.   ,  46.483,  28.956,
         1.   ,   0.   , 100.   ]), 0)
(array([-24.161,  54.129,  79.   ,  64.667,  10.   , -48.012, -41.304,
         4.   ,   0.   , 100.   ]), 5)
(array([-24.072,  52.963,  92.335, 125.671,  10.   , -48.012, -41.304,
         4.   ,   0.   , 100.   ]), 10)
(array([-24.072,  52.963,  92.335, 125.671,  10.   , -48.012, -41.304,
     

Exception in thread Thread-14:
Traceback (most recent call last):
  File "/Users/hemangkandwal/miniconda3/lib/python3.7/threading.py", line 917, in _bootstrap_inner
    self.run()
  File "/Users/hemangkandwal/miniconda3/lib/python3.7/threading.py", line 865, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-13-0892a0299cb4>", line 31, in read
    message = self.api_runner.server.read_message()
  File "/Users/hemangkandwal/Documents/projects/MSTanks/ai/server_comms.py", line 100, in read_message
    message_type = struct.unpack('>B', message_typeRaw)[0]
struct.error: unpack requires a buffer of 1 bytes



(array([0, 0, 0, 0, 0, 0, 0, 0, 0]), 0)
(array([0, 0, 0, 0, 0, 0, 0, 0, 0]), 0)


In [None]:
tank_runner = TankRunner("bruh{0}".format(1), api_runner)

In [None]:
tank_runner.step(2)
tank_runner.step(3)

In [7]:
tank_runner.stop()