Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ REDHAWK REST Python is licensed under the GNU Lesser General Public License (LGP

## Running

For Development/Test environments there are scripts to automatically create a local environment and run the server.
For Development/Test environments there are scripts to automatically create a local environment and run the server. You will need to install the `virtualenv` python package before using the commands below.

./setup.sh install
./start.sh --port=<desired_port>
Expand All @@ -39,4 +39,3 @@ Once running the REST Interface can be tested at `http://localhost:<desired_port
## Deploying Applications

You can either install your application in `apps` for REST-Python to serve them, or deploy them with a separate server (e.g., NodeJS). REST-Python supports cross-domain responses to REST and Websocket requests to facilitate dual- or multi-server configurations to completely decouple the REDHAWK environment from the web application environment. (See [Docker-REDHAWK's](http://github.com/GeonTech/docker-redhawk) `geontech/redhawk-webserver` image.)

215 changes: 164 additions & 51 deletions rest/bulkio_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from tornado import websocket

import json
import numpy
import bulkio_limiter

from model.domain import Domain, ResourceNotFound
from asyncport import AsyncPort
Expand All @@ -43,7 +43,7 @@ def enum(**enums):
# Control messages are a dictionary of type and value. The type should be
# one of these enumerations.
# E.g.: ControlMessage = { 'type': 0, 'value': 1024 }
ControlEnum = enum(MaxWidth=0, MaxPPS=1)
ControlEnum = enum(xMax=0, xBegin=1, xEnd=2, xZoomIn=3, xZoomReset=4, yMax=5, yBegin=6, yEnd=7, yZoomIn=8, yZoomReset=9, MaxPPS=10)


class BulkIOWebsocketHandler(CrossDomainSockets):
Expand All @@ -54,8 +54,28 @@ def initialize(self, kind, redhawk=None, _ioloop=None):
_ioloop = ioloop.IOLoop.current()
self._ioloop = _ioloop

# For on-the-fly per-client decimation
self._outputWidth = None
# For on-the-fly per-client down-sampling
# Current options: 0=NoLimit, else=Limit
self._xMax = 1024
self._yMax = 1024

# The ACTIVE on-the-fly per-client zooming parameters
# These are set from the STAGED values from a xZoomIn/yZoomIn command
self._xBegin = None
self._xEnd = None
self._yBegin = None
self._yEnd = None

# The STAGED on-the-fly per-client zooming parameters
# These are set by commands from UI
self._xBeginStaged = None
self._xEndStaged = None
self._yBeginStaged = None
self._yEndStaged = None

# The down-sampling ratios calculated from the last call to the bulio_limiter
self._xFactor = 1
self._yFactor = 1

# Map of SRIs seen on this port.
self._SRIs = dict()
Expand Down Expand Up @@ -108,18 +128,112 @@ def open(self, *args):

def on_message(self, message):
try:
# Parse a JSON string into a dictionary
ctrl = json.loads(message)

if (ctrl['type'] == ControlEnum.MaxWidth):
if 0 < ctrl['value']:
self._outputWidth = ctrl['value']
logging.info('Decimation requested to {0} samples'.format(ctrl['value']))
else:
self._outputWidth = None
logging.info('Decimation disabled')
# Convert the value to integer
ctrlValueInt = int(ctrl['value'])

# Set the maximum number of samples --------------------------------
if (ctrl['type'] == ControlEnum.xMax):
if (ctrlValueInt > 0):
self._xMax = ctrlValueInt
logging.info('Bulkio packet size limited to {0} samples on the X axis'.format(ctrlValueInt))
else:
self._xMax = None
logging.info('Bulkio packet size limit removed from the X axis')

# Set the STAGED zoom region ---------------------------------------
elif (ctrl['type'] == ControlEnum.xBegin):
# Calculate the index based on the original packet size since we
# slice and then downsample in the bulkio_limiter
self._xBeginStaged = ctrlValueInt * self._xFactor
# If there is already a start index, this means that we are
# zooming on a zoomed region and we need to adjust the indices
# to reflect the previously zoomed region.
if (self._xBegin):
self._xBeginStaged += self._xBegin
# Log the indices
logging.info('Bulkio packet zoom begin index set to {0} on the X axis'.format(self._xBeginStaged))
elif (ctrl['type'] == ControlEnum.xEnd):
# Calculate the index based on the original packet size since we
# slice and then downsample in the bulkio_limiter
self._xEndStaged = ctrlValueInt * self._xFactor
# If there is already a start index, this means that we are
# zooming on a zoomed region and we need to adjust the indices
# to reflect the previously zoomed region.
if (self._xBegin):
self._xEndStaged += self._xBegin
# Log the indices
logging.info('Bulkio packet zoom end index set to {0} on the X axis'.format(self._xEndStaged))

# Zoom commands ----------------------------------------------------
elif (ctrl['type'] == ControlEnum.xZoomIn):
# Make the staged values ACTIVE
self._xBegin = self._xBeginStaged
self._xEnd = self._xEndStaged
self._xBeginStaged = None
self._xEndStaged = None
logging.info('Zoom IN commanded for the X axis with indices: ['+str(self._xBegin)+','+str(self._xEnd)+']')
elif (ctrl['type'] == ControlEnum.xZoomReset):
self._xBegin = None
self._xEnd = None
self._xBeginStaged = None
self._xEndStaged = None
logging.info('Zoom RESET commanded for the X axis')

# Set the maximum number of samples --------------------------------
elif (ctrl['type'] == ControlEnum.yMax):
if (ctrlValueInt > 0):
self._yMax = ctrlValueInt
logging.info('Bulkio packet size limited to {0} samples on the Y axis'.format(ctrlValueInt))
else:
self._yMax = None
logging.info('Bulkio packet size limit removed from the Y axis')

# Set the STAGED zoom region ---------------------------------------
elif (ctrl['type'] == ControlEnum.yBegin):
# Calculate the index based on the original packet size since we
# slice and then downsample in the bulkio_limiter
self._yBeginStaged = ctrlValueInt * self._yFactor
# If there is already a start index, this means that we are
# zooming on a zoomed region and we need to adjust the indices
# to reflect the previously zoomed region.
if (self._yBegin):
self._yBeginStaged += self._yBegin
# Log the indices
logging.info('Bulkio packet zoom begin index set to {0} on the Y axis'.format(self._yBeginStaged))
elif (ctrl['type'] == ControlEnum.yEnd):
# Calculate the index based on the original packet size since we
# slice and then downsample in the bulkio_limiter
self._yEndStaged = ctrlValueInt * self._yFactor
# If there is already a start index, this means that we are
# zooming on a zoomed region and we need to adjust the indices
# to reflect the previously zoomed region.
if (self._yBegin):
self._yEndStaged += self._yBegin
# Log the indices
logging.info('Bulkio packet zoom end index set to {0} on the Y axis'.format(self._yEndStaged))

# Zoom commands ----------------------------------------------------
elif (ctrl['type'] == ControlEnum.yZoomIn):
# Make the staged values ACTIVE
self._yBegin = self._yBeginStaged
self._yEnd = self._yEndStaged
self._yBeginStaged = None
self._yEndStaged = None
logging.info('Zoom IN commanded for the Y axis with indices: ['+str(self._yBegin)+','+str(self._yEnd)+']')
elif (ctrl['type'] == ControlEnum.yZoomReset):
self._yBegin = None
self._yEnd = None
self._yBeginStaged = None
self._yEndStaged = None
logging.info('Zoom RESET commanded for the Y axis')

# Set the max PPS --------------------------------------------------
elif (ctrl['type'] == ControlEnum.MaxPPS):
logging.warning('Packets per second (PPS) not implemented yet.')

except Exception as e:
self.write_message(dict(error='SystemError', message=str(e)))

Expand All @@ -129,7 +243,7 @@ def on_close(self):
self.port.ref.disconnectPort(self._connectionId)
logging.info("Closed websocket to %s, %s", self.port, self._connectionId)
except CORBA.TRANSIENT:
pass
pass
except Exception, e:
logging.exception('Error disconnecting port %s' % self._connectionId)

Expand All @@ -143,61 +257,60 @@ def _getSRI(self, streamID):
return self._SRIs.get(streamID, (None, True))

def _pushPacket(self, data, ts, EOS, stream_id):
data = numpy.array(data)

if None == self._outputWidth:
self._outputWidth = data.size

# Get SRI and modify if necessary (from decimation)
SRI, changed = self._getSRI(stream_id)
outSRI = copy_sri(SRI)
if 0 < data.size and data.size != self._outputWidth:
D, M = divmod(data.size, self._outputWidth)
if 0 == M and 1 < D:
# Mean decimate
data = data.reshape(-1, D).mean(axis=1)
outSRI.xdelta = SRI.xdelta * D
changed = True
else:
# Restore...invalid setting.
# TODO: Support interp+decimate rather than just
# neighbor-mean
logging.warning('Interpolate+decimate not supported. Restoring original output width.')
self._outputWidth = None
# Retrieve SRI from stream_id
sri, sriChangedFromPacket = self._getSRI(stream_id)

# Check if any limiting parameter exists
if (self._hasLimitingParameter()):
# Call the limit function (the "True" flags tell the function to use the mean() down-sampling)
outData, outSRI, self._xFactor, self._yFactor, sriChangedFromLimiter, warningMessage = bulkio_limiter.limit(
data, sri, self._xMax, self._xBegin, self._xEnd, True, self._yMax, self._yBegin, self._yEnd, True
)

# Logging for debug (comment out operationally)
#logging.info("connection_id: " + str(self._connectionId))
#logging.info("bulkio_limiter info: \n" +
# " originalWordLength="+str(len(data))+", finalWordLength="+str(len(outData))+", originalSubsize="+str(sri.subsize)+", finalSubsize="+str(outSRI.subsize)+"\n"+
# " xMax="+str(self._xMax)+", xBegin="+str(self._xBegin)+", xEnd="+str(self._xEnd)+", xDownSampFactor="+str(self._xFactor)+"\n"+
# " yMax="+str(self._yMax)+", yBegin="+str(self._yBegin)+", yEnd="+str(self._yEnd)+", yDownSampFactor="+str(self._yFactor)+"\n"+
# " sriChangedFromLimiter="+str(sriChangedFromLimiter))

# Print warnings if they exist
if (warningMessage):
logging.warning('bulkio_limiter.limit(): ' + warningMessage)
else:
# Don't do anything if no limiting parameter exists
outData = data
outSRI = bulkio_limiter.copy_sri(sri)
sriChangedFromLimiter = False

# Tack on SRI, Package, Deliver.
outSRI.keywords = props_to_dict(outSRI.keywords)
packet = dict(
streamID = stream_id,
T = ts.__dict__,
EOS = EOS,
sriChanged = changed,
sriChanged = (sriChangedFromPacket or sriChangedFromLimiter),
SRI = outSRI.__dict__,
type = self.port._using.name,
dataBuffer = data.tolist()
dataBuffer = outData
)
self._ioloop.add_callback(self.write_message, packet)

def _hasLimitingParameter(self):
# Check if any of the X axis parameters are not None
if (self._xMax or self._xBegin or self._xEnd):
return True
# Check if any of the Y axis parameters are not None
if (self._yMax or self._yBegin or self._yEnd):
return True
# Return false if all the parameters are None
return False

def write_message(self, *args, **ioargs):
# hide WebSocketClosedError because it's very likely
try:
super(BulkIOWebsocketHandler, self).write_message(*args, **ioargs)
except websocket.WebSocketClosedError:
logging.debug('Received WebSocketClosedError. Ignoring')
self.close()

def copy_sri(SRI):
copied = sri.create()
copied.hversion = SRI.hversion
copied.xstart = SRI.xstart
copied.xdelta = SRI.xdelta
copied.xunits = SRI.xunits
copied.subsize = SRI.subsize
copied.ystart = SRI.ystart
copied.ydelta = SRI.ydelta
copied.yunits = SRI.yunits
copied.mode = SRI.mode
copied.streamID = SRI.streamID
copied.blocking = SRI.blocking
copied.keywords = SRI.keywords[:]
return copied
Loading