Creates a new API instance for interacting with the platform. :
>>> api = API(creds)
Options labelled as
DEFAULT
will use the options set in the :pydefaults
configuration object.An :py
API
is comprised of services for querying the different domains on the platform:
flow
drop
track
group
identity
api_task
mqtt_task
rss_task
token
share
device
statistics
websocket
For documentation on these services, read
services
,authentication
,statistics
,aggregation
, andwebsockets
.
All :pyAPI
service requests return plain dictionaries of the request body. They may throw exceptions <exceptions>
in case of an error.
Note
The drop
service is slightly different in that it must first be parameterized by the Flow id.
>>> api.drop('<flow_id>').find(limit=10)
Service methods <services>
take additional keyword arguments that act as query parameters on the requests. These are not fixed in any way, so please refer to the platform documentation for the options.
Note
When a request is made with the refs
parameter set to True
, the return type becomes a tuple rather than a single dictionary:
>>> resp, refs = api.flow.find('<flow_id>', refs=True)
:pyService find methods <service.find_many>
understand a query DSL that lets you express filters using Python operations instead of manually splicing strings together. :
>>> api.flow.find(mem.displayName == 'foo', mem.path.re('^/foo', 'i'))
Additional platform filter operations are supported:
Boolean operations are supported on filters using AND
and OR
.:
>>> api.flow.find((mem.displayName == 'foo').OR(mem.displayName == 'bar'))
If you create your :pyAPI
using a master token, you can create and manage tokens and shares.
Both tokens and shares support find
and delete
methods like other services. They are, however, immutable and do not support updates.
Both filter
and rules
support filters
.:
>>> api.drop(flow_id).aggregate(['$avg:test'], rules={'test': mem.foo > 42})
:pyService update methods <service.update>
can also take an instance of a modification helper called :pyM
. It lets you gradually make updates to a model and then extract the diff and model with the changes applied.
When passed directly to an update method, only the changes will be sent to the server instead of the entire model.
Two workflows are supported for making asynchronous and parallel requests.
The :pyAPI.async
workflow is an imperative API where requests are queued internally. Once you've made all the requests you need, you can invoke the results()
method to wait. This can be useful when making large batches of similar requests:
paths = [...]
async_api = api.async()
for path in paths
async_api.flow.find(mem.path == path)
for flows in async_api.results():
# Do something with the flows
pass
If some of your requests might fail, and you want to know which ones, you may set the with_exceptions
keyword argument:
flows = [...]
async_api = api.async()
for flow in flows:
async_api.drop(flow['id']).find(limit=10)
for e, drops in async_api.results(with_exceptions=True):
if e:
# Do something if there was an error
pass
else:
# Do something with the drops
pass
The :pyAPI.lazy
worklow is useful when building complex compositions of dependent requests which can benefit from implicit parallelization. All requests are executed in parallel, but wait when you try to read the data. This works by requests returning a GreenThunk
, which is a MutableMapping
around a green thread. This object acts just like a regular dictionary or list, but waits on the green thread before performing any look-ups or mutations. :
lazy_api = api.lazy()
flow_a = lazy_api.flow.find(mem.path == '/path/to/flow_a')
flow_b = lazy_api.flow.find(mem.path == '/path/to/flow_b')
drops = lazy_api.drop(flow_a[0]['id']).find(limit=10)
In this example, the two requests for Flows are performed in parallel, while the requests for drops waits for the flow_a
request to complete first.
You can retrieve the pure data of a GreenThunk
by invoking its unwrap()
method.
Note
It is assumed the user has done the necessary green thread monkey-patching for their chosen library before importing the flowthings
package.
WebSockets are supported using the websocket-client
package. Here is a short example:
def on_open(ws):
ws.subscribe('<flow_id>')
def on_message(ws, resource, data):
print 'Got message:', resource, data
def on_close(ws):
print 'Closed'
def on_error(ws, e):
print 'Error:', e
ws = api.websocket.connect(on_open=on_open,
on_message=on_message,
on_close=on_close,
on_error=on_error)
ws.run()
from flowthings import API, Token, mem
creds = Token('<account_name>', '<token_string>')
api = API(creds)
# Get a Flow by id
api.flow.find('<flow_id>')
# Get a Flow by path
api.flow.find(mem.path == '<flow_path>')
# Get 10 recent Flows, with references
flows, refs = api.flow.find(limit=10, refs=True)
# Create a flow
api.flow.create({ 'path': '<flow_path' })
# Delete a flow
api.flow.delete('<flow_id>')
# Get drops in a flow
api.drop('<flow_id>').find()
# Filter drops in a flow
api.drop('<flow_id>').find(mem.elems.foo == 'value')