Permalink
Browse files

Add a second, experimental implementation of Joust FFA.

We make heavy use of python3's async support, and run most game logic in one
process. Controllers are polled periodically (currently, at 30Hz), leading to
reduced CPU use. Currently unimplemented features:
  * Varying music tempo.
  * An Intro/Countdown.
  * Support for 9+ controllers.
  * Sound effects.
  * Sane values for warning/death thresholds.
  • Loading branch information...
mbabinski-at-google committed Sep 29, 2017
1 parent 7073f88 commit 64390548be1ecc2c774e24ff12cd3fb3976c9f0f
Showing with 351 additions and 4 deletions.
  1. +3 −0 .gitignore
  2. +3 −0 DEVELOPMENT_README.md
  3. +25 −2 common.py
  4. 0 games/__init__.py
  5. +72 −0 games/ffa.py
  6. +51 −0 games/ffa_test.py
  7. +10 −2 piparty.py
  8. +153 −0 player.py
  9. 0 testing/__init__.py
  10. +28 −0 testing/fakes.py
  11. +6 −0 tests
View
@@ -66,3 +66,6 @@ audio/Zombie/music/
*.ini
apfiles/ap_active
# Python virtualenv
venv
View
@@ -12,4 +12,7 @@ then run:
and the game should be running locally in your terminal!
You can run unit tests with the provided shell script:
`./tests`
Happy development!
View
@@ -1,7 +1,7 @@
import psmove
import colorsys
import time
import enum
import psmove
import time
color_range = 255
@@ -105,3 +105,26 @@ class Button(enum.Flag):
238: "Charging",
239: "Charged"
}
# Common colors lifted from https://xkcd.com/color/rgb/
# TODO: Add more colors -- probably need to have 14 player colors at least.
class Color(enum.Enum):
BLACK = 0x000000
WHITE = 0xffffff
RED = 0xff0000
GREEN = 0x00ff00
BLUE = 0x0000ff
YELLOW = 0xffff14
PURPLE = 0x7e1e9c
ORANGE = 0xf97306
PINK = 0xff81c0
TURQUOISE = 0x06c2ac
BROWN = 0x653700
def rgb_bytes(self):
v = self.value
return v >> 16, (v >> 8) & 0xff, v & 0xff
# Red is reserved for warnings/knockouts.
PLAYER_COLORS = [ c for c in Color if c not in (Color.RED, Color.WHITE, Color.BLACK) ]
View
No changes.
View
@@ -0,0 +1,72 @@
import enum
import asyncio
import time
import common
from player import Player, PlayerCollection
# Hertz
UPDATE_FREQUENCY=30
# TODO: These are placeholder values.
# We can't take the values from joust.py, since those are compared to the sum of the
# three accelerometer dimensions, whereas we compute the magnitude of the acceleration
# vector.
DEATH_THRESHOLD=7
WARN_THRESHOLD=2
class FreeForAll:
## Note, the "Player" objects should probably get created (and assigned colors) by the core game code, not here.
def __init__(self, controllers, music):
players = [ Player(move) for move in controllers ]
for player, color in zip(players, common.PLAYER_COLORS):
player.set_player_color(color)
self.players = PlayerCollection(players)
self.music = music
self.rainbow_duration_ = 6
def has_winner_(self):
if len(self.players.active_players) == 0:
raise ValueError("Can't have zero players!")
return len(self.players.active_players) == 1
def game_tick_(self):
"""Implements a game tick.
Polls controllers for input, and issues warnings/deaths to players."""
# Make a copy of the active players, as we may modify it during iteration.
for player, state in self.players.active_player_events():
if state.acceleration_magnitude > DEATH_THRESHOLD:
self.players.kill_player(player)
# Cut out early if we have a winner, so we don't accidentally kill all remaining players.
if self.has_winner_():
break
elif state.acceleration_magnitude > WARN_THRESHOLD:
player.warn()
async def run(self):
"""Main loop for this game."""
# TODO: Countdown/Intro.
self.music.start_audio_loop()
try:
while not self.has_winner_():
self.game_tick_()
await asyncio.sleep(1 / UPDATE_FREQUENCY)
# TODO: Play some kind of crash sound to let everyone know there is a winner.
self.music.stop_audio()
winner = list(self.players.active_players)[0]
await winner.show_rainbow(self.rainbow_duration_)
finally:
self.music.stop_audio()
self.players.cancel_effects()
# TODO: Ideally, the main game loop in piparty.py should handle setting up async.
def run_loop(self):
loop = asyncio.get_event_loop()
loop.set_debug(True)
loop.run_until_complete(self.run())
# TODO: We should make sure all other scheduled tasks have completed.
def set_rainbow_duration_for_testing(self, secs):
self.rainbow_duration_ = secs
View
@@ -0,0 +1,51 @@
import asyncio
import unittest
from games import ffa
from testing import fakes
import piaudio
class TestFFA(unittest.TestCase):
def test_one_winner(self):
controller1 = fakes.FakeMove()
controller2 = fakes.FakeMove()
loop = asyncio.get_event_loop()
game = ffa.FreeForAll([controller1, controller2], piaudio.DummyMusic())
game.set_rainbow_duration_for_testing(0.1)
game_task = asyncio.ensure_future(game.run())
loop.run_until_complete(asyncio.sleep(1))
self.assertFalse(game_task.done())
self.assertFalse(game_task.cancelled())
controller1.accel = (100, 100, 100)
# Shouldn't throw timeout.
loop.run_until_complete(asyncio.wait_for(game_task, timeout=3))
self.assertTrue(game.has_winner_())
def test_two_outs(self):
"""Tests that we don't lose all the players if they all simultaenously register high accel."""
controller1 = fakes.FakeMove()
controller2 = fakes.FakeMove()
loop = asyncio.get_event_loop()
game = ffa.FreeForAll([controller1, controller2], piaudio.DummyMusic())
game.set_rainbow_duration_for_testing(0.1)
game_task = asyncio.ensure_future(game.run())
loop.run_until_complete(asyncio.sleep(0.01))
self.assertFalse(game_task.done())
self.assertFalse(game_task.cancelled())
controller1.accel = (100, 100, 100)
controller2.accel = (100, 100, 100)
# Shouldn't throw timeout.
loop.run_until_complete(asyncio.wait_for(game_task, timeout=3))
self.assertTrue(game.has_winner_())
if __name__ == '__main__':
unittest.main()
View
@@ -6,6 +6,7 @@
from enum import Enum
from multiprocessing import Process, Value, Array, Queue, Manager
from webui import start_web
from games import ffa
TEAM_NUM = 6
TEAM_COLORS = common.generate_colors(TEAM_NUM)
@@ -218,6 +219,7 @@ def __init__(self,command_queue=Queue(), status_manager=Manager()):
config.read("joustconfig.ini")
self.audio_toggle = config.getboolean("GENERAL","audio")
self.sensitivity = int(config['GENERAL']['sensitivity'])
self.experimental = config.getboolean('GENERAL', 'experimental') if config.has_option('GENERAL', 'experimental') else False
self.instructions = config.getboolean("GENERAL","instructions")
self.con_games = []
for game in common.Games:
@@ -674,8 +676,14 @@ def start_game(self, random_mode=False):
tournament.Tournament(game_moves, self.sensitivity, self.command_queue, self.status_ns, self.audio_toggle, self.joust_music)
self.tracked_moves = {}
else:
#may need to put in moves that have selected to not be in the game
joust.Joust(self.game_mode, game_moves, self.teams, self.sensitivity, self.command_queue, self.status_ns, self.audio_toggle,self.joust_music)
if self.game_mode == common.Games.JoustFFA and self.experimental:
print("Playing EXPERIMENTAL FFA Mode.")
moves = [ common.get_move(serial, num) for num, serial in enumerate(game_moves) ]
game = ffa.FreeForAll(moves, self.joust_music)
game.run_loop()
else:
#may need to put in moves that have selected to not be in the game
joust.Joust(self.game_mode, game_moves, self.teams, self.sensitivity, self.command_queue, self.status_ns, self.audio_toggle,self.joust_music)
self.tracked_moves = {}
if random_mode:
self.game_mode = common.Games.Random
View
153 player.py
@@ -0,0 +1,153 @@
import asyncio
import collections
import functools
import itertools
import math
import typing
import psmove
import common
NUM_WARNING_FLASHES=5
WARNING_FLASH_DURATION=0.1
RAINBOW_PHASE_DURATION=0.1
class ControllerState:
"""The state of inputs on a controller at one point in time."""
__slots__ = ['buttons', 'trigger', 'acceleration']
def __init__(self, move):
self.buttons = common.Button(move.get_buttons())
self.trigger = move.get_trigger() / 100
self.acceleration = move.get_accelerometer_frame(psmove.Frame_SecondHalf)
@property
def acceleration_magnitude(self):
return math.sqrt(sum([ v*v for v in self.acceleration ]))
# TODO: Break this out into a util library if it seems useful.
def with_lock(lock):
"""Decorator that makes a coroutine hold a lock during execution"""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
async with lock:
return await func(*args, **kwargs)
return wrapper
return decorator
class Player:
def __init__(self, move):
self.move_ = move
self.color_ = common.Color.WHITE
self.effect_lock_ = asyncio.Lock()
self.warn_ = None
self.effect_ = None
def get_events(self) -> typing.Iterator[ControllerState]:
"""Returns an iterator over events currently pending on the controller."""
while self.move_.poll():
yield ControllerState(self.move_)
# TODO: The moves need to be occasionally prodded to keep their leds lit.
# If we make the piparty loop async, move this logic in there as a task.
self.move_.update_leds()
def set_player_color(self, color: common.Color):
"""Set's the player's color -- this is the default color we return to during play."""
self.color_ = color
self.set_color_(color)
def set_color_(self, color: common.Color):
"""Changes the controller's indicator to the specified color."""
self.move_.set_leds(*color.rgb_bytes())
self.move_.update_leds()
def set_rumble(self, value):
self.move_.set_rumble(value)
# This is apparently needed to flush the instruction out.
self.move_.update_leds()
def set_effect_(self, future):
self.effect_ = asyncio.ensure_future(future)
return self.effect_
def cancel_effect(self):
if self.effect_ and not self.effect_.done():
self.effect_.cancel()
def warn(self):
"""Issues a warning to the player."""
if self.warn_:
return
@with_lock(self.effect_lock_)
async def run():
try:
for i in range(NUM_WARNING_FLASHES):
self.set_color_(common.Color.BLACK)
self.set_rumble(90)
await asyncio.sleep(WARNING_FLASH_DURATION)
self.set_color_(self.color_)
self.set_rumble(0)
await asyncio.sleep(WARNING_FLASH_DURATION)
finally:
self.set_color_(self.color_)
self.set_rumble(0)
self.warn_ = None
self.warn_ = self.set_effect_(run())
def show_rainbow(self, duration_seconds: float):
"""Shows the victory rainbow."""
if self.warn_:
self.warn_.cancel()
@with_lock(self.effect_lock_)
async def cycle_colors():
try:
for color in itertools.cycle(common.PLAYER_COLORS):
self.set_color_(color)
await asyncio.sleep(RAINBOW_PHASE_DURATION)
finally:
self.set_color_(self.color_)
async def run():
try:
await asyncio.wait_for(cycle_colors(), duration_seconds)
except asyncio.TimeoutError:
pass
return self.set_effect_(run())
def show_death(self):
"""Lets the player know they have died."""
if self.warn_:
self.warn_.cancel()
@with_lock(self.effect_lock_)
async def run():
try:
self.set_rumble(110)
self.set_color_(common.Color.RED)
await asyncio.sleep(3)
finally:
self.set_color_(common.Color.BLACK)
self.set_rumble(0)
self.set_effect_(run())
def __str__(self):
return '<Player %s %s>' % (self.move_, self.color_)
class PlayerCollection:
"""The set of players in a round of the game."""
def __init__(self, players):
self.players = players
self.active_players = set(players)
def kill_player(self, player: Player):
self.active_players.remove(player)
return player.show_death()
def active_player_events(self):
# consider randomizing this so players don't get an advantage by being first in the list.
for player in list(self.active_players):
for event in player.get_events():
yield player, event
def cancel_effects(self):
for player in self.players:
player.cancel_effect()
View
No changes.
View
@@ -0,0 +1,28 @@
class FakeMove:
def __init__(self):
self.accel = (0, 0, 0)
self.last_poll_ = False
# Alternate yes/no returns to simulate draining the move's event queue.
def poll(self):
self.last_poll_ = not self.last_poll_
return self.last_poll_
def get_buttons(self):
return 0
def get_trigger(self):
return 0
def get_accelerometer_frame(self, _):
return self.accel
def set_leds(self, r, g, b):
pass
def update_leds(self):
pass
def set_rumble(self, intensity):
pass
View
6 tests
@@ -0,0 +1,6 @@
#!/bin/bash
export HOME="/home/pi/JoustMania"
export PYTHONPATH="/home/pi/psmoveapi/build/"
exec $HOME/venv/bin/python3.6 -m unittest discover -s . -p '*_test.py'

0 comments on commit 6439054

Please sign in to comment.