aerospike
The Query object created by calling aerospike.Client.query
is used for executing queries over a secondary index of a specified set (which can be omitted or :pyNone
). For queries, the :pyNone
set contains those records which are not part of any named set.
The Query can (optionally) be assigned one of the ~aerospike.predicates
(~aerospike.predicates.between
or ~aerospike.predicates.equals
) using where
. A query without a predicate will match all the records in the given set, similar to a ~aerospike.Scan
.
The query is invoked using either foreach
or results
. The bins returned can be filtered by using select
.
Finally, a stream UDF may be applied with apply
. It will aggregate results out of the records streaming back from the query.
Queries and Managing Queries.
select(bin1[, bin2[, bin3..]])
Set a filter on the record bins resulting from results
or foreach
. If a selected bin does not exist in a record it will not appear in the bins portion of that record tuple.
where(predicate)
Set a where predicate for the query, without which the query will behave similar to aerospike.Scan
. The predicate is produced by one of the aerospike.predicates
methods ~aerospike.predicates.equals
and ~aerospike.predicates.between
.
- param tuple predicate
the :py
tuple
produced by one of theaerospike.predicates
methods.
Note
Currently, you can assign at most one predicate to the query.
results([,policy [, options]]) -> list of (key, meta, bins)
Buffer the records resulting from the query, and return them as a list
of records.
- param dict policy
optional
aerospike_query_policies
.- param dict options
optional
aerospike_query_options
.- return
a
list
ofaerospike_record_tuple
.
import aerospike
from aerospike import predicates as p
import pprint
config = { 'hosts': [ ('127.0.0.1', 3000)]}
client = aerospike.client(config).connect()
pp = pprint.PrettyPrinter(indent=2)
query = client.query('test', 'demo')
query.select('name', 'age') # matched records return with the values of these bins
# assuming there is a secondary index on the 'age' bin of test.demo
query.where(p.equals('age', 40))
records = query.results( {'total_timeout':2000})
pp.pprint(records)
client.close()
Note
Queries require a secondary index to exist on the bin being queried.
foreach(callback[, policy [, options]])
Invoke the callback function for each of the records streaming back from the query.
- param callable callback
the function to invoke for each record.
- param dict policy
optional
aerospike_query_policies
.- param dict options
optional
aerospike_query_options
.
Note
A aerospike_record_tuple
is passed as the argument to the callback function.
import aerospike
from aerospike import predicates as p
import pprint
config = { 'hosts': [ ('127.0.0.1', 3000)]}
client = aerospike.client(config).connect()
pp = pprint.PrettyPrinter(indent=2)
query = client.query('test', 'demo')
query.select('name', 'age') # matched records return with the values of these bins
# assuming there is a secondary index on the 'age' bin of test.demo
query.where(p.between('age', 20, 30))
names = []
def matched_names((key, metadata, bins)):
pp.pprint(bins)
names.append(bins['name'])
query.foreach(matched_names, {'total_timeout':2000})
pp.pprint(names)
client.close()
Note
To stop the stream return False
from the callback function.
from __future__ import print_function
import aerospike
from aerospike import predicates as p
config = { 'hosts': [ ('127.0.0.1',3000)]}
client = aerospike.client(config).connect()
def limit(lim, result):
c = [0] # integers are immutable so a list (mutable) is used for the counter
def key_add((key, metadata, bins)):
if c[0] < lim:
result.append(key)
c[0] = c[0] + 1
else:
return False
return key_add
query = client.query('test','user')
query.where(p.between('age', 20, 30))
keys = []
query.foreach(limit(100, keys))
print(len(keys)) # this will be 100 if the number of matching records > 100
client.close()
apply(module, function[, arguments])
Aggregate the results
using a stream UDF. If no predicate is attached to the ~aerospike.Query
the stream UDF will aggregate over all the records in the specified set.
- param str module
the name of the Lua module.
- param str function
the name of the Lua function within the module.
- param list arguments
optional arguments to pass to the function.
- return
one of the supported types,
int
,str
,float
(double),list
,dict
(map),bytearray
(bytes).
Note
Assume we registered the following Lua module with the cluster as stream_udf.lua using aerospike.Client.udf_put
.
local function having_ge_threshold(bin_having, ge_threshold)
return function(rec)
debug("group_count::thresh_filter: %s > %s ?", tostring(rec[bin_having]), tostring(ge_threshold))
if rec[bin_having] < ge_threshold then
return false
end
return true
end
end
local function count(group_by_bin)
return function(group, rec)
if rec[group_by_bin] then
local bin_name = rec[group_by_bin]
group[bin_name] = (group[bin_name] or 0) + 1
debug("group_count::count: bin %s has value %s which has the count of %s", tostring(bin_name), tostring(group[bin_name]))
end
return group
end
end
local function add_values(val1, val2)
return val1 + val2
end
local function reduce_groups(a, b)
return map.merge(a, b, add_values)
end
function group_count(stream, group_by_bin, bin_having, ge_threshold)
if bin_having and ge_threshold then
local myfilter = having_ge_threshold(bin_having, ge_threshold)
return stream : filter(myfilter) : aggregate(map{}, count(group_by_bin)) : reduce(reduce_groups)
else
return stream : aggregate(map{}, count(group_by_bin)) : reduce(reduce_groups)
end
end
Find the first name distribution of users in their twenties using a query aggregation:
import aerospike
from aerospike import predicates as p
import pprint
config = {'hosts': [('127.0.0.1', 3000)],
'lua': {'system_path':'/usr/local/aerospike/lua/',
'user_path':'/usr/local/aerospike/usr-lua/'}}
client = aerospike.client(config).connect()
pp = pprint.PrettyPrinter(indent=2)
query = client.query('test', 'users')
query.where(p.between('age', 20, 29))
query.apply('stream_udf', 'group_count', [ 'first_name' ])
names = query.results()
# we expect a dict (map) whose keys are names, each with a count value
pp.pprint(names)
client.close()
With stream UDFs, the final reduce steps (which ties the results from the reducers of the cluster nodes) executes on the client-side. Explicitly setting the Lua user_path
in the config helps the client find the local copy of the module containing the stream UDF. The system_path
is constructed when the Python package is installed, and contains system modules such as aerospike.lua
, as.lua
, and stream_ops.lua
. The default value for the Lua system_path
is /usr/local/aerospike/lua
.
predexp(predicates)
Set the predicate expression filters to be used by this query.
- param predicates
list A list of predicates generated by the
aerospike.predexp
functions
import aerospike
from aerospike import predexp as predexp
query = client.query('test', 'demo')
predexps = [
predexp.rec_device_size(),
predexp.integer_value(65 * 1024),
predexp.integer_greater()
]
query.predexp(predexps)
big_records = query.results()
client.close()
execute_background([, policy])
Execute a record UDF on records found by the query in the background. This method returns before the query has completed. A UDF must have been added to the query with Query.apply
.
- param dict policy
optional
aerospike_write_policies
.- return
a job ID that can be used with
aerospike.Client.job_info
to track the status of theaerospike.JOB_QUERY
, as it runs in the background.
import aerospike
query = client.query('test', 'demo')
query.apply('myudfs', 'myfunction', ['a', 1])
# This id can be used to monitor the progress of the background query
query_id = query.execute_background()
policy
A dict
of optional query policies which are applicable to Query.results
and Query.foreach
. See aerospike_policies
.
- max_retries
- An
int
. Maximum number of retries before aborting the current transaction. The initial attempt is not counted as a retry.
If max_retries is exceeded, the transaction will return errorAEROSPIKE_ERR_TIMEOUT
.
WARNING: Database writes that are not idempotent (such as "add") should not be retried because the write operation may be performed multiple times
if the client timed out previous transaction attempts. It's important to use a distinct write policy for non-idempotent writes which sets max_retries = 0;
Default:0
- sleep_between_retries
- An
int
. Milliseconds to sleep between retries. Enter zero to skip sleep. Default:0
- socket_timeout
- An
int
. Socket idle timeout in milliseconds when processing a database command.
If socket_timeout is not zero and the socket has been idle for at least socket_timeout, both max_retries and total_timeout are checked. If max_retries and total_timeout are not exceeded, the transaction is retried.
If bothsocket_timeout
andtotal_timeout
are non-zero andsocket_timeout
>total_timeout
, thensocket_timeout
will be set tototal_timeout
. Ifsocket_timeout
is zero, there will be no socket idle limit.
Default:30000
.
- total_timeout
- An
int
. Total transaction timeout in milliseconds.
The total_timeout is tracked on the client and sent to the server along with the transaction in the wire protocol. The client will most likely timeout first, but the server also has the capability to timeout the transaction.
Iftotal_timeout
is not zero andtotal_timeout
is reached before the transaction completes, the transaction will return errorAEROSPIKE_ERR_TIMEOUT
. Iftotal_timeout
is zero, there will be no total time limit.
Default:0
- deserialize
bool
Should raw bytes representing a list or map be deserialized to a list or dictionary.
Set to False for backup programs that just need access to raw bytes.
Default:True
- fail_on_cluster_change
bool
Terminate query if cluster is in migration state. DefaultFalse
options
A dict
of optional scan options which are applicable to Query.foreach
and Query.results
.
- nobins
bool
whether to return the bins portion of theaerospike_record_tuple
. DefaultFalse
.
3.0.0