Skip to content

Commit

Permalink
Merge branch 'master' into release/5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
sveseli committed Aug 11, 2022
2 parents 718a90a + 49921f7 commit 3b99a26
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 22 deletions.
2 changes: 1 addition & 1 deletion documentation/RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## Release 5.0.0 (2022/08/10)
## Release 5.0.0 (2022/08/11)

- PvaMirrorServer enhancements:
- optimized structure copy on processing
Expand Down
8 changes: 4 additions & 4 deletions pvapy/cli/hpcCollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ def createProcessorConfig(self, collectorId, args):
processorConfig['inputChannel'] = inputChannel
if not 'processorId' in processorConfig:
processorConfig['processorId'] = collectorId
if not 'processFirstUpdate' in processorConfig:
processorConfig['processFirstUpdate'] = args.process_first_update
if not 'skipInitialUpdates' in processorConfig:
processorConfig['skipInitialUpdates'] = args.skip_initial_updates
if not 'objectIdField' in processorConfig:
processorConfig['objectIdField'] = args.oid_field
if not 'objectIdOffset' in processorConfig:
Expand Down Expand Up @@ -369,7 +369,7 @@ def main():
parser.add_argument('-ic', '--input-channel', dest='input_channel', required=True, help='Input PV channel name. It shuld contain the "*" character which will be replaced with <producerId>.')
parser.add_argument('-oc', '--output-channel', dest='output_channel', default=None, help='Output PVA channel name (default: None). If specified, this channel can be used for publishing processing results. The value of "_" indicates that the output channel name will be set to "pvapy:collector:<collectorId>:output", while the "*" character will be replaced with <collectorId>. Note that this parameter is ignored if processor arguments dictionary contains "outputChannel" key.')
parser.add_argument('-sc', '--status-channel', dest='status_channel', default=None, help='Status PVA channel name (default: None). If specified, this channel will provide collector status. The value of "_" indicates that the status channel name will be set to "pvapy:collector:<collectorId>:status", while the "*" character will be replaced with <collectorId>.')
parser.add_argument('-cc', '--control-channel', dest='control_channel', default=None, help='Control channel name (default: None). If specified, this channel can be used to control collector configuration and processing. The value of "_" indicates that the control channel name will be set to "pvapy:collector:<collectorId>:control", while the "*" character will be replaced with <collectorId>. The control channel object has two strings: command and args. The only allowed values for the command string are: "configure", "reset_stats", "get_stats" and "stop". The configure command is used to allow for runtime configuration changes; in this case the keyword arguments string should be in json format to allow data collector to convert it into python dictionary that contains new configuration. For example, sending configuration dictionary via pvput command might look like this: pvput pvapy:collector:1:control \'{"command" : "configure", "args" : "{\\"x\\":100}"}\'. Note that system parameters that can be modified at runtime are the following: "collectorCacheSize", "monitorQueueSize" (only if client monitor queues have been configured at the start), "processFirstUpdate" (affects processing behavior after resetting stats), and "objectIdOffset". The reset_stats command will cause collector to reset its statistics data, the get_stats will force statistics data update, and the stop command will result in collector process exiting; for all of these commands args string is not needed.')
parser.add_argument('-cc', '--control-channel', dest='control_channel', default=None, help='Control channel name (default: None). If specified, this channel can be used to control collector configuration and processing. The value of "_" indicates that the control channel name will be set to "pvapy:collector:<collectorId>:control", while the "*" character will be replaced with <collectorId>. The control channel object has two strings: command and args. The only allowed values for the command string are: "configure", "reset_stats", "get_stats" and "stop". The configure command is used to allow for runtime configuration changes; in this case the keyword arguments string should be in json format to allow data collector to convert it into python dictionary that contains new configuration. For example, sending configuration dictionary via pvput command might look like this: pvput pvapy:collector:1:control \'{"command" : "configure", "args" : "{\\"x\\":100}"}\'. Note that system parameters that can be modified at runtime are the following: "collectorCacheSize", "monitorQueueSize" (only if client monitor queues have been configured at the start), "skipInitialUpdates" (affects processing behavior after resetting stats), and "objectIdOffset". The reset_stats command will cause collector to reset its statistics data, the get_stats will force statistics data update, and the stop command will result in collector process exiting; for all of these commands args string is not needed.')
parser.add_argument('-sqs', '--server-queue-size', type=int, dest='server_queue_size', default=0, help='Server queue size (default: 0); this setting will increase memory usage on the server side, but may help prevent missed PV updates.')
parser.add_argument('-mqs', '--monitor-queue-size', type=int, dest='monitor_queue_size', default=-1, help='PVA channel monitor (client) queue size (default: -1); if < 0, PV updates will be processed immediately without copying them into PvObjectQueue; if >= 0, PvObjectQueue will be used for receving PV updates (value of zero indicates infinite queue size).')
parser.add_argument('-ccs', '--collector-cache-size', type=int, dest='collector_cache_size', default=-1, help='Collector cache size (default: -1). Collector puts all received PV updates into its cache; once the cache is full, PV updates are sorted by the objectIdField value, removed from the cache and further processed. If specified cache size is negative, or smaller than the minimum allowed value (nProducers*10), this option will be ignored.')
Expand All @@ -379,7 +379,7 @@ def main():
parser.add_argument('-of', '--oid-field', dest='oid_field', default='uniqueId', help='PV update id field used for calculating data processor statistics (default: uniqueId). This parameter is ignored if processor arguments dictionary contains "objectIdField" key.')
parser.add_argument('-oo', '--oid-offset', type=int, dest='oid_offset', default=1, help='This parameter determines by how much object id should change between the two PV updates, and is used for determining the number of missed PV updates (default: 1). This parameter is ignored if processor arguments dictionary contains "objectIdOffset" key.')
parser.add_argument('-fr', '--field-request', dest='field_request', default='', help='PV field request string (default: None). This parameter can be used to request only a subset of the data available in the input channel. The system will automatically append object id field to the specified request string. Note that this parameter is ignored when data distributor is used.')
parser.add_argument('-pfu', '--process-first-update', dest='process_first_update', default=False, action='store_true', help='Process first PV update (default: False). This parameter is ignored if processor arguments dictionary contains "processFirstUpdate" key.')
parser.add_argument('-siu', '--skip-initial-updates', type=int, dest='skip_initial_updates', default=1, help='Number of initial PV updates that should not be processed (default: 1). This parameter is ignored if processor arguments dictionary contains "skipInitialUpdates" key.')
parser.add_argument('-rt', '--runtime', type=float, dest='runtime', default=0, help='Server runtime in seconds; values <=0 indicate infinite runtime (default: infinite).')
parser.add_argument('-rp', '--report-period', type=float, dest='report_period', default=0, help='Statistics report period for the collector in seconds; values <=0 indicate no reporting (default: 0).')
parser.add_argument('-rs', '--report-stats', dest='report_stats', default='all', help='Comma-separated list of statistics subsets that should be reported (default: all); possible values: monitor, queue, processor, user, all.')
Expand Down
8 changes: 4 additions & 4 deletions pvapy/cli/hpcConsumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ def createProcessorConfig(self, consumerId, args):
processorConfig['inputChannel'] = inputChannel
if not 'processorId' in processorConfig:
processorConfig['processorId'] = consumerId
if not 'processFirstUpdate' in processorConfig:
processorConfig['processFirstUpdate'] = args.process_first_update
if not 'skipInitialUpdates' in processorConfig:
processorConfig['skipInitialUpdates'] = args.skip_initial_updates
if not 'objectIdField' in processorConfig:
processorConfig['objectIdField'] = args.oid_field
if not 'objectIdOffset' in processorConfig:
Expand Down Expand Up @@ -543,7 +543,7 @@ def main():
parser.add_argument('-ipt', '--input-provider-type', dest='input_provider_type', default='pva', help='Input PV channel provider type, it must be either "pva" or "ca" (default: pva).')
parser.add_argument('-oc', '--output-channel', dest='output_channel', default=None, help='Output PVA channel name (default: None). If specified, this channel can be used for publishing processing results. The value of "_" indicates that the output channel name will be set to "pvapy:consumer:<consumerId>:output", while the "*" character will be replaced with <consumerId>. Note that this parameter is ignored if processor arguments dictionary contains "outputChannel" key.')
parser.add_argument('-sc', '--status-channel', dest='status_channel', default=None, help='Status PVA channel name (default: None). If specified, this channel will provide consumer status. The value of "_" indicates that the status channel name will be set to "pvapy:consumer:<consumerId>:status", while the "*" character will be replaced with <consumerId>.')
parser.add_argument('-cc', '--control-channel', dest='control_channel', default=None, help='Control channel name (default: None). If specified, this channel can be used to control consumer configuration and processing. The value of "_" indicates that the control channel name will be set to "pvapy:consumer:<consumerId>:control", while the "*" character will be replaced with <consumerId>. The control channel object has two strings: command and args. The only allowed values for the command string are: "configure", "reset_stats", "get_stats" and "stop". The configure command is used to allow for runtime configuration changes; in this case the keyword arguments string should be in json format to allow data consumer to convert it into python dictionary that contains new configuration. For example, sending configuration dictionary via pvput command might look like this: pvput input_channel:consumer:2:control \'{"command" : "configure", "args" : "{\\"x\\":100}"}\'. Note that system parameters that can be modified at runtime are the following: "monitorQueueSize" (only if client monitor queue has been configured at the start), "processFirstUpdate" (affects processing behavior after resetting stats), and "objectIdOffset" (may be used to adjust offset if consumers have been added or removed from processing). The reset_stats command will cause consumer to reset its statistics data, the get_stats will force statistics data update, and the stop command will result in consumer process exiting; for all of these commands args string is not needed.')
parser.add_argument('-cc', '--control-channel', dest='control_channel', default=None, help='Control channel name (default: None). If specified, this channel can be used to control consumer configuration and processing. The value of "_" indicates that the control channel name will be set to "pvapy:consumer:<consumerId>:control", while the "*" character will be replaced with <consumerId>. The control channel object has two strings: command and args. The only allowed values for the command string are: "configure", "reset_stats", "get_stats" and "stop". The configure command is used to allow for runtime configuration changes; in this case the keyword arguments string should be in json format to allow data consumer to convert it into python dictionary that contains new configuration. For example, sending configuration dictionary via pvput command might look like this: pvput input_channel:consumer:2:control \'{"command" : "configure", "args" : "{\\"x\\":100}"}\'. Note that system parameters that can be modified at runtime are the following: "monitorQueueSize" (only if client monitor queue has been configured at the start), "skipInitialUpdates" (affects processing behavior after resetting stats), and "objectIdOffset" (may be used to adjust offset if consumers have been added or removed from processing). The reset_stats command will cause consumer to reset its statistics data, the get_stats will force statistics data update, and the stop command will result in consumer process exiting; for all of these commands args string is not needed.')
parser.add_argument('-sqs', '--server-queue-size', type=int, dest='server_queue_size', default=0, help='Server queue size (default: 0); this setting will increase memory usage on the server side, but may help prevent missed PV updates.')
parser.add_argument('-mqs', '--monitor-queue-size', type=int, dest='monitor_queue_size', default=-1, help='PVA channel monitor (client) queue size (default: -1); if < 0, PV updates will be processed immediately without copying them into PvObjectQueue; if >= 0, PvObjectQueue will be used for receving PV updates (value of zero indicates infinite queue size).')
parser.add_argument('-pf', '--processor-file', dest='processor_file', default=None, help='Full path to the python file containing user processor class. If this option is not used, the processor class should be specified using "<modulePath>.<className>" notation.')
Expand All @@ -552,7 +552,7 @@ def main():
parser.add_argument('-of', '--oid-field', dest='oid_field', default='uniqueId', help='PV update id field used for calculating data processor statistics (default: uniqueId). This parameter is ignored if processor argumentss dictionary contains "objectIdField" key.')
parser.add_argument('-oo', '--oid-offset', type=int, dest='oid_offset', default=0, help='This parameter determines by how much object id should change between the two PV updates, and is used for determining the number of missed PV updates (default: 0). This parameter is ignored if processor arguments dictionary contains "objectIdOffset" key, and should be modified only if data distributor plugin will be distributing data between multiple clients, in which case it should be set to "(<nConsumers>-1)*<nUpdates>+1" for a single client set, or to "(<nSets>-1)*<nUpdates>+1" for multiple client sets. Values <= 0 will be replaced with the appropriate value depending on the number of client sets specified. Note that this relies on using the same value for the --n-distributor-sets when multiple instances of this command are running separately.')
parser.add_argument('-fr', '--field-request', dest='field_request', default='', help='PV field request string (default: None). This parameter can be used to request only a subset of the data available in the input channel. The system will automatically append object id field to the specified request string. Note that this parameter is ignored when data distributor is used.')
parser.add_argument('-pfu', '--process-first-update', dest='process_first_update', default=False, action='store_true', help='Process first PV update (default: False). This parameter is ignored if processor arguments dictionary contains "processFirstUpdate" key.')
parser.add_argument('-siu', '--skip-initial-updates', type=int, dest='skip_initial_updates', default=1, help='Number of initial PV updates that should not be processed (default: 1). This parameter is ignored if processor arguments dictionary contains "skipInitialUpdates" key.')
parser.add_argument('-dpn', '--distributor-plugin-name', dest='distributor_plugin_name', default='pydistributor', help='Distributor plugin name (default: pydistributor).')
parser.add_argument('-dg', '--distributor-group', dest='distributor_group', default=None, help='Distributor client group that application belongs to (default: None). This parameter should be used only if data distributor plugin will be distributing data between multiple clients. Note that different distributor groups are completely independent of each other.')
parser.add_argument('-ds', '--distributor-set', dest='distributor_set', default=None, help='Distributor client set that application belongs to within its group (default: None). This parameter should be used only if data distributor plugin will be distributing data between multiple clients. Note that all clients belonging to the same set receive the same PV updates. If set id is not specified (i.e., if a group does not have multiple sets of clients), a PV update will be distributed to only one client.')
Expand Down
14 changes: 8 additions & 6 deletions pvapy/hpc/dataProcessingController.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(self, configDict={}, userDataProcessor=None):
# Assume NTND Arrays if object id field is not passed in
self.objectIdField = configDict.get('objectIdField', 'uniqueId')
# Do not process first object by default
self.processFirstUpdate = configDict.get('processFirstUpdate', False)
self.skipInitialUpdates = configDict.get('skipInitialUpdates', 1)
# Object id processing offset used for statistics calculation
self.objectIdOffset = int(configDict.get('objectIdOffset', 1))
# Output channel is used for publishing processed objects
Expand Down Expand Up @@ -81,9 +81,9 @@ def stop(self):

def configure(self, configDict):
if type(configDict) == dict:
if 'processFirstUpdate' in configDict:
self.processFirstUpdate = configDict.get('processFirstUpdate')
self.logger.debug(f'Resetting processing of first update to {self.processFirstUpdate}')
if 'skipInitialUpdates' in configDict:
self.skipInitialUpdates = int(configDict.get('skipInitialUpdates'))
self.logger.debug(f'Resetting processing of first update to {self.skipInitialUpdates}')
if 'objectIdOffset' in configDict:
self.objectIdOffset = int(configDict.get('objectIdOffset', 1))
self.logger.debug(f'Resetting object id offset to {self.objectIdOffset}')
Expand All @@ -97,8 +97,10 @@ def process(self, pvObject):
if self.lastObjectId is None:
self.lastObjectId = objectId
self.createOutputChannel(pvObject)
if not self.processFirstUpdate:
return None
if self.skipInitialUpdates > 0:
self.skipInitialUpdates -= 1
self.logger.debug(f'Skipping initial update, {self.skipInitialUpdates} remain to be skipped')
return None
if self.firstObjectId is None:
self.firstObjectId = objectId
self.firstObjectTime = now
Expand Down
20 changes: 13 additions & 7 deletions tools/conda/pvapy-conda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package:
version: {{ environ.get('PVAPY_VERSION', '0.0') }}

source:
#git_rev: {{ environ.get('PVAPY_VERSION', '0.0') }}
#git_url: https://github.com/epics-base/pvaPy
path: ../../..
git_rev: {{ environ.get('PVAPY_VERSION', '0.0') }}
git_url: https://github.com/epics-base/pvaPy
#path: ../../..

build:
number: {{ environ.get('BUILD_NUMBER', '1') }}
Expand All @@ -29,17 +29,23 @@ requirements:
- python
- sphinx
- sphinx_rtd_theme
#- numpy>=1.22
- numpy>=1.20,<1.21
- numpy>=1.22
#- numpy>=1.19,<1.21
- epics-base={{ environ.get('EPICS_BASE_VERSION', '0.0') }}
- pvapy-boost={{ environ.get('BOOST_VERSION', '0.0') }}
- pillow
- pycryptodome
- rsa

run:
- python
#- numpy>=1.22
- numpy>=1.20,<1.21
- numpy>=1.22
#- numpy>=1.19,<1.21
- epics-base={{ environ.get('EPICS_BASE_VERSION', '0.0') }}
- pvapy-boost={{ environ.get('BOOST_VERSION', '0.0') }}
- pillow
- pycryptodome
- rsa

about:
home: https://github.com/epics-base/pvaPy
Expand Down
3 changes: 3 additions & 0 deletions tools/pip/pvapy-pip/requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
numpy>=1.22
pillow
pycryptodome
rsa
3 changes: 3 additions & 0 deletions tools/pip/pvapy-pip/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ def build_extension(self, ext):
},
install_requires=[
'numpy>=1.22',
'pillow',
'pycryptodome',
'rsa'
],
entry_points = {
'console_scripts': [
Expand Down

0 comments on commit 3b99a26

Please sign in to comment.