Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 355 lines (332 sloc) 9.39 KB
#!/usr/bin/python
#
# Cluster HAT control tool
#
# Compatible with v1.x and v2.x
#
# "clusterhat" tool usage
#
# Supported on v1.x and v2.x
#
# Turn on all Pi Zeros
# $ clusterhat on
#
# Turn on individual Pi Zeros
# $ clusterhat on p1 p3
#
# Turn on/off ALERT LED
# $ clusterhat alert on
# $ clusterhat alert off
#
# Get status
# $ clusterhat status
#
# Supported on v2.x ONLY
#
# Turn on/off USB HUB
# $ clusterhat hub on
# $ clusterhat hub off
#
# Turn HAT LED on/off
# $ clusterhat led on
# $ clusterhat led off
#
# Enable/Disable EEPROM Write Protect
# $ clusterhat wp on
# $ clusterhat wp off
#
# Physical Solder Jumpers
#
# PWR - Pi Zero Power Source (2 way solder jumper)
# By default RPI<>USB is joined, for other options below cut the track
# RPI<>USB - Pi Zeros Powered via Controller Pi
# PWR<>USB - Powered via USB
# PWR (no link) Powered via connector on underside of Cluster HAT
#
# POS - Power on state (1 way solder jumper)
#
# By default the POW jumper is linked, cut track to disconnect
# POW [LINK] - Initial Pi Zero state OFF
# POW [NO LINK] - Initial Pi Zero state ON
#
# WP - HAT EEPROM Write Protect [FORCE]
# By default is not linked
# WP [LINK ] - EEPROM is write protected
# WP [NO LINK] - EEPROM write protect can be enabled using this tool
#
import sys, time, os, os.path
# Number of seconds to wait between enabling power for each Pi Zero
delay = 2
# Version number of this tool
clusterhat_version = "20180707"
# HAT File locations
hat_product = "/proc/device-tree/hat/product"
hat_version = "/proc/device-tree/hat/product_ver"
hat_uuid = "/proc/device-tree/hat/uuid"
hat_vendor = "/proc/device-tree/hat/vendor"
hat_pid = "/proc/device-tree/hat/product_id"
# Path to external commands
vcgencmdpath= "/usr/bin/vcgencmd"
# Do we have a HAT?
if ( not os.path.isfile(hat_product)
or not os.access(hat_product, os.R_OK)
or not os.path.isfile(hat_uuid)
or not os.access(hat_uuid, os.R_OK)
or not os.path.isfile(hat_vendor)
or not os.access(hat_vendor, os.R_OK)
or not os.path.isfile(hat_pid)
or not os.access(hat_pid, os.R_OK)
or not os.path.isfile(hat_version)
or not os.access(hat_version, os.R_OK) ):
print ( "ERROR: HAT not found?" )
sys.exit()
# Is it a Cluster HAT?
f = open(hat_product, 'r')
if ( f.read().strip('\x00') != 'ZC4:ClusterHAT' ):
print ( "ERROR: Cluster HAT not found?" )
sys.exit()
f.close()
# Are we running a v1.x or v2.x ?
# 0x001? = v1.x / 0x002? = v2.x
version = 0
f = open(hat_version, 'r')
tmp = int(f.read().strip('\x00'),16)
f.close()
if ( tmp >= 16 and tmp <=31 ):
version = 1
version_minor = tmp - 16
elif ( tmp >= 32 and tmp <= 47 ):
version = 2
version_minor = tmp - 32
else:
print ( "ERROR: Unknown Cluster HAT version found?" )
sys.exit()
def get_throttled():
if( not os.path.isfile(vcgencmdpath) or not os.access(vcgencmdpath,os.X_OK) ):
return 'NA'
return ( (os.popen(vcgencmdpath + ' get_throttled').readline()).split('=', 1)[-1].strip())
# Version specific setup
if ( version == 1 ):
import RPi.GPIO as GPIO
GPIO.setwarnings(False)
ports = [29, 31, 33, 35, 37]
GPIO.setmode(GPIO.BOARD)
GPIO.setup(ports, GPIO.OUT)
else: # v2.x
sys.path.append('/usr/share/clusterhat/python')
import xra1200, smbus
wp_link = 0
bus = smbus.SMBus(1)
hat = xra1200.Xra1200(bus=1, address=0x20)
p1 = xra1200.Xra1200(bus=1, address=0x20, port=0)
p2 = xra1200.Xra1200(bus=1, address=0x20, port=1)
p3 = xra1200.Xra1200(bus=1, address=0x20, port=2)
p4 = xra1200.Xra1200(bus=1, address=0x20, port=3)
led = xra1200.Xra1200(bus=1, address=0x20, port=4)
hub = xra1200.Xra1200(bus=1, address=0x20, port=5)
alert = xra1200.Xra1200(bus=1, address=0x20, port=6)
wp = xra1200.Xra1200(bus=1, address=0x20, port=7)
# Get status of I/O Extender
dir = hat.get_dir() # I/O pin directions
status = hat.read_byte() # Pin Status
# Detect I/O Expander
xra1200p = True;
pur = hat.get_pur()
if pur == -1:
xra1200p = False
# If all pins are inputs this is the first run since HAT power up
if ( dir == 255 ):
# Detect if WP is being pulled high
if(xra1200p):
hat.set_pur(0x7F) # Disable pullup for EEPROM WP on I/O expander
wp_link = (hat.read_byte()>>7) # 1 = soldered / 0 = open
if( wp_link == 1 ):
hat.set_pur(0xFF)
else:
wp.on()
else:
wp.on()
wp_link = -1
if ( ( status & 0xF ) == 0xF ): # Check POS [Power On State]
# POS [NO LINK] set power ON (CUT)
p1.on()
p2.on()
p3.on()
p4.on()
else:
# POS [LINK] set power off (Default)
p1.off()
p2.off()
p3.off()
p4.off()
# Set default state for other pins
alert.off()
led.on()
if ( version_minor == 0 ):
hub.on()
else:
hub.off()
hat.set_dir(0x00) # Set all pins as outputs
else:
if(version == 2 and xra1200p==True):
if (hat.get_pur()>>7):
wp_link = 1
else:
wp_link = -1
args = len(sys.argv)
# Command line options
if ( args == 3 and sys.argv[1] == "alert" and ( sys.argv[2] == "on" or sys.argv[2] == "off" ) ):
# Turn ALERT LED on/off
if ( version == 1 ):
if ( sys.argv[2] == "on" ):
GPIO.output(29, 1)
else:
GPIO.output(29, 0)
else:
if ( sys.argv[2] == "on" ):
alert.on()
else:
alert.off()
elif ( args == 2 and sys.argv[1] == "on" ):
# Turn on all ports
if ( version == 1 ):
alertstatus = GPIO.input(ports[0])
if not alertstatus: GPIO.output(ports[0], 1)
for port in ports[:-1]:
GPIO.output(port, 1)
time.sleep(delay)
GPIO.output(ports[-1], 1)
if not alertstatus: GPIO.output(ports[0], 0)
else:
alertstatus = alert.get()
if not alertstatus: alert.on()
p1.on()
time.sleep(delay)
p2.on()
time.sleep(delay)
p3.on()
time.sleep(delay)
p4.on()
if not alertstatus: alert.off()
elif ( args == 2 and sys.argv[1] == "off" ):
# Turn off all ports
if ( version == 1 ):
alertstatus = GPIO.input(ports[0])
if not alertstatus: GPIO.output(ports[0], 1)
for port in ports:
GPIO.output(port, 0)
if not alertstatus: GPIO.output(ports[0], 0)
else:
alertstatus = alert.get()
if not alertstatus: alert.on()
p1.off()
p2.off()
p3.off()
p4.off()
if not alertstatus: alert.off()
elif ( args > 2 and ( sys.argv[1] == "on" or sys.argv[1] == "off" ) ):
if ( sys.argv[1] == "on"):
mode = 1
else:
mode = 0
zeros = []
# Parse command line options (2 chars, start with p and end with numeric 1 to 4)
for zero in sys.argv[2:]:
if ( len(zero) != 2 or zero[0] != "p" or ( int(zero[1]) < 1 or int(zero[1]) > 4 ) ):
print ( "ERROR: Valid options are p1/p2/p3/p4" )
sys.exit()
zeros.append( int(zero[1]) )
if ( version == 1 ):
alertstatus = GPIO.input(ports[0])
if not alertstatus: GPIO.output(ports[0], 1)
for zero in zeros[:-1]:
GPIO.output(ports[zero], mode)
if ( sys.argv[1] == "on" ): time.sleep(delay)
GPIO.output(ports[zeros[-1]], mode)
if not alertstatus: GPIO.output(ports[0], 0)
else:
alertstatus = alert.get()
if not alertstatus: alert.on()
for zero in zeros[:-1]:
bit = (1<<(zero-1))
if (mode == 1):
hat.write_byte(hat.read_byte() | bit)
else:
hat.write_byte(hat.read_byte() & (~bit | 0xF0))
time.sleep(delay)
bit = (1<<(zeros[-1]-1))
if (mode == 1):
hat.write_byte(hat.read_byte() | bit)
else:
hat.write_byte(hat.read_byte() & (~bit | 0xF0))
if not alertstatus: alert.off()
elif ( args == 3 and sys.argv[1] == 'hub' and (sys.argv[2] == 'on' or sys.argv[2] == 'off') ):
if ( version == 1 ):
print ( "ERROR: hub control not supported on Cluster HAT v1.x\n")
else:
if ( sys.argv[2] == 'on' ):
if ( version_minor == 0 ):
hub.on()
else:
hub.off()
else:
if ( version_minor == 0 ):
hub.off()
else:
hub.on()
elif ( args == 3 and sys.argv[1] == 'wp' and (sys.argv[2] == 'on' or sys.argv[2] == 'off') ):
if ( version == 1 ):
print ( "ERROR: write protect not supported on Cluster HAT v1.x\n")
else:
if ( sys.argv[2] == 'on' ):
wp.on()
else:
if ( wp_link ):
print("Unable to disable EEPROM WP (Solder link set)")
else:
wp.off()
elif ( args == 3 and sys.argv[1] == 'led' and (sys.argv[2] == 'on' or sys.argv[2] == 'off') ):
if ( version == 1 ):
print ( "ERROR: LED control not supported on Cluster HAT v1.x\n")
else:
if ( sys.argv[2] == 'on' ):
led.on()
else:
led.off()
elif ( args == 2 and sys.argv[1] == 'status' ):
print ( "tool:{}" ).format( clusterhat_version )
print ( "version:{}.{}" ).format ( version, version_minor )
print ( "version_major:{}" ).format( version )
print ( "version_minor:{}" ).format( version_minor )
f = open(hat_uuid, 'r')
print ( "uuid:{}" ).format( f.read().strip('\x00') )
f.close()
f = open(hat_vendor, 'r')
print ( "vendor:{}" ).format( f.read().strip('\x00') )
f.close()
f = open(hat_pid, 'r')
print ( "product_id:{}" ).format( f.read().strip('\x00') )
f.close()
if ( version == 1 ):
print ( "alert:{}" ).format(GPIO.input(ports[0]))
for p in range(1, 5):
print ( "p{}:{}" ).format( p, GPIO.input(ports[p]) )
else:
print ( "alert:{}" ).format( alert.get() )
if ( version_minor == 0 ):
print ( "hub:{:d}" ).format( hub.get() )
else:
print ( "hub:{:d}" ).format( not hub.get() )
print ( "wp:{}" ).format( wp.get() )
print ( "led:{}" ).format( led.get() )
print ( "wplink:{}").format( wp_link )
print ( "xra1200p:{}" ).format( xra1200p )
print ( "throttled:{}" ).format( get_throttled() )
status = hat.read_byte()
for p in range(1,5):
print ( "p{}:{:d}" ).format( p, (( status & (1<<(p-1)) )>0) )
elif ( args == 2 and sys.argv[1] == 'init' ):
# First run init is handled above this is just here to allow the command to succeed
pass
else:
print( "ERROR: Unknown command" )