Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QuantumLeap request pagination #47

Closed
msc-d1s7fa1eza opened this issue Oct 7, 2021 · 8 comments · Fixed by #73
Closed

QuantumLeap request pagination #47

msc-d1s7fa1eza opened this issue Oct 7, 2021 · 8 comments · Fixed by #73
Assignees
Labels
feature request Request a potential feature

Comments

@msc-d1s7fa1eza
Copy link
Collaborator

msc-d1s7fa1eza commented Oct 7, 2021

Is your feature request related to a problem? Please describe.
QuantumLeap API implements a pagination mechanism in order to help clients to retrieve large sets of resources. (https://quantumleap.readthedocs.io/en/latest/user/pagination/)
However, this mechanism has not yet been implemented in the QuantumLeap client (see at https://github.com/RWTH-EBC/FiLiP/blob/development/filip/clients/ngsi_v2/quantumleap.py). Hence, the maximum number of results is limited to 10000 for a single request.

Describe the solution you'd like
The pagination mechanism is somewhat similar to the one already implemented for the context broker, compare https://github.com/RWTH-EBC/FiLiP/blob/development/filip/clients/ngsi_v2/cb.py (and also see https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html). So, the solution should follow a similar approach.
However, the response is different, which results in a few changes, for instance, the json objects cannot be extended but rather the resulting data lists should be extended.
Please find a suggestion below:

def __pagination(*,
                method: PaginationMethod = PaginationMethod.GET,
                url: str,
                headers: Dict,
                limit: Union[PositiveInt, PositiveFloat] = None,
                offset: int = None,
                session: requests.Session = None,
                params: Dict = None,
                data: str = None) -> List[Dict]:
        """
        NGSIv2 implements a pagination mechanism in order to help clients to
        retrieve large sets of resources. This mechanism works for all listing
        operations in the API (e.g. GET /v2/entities, GET /v2/subscriptions,
        POST /v2/op/query, etc.). This function helps getting datasets that are
        larger than the limit for the different GET operations.
        https://fiware-orion.readthedocs.io/en/master/user/pagination/index.html
        Args:
            url: Information about the url, obtained from the original function
            headers: The headers from the original function
            params:
            limit:
        Returns:
            object:
        """
        if limit is None:
            limit = inf
        if limit > 10000:
            params['limit'] = 10000  # maximum items per request
        else:
            params['limit'] = limit
        if offset is None:
            params['offset'] = 0
        else:
            params['offset'] = offset
        additional_offset = 0
        if not session:
            session = requests.Session()
        with session:
            res = session.request(method=method,
                                  url=url,
                                  params=params,
                                  headers=headers,
                                  data=data)
            if res.ok:
                items = res.json()
                # do pagination
                count = int(res.headers['Content-Length'])
                attributes = params['attrs']

                # fresh start
                items['index'] = []
                items['attributes'][0]['values'] = []

                while len(items['index']) < limit and len(items['index']) < count:
                    try:
                        # Establishing the offset from where entities are retrieved
                        for attr in attributes:
                            params['attrs'] = attr
                            params['offset'] = len(items['index']) + additional_offset
                            params['limit'] = min(10000, (limit - len(items['index'])))
                            res = session.request(method=method,
                                                  url=url,
                                                  params=params,
                                                  headers=headers,
                                                  data=data)
                            if res.ok:
                                # items.extend(res.json()) --> this does not work for the given response structure
                                if len(items['attributes']) == len(attributes): # update entries
                                    i = find(lst=items['attributes'],key=attr)
                                    items['attributes'][i]['values'].extend(res.json()['attributes'][0]['values'])
                                elif len(items['attributes']) < len(attributes):
                                    items['attributes'].extend(res.json()['attributes'])
                                else:
                                    logger.error('Inconsistent number of attributes %i', len(items['attributes']))
                            else:
                                res.raise_for_status()
                        items['index'].extend(res.json()['index'])
                    except Exception as e:
                        if '404 Client Error' in e.args[0]:
                            # no records found, skip period
                            limit -= 10000
                            additional_offset += 10000
                        else:
                            print(e)
                    if not (pytz.utc.localize(dateutil.parser.parse(params['toDate']))>
                        dateutil.parser.parse(items['index'][-1])+datetime.timedelta(seconds=1)):
                        break
                logger.debug('Received: %s', items)
                items['entity_id'] = params['entity_id']
                items['entity_type'] = params['entity_type']
                ts = parse_obj_as(TimeSeries, items)
                return ts
            res.raise_for_status()

Describe alternatives you've considered
The obvious alternative is to individually evoke single requests and post-process the results to the desired format.

Additional context
Beside the different response structure, when there are no records within a period, the function is stuck since the request will fail and neither the items nor the limit will be updated. This exception has to be caught. Note that in the above example, the request of several attributes has been considered.

@msc-d1s7fa1eza msc-d1s7fa1eza added the feature request Request a potential feature label Oct 7, 2021
@tstorek
Copy link
Collaborator

tstorek commented Oct 10, 2021

@msc-d1s7fa1eza thanks for the issue. Indeed, we overlooked the mechanism when implementing the client. As you pointed out the responses can not be merged that easily. However, I would probably try to let return QueryBuilder a list of json-objects and then let each individual request function merge the resulting TimeSeries objects.

What do you think?

@msc-d1s7fa1eza
Copy link
Collaborator Author

@tstorek Thank you for the quick reply. Regarding your suggestion, this would probably get around the issues I described above. Due to the additional effort for the pagination mechanism it is likely that using the individual request approach might even perform better. I would suggest though to conduct a performance test comparing both mechanisms if there is a significant difference.

@tstorek
Copy link
Collaborator

tstorek commented Oct 11, 2021

@msc-d1s7fa1eza As a workaround I suggest to use a loop over the normal request function that are already available. Simple increment your window of entries. If no entries are available for a request you will receive an error that you need to catch. It comes along with a descriptive error message.
The script would look something like this (depending on the client function that you are using):

from requests import RequestException
from typing import List
from filip.clients.ngsi_v2 import QuantumLeapClient
from filip.models import FiwareHeader
from filip.models.ngsi_v2.timeseries import TimeSeries

QL_URL = "UrlOfQuantumLeap"
FIWARE_SERVICE = "yourTenant"
FIWARE_SERVICEPATH = "/yourTenantPath"

fh = FiwareHeader(
    service=FIWARE_SERVICE,
    service_path=FIWARE_SERVICEPATH
)

client = QuantumLeapClient(url=QL_URL,
                           fiware_header=fh)
limit = 1000000 # Max number of entries to retrieve
max_per_request = 10000 # Max allowed entries
res: List[TimeSeries] = []
for i in range(0, limit, max_per_request):
    try:
        res.append(client.get_entity_by_id(
            entity_id="yourEntity",
            entity_type="yourEntityType",
            offset=i,
            limit=i+max_per_request))
    except RequestException as err:
        if err.response.status_code == 404:
            try:
                err.response.json()['error'] == 'Not Found'
            except KeyError:
                break
        else:
            raise

# merge time series objects to pandas
df = res[0].to_pandas()
for i in range(1, len(res)):
    df.append(res[i].to_pandas())

Hope that helps. Nevertheless, I will be happy about a pull request solving the pagination.

@tstorek
Copy link
Collaborator

tstorek commented Oct 11, 2021

@msc-d1s7fa1eza could you please test with your use case? I do not have a dataset available right now

@msc-d1s7fa1eza
Copy link
Collaborator Author

msc-d1s7fa1eza commented Oct 11, 2021

@tstorek I tested the run times of both the pagination and individual request approach with the timeit module. Not only is the individual approach shorter and likely to cause less issues, it also runs faster (allthough the sample might not be representative, the difference is quite significant):

timeit.repeat(stmt="ts_with_pagination(data_limit=50000,fromDate='2021-10-04T05:30:00',toDate='2021-10-11T09:40:00')", setup="from __main__ import ts_with_pagination", repeat=3, number=3)

">>> [120.03673389999994, 121.31632300000001, 121.88975950000031]"

timeit.repeat(stmt="ts_with_individual_requests(data_limit=50000,fromDate='2021-10-04T05:30:00',toDate='2021-10-11T09:40:00')", setup="from __main__ import ts_with_individual_requests", repeat=3, number=3)

">>> [99.43117789999997, 97.69579180000028, 98.20416950000026]"

I had to change your last line to
df = df.append(res[i].to_pandas())
since it would not update the df otherwise.

Do you think, this can be directly integrated in the QL client functions?
So, for instance:

    # /entities/{entityId}
    def get_entity_by_id(self,
                         entity_id: str,
                         *,
                         attrs: str = None,
                         entity_type: str = None,
                         aggr_method: Union[str, AggrMethod] = None,
                         aggr_period: Union[str, AggrPeriod] = None,
                         from_date: str = None,
                         to_date: str = None,
                         last_n: int = None,
                         limit: int = None,
                         offset: int = None,
                         georel: str = None,
                         geometry: str = None,
                         coords: str = None,
                         options: str = None
                         ) -> TimeSeries:

        """
        History of N attributes of a given entity instance
        For example, query max water level of the central tank throughout the
        last year. Queries can get more
        sophisticated with the use of filters and query attributes.

        Args:
            entity_id (String): Entity id is required.
            attrs (String): Comma-separated list of attribute names
            entity_type (String): Comma-separated list of entity types whose
                data are to be included in the response.
            aggr_method (String): The function to apply to the raw data
                filtered. count, sum, avg, min, max
            aggr_period (String): year, month, day, hour, minute, second
            from_date (String): Starting date and time inclusive.
            to_date (String): Final date and time inclusive.
            last_n (int): Request only the last N values.
            limit (int): Maximum number of results to be retrieved.
                Default value : 10000
            offset (int): Offset for the results.
            georel (String): Geographical pattern
            geometry (String): Required if georel is specified.  point, line,
                polygon, box
            coords (String): Required if georel is specified.
                e.g. 40.714,-74.006
            options (String): Key value pair options.

        Returns:
            TimeSeries
        """
        url = urljoin(self.base_url, f'/v2/entities/{entity_id}')
        res: List[TimeSeries] = []
        for i in range(Offset, limit, 10000):
            try:
                res.append(TimeSeries.parse_obj(
                                   self.__query_builder(url=url,
                                   attrs=attrs,
                                   options=options,
                                   entity_type=entity_type,
                                   aggr_method=aggr_method,
                                   aggr_period=aggr_period,
                                   from_date=from_date,
                                   to_date=to_date,
                                   last_n=last_n,
                                   limit=i+10000,
                                   offset=i,
                                   georel=georel,
                                   geometry=geometry,
                                   coords=coords)))
            except RequestException as err:
                if err.response.status_code == 404:
                    try:
                        err.response.json()['error'] == 'Not Found'
                    except KeyError:
                        break
                else:
                    raise
        return res

This would always return a list of TimeSeries objects which is of length 1 for a request with lower than 10000 entries. What do you think about that?

@tstorek
Copy link
Collaborator

tstorek commented Oct 11, 2021

@msc-d1s7fa1eza Thanks for the suggestion.
I think a combination of both would be great. I tested to have the ability for extend in TimeSeries.
Since it only uses lists it should be very fast as well and you directly come up with a concise TimeSeries object without changing the behavior of the current api :)

Could you please run another test merging the TimeSeries objects instead of the pandas dataframes? That would be quite interesting :)

from filip.models.ngsi_v2.timeseries import TimeSeries

class TimeSeriesExtend(TimeSeries):
    def extend(self, other: TimeSeries) -> None:
        assert self.entityId == other.entityId
        assert self.entityType == other.entityType
        assert self.index[-1] < other.index[0]
        for attr, other_attr in zip(self.attributes, other.attributes):
            assert attr.attrName == other_attr.attrName
            attr.values.extend(other_attr.values)
        self.index.extend(other.index)
        for attr in self.attributes:
            print(attr.json(indent=2))

if __name__ == '__main__':
    data1 = {
        "attributes": [
            {
                "attrName": "temperature",
                "values": [
                    24.1,
                    25.3,
                    26.7
                ]
            },
            {
                "attrName": "pressure",
                "values": [
                    1.01,
                    0.9,
                    1.02
                ]
            }
        ],
        "entityId": "Kitchen1",
        "index": [
            "2018-01-05T15:44:34",
            "2018-01-06T15:44:59",
            "2018-01-07T15:44:59"
        ]
    }
    data2 = {
        "attributes": [
            {
                "attrName": "temperature",
                "values": [
                    34.1,
                    35.3,
                    36.7
                ]
            },
            {
                "attrName": "pressure",
                "values": [
                    2.01,
                    1.9,
                    2.02
                ]
            }
        ],
        "entityId": "Kitchen1",
        "index": [
            "2018-01-08T15:44:34",
            "2018-01-09T15:44:59",
            "2018-01-10T15:44:59"
        ]
    }
    ts1 = TimeSeriesExtend.parse_obj(data1)
    print(f"Initial data set: \n {ts1.to_pandas()}")
    ts2 = TimeSeries.parse_obj(data2)
    ts1.extend(ts2)
    print(f"Extended data set: \n {ts1.to_pandas()}")

Output:

Initial data set: 
 entityId               Kitchen1         
entityType                  NaN         
attribute           temperature pressure
datetime                                
2018-01-05 15:44:34        24.1     1.01
2018-01-06 15:44:59        25.3     0.90
2018-01-07 15:44:59        26.7     1.02
{
  "values": [
    24.1,
    25.3,
    26.7,
    34.1,
    35.3,
    36.7
  ],
  "attrName": "temperature"
}
{
  "values": [
    1.01,
    0.9,
    1.02,
    2.01,
    1.9,
    2.02
  ],
  "attrName": "pressure"
}
Extended data set: 
 entityId               Kitchen1         
entityType                  NaN         
attribute           temperature pressure
datetime                                
2018-01-05 15:44:34        24.1     1.01
2018-01-06 15:44:59        25.3     0.90
2018-01-07 15:44:59        26.7     1.02
2018-01-08 15:44:34        34.1     2.01
2018-01-09 15:44:59        35.3     1.90
2018-01-10 15:44:59        36.7     2.02

@msc-d1s7fa1eza
Copy link
Collaborator Author

msc-d1s7fa1eza commented Oct 12, 2021

Hi @tstorek - just to get this straight out of the way, here you can find the results of my latest test:

timeit.repeat(stmt="ts_with_pagination(data_limit=50000,fromDate='2021-10-04T05:30:00',toDate='2021-10-11T09:40:00')", setup="from __main__ import ts_with_pagination", repeat=3, number=3)

">>> [121.3780323, 121.93554929999999, 123.9959528]"

timeit.repeat(stmt="ts_with_individual_requests(data_limit=50000,fromDate='2021-10-04T05:30:00',toDate='2021-10-11T09:40:00')", setup="from __main__ import ts_with_individual_requests", repeat=3, number=3)

">>> [97.09694879999995, 98.87462059999996, 96.82192520000001]"

timeit.repeat(stmt="ts_with_timeseries_extend(data_limit=50000,fromDate='2021-10-04T05:30:00',toDate='2021-10-11T09:40:00')", setup="from __main__ import ts_with_timeseries_extend", repeat=3, number=3)

">>> [98.71543789999998, 98.38199939999993, 95.6458027000001]"

Again, my sample is not representative. Therefore, the run times are somewhat comparable for the timeseries and df extend. However, the handling for the former approach seems much more convenient to me.

Ok, so after another little tuning, I would prefer using the ts extending approach. Here is my latest implementation of the related functions:

#FiLiP/filip/models/ngsi_v2/timeseries.py
class TimeSeries(TimeSeriesHeader):
    """
    Model for time series data
    """
    attributes: List[AttributeValues] = None

    def to_pandas(self) -> pd.DataFrame:
        """
        Converts time series data to pandas dataframe
        Returns:
            pandas.DataFrame
        """
        index = pd.Index(data=self.index, name='datetime')
        attr_names = [attr.attrName for attr in self.attributes]
        values = np.array([attr.values for attr in self.attributes]).transpose()
        columns = pd.MultiIndex.from_product(
            [[self.entityId], [self.entityType], attr_names],
            names=['entityId', 'entityType', 'attribute'])

        return pd.DataFrame(data=values, index=index, columns=columns)

    def extend(self, other: TimeSeries) -> None:
        """
        Extends time series data by another time series object
        """
        assert self.entityId == other.entityId
        assert self.entityType == other.entityType
        assert self.index[-1] < other.index[0]
        for attr, other_attr in zip(self.attributes, other.attributes):
            assert attr.attrName == other_attr.attrName
            attr.values.extend(other_attr.values)
        self.index.extend(other.index)


#FiLiP/filip/clients/ngsi_v2/quantumleap.py
    # QUERY API ENDPOINTS
    def __query_builder(self,
                        url,
                        *,
                        entity_id: str = None,
                        options: str = None,
                        entity_type: str = None,
                        aggr_method: Union[str, AggrMethod] = None,
                        aggr_period: Union[str, AggrPeriod] = None,
                        from_date: str = None,
                        to_date: str = None,
                        last_n: int = None,
                        limit: int = None,
                        offset: int = None,
                        georel: str = None,
                        geometry: str = None,
                        coords: str = None,
                        attrs: str = None,
                        aggr_scope: Union[str, AggrScope] = None
                        ) -> Dict:
        """
        Private Function to call respective API endpoints

        Args:
            url:
            entity_id:
            options:
            entity_type:
            aggr_method:
            aggr_period:
            from_date:
            to_date:
            last_n:
            limit:
            offset:
            georel:
            geometry:
            coords:
            attrs:
            aggr_scope:

        Returns:
            Dict
        """
        params = {}
        headers = self.headers.copy()
        if options:
            params.update({'options': options})
        if entity_type:
            params.update({'type': entity_type})
        if aggr_method:
            aggr_method = AggrMethod(aggr_method)
            params.update({'aggrMethod': aggr_method.value})
        if aggr_period:
            aggr_period = AggrPeriod(aggr_period)
            params.update({'aggrPeriod': aggr_period.value})
        if from_date:
            params.update({'fromDate': from_date})
        if to_date:
            params.update({'toDate': to_date})
        if last_n:
            params.update({'lastN': last_n})
        if limit:
            params.update({'limit': limit})
        if offset:
            params.update({'offset': offset})
        if georel:
            params.update({'georel': georel})
        if coords:
            params.update({'coords': coords})
        if geometry:
            params.update({'geometry': geometry})
        if attrs:
            params.update({'attrs': attrs})
        if aggr_scope:
            aggr_scope = AggrScope(aggr_scope)
            params.update({'aggr_scope': aggr_scope.value})
        if entity_id:
            params.update({'id': entity_id})
        try:
            res = self.get(url=url, params=params, headers=headers)
            if res.ok:
                self.logger.info("Successfully received entity data")
                self.logger.debug('Received: %s', res.json())
                return res.json(), int(res.headers['Content-Length'])
            res.raise_for_status()
        except requests.exceptions.RequestException as err:
            msg = "Could not load entity data"
            self.log_error(err=err, msg=msg)
            raise

    # /entities/{entityId}
    def get_entity_by_id(self,
                         entity_id: str,
                         *,
                         attrs: str = None,
                         entity_type: str = None,
                         aggr_method: Union[str, AggrMethod] = None,
                         aggr_period: Union[str, AggrPeriod] = None,
                         from_date: str = None,
                         to_date: str = None,
                         last_n: int = None,
                         limit: int = None,
                         offset: int = None,
                         georel: str = None,
                         geometry: str = None,
                         coords: str = None,
                         options: str = None
                         ) -> TimeSeries:

        """
        History of N attributes of a given entity instance
        For example, query max water level of the central tank throughout the
        last year. Queries can get more
        sophisticated with the use of filters and query attributes.

        Args:
            entity_id (String): Entity id is required.
            attrs (String): Comma-separated list of attribute names
            entity_type (String): Comma-separated list of entity types whose
                data are to be included in the response.
            aggr_method (String): The function to apply to the raw data
                filtered. count, sum, avg, min, max
            aggr_period (String): year, month, day, hour, minute, second
            from_date (String): Starting date and time inclusive.
            to_date (String): Final date and time inclusive.
            last_n (int): Request only the last N values.
            limit (int): Maximum number of results to be retrieved.
                Default value : 10000
            offset (int): Offset for the results.
            georel (String): Geographical pattern
            geometry (String): Required if georel is specified.  point, line,
                polygon, box
            coords (String): Required if georel is specified.
                e.g. 40.714,-74.006
            options (String): Key value pair options.

        Returns:
            TimeSeries
        """
        url = urljoin(self.base_url, f'/v2/entities/{entity_id}')
        if limit is None:
                limit = inf
        if offset is None:
            offset = 0
        ts, content_length = self.__query_builder(url=url,
                                       attrs=attrs,
                                       options=options,
                                       entity_type=entity_type,
                                       aggr_method=aggr_method,
                                       aggr_period=aggr_period,
                                       from_date=from_date,
                                       to_date=to_date,
                                       last_n=last_n,
                                       limit=offset+min(limit, 10000),
                                       offset=offset,
                                       georel=georel,
                                       geometry=geometry,
                                       coords=coords)
        res = TimeSeries.parse_obj(ts)
        if limit > content_length:
            limit = content_length
        if limit > 10000:
            offset += 10000
            for i in range(offset, limit, 10000):
                max_requests = 10000 if limit - i > 10000 else limit -i
                try:
                    ts, content_length = self.__query_builder(url=url,
                                        attrs=attrs,
                                        options=options,
                                        entity_type=entity_type,
                                        aggr_method=aggr_method,
                                        aggr_period=aggr_period,
                                        from_date=from_date,
                                        to_date=to_date,
                                        last_n=last_n,
                                        limit=max_requests,
                                        offset=i,
                                        georel=georel,
                                        geometry=geometry,
                                        coords=coords)
                    res.extend(TimeSeries.parse_obj(ts))
                except requests.RequestException as err:
                    if err.response.status_code == 404:
                        try:
                            err.response.json()['error'] == 'Not Found'
                        except KeyError:
                            break
                    else:
                        raise
        return res

I added the content length to the query build so that it can skip would not try running numerous empty requests if the limit is much larger than the actual content length. I believe for the other API functions, the parse_obj function could just be feeded with the additional arguments to make the work as well?

msc-d1s7fa1eza added a commit that referenced this issue Oct 13, 2021
Changelog:
- Added time series extend function to TimeSerie class according to suggestion in issue #47
- Added using the extending function by default if the given limit is >
  10000 to the quantumleap function get_entity_values_by_id()
@github-actions
Copy link

Branch 47-QuantumLeap-request-pagination created!

@tstorek tstorek assigned tstorek and unassigned msc-d1s7fa1eza Nov 9, 2021
tstorek added a commit that referenced this issue Nov 12, 2021
tstorek added a commit that referenced this issue Nov 12, 2021
tstorek added a commit that referenced this issue Nov 12, 2021
tstorek added a commit that referenced this issue Nov 12, 2021
@tstorek tstorek linked a pull request Nov 12, 2021 that will close this issue
tstorek added a commit that referenced this issue Nov 12, 2021
tstorek added a commit that referenced this issue Nov 12, 2021
tstorek added a commit that referenced this issue Nov 12, 2021
tstorek added a commit that referenced this issue Nov 12, 2021
tstorek added a commit that referenced this issue Nov 13, 2021
tstorek added a commit that referenced this issue Nov 13, 2021
feat: 47 quantum leap request pagination

closes #47
dnikolay-ebc pushed a commit to dnikolay-ebc/FiLiP that referenced this issue Dec 10, 2021
…velopment'

Fix bug

Closes RWTH-EBC#47

See merge request EBC/EBC_all/fiware/filip!44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request Request a potential feature
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants