Skip to content

Commit

Permalink
Update/improve get deployment elastic (#276)
Browse files Browse the repository at this point in the history
* Adding some info about the changes

- The aggregations query will use now the search method of the client. The queries will use the scan helper. The second one is good if we are receiving a lot of information (more than 10K).
- Add explanation about chunks division in the get_deployment method of the elastic source

* Add filtering field

* Delete unnecesary dvr_argument variable

* Show all jobs even those that doesn't have information

* Fix pep8 whitespace before

* Add aggregations + query result query in tests
  • Loading branch information
adrianfusco committed May 30, 2022
1 parent 24f74fc commit 84b8f7f
Show file tree
Hide file tree
Showing 3 changed files with 185 additions and 102 deletions.
213 changes: 133 additions & 80 deletions cibyl/plugins/openstack/sources/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
"""

import logging
import re

from cibyl.models.attribute import AttributeDictValue
from cibyl.models.ci.base.job import Job
Expand All @@ -40,117 +39,171 @@ def get_deployment(self, **kwargs):
jobs_found = self.get_jobs(**kwargs)

query_body = {
# We don't want results in the main hits
# size 0 don't show it. We will have the results
# in the aggregations side
"size": 0,
"from": 0,
"query": {
"bool": {
"must": [
{
"bool": {
"must": []
}
},
{
"bool": {
"should": [
{
"exists": {
"field": "ip_version"
}
},
{
"exists": {
"field": "storage_backend"
}
},
{
"exists": {
"field": "network_backend"
}
},
"bool": {
"must": [
{
"exists": {
"field": "dvr"
}
"bool": {
"should": []
}
},
{
"exists": {
"field": "topology"
}
"bool": {
"should": [],
"minimum_should_match": 1
}
}
],
"minimum_should_match": 1
}
}
]
}
]
}
},
"size": 1,
"sort": [
{
"timestamp.keyword": {
"order": "desc"
# We should GROUP BY job names
"aggs": {
"group_by_job_name": {
"terms": {
"field": "job_name.keyword",
"size": 10000
},
"aggs": {
"last_build": {
# And take the first coincidence
"top_hits": {
"size": 1,
# Sorted by build_num to get the last info
"sort": [
{
"build_num": {
"order": "asc"
}
}
],
"_source": ['build_url']
}
}
}
}
]
}
}

results = []
hits = []
for job in jobs_found:
query_body['query']['bool']['must'][0]['bool']['must'] = {
"match": {
"job_name.keyword": f"{job}"
}
}
results = self.__query_get_hits(
query=query_body,
index='logstash_jenkins'
chunked_list_of_jobs = []
chunk_size_for_search = 400
# We can't send a giant query in the request to the elasticsearch
# for asking to all the jobs information. Instead of doing one
# query for job we create a list of jobs sublists and do calls
# divided by chunks. chunk_size_for_search quantity will be
# the size of every sublist. If we have 2000 jobs we will have
# the following calls: 2000 / 600 = 3.33 -> 4 calls.
for chunk_max_value in range(
0,
len(list(jobs_found.keys())),
chunk_size_for_search
):
chunked_list_of_jobs.append(
list(
jobs_found.keys()
)[chunk_max_value:chunk_max_value + chunk_size_for_search]
)
if results:
hits.append(results[0])

if not results:
return jobs_found
# We will filter depending of the field we receive
# in the kwargs
def append_exists_field_to_query(field: str):
query_body['query']['bool']['must'][1]['bool']['should'].append(
{
"exists": {
"field": field
}
}
)

# We will select just the field we receive
# in the kwargs
def append_get_specific_field(field: str):
(query_body['aggs']['group_by_job_name']['aggs']
['last_build']['top_hits']['_source'].append(
f"{field}"
))

if 'topology' in kwargs:
append_exists_field_to_query('topology')
append_get_specific_field('topology')
ip_version_argument = None
if 'ip_version' in kwargs:
ip_version_argument = kwargs.get('ip_version').value
append_exists_field_to_query('ip_version')
append_get_specific_field('ip_version')
dvr_argument = None
if 'dvr' in kwargs:
dvr_argument = kwargs.get('dvr').value
append_exists_field_to_query('dvr')
append_get_specific_field('dvr')
release_argument = None
if 'release' in kwargs:
release_argument = kwargs.get('release').value
network_argument = None
if 'network_backend' in kwargs:
network_argument = kwargs.get('network_backend').value
append_exists_field_to_query('network_backend')
append_get_specific_field('network_backend')
storage_argument = None
if 'storage_backend' in kwargs:
storage_argument = kwargs.get('storage_backend').value
append_exists_field_to_query('storage_backend')
append_get_specific_field('storage_backend')
if 'osp_release' in kwargs:
storage_argument = kwargs.get('osp_release').value
append_exists_field_to_query('osp_release')
append_get_specific_field('osp_release')

hits_info = {}
for jobs_list in chunked_list_of_jobs:
for job in jobs_list:
query_body['query']['bool']['must'][0]['bool']['should'] \
.append(
{
"match": {
"job_name.keyword": f"{job}"
}
}
)

results = self.__query_get_hits(
query=query_body,
index='logstash_jenkins'
)
for result in results:
hits_info[
result['key']
] = result['last_build']['hits']['hits'][0]
query_body['query']['bool']['must'][0]['bool']['should'].clear()

job_objects = {}
for hit in hits:
job_name = hit['_source']['job_name']
job_url = re.compile(r"(.*)/\d").search(
hit['_source']['build_url']
).group(1)

# If the key exists assign the value otherwise assign unknown
topology = hit['_source'].get(
"topology", "unknown")
network_backend = hit['_source'].get(
"network_backend", "unknown")
ip_version = hit['_source'].get(
"ip_version", "unknown")
storage_backend = hit['_source'].get(
"storage_backend", "unknown")
dvr = hit['_source'].get(
"dvr", "unknown")
osp_release = hit['_source'].get(
"osp_release", "unknown")

if ip_version != 'unknown':
for job_name in jobs_found.keys():

job_source_data = {}
if job_name in hits_info:
job_source_data = hits_info[job_name]['_source']

job_url = jobs_found[job_name].url
# If data does not exist in the source we
# don't wanna display it
topology = job_source_data.get(
"topology", "")
network_backend = job_source_data.get(
"network_backend", "")
ip_version = job_source_data.get(
"ip_version", "")
storage_backend = job_source_data.get(
"storage_backend", "")
dvr = job_source_data.get(
"dvr", "")
osp_release = job_source_data.get(
"osp_release", "")

if ip_version != '':
matches = IP_PATTERN.search(ip_version)
ip_version = matches.group(1)

Expand Down
18 changes: 16 additions & 2 deletions cibyl/sources/elasticsearch/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ def get_jobs(self: object, **kwargs: Argument) -> list:
job_objects[job_name] = Job(name=job_name, url=url)
return AttributeDictValue("jobs", attr_type=Job, value=job_objects)

def __query_get_hits(self: object, query: dict, index: str = '*') -> list:
def __query_get_hits(self: object,
query: dict,
index: str = '*') -> list:
"""Perform the search query to ElasticSearch
and return all the hits
Expand All @@ -126,17 +128,29 @@ def __query_get_hits(self: object, query: dict, index: str = '*') -> list:
try:
LOG.debug("Using the following query: {}"
.format(str(query).replace("'", '"')))
# https://github.com/elastic/elasticsearch-py/issues/91
# For aggregations we should use the search method of the client
if 'aggs' in query:
results = self.es_client.search(
index=index,
body=query,
size=10000,
)
aggregation_key = list(results['aggregations'].keys())[0]
buckets = results['aggregations'][aggregation_key]['buckets']
return buckets
# For normal queries we can use the scan helper
hits = [item for item in scan(
self.es_client,
index=index,
query=query,
size=10000
)]
return hits
except Exception as exception:
raise ElasticSearchError(
"Error getting the results."
) from exception
return hits

@speed_index({'base': 2})
def get_builds(self: object, **kwargs: Argument):
Expand Down
56 changes: 36 additions & 20 deletions tests/unit/sources/elasticsearch/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,31 +354,47 @@ def setUp(self) -> None:
}
]

# This is an aggregation + query results
self.tests_hits = [
{
'_source': {
'job_name': 'test',
'job_url': 'http://domain.tld/test/',
'build_result': 'SUCCESS',
'build_id': '1',
'build_num': '1',
'test_name': 'it_is_just_a_test',
'time_duration': '720',
'test_status': 'SUCCESS',
'test_time': '720',
'job_url': 'http://domain.tld/test/'
},
'key': 'test',
'last_build': {
'hits': {
'hits': [
{

'_source': {
'job_name': 'test',
'job_url': 'http://domain.tld/test/',
'ip_version': 'ipv4',
}
}
]
}
}
},
{
'_source': {
'job_name': 'test2',
'job_url': 'http://domain.tld/test2/',
'build_result': 'FAIL',
'build_id': '2',
'build_num': '2',
'test_name': 'it_is_just_a_test2',
'time_duration': '0.0001_bad_parsed',
'test_status': 'FAIL',
'test_time': '0.0001_bad_parsed',
'job_url': 'http://domain.tld/test2/'
},
'key': 'test',
'last_build': {
'hits': {
'hits': [
{
'_source': {
'job_name': 'test2',
'job_url': 'http://domain.tld/test2/',
'ip_version': 'ipv4',
}
}
]
}
}
}
]
Expand All @@ -388,7 +404,7 @@ def test_get_deployment(self: object, mock_query_hits: object) -> None:
"""Tests that the internal logic from
:meth:`ElasticSearch.get_deployment` is correct.
"""
mock_query_hits.return_value = self.build_hits
mock_query_hits.return_value = self.tests_hits

jobs_argument = Mock()
jobs_argument.value = ['test']
Expand All @@ -403,15 +419,15 @@ def test_get_deployment(self: object, mock_query_hits: object) -> None:
ip_version=ip_address_kwargs)
deployment = jobs['test'].deployment.value
self.assertEqual(deployment.ip_version.value, '4')
self.assertEqual(deployment.topology.value, 'unknown')
self.assertEqual(deployment.topology.value, '')

@patch.object(ElasticSearch, '_ElasticSearch__query_get_hits')
def test_deployment_filtering(self: object,
mock_query_hits: object) -> None:
"""Tests that the internal logic from :meth:`ElasticSearch.get_jobs`
is correct.
"""
mock_query_hits.return_value = self.build_hits
mock_query_hits.return_value = self.tests_hits

jobs_argument = Mock()
jobs_argument.value = ['test']
Expand All @@ -427,4 +443,4 @@ def test_deployment_filtering(self: object,

deployment = builds['test'].deployment.value
self.assertEqual(deployment.ip_version.value, '4')
self.assertEqual(deployment.topology.value, 'unknown')
self.assertEqual(deployment.topology.value, '')

0 comments on commit 84b8f7f

Please sign in to comment.