# Elasticsearch use case

Here we try the possibility given by elasticsearch and its python high level sdk, elasticsearch_dsl.

First, import the needed modules and create a connection.

Here we assume an istance of elasticsearch is running on localhost, on the default port.

In [1]:
from datetime import datetime

from elasticsearch_dsl import (
    DocType, Keyword, Date, Float, Nested, InnerObjectWrapper, Search
)
from elasticsearch_dsl.aggs import A
from elasticsearch_dsl.connections import connections
from elasticsearch_dsl.query import Q


# Connect to a local ES instance
connections.create_connection(hosts=['localhost'], timeout=20)


<Elasticsearch([{'host': 'localhost'}])>

## Define the doc type

Next we define a DocType, BasicMetric, that will tell ES how to represent our objects.

If this is not set first, ES will create it by itself the first time you send data to it.

We create it manually so we can define some fields as Keyword rather than Text where we 
won't need Full Text Search. This will make the index faster, and will avoid the creation
of a wrong index caused by malformed data

In [2]:
class BasicMetric(DocType):
    """
    Besic metric mapping
    """
    metric_type = Keyword()
    user_id = Keyword()
    timestamp = Date()
    value = Float()
    metadata = Nested(
        properties={
            'type': Keyword(),
            'unique_id': Keyword(),
        }
    )

    class Meta:
        index = 'metrics'


## Initialize metric

We can now add one record by istantiating a BasicMetric object and calling the save() method (much like the Django ORM)

In [11]:
metric = BasicMetric(metric_type='time', user_id='prova id', timestamp=datetime.now(), value=1.245)
metric.metadata.append({'type': 'provatype', 'unique_id': 'very unique id'})
metric.save()
print("==> Metric: %s" % metric)


==> Metric: BasicMetric(index='metrics', id='AVwLPq0we79RYn3ceAqL')


## An example search

Here is how we can make call to ES. Let's say we want to ask for all our records, limiting the fields that will be returned:

In [10]:
s = Search()
s = s.source(['user_id', 'value', 'metadata'])
s.to_dict()
response = s.execute()
print("==> Success:\t%s" % response.success())
print("==> Total hits:\t%s" % response.hits.total)

==> Success:	True
==> Total hits:	100002


## Some usefull search queries

Elasticsearch DSL allow us to build queries with a DjangoORM-like syntax.
Search objects have methods like .filter() and .exclude().

We can build aggregations using the A object and we can build queries using Q objects
and combine them with logic operators.



### Get all records with a time metric value between 2000 and 30000


In [9]:
s = Search().filter('range', value={"gte": 2000, "lte": 30000})
response = s.execute()
print("==> Response:")
print(response)
print("==> Hits:")
print(response.hits[0])
print("==> First user id:")
print(response.hits[0].user_id)

==> Response:
<Response: [<Hit(metrics/basic_metric/16244): {'metric_type': 'time', 'user_id': '58fa8ad2-ca2a-4a56-b79e-...}>, <Hit(metrics/basic_metric/18379): {'metric_type': 'time', 'user_id': 'e4094361-70ad-46ce-aa2c-...}>, <Hit(metrics/basic_metric/18685): {'metric_type': 'time', 'user_id': '307362c7-cec4-4ebf-b52f-...}>, <Hit(metrics/basic_metric/25402): {'metric_type': 'time', 'user_id': '723b2cc8-52e3-411b-b9fc-...}>, <Hit(metrics/basic_metric/26768): {'metric_type': 'time', 'user_id': '67f3a7e0-86c1-4bab-9d8f-...}>, <Hit(metrics/basic_metric/31500): {'metric_type': 'time', 'user_id': '723b2cc8-52e3-411b-b9fc-...}>, <Hit(metrics/basic_metric/32725): {'metric_type': 'time', 'user_id': 'f3fb6328-d4b0-4b07-a753-...}>, <Hit(metrics/basic_metric/43264): {'metric_type': 'time', 'user_id': 'ce103d23-eada-4a98-8d7d-...}>, <Hit(metrics/basic_metric/46457): {'metric_type': 'time', 'user_id': 'ac6cfc73-4044-4cf1-9ad4-...}>, <Hit(metrics/basic_metric/61072): {'metric_type': 'time', 'user_i

### Number of records per user

This is how we can build an aggregation.
We can also use the to_dict() method to show the query we just built, and if we set the size of the Search object to 0, we can retrieve only the aggregations.

In [12]:
# Define aggregation
agg = A('terms', field='user_id', include='user_id')
# init search
s = Search()
# Size 0
s = s[0:0]
# Add aggregation
s.aggs.bucket('group_by_user', agg)
# Dict representation
s.to_dict()

{'aggs': {'group_by_user': {'terms': {'field': 'user_id',
    'include': 'user_id'}}},
 'from': 0,
 'query': {'match_all': {}},
 'size': 0}

### Average time by segment

Here we use the Q object to build the query

In [16]:
s = Search()

# Set size to 0, we only want aggregations
s = s[0:0]

# Queries
s = s.filter('term', metric_type='time')
q = Q('term', metadata__resource_type='segment')
s = s.filter('nested', path='metadata', query=q)

# Aggregations (read it down-top)
a = A('avg', field='value')
a = A('reverse_nested', aggs={'avg_time': a})
a = A('terms', field='metadata.resource_id', aggs={'average': a})
a = A('nested', path='metadata', aggs={'segments': a})
s.aggs.bucket('by_segment_id', a)

s.to_dict()

resp = s.execute()
print("==> Buckets: %s" % resp.aggregations.by_segment_id.segments.buckets)
print("==> Doc count: %s" % resp.aggregations.by_segment_id.segments.buckets[0].doc_count)


==> Buckets: [{'key': '382d786a-b416-4df3-97e0-1cca4555b020', 'doc_count':...}, {'key': '696982c5-2baf-467c-9c69-734eb807e02a', 'doc_count':...}, {'key': '77f5e0b5-39a6-4569-851e-4d05030864d2', 'doc_count':...}, {'key': '99b7a52e-da4b-49bd-b9ce-3feae4307b19', 'doc_count':...}, {'key': 'bf24f051-9f36-40bc-aedf-7379d2b0d543', 'doc_count':...}, {'key': 'ece95a0e-f710-4409-b94c-407e7bf0ba0a', 'doc_count':...}, {'key': 'a9d3ca0d-4aa6-481b-94e5-d0f865d67240', 'doc_count':...}, {'key': '63a4a9a1-0775-4772-95db-8580801ded8f', 'doc_count':...}, {'key': '318e1d9b-3204-461a-a8c2-aa0672ab1983', 'doc_count':...}, {'key': '442038ca-efd5-4300-8b37-6cc3c41f054c', 'doc_count':...}]
==> Doc count: 1174


### Best time for a given segment

In [17]:
s = Search()

# We only want the first result and user_id and value fields
s = s[0:1]
s = s.source(['user_id', 'value'])

# Sorting
s = s.sort('-value')

# Filter it
s = s.filter('term', metric_type='time')
q = Q('term', metadata__resource_type='segment')
q &= Q('term', metadata__resource_id='segment-id')
s = s.filter('nested', path='metadata', query=q)

s.to_dict()

{'_source': ['user_id', 'value'],
 'from': 0,
 'query': {'bool': {'filter': [{'term': {'metric_type': 'time'}},
    {'nested': {'path': 'metadata',
      'query': {'bool': {'must': [{'term': {'metadata.resource_type': 'segment'}},
         {'term': {'metadata.resource_id': 'segment-id'}}]}}}}]}},
 'size': 1,
 'sort': [{'value': {'order': 'desc'}}]}

In [18]:
s.execute() # No results if we don't set an existing segment_id

<Response: []>

### Best time in a given segment for a given user

In [22]:
q1 = {'term': {"metadata__resource_type": 'segment'}}
q2 = {'term': {"metadata__resource_id": 'segment-id'}}
q = Q('bool', filter=[q1,q2])

s = Search()[0:1]\
        .source(['user_id', 'value'])\
        .sort('-value')\
        .filter('term', metric_type='time')\
        .filter('term', user_id='user-id')\
        .filter('nested', path='metadata', query=q)

s.to_dict()

{'_source': ['user_id', 'value'],
 'from': 0,
 'query': {'bool': {'filter': [{'term': {'metric_type': 'time'}},
    {'term': {'user_id': 'user-id'}},
    {'nested': {'path': 'metadata',
      'query': {'bool': {'filter': [{'term': {'metadata.resource_type': 'segment'}},
         {'term': {'metadata.resource_id': 'segment-id'}}]}}}}]}},
 'size': 1,
 'sort': [{'value': {'order': 'desc'}}]}