Skip to content

Latest commit

 

History

History
executable file
·
614 lines (445 loc) · 23.2 KB

scan.rst

File metadata and controls

executable file
·
614 lines (445 loc) · 23.2 KB
.. currentmodule:: aerospike

.. deprecated:: 7.0.0 :class:`aerospike.Query` should be used instead.

Overview

The Scan object is used to return all the records in a specified set (which can be omitted or :py:obj:`None`). A Scan with a :py:obj:`None` set returns all the records in the namespace.

The scan is invoked using :meth:`foreach`, :meth:`results`, or :meth:`execute_background`. The bins returned can be filtered using :meth:`select`.

.. seealso::
    `Scans <http://www.aerospike.com/docs/guide/scan.html>`_ and \
    `Managing Scans <http://www.aerospike.com/docs/operations/manage/scans/>`_.

Fields

ttl (:class:`int`)

The time-to-live (expiration) of the record in seconds. Note that ttl is only used on background scan writes.

If this is set to :data:`aerospike.TTL_CLIENT_DEFAULT`, the scan will use the client's default scan policy ttl.

See :ref:`TTL_CONSTANTS` for special values that can be set in the record ttl.

Default: 0 (no limit)

Note

Requires server version >= 6.0.0

Methods

.. deprecated:: 7.0.0 :class:`aerospike.Query` should be used instead.

.. method:: select(bin1[, bin2[, bin3..]])

    Set a filter on the record bins resulting from :meth:`results` or \
    :meth:`foreach`. If a selected bin does not exist in a record it will \
    not appear in the *bins* portion of that record tuple.


.. method:: apply(module, function[, arguments])

    Apply a record UDF to each record found by the scan \
    `UDF <http://www.aerospike.com/docs/guide/udf.html>`_.

    :param str module: the name of the Lua module.
    :param str function: the name of the Lua function within the *module*.
    :param list arguments: optional arguments to pass to the *function*. NOTE: these arguments must be types supported by Aerospike See: `supported data types <https://docs.aerospike.com/server/guide/data-types/overview>`_.
        If you need to use an unsupported type, (e.g. set or tuple) you must use your own serializer.
    :return: one of the supported types, :class:`int`, :class:`str`, :class:`float` (double), :class:`list`, :class:`dict` (map), :class:`bytearray` (bytes), :class:`bool`.

    .. seealso:: `Developing Record UDFs <https://developer.aerospike.com/udf/developing_record_udfs>`_


.. method:: add_ops(ops)

    Add a list of write ops to the scan.
    When used with :meth:`Scan.execute_background` the scan will perform the write ops on any records found.
    If no predicate is attached to the scan it will apply ops to all the records in the specified set. See :mod:`aerospike_helpers` for available ops.

    :param ops: `list` A list of write operations generated by the aerospike_helpers e.g. list_operations, map_operations, etc.

    .. note::
        Requires server version >= 4.7.0.

    .. code-block:: python

        import aerospike
        from aerospike_helpers.operations import list_operations
        from aerospike_helpers.operations import operations
        scan = client.scan('test', 'demo')

        ops =  [
            operations.append(test_bin, 'val_to_append'),
            list_operations.list_remove_by_index(test_bin, list_index_to_remove, aerospike.LIST_RETURN_NONE)
        ]
        scan.add_ops(ops)

        id = scan.execute_background()
        client.close()

    For a more comprehensive example, see using a list of write ops with :meth:`Query.execute_background` .


.. method:: results([policy[, nodename]]) -> list of (key, meta, bins)

    Buffer the records resulting from the scan, and return them as a \
    :class:`list` of records.

    :param dict policy: optional :ref:`aerospike_scan_policies`.
    :param str nodename: optional Node ID of node used to limit the scan to a single node.

    :return: a :class:`list` of :ref:`aerospike_record_tuple`.


    .. code-block:: python

        import aerospike
        import pprint

        pp = pprint.PrettyPrinter(indent=2)
        config = { 'hosts': [ ('127.0.0.1',3000)]}
        client = aerospike.client(config)

        client.put(('test','test','key1'), {'id':1,'a':1},
            policy={'key':aerospike.POLICY_KEY_SEND})
        client.put(('test','test','key2'), {'id':2,'b':2},
            policy={'key':aerospike.POLICY_KEY_SEND})

        scan = client.scan('test', 'test')
        scan.select('id','a','zzz')
        res = scan.results()
        pp.pprint(res)
        client.close()

    .. note::

        We expect to see:

        .. code-block:: python

            [ ( ( 'test',
                  'test',
                  u'key2',
                  bytearray(b'\xb2\x18\n\xd4\xce\xd8\xba:\x96s\xf5\x9ba\xf1j\xa7t\xeem\x01')),
                { 'gen': 52, 'ttl': 2592000},
                { 'id': 2}),
              ( ( 'test',
                  'test',
                  u'key1',
                  bytearray(b'\x1cJ\xce\xa7\xd4Vj\xef+\xdf@W\xa5\xd8o\x8d:\xc9\xf4\xde')),
                { 'gen': 52, 'ttl': 2592000},
                { 'a': 1, 'id': 1})]

    .. note:: As of client 7.0.0 and with server >= 6.0 results and the scan policy
     "partition_filter" see :ref:`aerospike_partition_objects` can be used to specify which partitions/records
     results will scan. See the example below.

     .. code-block:: python

        # This is an example of scaning partitions 1000 - 1003.
        import aerospike


        scan = client.scan("test", "demo")

        policy = {
            "partition_filter": {
                "begin": 1000,
                "count": 4
            },
        }

        # NOTE that these will only be non 0 if there are records in partitions 1000 - 1003
        # results will be the records in partitions 1000 - 1003
        results = scan.results(policy=policy)



.. method:: foreach(callback[, policy[, options[, nodename]]])

    Invoke the *callback* function for each of the records streaming back \
    from the scan.

    :param callable callback: the function to invoke for each record.
    :param dict policy: optional :ref:`aerospike_scan_policies`.
    :param dict options: the :ref:`aerospike_scan_options` that will apply to the scan.
    :param str nodename: optional Node ID of node used to limit the scan to a single node.

    .. note::
        A :ref:`aerospike_record_tuple` is passed as the argument to the callback function.
        If the scan is using the "partition_filter" scan policy the callback will receive two arguments
        The first is a :class:`int` representing partition id, the second is the same :ref:`aerospike_record_tuple`
        as a normal callback.

    .. code-block:: python

        import aerospike
        import pprint

        pp = pprint.PrettyPrinter(indent=2)
        config = { 'hosts': [ ('127.0.0.1',3000)]}
        client = aerospike.client(config)

        client.put(('test','test','key1'), {'id':1,'a':1},
            policy={'key':aerospike.POLICY_KEY_SEND})
        client.put(('test','test','key2'), {'id':2,'b':2},
            policy={'key':aerospike.POLICY_KEY_SEND})

        def show_key(record):
            key, meta, bins = record
            print(key)

        scan = client.scan('test', 'test')
        scan_opts = {
          'concurrent': True,
          'nobins': True
        }
        scan.foreach(show_key, options=scan_opts)
        client.close()

    .. note::

        We expect to see:

        .. code-block:: python

            ('test', 'test', u'key2', bytearray(b'\xb2\x18\n\xd4\xce\xd8\xba:\x96s\xf5\x9ba\xf1j\xa7t\xeem\x01'))
            ('test', 'test', u'key1', bytearray(b'\x1cJ\xce\xa7\xd4Vj\xef+\xdf@W\xa5\xd8o\x8d:\xc9\xf4\xde'))

    .. note:: To stop the stream return ``False`` from the callback function.

        .. code-block:: python

            import aerospike

            config = { 'hosts': [ ('127.0.0.1',3000)]}
            client = aerospike.client(config)

            def limit(lim, result):
                c = [0] # integers are immutable so a list (mutable) is used for the counter
                def key_add(record):
                    key, metadata, bins = record
                    if c[0] < lim:
                        result.append(key)
                        c[0] = c[0] + 1
                    else:
                        return False
                return key_add

            scan = client.scan('test','user')
            keys = []
            scan.foreach(limit(100, keys))
            print(len(keys)) # this will be 100 if the number of matching records > 100
            client.close()

    .. note:: As of client 7.0.0 and with server >= 6.0 foreach and the scan policy
     "partition_filter" see :ref:`aerospike_partition_objects` can be used to specify which partitions/records
     foreach will scan. See the example below.

     .. code-block:: python

        # This is an example of scaning partitions 1000 - 1003.
        import aerospike


        partitions = []

        def callback(part_id, input_tuple):
            print(part_id)
            partitions.append(part_id)

        scan = client.scan("test", "demo")

        policy = {
            "partition_filter": {
                "begin": 1000,
                "count": 4
            },
        }

        scan.foreach(callback, policy)


        # NOTE that these will only be non 0 if there are records in partitions 1000 - 1003
        # should be 4
        print(len(partitions))

        # should be [1000, 1001, 1002, 1003]
        print(partitions)

.. method:: execute_background([, policy])

    Execute a record UDF on records found by the scan in the background. This method returns before the scan has completed.
    A UDF can be added to the scan with :meth:`Scan.apply`.

    :param dict policy: optional :ref:`aerospike_write_policies`.

    :return: a job ID that can be used with :meth:`~aerospike.Client.job_info` to track the status of the ``aerospike.JOB_SCAN``, as it runs in the background.

    .. note::
        Python client version 3.10.0 implemented scan execute_background.

        .. code-block:: python

            import aerospike
            from aerospike import exception as ex
            import sys
            import time

            config = {"hosts": [("127.0.0.1", 3000)]}
            client = aerospike.client(config)

            # register udf
            try:
                client.udf_put("/path/to/my_udf.lua")
            except ex.AerospikeError as e:
                print("Error: {0} [{1}]".format(e.msg, e.code))
                client.close()
                sys.exit(1)

            # put records and apply udf
            try:
                keys = [("test", "demo", 1), ("test", "demo", 2), ("test", "demo", 3)]
                records = [{"number": 1}, {"number": 2}, {"number": 3}]
                for i in range(3):
                    client.put(keys[i], records[i])

                scan = client.scan("test", "demo")
                scan.apply("my_udf", "my_udf", ["number", 10])
                job_id = scan.execute_background()

                # wait for job to finish
                while True:
                    response = client.job_info(job_id, aerospike.JOB_SCAN)
                    if response["status"] != aerospike.JOB_STATUS_INPROGRESS:
                        break
                    time.sleep(0.25)

                records = client.get_many(keys)
                print(records)
            except ex.AerospikeError as e:
                print("Error: {0} [{1}]".format(e.msg, e.code))
                sys.exit(1)
            finally:
                client.close()
            # EXPECTED OUTPUT:
            # [
            #   (('test', 'demo', 1, bytearray(b'\xb7\xf4\xb88\x89\xe2\xdag\xdeh>\x1d\xf6\x91\x9a\x1e\xac\xc4F\xc8')), {'gen': 2, 'ttl': 2591999}, {'number': 11}),
            #   (('test', 'demo', 2, bytearray(b'\xaejQ_7\xdeJ\xda\xccD\x96\xe2\xda\x1f\xea\x84\x8c:\x92p')), {'gen': 12, 'ttl': 2591999}, {'number': 12}),
            #   (('test', 'demo', 3, bytearray(b'\xb1\xa5`g\xf6\xd4\xa8\xa4D9\xd3\xafb\xbf\xf8ha\x01\x94\xcd')), {'gen': 13, 'ttl': 2591999}, {'number': 13})
            # ]
        .. code-block:: python

            # contents of my_udf.lua
            function my_udf(rec, bin, offset)
                info("my transform: %s", tostring(record.digest(rec)))
                rec[bin] = rec[bin] + offset
                aerospike:update(rec)
            end

.. method:: paginate()

    Makes a scan instance a paginated scan.
    Call this if you are using the "max_records" scan policy and you need to scan data in pages.

    .. note::
        Calling .paginate() on a scan instance causes it to save its partition state.
        This can be retrieved later using .get_partitions_status(). This can also be done using the
        partition_filter policy.

    .. code-block:: python

        # scan 3 pages of 1000 records each.

        import aerospike

        pages = 3
        page_size = 1000
        policy = {"max_records": 1000}

        scan = client.scan('test', 'demo')

        scan.paginate()

        # NOTE: The number of pages queried and records returned per page can differ
        # if record counts are small or unbalanced across nodes.
        for page in range(pages):
            records = scan.results(policy=policy)

            print("got page: " + str(page))

            if scan.is_done():
                print("all done")
                break

        # This id can be used to paginate queries.

.. method:: is_done()

    If using scan pagination, did the previous paginated or partition_filter scan using this scan instance return all records?

    :return: A :class:`bool` signifying whether this paginated scan instance has returned all records.

    .. code-block:: python

        import aerospike

        policy = {"max_records": 1000}

        scan = client.scan('test', 'demo')

        scan.paginate()

        records = scan.results(policy=policy)

        if scan.is_done():
            print("all done")

        # This id can be used to monitor the progress of a paginated scan.

.. method:: get_partitions_status()

    Get this scan instance's partition status. That is which partitions have been queried and which have not.
    The returned value is a :class:`dict` with partition id, :class:`int`, as keys and :class:`tuple` as values.
    If the scan instance is not tracking its partitions, the returned :class:`dict` will be empty.

    .. note::
        A scan instance must have had .paginate() called on it in order retrieve its
        partition status. If .paginate() was not called, the scan instance will not save partition status.

    :return: a :class:`tuple` of form (id: :class:`int`, init: class`bool`, done: class`bool`, digest: :class:`bytearray`).
        See :ref:`aerospike_partition_objects` for more information.

    .. code-block:: python

        # This is an example of resuming a scan using partition status.
        import aerospike


        for i in range(15):
            key = ("test", "demo", i)
            bins = {"id": i}
            client.put(key, bins)

        records = []
        resumed_records = []

        def callback(input_tuple):
            record, _, _ = input_tuple

            if len(records) == 5:
                return False

            records.append(record)

        scan = client.scan("test", "demo")
        scan.paginate()

        scan.foreach(callback)

        # The first scan should stop after 5 records.
        assert len(records) == 5

        partition_status = scan.get_partitions_status()

        def resume_callback(part_id, input_tuple):
            record, _, _ = input_tuple
            resumed_records.append(record)

        scan_resume = client.scan("test", "demo")

        policy = {
            "partition_filter": {
                "partition_status": partition_status
            },
        }

        scan_resume.foreach(resume_callback, policy)

        # should be 15
        total_records = len(records) + len(resumed_records)
        print(total_records)

        # cleanup
        for i in range(15):
            key = ("test", "demo", i)
            client.remove(key)

Policies

.. object:: policy

    A :class:`dict` of optional scan policies which are applicable to :meth:`Scan.results` and :meth:`Scan.foreach`. See :ref:`aerospike_policies`.

    .. hlist::
        :columns: 1

        * **max_retries** :class:`int`
            | Maximum number of retries before aborting the current transaction. The initial attempt is not counted as a retry.
            |
            | If max_retries is exceeded, the transaction will return error ``AEROSPIKE_ERR_TIMEOUT``.
            |
            | Default: ``0``

            .. warning::  Database writes that are not idempotent (such as "add") should not be retried because the write operation may be performed multiple times \
               if the client timed out previous transaction attempts. It's important to use a distinct write policy for non-idempotent writes which sets max_retries = `0`;

        * **sleep_between_retries** :class:`int`
            | Milliseconds to sleep between retries. Enter ``0`` to skip sleep.
            |
            | Default: ``0``
        * **socket_timeout** :class:`int`
            | Socket idle timeout in milliseconds when processing a database command.
            |
            | If socket_timeout is not ``0`` and the socket has been idle for at least socket_timeout, both max_retries and total_timeout are checked. \
              If max_retries and total_timeout are not exceeded, the transaction is retried.
            |
            | If both ``socket_timeout`` and ``total_timeout`` are non-zero and ``socket_timeout`` > ``total_timeout``, then ``socket_timeout`` will be set to \
             ``total_timeout``. If ``socket_timeout`` is ``0``, there will be no socket idle limit.
            |
            | Default: ``30000``.
        * **total_timeout** :class:`int`
            | Total transaction timeout in milliseconds.
            |
            | The total_timeout is tracked on the client and sent to the server along with the transaction in the wire protocol. The client will most likely \
              timeout first, but the server also has the capability to timeout the transaction.
            |
            | If ``total_timeout`` is not ``0`` and ``total_timeout`` is reached before the transaction completes, the transaction will return error \
             ``AEROSPIKE_ERR_TIMEOUT``. If ``total_timeout`` is ``0``, there will be no total time limit.
            |
            | Default: ``0``
        * **compress** (:class:`bool`)
            | Compress client requests and server responses.
            |
            | Use zlib compression on write or batch read commands when the command buffer size is greater than 128 bytes. In addition, tell the server to compress it's response on read commands. The server response compression threshold is also 128 bytes.
            |
            | This option will increase cpu and memory usage (for extra compressed buffers), but decrease the size of data sent over the network.
            |
            | Default: ``False``
        * **durable_delete** :class:`bool`
            | Perform durable delete (requires Enterprise server version >= 3.10)
            | If the transaction results in a record deletion, leave a tombstone for the record.
            |
            | Default: ``False``
        * **records_per_second** :class:`int`
            | Limit the scan to process records at records_per_second.
            | Requires server version >= 4.7.0.
            |
            | Default: ``0`` (no limit).
        * **expressions** :class:`list`
            | Compiled aerospike expressions :mod:`aerospike_helpers` used for filtering records within a transaction.
            |
            | Default: ``None``

            .. note:: Requires Aerospike server version >= 5.2.
        * **max_records** :class:`int`
            | Approximate number of records to return to client.
            | This number is divided by the number of nodes involved in the scan.
            | The actual number of records returned may be less than max_records if node record counts are small and unbalanced across nodes.
            |
            | Default: ``0`` (No Limit).

            .. note:: Requires Aerospike server version >= 6.0
        * **partition_filter** :class:`dict`
            | A dictionary of partition information used by the client
            | to perform partiton scans. Useful for resuming terminated scans and
            | scaning particular partitons/records.
            |
            |   See :ref:`aerospike_partition_objects` for more information.
            |
            | Default: ``{}`` (All partitions will be scanned).

        * **replica**
            | One of the :ref:`POLICY_REPLICA` values such as :data:`aerospike.POLICY_REPLICA_MASTER`
            |
            | Default: ``aerospike.POLICY_REPLICA_SEQUENCE``
        * **ttl** (:class:`int`)
            The default time-to-live (expiration) of the record in seconds. This field will only be used on
            background scan writes if :py:attr:`aerospike.Scan.ttl` is set to
            :data:`aerospike.TTL_CLIENT_DEFAULT`.

            There are also special values that can be set for this field. See :ref:`TTL_CONSTANTS`.

Options

.. object:: options

    A :class:`dict` of optional scan options which are applicable to :meth:`Scan.foreach`.

    .. hlist::
        :columns: 1

        * **nobins** :class:`bool`
            | Whether to return the *bins* portion of the :ref:`aerospike_record_tuple`.
            |
            | Default ``False``.
        * **concurrent** :class:`bool`
            | Whether to run the scan concurrently on all nodes of the cluster.
            |
            | Default ``False``.
        * **percent** :class:`int`
            | Deprecated in version 6.0.0, will be removed in a coming release.
            | No longer available with server 5.6+.
            | Use scan policy max_records instead.
            | Percentage of records to return from the scan.
            |
            | Default ``100``.

    .. versionadded:: 1.0.39