From d3a2d0de72a896b01a1c74cec789bc166ef4a994 Mon Sep 17 00:00:00 2001 From: Josh Schindehette Date: Tue, 14 Nov 2017 09:43:28 -0500 Subject: [PATCH 1/2] Initial integration of bulkio_limiter functionality --- README.md | 3 +- rest/bulkio_handler.py | 141 ++++++++++++++-------- rest/bulkio_limiter.py | 257 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 349 insertions(+), 52 deletions(-) create mode 100644 rest/bulkio_limiter.py diff --git a/README.md b/README.md index c2cdfc1..4f087f3 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ REDHAWK REST Python is licensed under the GNU Lesser General Public License (LGP ## Running -For Development/Test environments there are scripts to automatically create a local environment and run the server. +For Development/Test environments there are scripts to automatically create a local environment and run the server. You will need to install the `virtualenv` python package before using the commands below. ./setup.sh install ./start.sh --port= @@ -39,4 +39,3 @@ Once running the REST Interface can be tested at `http://localhost: 0): + self._xMax = ctrl['value'] + logging.info('Bulkio packet size limited to {0} samples on the X axis'.format(ctrl['value'])) else: - self._outputWidth = None - logging.info('Decimation disabled') - + self._xMax = None + logging.info('Bulkio packet size limit removed from the X axis') + elif (ctrl['type'] == ControlEnum.xBegin): + self._xBegin = ctrl['value'] + logging.info('Bulkio packets limited with start index {0} on the X axis'.format(ctrl['value'])) + elif (ctrl['type'] == ControlEnum.xEnd): + self._xEnd = ctrl['value'] + logging.info('Bulkio packets limited with end index {0} on the X axis'.format(ctrl['value'])) + elif (ctrl['type'] == ControlEnum.xUseMean): + self._xUseMean = ctrl['value'] + logging.info('Bulkio packet down-sampling mean function enable is set to {0} on the X axis'.format(ctrl['value'])) + elif (ctrl['type'] == ControlEnum.yMax): + if (ctrl['value'] > 0): + self._yMax = ctrl['value'] + logging.info('Bulkio packet size limited to {0} samples on the Y axis'.format(ctrl['value'])) + else: + self._yMax = None + logging.info('Bulkio packet size limit removed from the Y axis') + elif (ctrl['type'] == ControlEnum.yBegin): + self._yBegin = ctrl['value'] + logging.info('Bulkio packets limited with start index {0} on the Y axis'.format(ctrl['value'])) + elif (ctrl['type'] == ControlEnum.yEnd): + self._yEnd = ctrl['value'] + logging.info('Bulkio packets limited with end index {0} on the Y axis'.format(ctrl['value'])) + elif (ctrl['type'] == ControlEnum.yUseMean): + self._yUseMean = ctrl['value'] + logging.info('Bulkio packet down-sampling mean function enable is set to {0} on the Y axis'.format(ctrl['value'])) + elif (ctrl['type'] == ControlEnum.zoomReset): + # We don't care what is in ctrl['value'] + self._xBegin = None + self._xEnd = None + self._yBegin = None + self._yEnd = None + logging.info('Bulkio packet start and end indexes were reset for both axes') elif (ctrl['type'] == ControlEnum.MaxPPS): logging.warning('Packets per second (PPS) not implemented yet.') + except Exception as e: self.write_message(dict(error='SystemError', message=str(e))) @@ -129,7 +174,7 @@ def on_close(self): self.port.ref.disconnectPort(self._connectionId) logging.info("Closed websocket to %s, %s", self.port, self._connectionId) except CORBA.TRANSIENT: - pass + pass except Exception, e: logging.exception('Error disconnecting port %s' % self._connectionId) @@ -143,27 +188,29 @@ def _getSRI(self, streamID): return self._SRIs.get(streamID, (None, True)) def _pushPacket(self, data, ts, EOS, stream_id): - data = numpy.array(data) - - if None == self._outputWidth: - self._outputWidth = data.size - - # Get SRI and modify if necessary (from decimation) - SRI, changed = self._getSRI(stream_id) - outSRI = copy_sri(SRI) - if 0 < data.size and data.size != self._outputWidth: - D, M = divmod(data.size, self._outputWidth) - if 0 == M and 1 < D: - # Mean decimate - data = data.reshape(-1, D).mean(axis=1) - outSRI.xdelta = SRI.xdelta * D - changed = True + # Retrieve SRI from stream_id + sri, sriChangedFromPacket = self._getSRI(stream_id) + + # Check if the bulkio limiter is enabled + if (self._limiterEnable): + # Check if any limiting parameter exists + if (self._hasLimitingParameter()): + # Call the limit function + outData, outSRI, xFactor, yFactor, sriChangedFromLimiter, message = bulkio_limiter.limit( + data, sri, + self._xMax, self._xBegin, self._xEnd, self._xUseMean, + self._yMax, self._yBegin, self._yEnd, self._yUseMean + ) + # Check for success + if (None == outData): + logging.warning('bulkio limiter failed: ' + message) else: - # Restore...invalid setting. - # TODO: Support interp+decimate rather than just - # neighbor-mean - logging.warning('Interpolate+decimate not supported. Restoring original output width.') - self._outputWidth = None + logging.warning('bulkio limiter is turned ON but there are no limiting parameters!') + else: + # Limiter is OFF + outData = data + outSRI = sri + sriChangedFromLimiter = False # Tack on SRI, Package, Deliver. outSRI.keywords = props_to_dict(outSRI.keywords) @@ -171,13 +218,23 @@ def _pushPacket(self, data, ts, EOS, stream_id): streamID = stream_id, T = ts.__dict__, EOS = EOS, - sriChanged = changed, + sriChanged = (sriChangedFromPacket or sriChangedFromLimiter), SRI = outSRI.__dict__, type = self.port._using.name, - dataBuffer = data.tolist() + dataBuffer = outData ) self._ioloop.add_callback(self.write_message, packet) + def _hasLimitingParameter(self): + # Check if any of the X axis parameters are not None + if (self._xMax or self._xBegin or self._xEnd): + return True + # Check if any of the Y axis parameters are not None + if (self._yMax or self._yBegin or self._yEnd): + return True + # Return false if all the parameters are None + return False + def write_message(self, *args, **ioargs): # hide WebSocketClosedError because it's very likely try: @@ -185,19 +242,3 @@ def write_message(self, *args, **ioargs): except websocket.WebSocketClosedError: logging.debug('Received WebSocketClosedError. Ignoring') self.close() - -def copy_sri(SRI): - copied = sri.create() - copied.hversion = SRI.hversion - copied.xstart = SRI.xstart - copied.xdelta = SRI.xdelta - copied.xunits = SRI.xunits - copied.subsize = SRI.subsize - copied.ystart = SRI.ystart - copied.ydelta = SRI.ydelta - copied.yunits = SRI.yunits - copied.mode = SRI.mode - copied.streamID = SRI.streamID - copied.blocking = SRI.blocking - copied.keywords = SRI.keywords[:] - return copied diff --git a/rest/bulkio_limiter.py b/rest/bulkio_limiter.py new file mode 100644 index 0000000..4985036 --- /dev/null +++ b/rest/bulkio_limiter.py @@ -0,0 +1,257 @@ +""" + Company: Geon Technologies, LLC + Project: geon/redhawk-ui/rest-python, ESRA/BulkioLimiter + Author: Josh Schindehette + Copyright: + (c) 2017 Geon Technologies, LLC. All rights reserved. + Dissemination of this information or reproduction of this + material is strictly prohibited unless prior written permission + is obtained from Geon Technologies, LLC +""" +import math +import numpy +from ossie.utils.bulkio import bulkio_helpers +from bulkio import sri + +def copy_sri(SRI): + """ + This function copies the fields of an SRI object into a new object. + """ + copied = sri.create() + copied.hversion = SRI.hversion + copied.xstart = SRI.xstart + copied.xdelta = SRI.xdelta + copied.xunits = SRI.xunits + copied.subsize = SRI.subsize + copied.ystart = SRI.ystart + copied.ydelta = SRI.ydelta + copied.yunits = SRI.yunits + copied.mode = SRI.mode + copied.streamID = SRI.streamID + copied.blocking = SRI.blocking + copied.keywords = SRI.keywords[:] + return copied + +def meanDownsampleX(dataMatrix, resampleFactor): + """ + This function down-samples the matrix using a mean function across the X dimension. + """ + # Initialize output matrix + outMatrix = dataMatrix[:,::resampleFactor] + # Calculate the dimensions to loop through + numRows = outMatrix.shape[0] + numCols = outMatrix.shape[1] + origCols = dataMatrix.shape[1] + # Loop through the rows + for row in range(0, numRows): + # Loop through the columns + for col in range(0, numCols): + # Check if there is an even (resampleFactor) amount of data to calculate a mean() on + if ((col*resampleFactor+resampleFactor) > origCols): + # Calculate the mean with whatever is left + outMatrix[row, col] = dataMatrix[row, col*resampleFactor:].mean() + else: + # Calculate the mean with (resampleFactor) samples + outMatrix[row, col] = dataMatrix[row, col*resampleFactor:col*resampleFactor+resampleFactor].mean() + # Return output matrix + return outMatrix + +def meanDownsampleY(dataMatrix, resampleFactor): + """ + This function down-samples the matrix using a mean function across the Y dimension. + """ + # Initialize output matrix + outMatrix = dataMatrix[::resampleFactor,:] + # Calculate the dimensions to loop through + numRows = outMatrix.shape[0] + numCols = outMatrix.shape[1] + origRows = dataMatrix.shape[0] + # Loop through the columns + for col in range(0, numCols): + # Loop through the rows + for row in range(0, numRows): + # Check if there is an even (resampleFactor) amount of data to calculate a mean() on + if ((row*resampleFactor+resampleFactor) > origRows): + # Calculate the mean with whatever is left + outMatrix[row, col] = dataMatrix[row*resampleFactor:, col].mean() + else: + # Calculate the mean with (resampleFactor) samples + outMatrix[row, col] = dataMatrix[row*resampleFactor:row*resampleFactor+resampleFactor, col].mean() + # Return output matrix + return outMatrix + +def limit(data, sri, xMax, xBegin=None, xEnd=None, xUseMean=True, yMax=None, yBegin=None, yEnd=None, yUseMean=True): + """ + This function limits the output size of a bulkio packet. + First, the data is sliced across two different dimensions (if applicable) with the indices (note the "inclusive" range definition): + [xBegin, xEnd] + [yBegin, yEnd] + These indices are in terms of samples rather than data words. This means that xBegin=2 ignores two samples which + is 4 words if the data is complex and 2 words if the data is scalar.. + Second, the data is down-sampled across two different dimensions (if applicable) to fit within the max sample sizes: + xMax + yMax + The down-sampling is performed across the X dimension first (this only matters if the mean function is used). + These max sample sizes are in terms of samples rather than data words. The down-sampling operation either drops + samples or it uses the mean function to include information from the dropped samples. + The following flags indicate which operation to use: + xUseMean + yUseMean + """ + #============================================ + # Initialize + #============================================ + # Copy SRI so it isn't modified in place + outSRI = copy_sri(sri) + sriChanged = False + + #============================================ + # Convert from a vector of words to a matrix of samples + #============================================ + # Check if complex and convert (0:Scalar, 1:Complex) + if (outSRI.mode == 1): + outData = bulkio_helpers.bulkioComplexToPythonComplexList(data) + # Divide by two since this parameter uses word indexing rather than sample indexing + frameSize = outSRI.subsize/2 + else: + outData = data + frameSize = outSRI.subsize + + # Check for dimension (0:1D, otherwise:2D) + if (outSRI.subsize == 0): + xLength = len(outData) + yLength = 1 + else: + xLength = frameSize + yLength = len(outData) / frameSize + # Check that the frame size evenly fits in the data length + if ((len(outData) % frameSize) > 0): + return (None, None, None, None, None, "Malformed input packet: Data length:" + str(len(outData)) + " is not a multiple of the frame length:" + str(frameSize)) + + # Convert to numpy array + outData = numpy.array(outData) + + # Reshape data into matrix of yLength rows and xLength columns + outData = outData.reshape((yLength, xLength)) + + #============================================ + # Slice the data if necessary + #============================================ + # Slice the end of the X axis first + if (xEnd): + # Check for valid xEnd + if ((xEnd+1) <= xLength): + # Slice (Use xEnd+1 since [xBegin, xEnd] range is inclusive) + outData = outData[:,:xEnd+1] + # Recalculate xLength - number of columns + xLength = outData.shape[1] + else: + return (None, None, None, None, None, "Invalid X axis indices: Requested end index:" + str(xEnd) + " for " + str(xLength) + " samples") + + # Slice the beginning of the X axis second + if (xBegin): + # Check for valid xBegin + if ((xBegin+1) <= xLength): + # Slice + outData = outData[:,xBegin:] + # Recalculate xLength - number of columns + xLength = outData.shape[1] + # Modify SRI + outSRI.xstart = outSRI.xstart + xBegin*outSRI.xunits + # Check if the SRI was updated here + if (sri.xstart != outSRI.xstart): + sriChanged = True + else: + return (None, None, None, None, None, "Invalid X axis indices: Requested beginning index:" + str(xBegin) + " for " + str(xLength) + " samples") + + # Slice the end of the Y axis first + if (yEnd): + # Check for valid yEnd + if ((yEnd+1) <= yLength): + # Slice (Use yEnd+1 since [yBegin, yEnd] range is inclusive) + outData = outData[:yEnd+1,:] + # Recalculate yLength - number of rows + yLength = outData.shape[0] + else: + return (None, None, None, None, None, "Invalid Y axis indices: Requested end index:" + str(yEnd) + " for " + str(yLength) + " frames") + + # Slice the beginning of the Y axis second + if (yBegin): + # Check for valid xBegin + if ((yBegin+1) <= yLength): + # Slice + outData = outData[yBegin:,:] + # Recalculate xLength - number of rows + yLength = outData.shape[0] + # Modify SRI + outSRI.ystart = outSRI.ystart + yBegin*outSRI.yunits + # Check if SRI was updated here + if (sri.ystart != outSRI.ystart): + sriChanged = True + else: + return (None, None, None, None, None, "Invalid Y axis indices: Requested beginning index:" + str(yBegin) + " for " + str(yLength) + " frames") + + #============================================ + # Down-sample the data if necessary + #============================================ + # Check for down-sampling along the x axis + xResampleFactor = 1 + if ((xMax) and (xMax < xLength)): + # Calculate integer re-sample factor + xResampleFactor = float(xLength) / float(xMax) + xResampleFactor = int(math.ceil(xResampleFactor)) + # Down-sample + if (xUseMean): + # Use the average of the dropped samples and retained sample + outData = meanDownsampleX(outData, xResampleFactor) + else: + # Drop samples when down-sampling + outData = outData[:,::xResampleFactor] + # Recalculate xLength - number of columns + xLength = outData.shape[1] + # Modify SRI + outSRI.xdelta = outSRI.xdelta * xResampleFactor + # The SRI was updated here since re-sampling was performed + sriChanged = True + + # Update the subsize if this is two dimensional data + if (outSRI.subsize > 0): + outSRI.subsize = xLength + # If this is complex data, use 2x multiplier since xLength is in + # terms of samples whereas the subsize is in terms of words + if (outSRI.mode == 1): + outSRI.subsize = outSRI.subsize * 2 + # Check if the SRI was updated here + if (sri.subsize != outSRI.subsize): + sriChanged = True + + # Check for down-sampling along the y axis + yResampleFactor = 1 + if ((yMax) and (yMax < yLength)): + # Calculate integer re-sample factor + yResampleFactor = float(yLength) / float(yMax) + yResampleFactor = int(math.ceil(yResampleFactor)) + # Down-sample + if (yUseMean): + # Use the average of the dropped samples and retained sample + outData = meanDownsampleY(outData, yResampleFactor) + else: + outData = outData[::yResampleFactor,:] + # Recalculate yLength - number of rows + yLength = outData.shape[0] + # Modify SRI + outSRI.ydelta = outSRI.ydelta * yResampleFactor + # The SRI was updated here since re-sampling was performed + sriChanged = True + + #============================================ + # Convert from a matrix of samples to a vector of words + #============================================ + # Convert the matrix back to a list + outData = outData.reshape((1, xLength*yLength)).squeeze().tolist() + + # Word-serialize the complex data + if (outSRI.mode == 1): + outData = bulkio_helpers.pythonComplexListToBulkioComplex(outData) + + return (outData, outSRI, xResampleFactor, yResampleFactor, sriChanged, "") From 2310652ea0003edaa5673b7a74b71e276de41fb6 Mon Sep 17 00:00:00 2001 From: Josh Schindehette Date: Thu, 16 Nov 2017 13:21:18 -0500 Subject: [PATCH 2/2] Added zoom and mean() down-sampling --- rest/bulkio_handler.py | 180 ++++++++++++++++++++++++++++------------- rest/bulkio_limiter.py | 21 ++--- 2 files changed, 138 insertions(+), 63 deletions(-) diff --git a/rest/bulkio_handler.py b/rest/bulkio_handler.py index f4c523d..1783ac3 100644 --- a/rest/bulkio_handler.py +++ b/rest/bulkio_handler.py @@ -43,7 +43,7 @@ def enum(**enums): # Control messages are a dictionary of type and value. The type should be # one of these enumerations. # E.g.: ControlMessage = { 'type': 0, 'value': 1024 } -ControlEnum = enum(useLimiter=0, xMax=1, xBegin=2, xEnd=3, xUseMean=4, yMax=5, yBegin=6, yEnd=7, yUseMean=8, zoomReset=0, MaxPPS=10) +ControlEnum = enum(xMax=0, xBegin=1, xEnd=2, xZoomIn=3, xZoomReset=4, yMax=5, yBegin=6, yEnd=7, yZoomIn=8, yZoomReset=9, MaxPPS=10) class BulkIOWebsocketHandler(CrossDomainSockets): @@ -54,16 +54,28 @@ def initialize(self, kind, redhawk=None, _ioloop=None): _ioloop = ioloop.IOLoop.current() self._ioloop = _ioloop - # For on-the-fly per-client down-sampling of bulkio packets - self._limiterEnable = False - self._xMax = None + # For on-the-fly per-client down-sampling + # Current options: 0=NoLimit, else=Limit + self._xMax = 1024 + self._yMax = 1024 + + # The ACTIVE on-the-fly per-client zooming parameters + # These are set from the STAGED values from a xZoomIn/yZoomIn command self._xBegin = None self._xEnd = None - self._xUseMean = True - self._yMax = None self._yBegin = None self._yEnd = None - self._yUseMean = True + + # The STAGED on-the-fly per-client zooming parameters + # These are set by commands from UI + self._xBeginStaged = None + self._xEndStaged = None + self._yBeginStaged = None + self._yEndStaged = None + + # The down-sampling ratios calculated from the last call to the bulio_limiter + self._xFactor = 1 + self._yFactor = 1 # Map of SRIs seen on this port. self._SRIs = dict() @@ -119,49 +131,106 @@ def on_message(self, message): # Parse a JSON string into a dictionary ctrl = json.loads(message) - # Parse the type enumeration to set properties - if (ctrl['type'] == ControlEnum.useLimiter): - self._limiterEnable = ctrl['value'] - logging.info('Bulkio packets limiter enable is set to {0}'.format(ctrl['value'])) - elif (ctrl['type'] == ControlEnum.xMax): - if (ctrl['value'] > 0): - self._xMax = ctrl['value'] - logging.info('Bulkio packet size limited to {0} samples on the X axis'.format(ctrl['value'])) + # Convert the value to integer + ctrlValueInt = int(ctrl['value']) + + # Set the maximum number of samples -------------------------------- + if (ctrl['type'] == ControlEnum.xMax): + if (ctrlValueInt > 0): + self._xMax = ctrlValueInt + logging.info('Bulkio packet size limited to {0} samples on the X axis'.format(ctrlValueInt)) else: self._xMax = None logging.info('Bulkio packet size limit removed from the X axis') + + # Set the STAGED zoom region --------------------------------------- elif (ctrl['type'] == ControlEnum.xBegin): - self._xBegin = ctrl['value'] - logging.info('Bulkio packets limited with start index {0} on the X axis'.format(ctrl['value'])) + # Calculate the index based on the original packet size since we + # slice and then downsample in the bulkio_limiter + self._xBeginStaged = ctrlValueInt * self._xFactor + # If there is already a start index, this means that we are + # zooming on a zoomed region and we need to adjust the indices + # to reflect the previously zoomed region. + if (self._xBegin): + self._xBeginStaged += self._xBegin + # Log the indices + logging.info('Bulkio packet zoom begin index set to {0} on the X axis'.format(self._xBeginStaged)) elif (ctrl['type'] == ControlEnum.xEnd): - self._xEnd = ctrl['value'] - logging.info('Bulkio packets limited with end index {0} on the X axis'.format(ctrl['value'])) - elif (ctrl['type'] == ControlEnum.xUseMean): - self._xUseMean = ctrl['value'] - logging.info('Bulkio packet down-sampling mean function enable is set to {0} on the X axis'.format(ctrl['value'])) + # Calculate the index based on the original packet size since we + # slice and then downsample in the bulkio_limiter + self._xEndStaged = ctrlValueInt * self._xFactor + # If there is already a start index, this means that we are + # zooming on a zoomed region and we need to adjust the indices + # to reflect the previously zoomed region. + if (self._xBegin): + self._xEndStaged += self._xBegin + # Log the indices + logging.info('Bulkio packet zoom end index set to {0} on the X axis'.format(self._xEndStaged)) + + # Zoom commands ---------------------------------------------------- + elif (ctrl['type'] == ControlEnum.xZoomIn): + # Make the staged values ACTIVE + self._xBegin = self._xBeginStaged + self._xEnd = self._xEndStaged + self._xBeginStaged = None + self._xEndStaged = None + logging.info('Zoom IN commanded for the X axis with indices: ['+str(self._xBegin)+','+str(self._xEnd)+']') + elif (ctrl['type'] == ControlEnum.xZoomReset): + self._xBegin = None + self._xEnd = None + self._xBeginStaged = None + self._xEndStaged = None + logging.info('Zoom RESET commanded for the X axis') + + # Set the maximum number of samples -------------------------------- elif (ctrl['type'] == ControlEnum.yMax): - if (ctrl['value'] > 0): - self._yMax = ctrl['value'] - logging.info('Bulkio packet size limited to {0} samples on the Y axis'.format(ctrl['value'])) + if (ctrlValueInt > 0): + self._yMax = ctrlValueInt + logging.info('Bulkio packet size limited to {0} samples on the Y axis'.format(ctrlValueInt)) else: self._yMax = None logging.info('Bulkio packet size limit removed from the Y axis') + + # Set the STAGED zoom region --------------------------------------- elif (ctrl['type'] == ControlEnum.yBegin): - self._yBegin = ctrl['value'] - logging.info('Bulkio packets limited with start index {0} on the Y axis'.format(ctrl['value'])) + # Calculate the index based on the original packet size since we + # slice and then downsample in the bulkio_limiter + self._yBeginStaged = ctrlValueInt * self._yFactor + # If there is already a start index, this means that we are + # zooming on a zoomed region and we need to adjust the indices + # to reflect the previously zoomed region. + if (self._yBegin): + self._yBeginStaged += self._yBegin + # Log the indices + logging.info('Bulkio packet zoom begin index set to {0} on the Y axis'.format(self._yBeginStaged)) elif (ctrl['type'] == ControlEnum.yEnd): - self._yEnd = ctrl['value'] - logging.info('Bulkio packets limited with end index {0} on the Y axis'.format(ctrl['value'])) - elif (ctrl['type'] == ControlEnum.yUseMean): - self._yUseMean = ctrl['value'] - logging.info('Bulkio packet down-sampling mean function enable is set to {0} on the Y axis'.format(ctrl['value'])) - elif (ctrl['type'] == ControlEnum.zoomReset): - # We don't care what is in ctrl['value'] - self._xBegin = None - self._xEnd = None + # Calculate the index based on the original packet size since we + # slice and then downsample in the bulkio_limiter + self._yEndStaged = ctrlValueInt * self._yFactor + # If there is already a start index, this means that we are + # zooming on a zoomed region and we need to adjust the indices + # to reflect the previously zoomed region. + if (self._yBegin): + self._yEndStaged += self._yBegin + # Log the indices + logging.info('Bulkio packet zoom end index set to {0} on the Y axis'.format(self._yEndStaged)) + + # Zoom commands ---------------------------------------------------- + elif (ctrl['type'] == ControlEnum.yZoomIn): + # Make the staged values ACTIVE + self._yBegin = self._yBeginStaged + self._yEnd = self._yEndStaged + self._yBeginStaged = None + self._yEndStaged = None + logging.info('Zoom IN commanded for the Y axis with indices: ['+str(self._yBegin)+','+str(self._yEnd)+']') + elif (ctrl['type'] == ControlEnum.yZoomReset): self._yBegin = None self._yEnd = None - logging.info('Bulkio packet start and end indexes were reset for both axes') + self._yBeginStaged = None + self._yEndStaged = None + logging.info('Zoom RESET commanded for the Y axis') + + # Set the max PPS -------------------------------------------------- elif (ctrl['type'] == ControlEnum.MaxPPS): logging.warning('Packets per second (PPS) not implemented yet.') @@ -191,25 +260,28 @@ def _pushPacket(self, data, ts, EOS, stream_id): # Retrieve SRI from stream_id sri, sriChangedFromPacket = self._getSRI(stream_id) - # Check if the bulkio limiter is enabled - if (self._limiterEnable): - # Check if any limiting parameter exists - if (self._hasLimitingParameter()): - # Call the limit function - outData, outSRI, xFactor, yFactor, sriChangedFromLimiter, message = bulkio_limiter.limit( - data, sri, - self._xMax, self._xBegin, self._xEnd, self._xUseMean, - self._yMax, self._yBegin, self._yEnd, self._yUseMean - ) - # Check for success - if (None == outData): - logging.warning('bulkio limiter failed: ' + message) - else: - logging.warning('bulkio limiter is turned ON but there are no limiting parameters!') + # Check if any limiting parameter exists + if (self._hasLimitingParameter()): + # Call the limit function (the "True" flags tell the function to use the mean() down-sampling) + outData, outSRI, self._xFactor, self._yFactor, sriChangedFromLimiter, warningMessage = bulkio_limiter.limit( + data, sri, self._xMax, self._xBegin, self._xEnd, True, self._yMax, self._yBegin, self._yEnd, True + ) + + # Logging for debug (comment out operationally) + #logging.info("connection_id: " + str(self._connectionId)) + #logging.info("bulkio_limiter info: \n" + + # " originalWordLength="+str(len(data))+", finalWordLength="+str(len(outData))+", originalSubsize="+str(sri.subsize)+", finalSubsize="+str(outSRI.subsize)+"\n"+ + # " xMax="+str(self._xMax)+", xBegin="+str(self._xBegin)+", xEnd="+str(self._xEnd)+", xDownSampFactor="+str(self._xFactor)+"\n"+ + # " yMax="+str(self._yMax)+", yBegin="+str(self._yBegin)+", yEnd="+str(self._yEnd)+", yDownSampFactor="+str(self._yFactor)+"\n"+ + # " sriChangedFromLimiter="+str(sriChangedFromLimiter)) + + # Print warnings if they exist + if (warningMessage): + logging.warning('bulkio_limiter.limit(): ' + warningMessage) else: - # Limiter is OFF + # Don't do anything if no limiting parameter exists outData = data - outSRI = sri + outSRI = bulkio_limiter.copy_sri(sri) sriChangedFromLimiter = False # Tack on SRI, Package, Deliver. diff --git a/rest/bulkio_limiter.py b/rest/bulkio_limiter.py index 4985036..abcbddb 100644 --- a/rest/bulkio_limiter.py +++ b/rest/bulkio_limiter.py @@ -1,6 +1,5 @@ """ Company: Geon Technologies, LLC - Project: geon/redhawk-ui/rest-python, ESRA/BulkioLimiter Author: Josh Schindehette Copyright: (c) 2017 Geon Technologies, LLC. All rights reserved. @@ -101,9 +100,9 @@ def limit(data, sri, xMax, xBegin=None, xEnd=None, xUseMean=True, yMax=None, yBe #============================================ # Initialize #============================================ - # Copy SRI so it isn't modified in place - outSRI = copy_sri(sri) + outSRI = copy_sri(sri) # Copy SRI so it isn't modified in place sriChanged = False + warningMessage = "" #============================================ # Convert from a vector of words to a matrix of samples @@ -126,7 +125,11 @@ def limit(data, sri, xMax, xBegin=None, xEnd=None, xUseMean=True, yMax=None, yBe yLength = len(outData) / frameSize # Check that the frame size evenly fits in the data length if ((len(outData) % frameSize) > 0): - return (None, None, None, None, None, "Malformed input packet: Data length:" + str(len(outData)) + " is not a multiple of the frame length:" + str(frameSize)) + # Warn and attempt to correct by slicing off the end of the packet + warningMessage += "Malformed input packet! Data with length=" + str(len(outData)) + " is not a multiple of the frame with length=" + str(frameSize) + "\n" + adjustedLength = (int(len(outData)) / int(frameSize)) * frameSize + outData = outData[:adjustedLength] + warningMessage += "Dropped data to fix malformed input packet! Data now has length=" + str(adjustedLength) + "\n" # Convert to numpy array outData = numpy.array(outData) @@ -146,7 +149,7 @@ def limit(data, sri, xMax, xBegin=None, xEnd=None, xUseMean=True, yMax=None, yBe # Recalculate xLength - number of columns xLength = outData.shape[1] else: - return (None, None, None, None, None, "Invalid X axis indices: Requested end index:" + str(xEnd) + " for " + str(xLength) + " samples") + warningMessage += "Ignoring X axis end index! Index=" + str(xEnd) + " does not exist in samples with length=" + str(xLength) + "\n" # Slice the beginning of the X axis second if (xBegin): @@ -162,7 +165,7 @@ def limit(data, sri, xMax, xBegin=None, xEnd=None, xUseMean=True, yMax=None, yBe if (sri.xstart != outSRI.xstart): sriChanged = True else: - return (None, None, None, None, None, "Invalid X axis indices: Requested beginning index:" + str(xBegin) + " for " + str(xLength) + " samples") + warningMessage += "Ignoring X axis beginning index! Index=" + str(xBegin) + " does not exist in samples with length=" + str(xLength) + "\n" # Slice the end of the Y axis first if (yEnd): @@ -173,7 +176,7 @@ def limit(data, sri, xMax, xBegin=None, xEnd=None, xUseMean=True, yMax=None, yBe # Recalculate yLength - number of rows yLength = outData.shape[0] else: - return (None, None, None, None, None, "Invalid Y axis indices: Requested end index:" + str(yEnd) + " for " + str(yLength) + " frames") + warningMessage += "Ignoring Y axis end index! Index=" + str(yEnd) + " does not exist in samples with length=" + str(yLength) + "\n" # Slice the beginning of the Y axis second if (yBegin): @@ -189,7 +192,7 @@ def limit(data, sri, xMax, xBegin=None, xEnd=None, xUseMean=True, yMax=None, yBe if (sri.ystart != outSRI.ystart): sriChanged = True else: - return (None, None, None, None, None, "Invalid Y axis indices: Requested beginning index:" + str(yBegin) + " for " + str(yLength) + " frames") + warningMessage += "Ignoring Y axis beginning index! Index=" + str(yBegin) + " does not exist in samples with length=" + str(yLength) + "\n" #============================================ # Down-sample the data if necessary @@ -254,4 +257,4 @@ def limit(data, sri, xMax, xBegin=None, xEnd=None, xUseMean=True, yMax=None, yBe if (outSRI.mode == 1): outData = bulkio_helpers.pythonComplexListToBulkioComplex(outData) - return (outData, outSRI, xResampleFactor, yResampleFactor, sriChanged, "") + return (outData, outSRI, xResampleFactor, yResampleFactor, sriChanged, warningMessage)