### Group Controller

- Load in a roster of Spheros
- Connect to each one and calibrate their position
- Perform Orchestration



In [None]:
# General Python Helper Functions
def readJsonFile(filename):
    '''
        Read a JSON file into a python dict 
    '''
    with open(filename) as data_file:    
        data = json.load(data_file)
    return data

In [None]:
# Sphero imports
import sphero

# Controller Functions
import time
import os
import json
from pprint import pprint

In [None]:
# Use sphero app to get sphero names and MAC addresses, and use those address to populate 'roster.json'
PROJ_ROOT = os.getcwd()
ROSTER = os.path.join(PROJ_ROOT, "roster.json")
roster = readJsonFile(ROSTER)
pprint(roster)

In [None]:
# Class for managing multiple spheros
manager = sphero.SpheroManager()

In [None]:
# Initialize Sphero manager with addresses of local spheros
manager._name_cache = roster

# Number of spheros to connect to
NBOTS = len(roster.keys())
print(NBOTS)

In [None]:
# This searches the system cache for all of the robots. 
# It doesn't need the spheros to be turned on yet.
def on_new_sphero(device, NBOTS=NBOTS):
    """
        NBOTS = number of robots in the flock
        Note that this presently refers to a global "manager" object. This method should be moved inside the native
        library later.
    """
    print "Found " + device.bt_name
    
    # Terminate search when all expected bots are found
    if len(manager._spheros.keys()) == NBOTS:
        print "Found all {} spheros".format(NBOTS)
        manager.stop_auto_search()
                 
# Callback: what to do when a new sphero is found
manager.set_sphero_found_cb(on_new_sphero)

# Construct list of devices in system bluetooth collection
# Your bluetooth network cannot contain anything with name of "Sphero-" prefix
devices = []
manager.start_auto_search()

# Wait for all spheros to be found before running the next cell

In [None]:
devices = []
# Verify that these are the robots we are looking for
for name, device in manager._spheros.iteritems():
    print "{}: {}".format(name, device.bt_addr)
    devices.append(device) 

In [None]:
# Modify this list based on which spheros you actually have available at the moment.

activeBotNames = [
    "Sphero-RYR",
    "Sphero-GRY",
    "Sphero-YPR",
    "Sphero-RWR",
    "Sphero-ORG",
    "Sphero-RPB",
    "Sphero-YYP"
]

The following were written between 4/15 - 4/22 to simplify multiple robot control.



In [None]:
# Bluetooth Initial / Closing communication
    
def connect_team(bots):
    for i, bot in enumerate(bots):  
        bot.disconnect()
        bot.connect()
        
def disconnect_team(bots):
    for i, bot in enumerate(bots):  
        bot.disconnect()

# Light control
def set_team_back_led(bots, status):
    # Bright if true, dim if false
    status = 0xaa if status else 0x00
    
    for bot in bots:  
        bot.set_back_led_output(status)
        
def set_team_colors(bots, colors):
    for i, bot in enumerate(bots):  
        colorTriple = colors[i]
        bot.set_rgb(colorTriple[0], colorTriple[1], colorTriple[2])
    

def highlight_bot(bots, iBot):
    for i, bot in enumerate(bots):
        if i == iBot:
            bot.set_rgb(255, 0, 0)
        else:
            bot.set_rgb(0, 0, 0)
            
def highlight_team(bots, duration=1):
    for i, bot in enumerate(bots):
        highlight_bot(bots, i)
        time.sleep(duration)
    
# Diagnostics
def print_team_status(bots):
    for bot in bots:
        response = bot.get_power_state()
        print "{} {} | {}".format(bot.bt_name, response.power_state, response.bat_voltage)
    
# MOVEMENT   
def roll_sphero(bot, speed, heading, offset):
    """
        Roll robot in in proper direction at a given speed
    """
    bot.roll(speed, normalize_angle(heading + offset))
    
def set_team_timeout(bots, motionTimeout=2000):
    """
        How long robot should apply motor force for.
    """
    for bot in bots:
        bot.set_motion_timeout(motionTimeout)

        
def roll_sphero_team_synchronized(bots, speed, heading, offsets,motionTimeout=2000):
    """
        Move all robots in same direction at shared speed
    """
    assert(len(bots) == len(offsets))
    
    tStart = time.time()
    for i, bot in enumerate(bots):
        roll_sphero(bot, speed, heading, offsets[i])
    tEnd = time.time()
    
    print("Dispatch Time {}".format(tEnd - tStart) )
    time.sleep(motionTimeout / 1000)  # wait for bots to finish rolling
    
# Math Functions
def normalize_angle(angle):
    if angle < 0:
        return 360 + angle
    elif angle > 359:
        return angle - 360
    else:
        return angle

Now, provide a demo application of the above team control methods.


In [None]:
# Test notes 4/22
bots = [ manager._spheros[name] for name in activeBotNames ]

In [None]:
# Connect team to computer
connect_team(bots)

In [None]:
# Make sure everyone's battery is above 7 volts. Below that they turn off.
# Get roughly 2-3 hours of ontime (not continuous)
print_team_status(bots)

In [None]:
# Manage sphero colors consistently
colors = [
    [255, 0 , 0], # R
    [0, 255 , 0], # G
    [0, 0 , 255], # B
    
    [255, 0, 255],   # Purple
    [255, 255, 0],   # Yellow
    [255, 133, 0],   # Orange
    [255, 192, 203], # pink
]

# Colors to cut down
offCols = [ [0, 0, 0] for bot in bots]

POWER_SAVE = False


# Control Brightness
if POWER_SAVE:
    for color in colors:
        for i, val in enumerate(color):
            color[i] /= 2

# Initialization for any team
set_team_back_led(bots, True)

In [None]:
# illuminate robots from 1 to 7, one at a time so we know which one will receive commands first
highlight_team(bots, 1) # keyword = duration to wait in between pulses

In [None]:
# Turn colors off to save power
set_team_colors(bots, offCols)

Now, we must calibrate each robot's sense of direction


In [None]:
# Use this to calculate offsets. Repeatedly run this cell and the next cell to populate "botAngleOffsets"
# appropriately in 2 cells from here. Using the offsets may or may not be preferable to using the offsets to tweak
# "set heading" each time.
highlight_bot(bots, 1)

In [None]:
bots[0].roll(70, 60)

In [None]:
# This process needs to be tuned by hand each time the robots fall asleep / get turned on!
# It would be nice to calibrate with the camera's help, but it's also ok if there isn't time.
# Eventually these offsets could be moved to properties of the sphero object.
# Recommend setting the 0 direction to be parallel to one of the walls.

botAngleOffsets = [
    115, 
    115, 
    95, 
    112, 
    125, 
    73, 
    118   
]

# 2-robot demo
botAngleOffsets = [
    240, 240    
]

nullOffsets = [0 for bot in bots]
set_team_colors(bots, colors)

In [None]:
highlight_team(bots)

In [None]:
# Demo Reel: show basic polygon

corners = 3
angles = [ i * (360 / corners) for i in range(corners)]

# angles = [180]

TIMEOUT = 1500

set_team_timeout(bots, TIMEOUT)
for angle in angles:
    roll_sphero_team_synchronized(bots, 70, angle, botAngleOffsets, TIMEOUT)   
    

In [None]:
# Demo Reel: Laps Back and Forth
laps = 2
corners = 2
angles = [ i * (360 / corners) for i in range(corners)]

# angles = [180]

TIMEOUT = 2000

set_team_timeout(bots, TIMEOUT)
for lap in range(laps):
    for angle in angles:
        roll_sphero_team_synchronized(bots, 100, angle, botAngleOffsets, TIMEOUT)   

In [None]:
# Demo Reel: Basic Push
# Strategy: rewind and repush each time?

angles = [180]
TIMEOUT = 4000 # milliseconds of pushing

set_team_timeout(bots, TIMEOUT)
for angle in angles:
    roll_sphero_team_synchronized(bots, 90, angle, botAngleOffsets, TIMEOUT)   
    

In [None]:
# Demo Reel: Advanced Blind Push
# Strategy: rewind and repush each time?

angles   = [0, 180]
TIMEOUTS = [2700, 1500] # milliseconds of pushing: spend more time pushing forward than backwards

for i in range(3): # number of cycles to repush
    for timeout in TIMEOUTS:
        set_team_timeout(bots, timeout)
        for angle in angles:
            roll_sphero_team_synchronized(bots, 80, angle, botAngleOffsets, TIMEOUT)        