Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 71 additions & 31 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,33 @@
import service_pb2 as pb2
from typing import Union
from multiprocessing import Manager, Lock
from utils.logger_utils import setup_logger
import logging
import grpc
import argparse
import datetime


logging.basicConfig(level=logging.DEBUG)
console_logging_level = logging.INFO
file_logging_level = logging.DEBUG

main_logger = None
log_dir = None


class GrpcAgent:
def __init__(self, agent_type, uniform_number) -> None:
def __init__(self, agent_type, uniform_number, logger) -> None:
self.agent_type: pb2.AgentType = agent_type
self.uniform_number: int = uniform_number
self.server_params: Union[pb2.ServerParam, None] = None
self.player_params: Union[pb2.PlayerParam, None] = None
self.player_types: dict[int, pb2.PlayerType] = {}
self.debug_mode: bool = False
self.logger: logging.Logger = logger

def GetAction(self, state: pb2.State):
self.logger.debug(f"================================= cycle={state.world_model.cycle}.{state.world_model.stoped_cycle} =================================")
# self.logger.debug(f"State: {state}")
if self.agent_type == pb2.AgentType.PlayerT:
return self.GetPlayerActions(state)
elif self.agent_type == pb2.AgentType.CoachT:
Expand Down Expand Up @@ -50,8 +60,19 @@ def GetPlayerActions(self, state: pb2.State):
actions.append(pb2.PlayerAction(helios_basic_move=pb2.HeliosBasicMove()))
else:
actions.append(pb2.PlayerAction(helios_set_play=pb2.HeliosSetPlay()))

self.logger.debug(f"Actions: {actions}")
return pb2.PlayerActions(actions=actions)

def GetBestPlannerAction(self, pairs: pb2.BestPlannerActionRequest):
self.logger.debug(f"GetBestPlannerAction cycle:{pairs.state.world_model.cycle} pairs:{len(pairs.pairs)} unum:{pairs.state.register_response.uniform_number}")
pairs_list: list[int, pb2.RpcActionState] = [(k, v) for k, v in pairs.pairs.items()]
pairs_list.sort(key=lambda x: x[0])
best_action = max(pairs_list, key=lambda x: -1000 if x[1].action.parent_index != -1 else x[1].predict_state.ball_position.x)
self.logger.debug(f"Best action: {best_action[0]} {best_action[1].action.description} to {best_action[1].action.target_unum} in ({round(best_action[1].action.target_point.x, 2)},{round(best_action[1].action.target_point.y, 2)}) e:{round(best_action[1].evaluation,2)}")
res = pb2.BestPlannerActionResponse(index=best_action[0])
return res

def GetCoachActions(self, state: pb2.State):
actions = []
actions.append(pb2.CoachAction(do_helios_substitute=pb2.DoHeliosSubstitute()))
Expand All @@ -74,6 +95,21 @@ def GetTrainerActions(self, state: pb2.State):
)
)
return pb2.TrainerActions(actions=actions)

def SetServerParams(self, server_params: pb2.ServerParam):
self.logger.debug(f"Server params received unum {server_params.register_response.uniform_number}")
# self.logger.debug(f"Server params: {server_params}")
self.server_params = server_params

def SetPlayerParams(self, player_params: pb2.PlayerParam):
self.logger.debug(f"Player params received unum {player_params.register_response.uniform_number}")
# self.logger.debug(f"Player params: {player_params}")
self.player_params = player_params

def SetPlayerType(self, player_type: pb2.PlayerType):
self.logger.debug(f"Player type received unum {player_type.register_response.uniform_number}")
# self.logger.debug(f"Player type: {player_type}")
self.player_types[player_type.id] = player_type

class GameHandler(pb2_grpc.GameServicer):
def __init__(self, shared_lock, shared_number_of_connections) -> None:
Expand All @@ -82,97 +118,101 @@ def __init__(self, shared_lock, shared_number_of_connections) -> None:
self.shared_number_of_connections = shared_number_of_connections

def GetPlayerActions(self, state: pb2.State, context):
logging.debug(f"GetPlayerActions unum {state.register_response.uniform_number} at {state.world_model.cycle}")
main_logger.debug(f"GetPlayerActions unum {state.register_response.uniform_number} at {state.world_model.cycle}")
res = self.agents[state.register_response.client_id].GetAction(state)
logging.debug(f"GetPlayerActions Done unum {res}")
return res

def GetCoachActions(self, state: pb2.State, context):
logging.debug(f"GetCoachActions coach at {state.world_model.cycle}")
main_logger.debug(f"GetCoachActions coach at {state.world_model.cycle}")
res = self.agents[state.register_response.client_id].GetAction(state)
return res

def GetTrainerActions(self, state: pb2.State, context):
logging.debug(f"GetTrainerActions trainer at {state.world_model.cycle}")
main_logger.debug(f"GetTrainerActions trainer at {state.world_model.cycle}")
res = self.agents[state.register_response.client_id].GetAction(state)
return res

def SendServerParams(self, serverParams: pb2.ServerParam, context):
logging.debug(f"Server params received unum {serverParams.register_response.uniform_number}")
self.agents[serverParams.register_response.client_id].server_params = serverParams
main_logger.debug(f"Server params received unum {serverParams.register_response.uniform_number}")
self.agents[serverParams.register_response.client_id].SetServerParams(serverParams)
res = pb2.Empty()
return res

def SendPlayerParams(self, playerParams: pb2.PlayerParam, context):
logging.debug(f"Player params received unum {playerParams.register_response.uniform_number}")
self.agents[playerParams.register_response.client_id].player_params = playerParams
main_logger.debug(f"Player params received unum {playerParams.register_response.uniform_number}")
self.agents[playerParams.register_response.client_id].SetPlayerParams(playerParams)
res = pb2.Empty()
return res

def SendPlayerType(self, playerType: pb2.PlayerType, context):
logging.debug(f"Player type received unum {playerType.register_response.uniform_number}")
self.agents[playerType.register_response.client_id].player_types[playerType.id] = playerType
main_logger.debug(f"Player type received unum {playerType.register_response.uniform_number}")
self.agents[playerType.register_response.client_id].SetPlayerType(playerType)
res = pb2.Empty()
return res

def SendInitMessage(self, initMessage: pb2.InitMessage, context):
logging.debug(f"Init message received unum {initMessage.register_response.uniform_number}")
main_logger.debug(f"Init message received unum {initMessage.register_response.uniform_number}")
self.agents[initMessage.register_response.client_id].debug_mode = initMessage.debug_mode
res = pb2.Empty()
return res

def Register(self, register_request: pb2.RegisterRequest, context):
logging.debug(f"received register request from team_name: {register_request.team_name} "
f"unum: {register_request.uniform_number} "
f"agent_type: {register_request.agent_type}")
with self.shared_lock:
main_logger.info(f"received register request from team_name: {register_request.team_name} "
f"unum: {register_request.uniform_number} "
f"agent_type: {register_request.agent_type}")
self.shared_number_of_connections.value += 1
logging.debug(f"Number of connections {self.shared_number_of_connections.value}")
main_logger.info(f"Number of connections {self.shared_number_of_connections.value}")
team_name = register_request.team_name
uniform_number = register_request.uniform_number
agent_type = register_request.agent_type
self.agents[self.shared_number_of_connections.value] = GrpcAgent(agent_type, uniform_number)
res = pb2.RegisterResponse(client_id=self.shared_number_of_connections.value,
register_response = pb2.RegisterResponse(client_id=self.shared_number_of_connections.value,
team_name=team_name,
uniform_number=uniform_number,
agent_type=agent_type)
return res
logger = setup_logger(f"agent{register_response.uniform_number}_{register_response.client_id}", log_dir)
self.agents[self.shared_number_of_connections.value] = GrpcAgent(agent_type, uniform_number, logger)
return register_response

def SendByeCommand(self, register_response: pb2.RegisterResponse, context):
logging.debug(f"Bye command received unum {register_response.uniform_number}")
main_logger.debug(f"Bye command received unum {register_response.uniform_number}")
# with shared_lock:
self.agents.pop(register_response.client_id)

res = pb2.Empty()
return res

def GetBestPlannerAction(self, pairs: pb2.BestPlannerActionRequest, context):
logging.debug(f"GetBestPlannerAction cycle:{pairs.state.world_model.cycle} pairs:{len(pairs.pairs)} unum:{pairs.state.register_response.uniform_number}")
pairs_list: list[int, pb2.RpcActionState] = [(k, v) for k, v in pairs.pairs.items()]
pairs_list.sort(key=lambda x: x[0])
best_action = max(pairs_list, key=lambda x: -1000 if x[1].action.parent_index != -1 else x[1].predict_state.ball_position.x)
logging.debug(f"Best action: {best_action[0]} {best_action[1].action.description} to {best_action[1].action.target_unum} in ({round(best_action[1].action.target_point.x, 2)},{round(best_action[1].action.target_point.y, 2)}) e:{round(best_action[1].evaluation,2)}")
res = pb2.BestPlannerActionResponse(index=best_action[0])
main_logger.debug(f"GetBestPlannerAction cycle:{pairs.state.world_model.cycle} pairs:{len(pairs.pairs)} unum:{pairs.state.register_response.uniform_number}")
res = self.agents[pairs.state.register_response.client_id].GetBestPlannerAction(pairs)
return res


def serve(port, shared_lock, shared_number_of_connections):
server = grpc.server(futures.ThreadPoolExecutor(max_workers=22))
game_service = GameHandler(shared_lock, shared_number_of_connections)
pb2_grpc.add_GameServicer_to_server(game_service, server)
server.add_insecure_port(f'[::]:{port}')
server.start()
logging.info(f"Starting server on port {port}")
main_logger.info(f"Starting server on port {port}")

server.wait_for_termination()


def main():
manager = Manager()
shared_lock = Lock() # Create a Lock for synchronization
shared_number_of_connections = manager.Value('i', 0)
global main_logger, log_dir
parser = argparse.ArgumentParser(description='Run play maker server')
parser.add_argument('-p', '--rpc-port', required=False, help='The port of the server', default=50051)
parser.add_argument('-l', '--log-dir', required=False, help='The directory of the log file',
default=f'logs/{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}')
args = parser.parse_args()
log_dir = args.log_dir
main_logger = setup_logger("pmservice", log_dir, console_level=console_logging_level, file_level=file_logging_level)
main_logger.info("Starting server")
manager = Manager()
shared_lock = Lock() # Create a Lock for synchronization
shared_number_of_connections = manager.Value('i', 0)

serve(args.rpc_port, shared_lock, shared_number_of_connections)

if __name__ == '__main__':
Expand Down
37 changes: 25 additions & 12 deletions start-team.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@
import logging
import argparse
import check_requirements
from utils.logger_utils import setup_logger
import datetime


# Set up logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
log_dir = os.path.join(os.getcwd(), 'logs', datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S'))
start_team_logger = setup_logger('start-team', log_dir, console_level=logging.DEBUG, file_level=logging.DEBUG, console_format_str='%(message)s')


def run_server_script(args):
# Start the server.py script as a new process group
process = subprocess.Popen(
['python3', 'server.py', '--rpc-port', args.rpc_port],
['python3', 'server.py', '--rpc-port', args.rpc_port, '--log-dir', log_dir],
preexec_fn=os.setsid, # Create a new session and set the process group ID
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT # Capture stderr and redirect it to stdout
Expand All @@ -23,20 +27,28 @@ def run_server_script(args):
def run_start_script(args):
# Start the start.sh script in its own directory as a new process group
process = subprocess.Popen(
['bash', 'start.sh', '-t', args.team_name, '--rpc-port', args.rpc_port, '--rpc-type', 'grpc'],
['bash', 'start.sh' if not args.debug else 'start-debug.sh', '-t', args.team_name, '--rpc-port', args.rpc_port, '--rpc-type', 'grpc'],
cwd='scripts/proxy', # Corrected directory to where start.sh is located
preexec_fn=os.setsid, # Create a new session and set the process group ID
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT # Capture stderr and redirect it to stdout
)
return process

def stream_output(process, prefix):
def stream_output_to_console(process, prefix):
# Stream output from the process and log it with a prefix
for line in iter(process.stdout.readline, b''):
logging.debug(f'{prefix} {line.decode().strip()}')
start_team_logger.debug(f'{prefix} {line.decode().strip()}')
process.stdout.close()

def stream_output_to_file(process, prefix):
# Stream output from the process and log it with a prefix
logger = setup_logger(prefix, log_dir, console_level=None, file_level=logging.DEBUG)
for line in iter(process.stdout.readline, b''):
logger.info(line.decode().strip())
pass
process.stdout.close()

def kill_process_group(process):
try:
os.killpg(os.getpgid(process.pid), signal.SIGTERM) # Send SIGTERM to the process group
Expand All @@ -48,24 +60,25 @@ def kill_process_group(process):
parser = argparse.ArgumentParser(description='Run server and team scripts.')
parser.add_argument('-t', '--team_name', required=False, help='The name of the team', default='CLS')
parser.add_argument('--rpc-port', required=False, help='The port of the server', default='50051')
parser.add_argument('-d', '--debug', required=False, help='Enable debug mode', default=False, action='store_true')
args = parser.parse_args()

try:
# Check Python requirements
logging.debug("Checking Python requirements...")
start_team_logger.debug("Checking Python requirements...")
check_requirements.check_requirements()

# Run the server.py script first
server_process = run_server_script(args)
logging.debug(f"Started server.py process with PID: {server_process.pid}")
start_team_logger.debug(f"Started server.py process with PID: {server_process.pid}")

# Run the start.sh script after server.py with the given arguments
start_process = run_start_script(args)
logging.debug(f"Started start.sh process with PID: {start_process.pid} with team name {args=}")
start_team_logger.debug(f"Started start.sh process with PID: {start_process.pid} with team name {args=}")

# Monitor both processes and log their outputs
server_thread = threading.Thread(target=stream_output, args=(server_process, 'server:'))
start_thread = threading.Thread(target=stream_output, args=(start_process, 'team:'))
server_thread = threading.Thread(target=stream_output_to_console, args=(server_process, 'server'))
start_thread = threading.Thread(target=stream_output_to_file, args=(start_process, 'proxy'))

server_thread.start()
start_thread.start()
Expand All @@ -75,7 +88,7 @@ def kill_process_group(process):
start_thread.join()

except KeyboardInterrupt:
logging.debug("Interrupted! Killing all processes.")
start_team_logger.debug("Interrupted! Killing all processes.")
kill_process_group(server_process)
kill_process_group(start_process)

Expand Down
22 changes: 20 additions & 2 deletions start-team.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
#!/bin/bash

# create a log directory with the current date and time
log_dir="logs/$(date +'%Y-%m-%d_%H-%M-%S')"
if [ ! -d $log_dir ]; then
mkdir -p $log_dir
fi

abs_log_dir_path=$(realpath $log_dir)

# Ensure the script exits if any command fails
set -e
# check scripts/proxy directory does not exist, raise error
Expand All @@ -10,6 +18,7 @@ fi

team_name="CLS"
rpc_port=50051
debug=false

# help function
usage() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add -d command to usage function so can people find the -d command

Expand All @@ -31,6 +40,9 @@ do
rpc_port=$2
shift
;;
-d|--debug)
debug=true
;;
*)
echo 1>&2
echo "invalid option \"${1}\"." 1>&2
Expand All @@ -49,7 +61,7 @@ python3 check_requirements.py

# Start server.py in the background
echo "Starting server.py..."
python3 server.py --rpc-port $rpc_port &
python3 server.py --rpc-port $rpc_port --log-dir $abs_log_dir_path &
server_pid=$!

# Function to kill server and team processes on exit
Expand All @@ -67,8 +79,14 @@ sleep 2

# Start start.sh script in the correct directory with arguments
echo "Starting start.sh with team name: $team_name and ..."

start_log_path="$abs_log_dir_path/proxy.log"
cd scripts/proxy
bash start.sh -t "$team_name" --rpc-port $rpc_port --rpc-type grpc &
if [ "$debug" = true ]; then
bash start-debug.sh -t "$team_name" --rpc-port $rpc_port --rpc-type grpc >> $start_log_path 2>&1 &
else
bash start.sh -t "$team_name" --rpc-port $rpc_port --rpc-type grpc >> $start_log_path 2>&1 &
fi
start_pid=$!

# Wait for both background processes to finish
Expand Down
Loading