Skip to content

Commit 34429be

Browse files
author
Josh Schindehette
authored
Integrated the BULKIO limiter function for server-side down-sampling and zooming (#1)
* Initial integration of bulkio_limiter functionality * Added zoom and mean() down-sampling
1 parent af660b3 commit 34429be

File tree

3 files changed

+425
-53
lines changed

3 files changed

+425
-53
lines changed

README.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ REDHAWK REST Python is licensed under the GNU Lesser General Public License (LGP
2020

2121
## Running
2222

23-
For Development/Test environments there are scripts to automatically create a local environment and run the server.
23+
For Development/Test environments there are scripts to automatically create a local environment and run the server. You will need to install the `virtualenv` python package before using the commands below.
2424

2525
./setup.sh install
2626
./start.sh --port=<desired_port>
@@ -39,4 +39,3 @@ Once running the REST Interface can be tested at `http://localhost:<desired_port
3939
## Deploying Applications
4040

4141
You can either install your application in `apps` for REST-Python to serve them, or deploy them with a separate server (e.g., NodeJS). REST-Python supports cross-domain responses to REST and Websocket requests to facilitate dual- or multi-server configurations to completely decouple the REDHAWK environment from the web application environment. (See [Docker-REDHAWK's](http://github.com/GeonTech/docker-redhawk) `geontech/redhawk-webserver` image.)
42-

rest/bulkio_handler.py

Lines changed: 164 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from tornado import websocket
3131

3232
import json
33-
import numpy
33+
import bulkio_limiter
3434

3535
from model.domain import Domain, ResourceNotFound
3636
from asyncport import AsyncPort
@@ -43,7 +43,7 @@ def enum(**enums):
4343
# Control messages are a dictionary of type and value. The type should be
4444
# one of these enumerations.
4545
# E.g.: ControlMessage = { 'type': 0, 'value': 1024 }
46-
ControlEnum = enum(MaxWidth=0, MaxPPS=1)
46+
ControlEnum = enum(xMax=0, xBegin=1, xEnd=2, xZoomIn=3, xZoomReset=4, yMax=5, yBegin=6, yEnd=7, yZoomIn=8, yZoomReset=9, MaxPPS=10)
4747

4848

4949
class BulkIOWebsocketHandler(CrossDomainSockets):
@@ -54,8 +54,28 @@ def initialize(self, kind, redhawk=None, _ioloop=None):
5454
_ioloop = ioloop.IOLoop.current()
5555
self._ioloop = _ioloop
5656

57-
# For on-the-fly per-client decimation
58-
self._outputWidth = None
57+
# For on-the-fly per-client down-sampling
58+
# Current options: 0=NoLimit, else=Limit
59+
self._xMax = 1024
60+
self._yMax = 1024
61+
62+
# The ACTIVE on-the-fly per-client zooming parameters
63+
# These are set from the STAGED values from a xZoomIn/yZoomIn command
64+
self._xBegin = None
65+
self._xEnd = None
66+
self._yBegin = None
67+
self._yEnd = None
68+
69+
# The STAGED on-the-fly per-client zooming parameters
70+
# These are set by commands from UI
71+
self._xBeginStaged = None
72+
self._xEndStaged = None
73+
self._yBeginStaged = None
74+
self._yEndStaged = None
75+
76+
# The down-sampling ratios calculated from the last call to the bulio_limiter
77+
self._xFactor = 1
78+
self._yFactor = 1
5979

6080
# Map of SRIs seen on this port.
6181
self._SRIs = dict()
@@ -108,18 +128,112 @@ def open(self, *args):
108128

109129
def on_message(self, message):
110130
try:
131+
# Parse a JSON string into a dictionary
111132
ctrl = json.loads(message)
112133

113-
if (ctrl['type'] == ControlEnum.MaxWidth):
114-
if 0 < ctrl['value']:
115-
self._outputWidth = ctrl['value']
116-
logging.info('Decimation requested to {0} samples'.format(ctrl['value']))
117-
else:
118-
self._outputWidth = None
119-
logging.info('Decimation disabled')
134+
# Convert the value to integer
135+
ctrlValueInt = int(ctrl['value'])
120136

137+
# Set the maximum number of samples --------------------------------
138+
if (ctrl['type'] == ControlEnum.xMax):
139+
if (ctrlValueInt > 0):
140+
self._xMax = ctrlValueInt
141+
logging.info('Bulkio packet size limited to {0} samples on the X axis'.format(ctrlValueInt))
142+
else:
143+
self._xMax = None
144+
logging.info('Bulkio packet size limit removed from the X axis')
145+
146+
# Set the STAGED zoom region ---------------------------------------
147+
elif (ctrl['type'] == ControlEnum.xBegin):
148+
# Calculate the index based on the original packet size since we
149+
# slice and then downsample in the bulkio_limiter
150+
self._xBeginStaged = ctrlValueInt * self._xFactor
151+
# If there is already a start index, this means that we are
152+
# zooming on a zoomed region and we need to adjust the indices
153+
# to reflect the previously zoomed region.
154+
if (self._xBegin):
155+
self._xBeginStaged += self._xBegin
156+
# Log the indices
157+
logging.info('Bulkio packet zoom begin index set to {0} on the X axis'.format(self._xBeginStaged))
158+
elif (ctrl['type'] == ControlEnum.xEnd):
159+
# Calculate the index based on the original packet size since we
160+
# slice and then downsample in the bulkio_limiter
161+
self._xEndStaged = ctrlValueInt * self._xFactor
162+
# If there is already a start index, this means that we are
163+
# zooming on a zoomed region and we need to adjust the indices
164+
# to reflect the previously zoomed region.
165+
if (self._xBegin):
166+
self._xEndStaged += self._xBegin
167+
# Log the indices
168+
logging.info('Bulkio packet zoom end index set to {0} on the X axis'.format(self._xEndStaged))
169+
170+
# Zoom commands ----------------------------------------------------
171+
elif (ctrl['type'] == ControlEnum.xZoomIn):
172+
# Make the staged values ACTIVE
173+
self._xBegin = self._xBeginStaged
174+
self._xEnd = self._xEndStaged
175+
self._xBeginStaged = None
176+
self._xEndStaged = None
177+
logging.info('Zoom IN commanded for the X axis with indices: ['+str(self._xBegin)+','+str(self._xEnd)+']')
178+
elif (ctrl['type'] == ControlEnum.xZoomReset):
179+
self._xBegin = None
180+
self._xEnd = None
181+
self._xBeginStaged = None
182+
self._xEndStaged = None
183+
logging.info('Zoom RESET commanded for the X axis')
184+
185+
# Set the maximum number of samples --------------------------------
186+
elif (ctrl['type'] == ControlEnum.yMax):
187+
if (ctrlValueInt > 0):
188+
self._yMax = ctrlValueInt
189+
logging.info('Bulkio packet size limited to {0} samples on the Y axis'.format(ctrlValueInt))
190+
else:
191+
self._yMax = None
192+
logging.info('Bulkio packet size limit removed from the Y axis')
193+
194+
# Set the STAGED zoom region ---------------------------------------
195+
elif (ctrl['type'] == ControlEnum.yBegin):
196+
# Calculate the index based on the original packet size since we
197+
# slice and then downsample in the bulkio_limiter
198+
self._yBeginStaged = ctrlValueInt * self._yFactor
199+
# If there is already a start index, this means that we are
200+
# zooming on a zoomed region and we need to adjust the indices
201+
# to reflect the previously zoomed region.
202+
if (self._yBegin):
203+
self._yBeginStaged += self._yBegin
204+
# Log the indices
205+
logging.info('Bulkio packet zoom begin index set to {0} on the Y axis'.format(self._yBeginStaged))
206+
elif (ctrl['type'] == ControlEnum.yEnd):
207+
# Calculate the index based on the original packet size since we
208+
# slice and then downsample in the bulkio_limiter
209+
self._yEndStaged = ctrlValueInt * self._yFactor
210+
# If there is already a start index, this means that we are
211+
# zooming on a zoomed region and we need to adjust the indices
212+
# to reflect the previously zoomed region.
213+
if (self._yBegin):
214+
self._yEndStaged += self._yBegin
215+
# Log the indices
216+
logging.info('Bulkio packet zoom end index set to {0} on the Y axis'.format(self._yEndStaged))
217+
218+
# Zoom commands ----------------------------------------------------
219+
elif (ctrl['type'] == ControlEnum.yZoomIn):
220+
# Make the staged values ACTIVE
221+
self._yBegin = self._yBeginStaged
222+
self._yEnd = self._yEndStaged
223+
self._yBeginStaged = None
224+
self._yEndStaged = None
225+
logging.info('Zoom IN commanded for the Y axis with indices: ['+str(self._yBegin)+','+str(self._yEnd)+']')
226+
elif (ctrl['type'] == ControlEnum.yZoomReset):
227+
self._yBegin = None
228+
self._yEnd = None
229+
self._yBeginStaged = None
230+
self._yEndStaged = None
231+
logging.info('Zoom RESET commanded for the Y axis')
232+
233+
# Set the max PPS --------------------------------------------------
121234
elif (ctrl['type'] == ControlEnum.MaxPPS):
122235
logging.warning('Packets per second (PPS) not implemented yet.')
236+
123237
except Exception as e:
124238
self.write_message(dict(error='SystemError', message=str(e)))
125239

@@ -129,7 +243,7 @@ def on_close(self):
129243
self.port.ref.disconnectPort(self._connectionId)
130244
logging.info("Closed websocket to %s, %s", self.port, self._connectionId)
131245
except CORBA.TRANSIENT:
132-
pass
246+
pass
133247
except Exception, e:
134248
logging.exception('Error disconnecting port %s' % self._connectionId)
135249

@@ -143,61 +257,60 @@ def _getSRI(self, streamID):
143257
return self._SRIs.get(streamID, (None, True))
144258

145259
def _pushPacket(self, data, ts, EOS, stream_id):
146-
data = numpy.array(data)
147-
148-
if None == self._outputWidth:
149-
self._outputWidth = data.size
150-
151-
# Get SRI and modify if necessary (from decimation)
152-
SRI, changed = self._getSRI(stream_id)
153-
outSRI = copy_sri(SRI)
154-
if 0 < data.size and data.size != self._outputWidth:
155-
D, M = divmod(data.size, self._outputWidth)
156-
if 0 == M and 1 < D:
157-
# Mean decimate
158-
data = data.reshape(-1, D).mean(axis=1)
159-
outSRI.xdelta = SRI.xdelta * D
160-
changed = True
161-
else:
162-
# Restore...invalid setting.
163-
# TODO: Support interp+decimate rather than just
164-
# neighbor-mean
165-
logging.warning('Interpolate+decimate not supported. Restoring original output width.')
166-
self._outputWidth = None
260+
# Retrieve SRI from stream_id
261+
sri, sriChangedFromPacket = self._getSRI(stream_id)
262+
263+
# Check if any limiting parameter exists
264+
if (self._hasLimitingParameter()):
265+
# Call the limit function (the "True" flags tell the function to use the mean() down-sampling)
266+
outData, outSRI, self._xFactor, self._yFactor, sriChangedFromLimiter, warningMessage = bulkio_limiter.limit(
267+
data, sri, self._xMax, self._xBegin, self._xEnd, True, self._yMax, self._yBegin, self._yEnd, True
268+
)
269+
270+
# Logging for debug (comment out operationally)
271+
#logging.info("connection_id: " + str(self._connectionId))
272+
#logging.info("bulkio_limiter info: \n" +
273+
# " originalWordLength="+str(len(data))+", finalWordLength="+str(len(outData))+", originalSubsize="+str(sri.subsize)+", finalSubsize="+str(outSRI.subsize)+"\n"+
274+
# " xMax="+str(self._xMax)+", xBegin="+str(self._xBegin)+", xEnd="+str(self._xEnd)+", xDownSampFactor="+str(self._xFactor)+"\n"+
275+
# " yMax="+str(self._yMax)+", yBegin="+str(self._yBegin)+", yEnd="+str(self._yEnd)+", yDownSampFactor="+str(self._yFactor)+"\n"+
276+
# " sriChangedFromLimiter="+str(sriChangedFromLimiter))
277+
278+
# Print warnings if they exist
279+
if (warningMessage):
280+
logging.warning('bulkio_limiter.limit(): ' + warningMessage)
281+
else:
282+
# Don't do anything if no limiting parameter exists
283+
outData = data
284+
outSRI = bulkio_limiter.copy_sri(sri)
285+
sriChangedFromLimiter = False
167286

168287
# Tack on SRI, Package, Deliver.
169288
outSRI.keywords = props_to_dict(outSRI.keywords)
170289
packet = dict(
171290
streamID = stream_id,
172291
T = ts.__dict__,
173292
EOS = EOS,
174-
sriChanged = changed,
293+
sriChanged = (sriChangedFromPacket or sriChangedFromLimiter),
175294
SRI = outSRI.__dict__,
176295
type = self.port._using.name,
177-
dataBuffer = data.tolist()
296+
dataBuffer = outData
178297
)
179298
self._ioloop.add_callback(self.write_message, packet)
180299

300+
def _hasLimitingParameter(self):
301+
# Check if any of the X axis parameters are not None
302+
if (self._xMax or self._xBegin or self._xEnd):
303+
return True
304+
# Check if any of the Y axis parameters are not None
305+
if (self._yMax or self._yBegin or self._yEnd):
306+
return True
307+
# Return false if all the parameters are None
308+
return False
309+
181310
def write_message(self, *args, **ioargs):
182311
# hide WebSocketClosedError because it's very likely
183312
try:
184313
super(BulkIOWebsocketHandler, self).write_message(*args, **ioargs)
185314
except websocket.WebSocketClosedError:
186315
logging.debug('Received WebSocketClosedError. Ignoring')
187316
self.close()
188-
189-
def copy_sri(SRI):
190-
copied = sri.create()
191-
copied.hversion = SRI.hversion
192-
copied.xstart = SRI.xstart
193-
copied.xdelta = SRI.xdelta
194-
copied.xunits = SRI.xunits
195-
copied.subsize = SRI.subsize
196-
copied.ystart = SRI.ystart
197-
copied.ydelta = SRI.ydelta
198-
copied.yunits = SRI.yunits
199-
copied.mode = SRI.mode
200-
copied.streamID = SRI.streamID
201-
copied.blocking = SRI.blocking
202-
copied.keywords = SRI.keywords[:]
203-
return copied

0 commit comments

Comments
 (0)