Skip to content
Permalink
Browse files
Add controller_util: a tool for dumping sensor readings from a contro…
…ller.
  • Loading branch information
mbabinski-at-google committed Oct 5, 2017
1 parent 05be793 commit f986a98032762e478f9e03222e0cf1fd6df99b2b
Showing with 76 additions and 33 deletions.
  1. +46 −0 controller_util
  2. +1 −1 games/ffa.py
  3. +2 −1 piparty.py
  4. +27 −25 player.py
  5. +0 −6 setup.sh
@@ -0,0 +1,46 @@
#!/home/pi/JoustMania/venv/bin/python3.6

import asyncio
import psmove
import player
import piparty
import math


# Continually prints sensor readings from the first controller found.

def FormatVec(v, places=5):
fmt = '{:%d.2f}' % places
return ', '.join([ fmt.format(e) for e in v ])
def VecLen(v):
return math.sqrt(sum([ e*e for e in v ]))
def Normalize(v):
m = VecLen(v)
return tuple([ e / m for e in v ])

async def Loop(plr):
print("Acceleration Jerk Gyro")
while True:
for event in plr.get_events():
if event.type != player.EventType.SENSOR:
continue
print('\r|%s| = %+.02f |%s| = %+7.02f |%s| = %+2.02f' % (
FormatVec(event.acceleration),
event.acceleration_magnitude,
FormatVec(event.jerk, 7),
event.jerk_magnitude,
FormatVec(event.gyroscope),
VecLen(event.gyroscope)), end='')

await asyncio.sleep(1/30)


def Main():
piparty.Menu.enable_bt_scanning()
move = psmove.PSMove(0)
move.enable_orientation(True)
p1 = player.Player(move)
asyncio.get_event_loop().run_until_complete(Loop(p1))

if __name__ == '__main__':
Main()
@@ -57,7 +57,7 @@ def game_tick_(self):
Polls controllers for input, and issues warnings/deaths to players."""
# Make a copy of the active players, as we may modify it during iteration.
pace = self.pace_
for event in self.players.active_player_events(EventType.ACCELEROMETER):
for event in self.players.active_player_events(EventType.SENSOR):
if event.acceleration_magnitude > pace.death_threshold:
self.players.kill_player(event.player)

@@ -291,7 +291,8 @@ def check_for_new_moves(self):
#self.alive_count = len([move.get_serial() for move in self.moves if self.move_opts[move.get_serial()][Opts.alive.value] == Alive.on.value])


def enable_bt_scanning(self, on=True):
@staticmethod
def enable_bt_scanning(on=True):
scan_cmd = "hciconfig {0} {1}"
if on:
scan = "pscan"
@@ -5,46 +5,48 @@
import functools
import itertools
import math
import time
import typing

import psmove
import common
from numpy import linalg

NUM_WARNING_FLASHES=5
WARNING_FLASH_DURATION=0.1
RAINBOW_PHASE_DURATION=0.1

class EventType(enum.Flag):
ACCELEROMETER = enum.auto()
SENSOR = enum.auto()
BUTTON_DOWN = enum.auto()
BUTTON_UP = enum.auto()
# TODO: Add trigger events

class ControllerEvent(abc.ABC):
__slots__ = ['player']

@abc.abstractproperty
def type(self): raise NotImplemented()

class AccelerometerEvent(ControllerEvent):
class SensorEvent(ControllerEvent):
"""Base class for controller events."""
__slots__ = ['acceleration']

def __init__(self, acceleration):
def __init__(self, acceleration, jerk, gyroscope):
self.acceleration = acceleration
self.jerk = jerk
self.gyroscope = gyroscope

@property
def type(self):
return EventType.ACCELEROMETER
return EventType.SENSOR

@property
def acceleration_magnitude(self):
return math.sqrt(sum([ v*v for v in self.acceleration ]))
return linalg.norm(self.acceleration)

@property
def jerk_magnitude(self):
return linalg.norm(self.jerk)

class ButtonDownEvent(ControllerEvent):
"""Sent when a player first presses a button."""
__slots__ = ['button']

def __init__(self, button: common.Button):
self.button = button

@@ -54,8 +56,6 @@ def type(self):

class ButtonUpEvent(ControllerEvent):
"""Sent when a player first releases a button."""
__slots__ = ['button']

def __init__(self, player, button: common.Button):
super().__init__(player)
self.button = button
@@ -67,21 +67,23 @@ def type(self):

class ControllerState:
"""The state of inputs on a controller at one point in time."""
__slots__ = ['buttons', 'trigger', 'acceleration']

def __init__(self, move):
self.buttons = common.Button(move.get_buttons())
self.trigger = move.get_trigger()
self.acceleration = move.get_accelerometer_frame(psmove.Frame_SecondHalf)
self.acceleration = tuple(move.get_accelerometer_frame(psmove.Frame_SecondHalf))
self.gyroscope = tuple(move.get_gyroscope_frame(psmove.Frame_SecondHalf))
self.ts = time.time()

@property
def acceleration_magnitude(self):
return math.sqrt(sum([ v*v for v in self.acceleration ]))

def get_events_from_state_diff(self, prev_state):
# Is this ever false?
if prev_state.acceleration != self.acceleration:
yield AccelerometerEvent(self.acceleration)
acc_diff = map(lambda a, b: a - b, self.acceleration, prev_state.acceleration)
time_diff = self.ts - prev_state.ts
jerk = tuple([ e / time_diff for e in acc_diff ])
yield SensorEvent(self.acceleration, jerk, self.gyroscope)

new_buttons = self.buttons & (~prev_state.buttons)
for button in common.Button:
if button in new_buttons:
@@ -117,12 +119,12 @@ def flush_events_(self):

def get_events(self) -> typing.Iterator[ControllerEvent]:
"""Returns an iterator over events currently pending on the controller."""
while self.move_.poll():
state = ControllerState(self.move_)
for ev in state.get_events_from_state_diff(self.previous_state_):
ev.player = self
yield ev
self.previous_state_ = state
self.flush_events_()
state = ControllerState(self.move_)
for ev in state.get_events_from_state_diff(self.previous_state_):
ev.player = self
yield ev
self.previous_state_ = state
# TODO: The moves need to be occasionally prodded to keep their leds lit.
# If we make the piparty loop async, move this logic in there as a task.
self.move_.update_leds()
@@ -73,12 +73,6 @@ setup() {
#use aplay -l to check sound card number
#sudo cp /home/pi/JoustMania/asound.conf /etc/


#allows python path to be kept after sudo command
OLD='env_reset'
NEW='env_keep += "PYTHONPATH"'
sudo sed -i -e "s/$OLD/$NEW/g" /etc/sudoers

# Pause a second before rebooting so we can see all the output from this script.
(sleep 1; sudo reboot) &
}

0 comments on commit f986a98

Please sign in to comment.