Skip to content

Commit

Permalink
Merge pull request #9 from QSchulz/devantech-drivers
Browse files Browse the repository at this point in the history
Add drivers for Devantech relays
  • Loading branch information
mattface committed May 9, 2016
2 parents 4e79e63 + 6ab27e6 commit 6e97a35
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 0 deletions.
159 changes: 159 additions & 0 deletions lavapdu/drivers/devantech.py
@@ -0,0 +1,159 @@
#! /usr/bin/python

# Copyright 2016 Quentin Schulz <quentin.schulz@free-electrons.com>
#
# Based on PDUDriver:
# Copyright 2013 Linaro Limited
# Author Matt Hart <matthew.hart@linaro.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.

import logging
from lavapdu.drivers.driver import PDUDriver
import socket

log = logging.getLogger(__name__)

class DevantechBase(PDUDriver):
connection = None
port_count = 0

def __init__(self, hostname, settings):
self.hostname = hostname
log.debug(settings)
self.settings = settings
self.ip = settings["ip"]
self.port = settings.get("port", 17494)
log.debug("port: %d" % self.port)
self.password = settings.get("password")

self.connect()

super(DevantechBase, self).__init__()

def connect(self):
self.connection = socket.create_connection((self.ip, self.port))
if self.password:
log.debug("Attempting connection to %s with provided password." % self.hostname)
msg = '\x79' + self.password
ret = self.connection.sendall(msg)
if ret:
log.error("Failed to send message.")
raise RuntimeError("Failed to send message.")
ret = self.connection.recv(1)
if ret != '\x01':
log.error("Authentication failed.")
raise RuntimeError("Failed to authenticate. Verify your password.")

def port_interaction(self, command, port_number):
if port_number > self.port_count:
log.error("There are only %d ports. Provide a port number lesser than %d." % (self.port_count, self.port_count))
raise RuntimeError("There are only %d ports. Provide a port number lesser than %d." % (self.port_count, self.port_count))

if command == "on":
msg = '\x20'
elif command == "off":
msg = '\x21'
else:
log.error("Unknown command %s." % (command))
return
msg += chr(port_number)
msg += '\x00'
log.debug("Attempting control: %s port: %d hostname: %s." % (command, port_number, self.hostname))
ret = self.connection.sendall(msg)
if ret:
log.error("Failed to send message.")
raise RuntimeError("Failed to send message.")
ret = self.connection.recv(1)
if ret != '\x00':
log.error("Failed to send %s command on port %d of %s." % (command, port_number, self.hostname))
raise RuntimeError("Failed to send %s command on port %d of %s." % (command, port_number, self.hostname))

def _close_connection(self):
# Logout
log.debug("Closing connection.")
if self.password:
log.debug("Attempting to logout.")
ret = self.connection.sendall('\x7B')
if ret:
log.error("Failed to send message.")
raise RuntimeError("Failed to send message.")
ret = self.connection.recv(1)
if ret != '\x00':
log.error("Failed to logout of %s." % self.hostname)
raise RuntimeError("Failed to logout of %s." % self.hostname)
self.connection.close()

def _cleanup(self):
self._close_connection()

def _bombout(self):
self._close_connection()

@classmethod
def accepts(cls, drivername):
log.debug(drivername)
return False

class DevantechETH002(DevantechBase):
port_count = 2

@classmethod
def accepts(cls, drivername):
log.debug(drivername)
if drivername == "devantech_eth002":
return True
return False

class DevantechETH0621(DevantechBase):
port_count = 2

@classmethod
def accepts(cls, drivername):
log.debug(drivername)
if drivername == "devantech_eth0621":
return True
return False

class DevantechETH484(DevantechBase):
port_count = 4

@classmethod
def accepts(cls, drivername):
log.debug(drivername)
if drivername == "devantech_eth484":
return True
return False

class DevantechETH008(DevantechBase):
port_count = 8

@classmethod
def accepts(cls, drivername):
log.debug(drivername)
if drivername == "devantech_eth008":
return True
return False

class DevantechETH8020(DevantechBase):
port_count = 20

@classmethod
def accepts(cls, drivername):
log.debug(drivername)
if drivername == "devantech_eth8020":
return True
return False
10 changes: 10 additions & 0 deletions lavapdu/drivers/strategies.py
Expand Up @@ -29,6 +29,11 @@
from lavapdu.drivers.localcmdline import LocalCmdline
from lavapdu.drivers.ip9258 import IP9258
from lavapdu.drivers.sainsmart import Sainsmart
from lavapdu.drivers.devantech import DevantechETH002
from lavapdu.drivers.devantech import DevantechETH0621
from lavapdu.drivers.devantech import DevantechETH484
from lavapdu.drivers.devantech import DevantechETH008
from lavapdu.drivers.devantech import DevantechETH8020

assert ACME
assert APC7932
Expand All @@ -40,3 +45,8 @@
assert Ubiquity6Port
assert IP9258
assert Sainsmart
assert DevantechETH002
assert DevantechETH0621
assert DevantechETH484
assert DevantechETH008
assert DevantechETH8020

0 comments on commit 6e97a35

Please sign in to comment.