# flow api example

The sarracenia.flow class provides built in accept/reject filtering for messages, supports built-in downloading in several protocols, retries on failure, and allows the creation of callbacks, to customize processing.

You need to provide a configuration as an argument when instantiating a subscriber.
the _sarracenia.config.no_file_config()_ returns an empty configuration without consulting
any of the sr3 configuration file tree.

After adding the modifications needed to the configuration, the subscriber is then initiated and run.

In [1]:
!mkdir /tmp/flow_demo

mkdir: cannot create directory ‘/tmp/flow_demo’: File exists


make a directory for the files you are going to download.
the root of the directory tree to must exist.

In [1]:
import re
import sarracenia.config
from sarracenia.flow.subscribe import Subscribe
import sarracenia.flowcb

from urllib.parse import urlparse

cfg = sarracenia.config.no_file_config()

cfg.broker = urlparse('amqps://anonymous:anonymous@hpfx.collab.science.gc.ca')
cfg.bindings = [ ('v02.post', 'xpublic', '*.WXO-DD.observations.swob-ml.#') ]
cfg.queue_name='q_anonymous.subscriber_test2'
cfg.download=True
cfg.batch=1
cfg.message_count_max=5

pattern=".*"
cfg.masks= [ ( pattern, "/tmp/flow_demo", None, re.compile(pattern), True, False, False, False, '/' ) ]




## starters.
the broker, bindings, and queue_name settings are explained in the moth notebook.

## cfg.download

Whether you want the flow to download the files corresponding to the messages.
If true, then it will download the files.

## cfg.batch

Messages are processed in batches. The number of messages to retrieve per call to newMessages()
is limited by the _batch_ setting.  We set it to 1 here so you can see each file being downloaded immediately when the corresponding message is downloaded.  you can leave this blank, and it defaults to 25. Settings are matter of taste and use case.

## cfg.message_count_max

Normally we just leave this setting at it's default (0) which has no effect on processing.
for demonstration purposes, we limit the number of messages the subscriber will process with this setting.
after _message_count_max_ messages have been received, stop processing.


## cfg.masks
masks are a compiled form of accept/reject directives.  a relPath is compared to the regex in the mask.
If the regex matches, and accept is true, then the message is accepted for further processing.
If the regex matches, but accept is False, then processing of the message is stopped (the message is rejected.)

masks are a tuple. the meaning can be looked up in the sr3(1) man page.

*  pattern_string,      the input regular expression string, to be compiled by re routines.
*  directory,           where to put the files downloaded (root of the tree, when mirroring)
*  fn,                  transformation of filename to do. None is the 99% use case.
*  regex,               compiled regex version of the pattern_string
*  accept(True/False),  if pattern matches then accept message for further processing.
*  mirror(True/False),  when downloading build a complete tree to mirror the source, or just dump in directory
*  strip(True/False),   modify the relpath by stripping entries from the left.
*  pstrip(True/False),  strip entries based on patterm
*  flatten(char ... '/' means do not flatten.) )



In [2]:
subscriber = sarracenia.flow.subscribe.Subscribe( cfg )

subscriber.run()

2021-02-16 18:39:35,059 [INFO] sarracenia.flow loadCallbacks imports to load: []
2021-02-16 18:39:35,061 [DEBUG] sarracenia.flowcb.retry __init__ sr_retry __init__
2021-02-16 18:39:35,061 [DEBUG] sarracenia.flowcb.retry __init__ logLevel=info
2021-02-16 18:39:35,160 [DEBUG] amqp _on_start Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@hpfx2.collab.science.gc.ca', 'copyright': 'Copyright (C) 2007-2019 Pivotal Software, Inc.', 'information': 'Licensed under the MPL.  See http://www.rabbitmq.com/', 'platform': 'Erlang/OTP 21.3', 'product': 'RabbitMQ', 'version': '3.7.13'}, mechanisms: [b'PLAIN', b'AMQPLAIN'], locales: ['en_US']
2021-02-16 18:39:35,192 [DEBUG] amqp __init__ using channel_id: 

2021-02-16 18:39:35,404 [DEBUG] sarracenia.transfer.https get sr_http get 2021-02-16-2334-CMJN-AUTO-minute-swob.xml 2021-02-16-2334-CMJN-AUTO-minute-swob.xml 0
2021-02-16 18:39:35,404 [DEBUG] sarracenia.transfer.https get sr_http self.path //20210216/WXO-DD/observations/swob-ml/20210216/CMJN
2021-02-16 18:39:35,404 [DEBUG] sarracenia.transfer.https __open__ sr_http open


_Config__admin=None, _Config__broker=amqps://anonymous@hpfx.collab.science.gc.ca, _Config__post_broker=None, accel_threshold=0, accept_unmatch=True,
accept_unmatched=False, attempts=3, auto_delete=False, baseDir=None, batch=1, bind=True, bindings=[('v02.post', 'xpublic', '*.WXO-DD.observations.swob-ml.#')],
bufsize=1048576, bytes_per_second=None, bytes_ps=0, cfg_run_dir='.', chmod=0, chmod_dir=509, chmod_log=384, currentDir=None, debug=False, declare=True,
declared_exchanges=[], declared_users={}, delete=False, destfn_script=None, directory=None, documentRoot=None, download=True, durable=True, exchange=None, expire=300,
filename=None, fixed_headers={}, flatten='/', hostdir='fractal', hostname='fractal', housekeeping=30, imports=[], inflight=None, inline=False, inline_encoding='guess',
inline_max=4096, instances=1, logFormat='%(asctime)s [%(levelname)s] %(name)s %(funcName)s %(message)s', logLevel='info', log_reject=False, lr_backupCount=5, lr_interval=1,
lr_when='midnight', masks=[('.*

2021-02-16 18:39:35,495 [DEBUG] sarracenia.transfer read_write sr_proto read_write
2021-02-16 18:39:35,498 [INFO] sarracenia.flow do_download downloaded ok: /tmp/flow_demo/2021-02-16-2334-CMJN-AUTO-minute-swob.xml
2021-02-16 18:39:35,498 [INFO] sarracenia.flow.subscribe do processing 1 messages worked!
2021-02-16 18:39:35,499 [DEBUG] sarracenia.flow run worked too long to sleep!
2021-02-16 18:39:35,516 [DEBUG] sarracenia.moth.amqp getNewMessage new msg: {'sundew_extension': 'DMS:WXO_RENAMED_SWOB2:MSC:XML::20210216233442', 'from_cluster': 'DDSR.CMC', 'to_clusters': 'DDSR.CMC,DDI.CMC,CMC,SCIENCE,EDM', 'filename': 'msg_ddsr-WXO-DD_9c94b86a65155b4fdfc07feb0f0be3dd:DMS:WXO_RENAMED_SWOB2:MSC:XML::20210216233442', 'source': 'WXO-DD', 'mtime': '20210216T233444.462', 'atime': '20210216T233444.462', 'topic': 'v02.post.20210216.WXO-DD.observations.swob-ml.20210216.CEQI', '_deleteOnPost': ['delivery_tag', 'exchange', 'local_offset', 'topic'], 'pubTime': '20210216T233444.462', 'baseUrl': 'https://h

2021-02-16 18:39:35,745 [DEBUG] sarracenia.flow download Beginning fetch of https://hpfx.collab.science.gc.ca//20210216/WXO-DD/observations/swob-ml/20210216/CWUT/2021-02-16-2334-CWUT-AUTO-minute-swob.xml  into 2021-02-16-2334-CWUT-AUTO-minute-swob.xml 0-9850
2021-02-16 18:39:35,745 [DEBUG] sarracenia.transfer set_sumalgo sr_proto set_sumalgo md5
2021-02-16 18:39:35,745 [DEBUG] sarracenia.flow download hasattr=True, thresh=0, len=9851, remote_off=0, local_off=0 
2021-02-16 18:39:35,745 [DEBUG] sarracenia.transfer.https get sr_http get 2021-02-16-2334-CWUT-AUTO-minute-swob.xml 2021-02-16-2334-CWUT-AUTO-minute-swob.xml 0
2021-02-16 18:39:35,746 [DEBUG] sarracenia.transfer.https get sr_http self.path //20210216/WXO-DD/observations/swob-ml/20210216/CWUT
2021-02-16 18:39:35,746 [DEBUG] sarracenia.transfer.https __open__ sr_http open
2021-02-16 18:39:35,841 [DEBUG] sarracenia.transfer read_write sr_proto read_write
2021-02-16 18:39:35,844 [INFO] sarracenia.flow do_download downloaded ok: /tmp

# Conclusion:

With the sarracenia.flow class, an async method of operation is supported, it can be customized using flowcb (flow callback) class to introduce specific processing at specific times. It is just like invocation of a single instance from the command line, except all configuration is done within python by setting cfg fields, rather than using the configuration language.

To use the full configuration language (slightly simpler than assigning values to the cfg object) and to run multiple instances, and have co-ordinated monitoring of the instances (restarts on failure, and a programmable number of subscribers started per configuration.) One should look to using the command line interface (sr3)
with the configuration file tree.
