Skip to content

Commit

Permalink
Sync queues of tmm
Browse files Browse the repository at this point in the history
Fixes #924
  • Loading branch information
1-alex98 committed Oct 31, 2022
1 parent 66e3a06 commit ae7f047
Show file tree
Hide file tree
Showing 15 changed files with 556 additions and 1,112 deletions.
161 changes: 115 additions & 46 deletions server/ladder_service/ladder_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,21 @@
MapPool,
MatchmakerQueue,
OnMatchedCallback,
PopTimer,
Search
)
from server.matchmaker.algorithm.team_matchmaker import TeamMatchMaker
from server.matchmaker.search import Match, are_searches_disjoint
from server.metrics import MatchLaunch
from server.players import Player, PlayerState
from server.types import GameLaunchOptions, Map, NeroxisGeneratedMap


def has_no_overlap(match: Match, matches_tmm_searches: set[Search]):
searches_in_match = set(search for team in match for search in team.get_original_searches())
return are_searches_disjoint(searches_in_match, matches_tmm_searches)


@with_logger
class LadderService(Service):
"""
Expand All @@ -56,22 +64,83 @@ class LadderService(Service):
"""

def __init__(
self,
database: FAFDatabase,
game_service: GameService,
violation_service: ViolationService,
self,
database: FAFDatabase,
game_service: GameService,
violation_service: ViolationService,
):
self._is_running = True
self._db = database
self._informed_players: set[Player] = set()
self.game_service = game_service
self.queues = {}
self.violation_service = violation_service

self._searches: dict[Player, dict[str, Search]] = defaultdict(dict)
self.timer = None
self.matchmaker = TeamMatchMaker()
self.timer = PopTimer()

async def initialize(self) -> None:
await self.update_data()
self._update_cron = aiocron.crontab("*/10 * * * *", func=self.update_data)
await self._initialize_pop_timer()

async def _initialize_pop_timer(self) -> None:
self.timer.queues = list(self.queues.values())
asyncio.create_task(self._queue_pop_timer())

async def _queue_pop_timer(self) -> None:
""" Periodically tries to match all Searches in the queue. The amount
of time until next queue 'pop' is determined by the number of players
in the queue.
"""
self._logger.debug("MatchmakerQueue pop timer initialized")
while self._is_running:
try:
await self.timer.next_pop()
await self._queue_pop_iteration()

except asyncio.CancelledError:
break
except Exception:
self._logger.exception(
"Unexpected error during queue pop timer loop!"
)
# To avoid potential busy loops
await asyncio.sleep(1)
self._logger.error("popping queues stopped")

async def _queue_pop_iteration(self):
possible_games = list()
for queue in self._queues_without1v1():
possible_games += await queue.find_matches()
matches_tmm = self.matchmaker.pick_noncolliding_games(possible_games)
matches_tmm = await self._add_matches_from1v1(matches_tmm)
await self._found_matches(matches_tmm)

async def _add_matches_from1v1(self, matches_tmm):
for queue in self._1v1queues():
matches_1v1 = await queue.find_matches1v1()
self._logger.debug("Suggested the following matches %s", matches_1v1)
matches_tmm_searches = set(search for match in matches_tmm
for team in match
for search in team.get_original_searches())
matches_1v1 = [match for match in matches_1v1 if has_no_overlap(match, matches_tmm_searches)]
self._logger.debug("Found the following 1v1 matches %s", matches_1v1)
matches_tmm += matches_1v1
return matches_tmm

def _queues_without1v1(self) -> list[MatchmakerQueue]:
return [queue for queue in self.queues.values() if queue.team_size != 1]

def _1v1queues(self) -> list[MatchmakerQueue]:
return [queue for queue in self.queues.values() if queue.team_size == 1]

async def _found_matches(self, matches: list[Match]):
for queue in self.queues.values():
await queue.found_matches([match for match in matches if match[0].queue == queue])
self.game_service.mark_dirty(queue)

async def update_data(self) -> None:
async with self._db.acquire() as conn:
Expand All @@ -81,8 +150,10 @@ async def update_data(self) -> None:
for name, info in db_queues.items():
if name not in self.queues:
queue = MatchmakerQueue(
self.game_service,
self.on_match_found,
game_service=self.game_service,
on_match_found=self.on_match_found,
timer=self.timer,
matchmaker=self.matchmaker,
name=name,
queue_id=info["id"],
featured_mod=info["mod"],
Expand All @@ -91,7 +162,6 @@ async def update_data(self) -> None:
params=info.get("params")
)
self.queues[name] = queue
queue.initialize()
else:
queue = self.queues[name]
queue.featured_mod = info["mod"]
Expand All @@ -115,7 +185,6 @@ async def update_data(self) -> None:
# Remove queues that don't exist anymore
for queue_name in list(self.queues.keys()):
if queue_name not in db_queues:
self.queues[queue_name].shutdown()
del self.queues[queue_name]

async def fetch_map_pools(self, conn) -> dict[int, tuple[str, list[Map]]]:
Expand Down Expand Up @@ -222,10 +291,10 @@ async def fetch_matchmaker_queues(self, conn):
return matchmaker_queues

def start_search(
self,
players: list[Player],
queue_name: str,
on_matched: OnMatchedCallback = lambda _1, _2: None
self,
players: list[Player],
queue_name: str,
on_matched: OnMatchedCallback = lambda _1, _2: None
):
timeouts = self.violation_service.get_violations(players)
if timeouts:
Expand Down Expand Up @@ -276,9 +345,10 @@ def start_search(

queue = self.queues[queue_name]
search = Search(
players,
players=players,
rating_type=queue.rating_type,
on_matched=on_matched
on_matched=on_matched,
queue=queue
)

for player in players:
Expand All @@ -299,9 +369,9 @@ def start_search(
asyncio.create_task(queue.search(search))

def cancel_search(
self,
initiator: Player,
queue_name: Optional[str] = None
self,
initiator: Player,
queue_name: Optional[str] = None
) -> None:
if queue_name is None:
queue_names = list(self._searches[initiator].keys())
Expand Down Expand Up @@ -333,18 +403,18 @@ def _cancel_search(self, initiator: Player, queue_name: str) -> None:
"state": "stop"
})
if (
not self._searches[player]
and player.state == PlayerState.SEARCHING_LADDER
not self._searches[player]
and player.state == PlayerState.SEARCHING_LADDER
):
player.state = PlayerState.IDLE
self._logger.info(
"%s stopped searching for %s", cancelled_search, queue_name
)

def _clear_search(
self,
initiator: Player,
queue_name: str
self,
initiator: Player,
queue_name: str
) -> Optional[Search]:
"""
Remove a search from the searches dictionary.
Expand Down Expand Up @@ -393,10 +463,10 @@ def write_rating_progress(self, player: Player, rating_type: str) -> None:
})

def on_match_found(
self,
s1: Search,
s2: Search,
queue: MatchmakerQueue
self,
s1: Search,
s2: Search,
queue: MatchmakerQueue
) -> None:
"""
Callback for when a match is generated by a matchmaker queue.
Expand Down Expand Up @@ -429,10 +499,10 @@ def on_match_found(
)

def start_game(
self,
team1: list[Player],
team2: list[Player],
queue: MatchmakerQueue
self,
team1: list[Player],
team2: list[Player],
queue: MatchmakerQueue
) -> Awaitable[None]:
# We want assertion errors to trigger when the caller attempts to
# create the async function, not when the function starts executing.
Expand All @@ -441,10 +511,10 @@ def start_game(
return self._start_game(team1, team2, queue)

async def _start_game(
self,
team1: list[Player],
team2: list[Player],
queue: MatchmakerQueue
self,
team1: list[Player],
team2: list[Player],
queue: MatchmakerQueue
) -> None:
self._logger.debug(
"Starting %s game between %s and %s",
Expand Down Expand Up @@ -496,7 +566,7 @@ def get_player_mean(player: Player) -> float:
random.shuffle(zipped_teams)

for i, player in enumerate(
player for pair in zipped_teams for player in pair
player for pair in zipped_teams for player in pair
):
# FA uses lua and lua arrays are 1-indexed
slot = i + 1
Expand Down Expand Up @@ -577,11 +647,11 @@ def make_game_options(player: Player) -> GameLaunchOptions:
self.violation_service.register_violations(abandoning_players)

async def launch_match(
self,
game: LadderGame,
host: Player,
guests: list[Player],
make_game_options: Callable[[Player], GameLaunchOptions]
self,
game: LadderGame,
host: Player,
guests: list[Player],
make_game_options: Callable[[Player], GameLaunchOptions]
):
# Launch the host
if host.lobby_connection is None:
Expand Down Expand Up @@ -630,10 +700,10 @@ async def launch_match(
])

async def get_game_history(
self,
players: list[Player],
queue_id: int,
limit: int = 3
self,
players: list[Player],
queue_id: int,
limit: int = 3
) -> list[int]:
async with self._db.acquire() as conn:
result = []
Expand Down Expand Up @@ -676,8 +746,7 @@ def on_connection_lost(self, conn: "LobbyConnection") -> None:
self._informed_players.remove(player)

async def shutdown(self):
for queue in self.queues.values():
queue.shutdown()
self._is_running = False


class NotConnectedError(asyncio.TimeoutError):
Expand Down
Loading

0 comments on commit ae7f047

Please sign in to comment.