Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/improve get deployment elastic #276

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
213 changes: 133 additions & 80 deletions cibyl/plugins/openstack/sources/elasticsearch.py
Expand Up @@ -15,7 +15,6 @@
"""

import logging
import re

from cibyl.models.attribute import AttributeDictValue
from cibyl.models.ci.base.job import Job
Expand All @@ -40,117 +39,171 @@ def get_deployment(self, **kwargs):
jobs_found = self.get_jobs(**kwargs)

query_body = {
# We don't want results in the main hits
# size 0 don't show it. We will have the results
# in the aggregations side
"size": 0,
"from": 0,
"query": {
"bool": {
"must": [
{
"bool": {
"must": []
}
},
{
"bool": {
"should": [
{
"exists": {
"field": "ip_version"
}
},
{
"exists": {
"field": "storage_backend"
}
},
{
"exists": {
"field": "network_backend"
}
},
"bool": {
"must": [
{
"exists": {
"field": "dvr"
}
"bool": {
"should": []
}
},
{
"exists": {
"field": "topology"
}
"bool": {
"should": [],
"minimum_should_match": 1
}
}
],
"minimum_should_match": 1
}
}
]
}
]
}
},
"size": 1,
"sort": [
{
"timestamp.keyword": {
"order": "desc"
# We should GROUP BY job names
"aggs": {
"group_by_job_name": {
"terms": {
"field": "job_name.keyword",
"size": 10000
},
"aggs": {
"last_build": {
# And take the first coincidence
"top_hits": {
"size": 1,
# Sorted by build_num to get the last info
"sort": [
{
"build_num": {
"order": "asc"
}
}
],
"_source": ['build_url']
}
}
}
}
]
}
}

results = []
hits = []
for job in jobs_found:
query_body['query']['bool']['must'][0]['bool']['must'] = {
"match": {
"job_name.keyword": f"{job}"
}
}
results = self.__query_get_hits(
query=query_body,
index='logstash_jenkins'
chunked_list_of_jobs = []
chunk_size_for_search = 400
# We can't send a giant query in the request to the elasticsearch
# for asking to all the jobs information. Instead of doing one
# query for job we create a list of jobs sublists and do calls
# divided by chunks. chunk_size_for_search quantity will be
# the size of every sublist. If we have 2000 jobs we will have
# the following calls: 2000 / 600 = 3.33 -> 4 calls.
for chunk_max_value in range(
0,
len(list(jobs_found.keys())),
chunk_size_for_search
):
chunked_list_of_jobs.append(
list(
jobs_found.keys()
)[chunk_max_value:chunk_max_value + chunk_size_for_search]
)
if results:
hits.append(results[0])

if not results:
return jobs_found
# We will filter depending of the field we receive
# in the kwargs
def append_exists_field_to_query(field: str):
query_body['query']['bool']['must'][1]['bool']['should'].append(
{
"exists": {
"field": field
}
}
)

# We will select just the field we receive
# in the kwargs
def append_get_specific_field(field: str):
(query_body['aggs']['group_by_job_name']['aggs']
['last_build']['top_hits']['_source'].append(
f"{field}"
))

if 'topology' in kwargs:
append_exists_field_to_query('topology')
append_get_specific_field('topology')
ip_version_argument = None
if 'ip_version' in kwargs:
ip_version_argument = kwargs.get('ip_version').value
append_exists_field_to_query('ip_version')
append_get_specific_field('ip_version')
dvr_argument = None
if 'dvr' in kwargs:
dvr_argument = kwargs.get('dvr').value
append_exists_field_to_query('dvr')
append_get_specific_field('dvr')
release_argument = None
if 'release' in kwargs:
release_argument = kwargs.get('release').value
network_argument = None
if 'network_backend' in kwargs:
network_argument = kwargs.get('network_backend').value
append_exists_field_to_query('network_backend')
append_get_specific_field('network_backend')
storage_argument = None
if 'storage_backend' in kwargs:
storage_argument = kwargs.get('storage_backend').value
append_exists_field_to_query('storage_backend')
append_get_specific_field('storage_backend')
if 'osp_release' in kwargs:
storage_argument = kwargs.get('osp_release').value
append_exists_field_to_query('osp_release')
append_get_specific_field('osp_release')

hits_info = {}
for jobs_list in chunked_list_of_jobs:
for job in jobs_list:
query_body['query']['bool']['must'][0]['bool']['should'] \
.append(
{
"match": {
"job_name.keyword": f"{job}"
}
}
)

results = self.__query_get_hits(
query=query_body,
index='logstash_jenkins'
)
for result in results:
hits_info[
result['key']
] = result['last_build']['hits']['hits'][0]
query_body['query']['bool']['must'][0]['bool']['should'].clear()

job_objects = {}
for hit in hits:
job_name = hit['_source']['job_name']
job_url = re.compile(r"(.*)/\d").search(
hit['_source']['build_url']
).group(1)

# If the key exists assign the value otherwise assign unknown
topology = hit['_source'].get(
"topology", "unknown")
network_backend = hit['_source'].get(
"network_backend", "unknown")
ip_version = hit['_source'].get(
"ip_version", "unknown")
storage_backend = hit['_source'].get(
"storage_backend", "unknown")
dvr = hit['_source'].get(
"dvr", "unknown")
osp_release = hit['_source'].get(
"osp_release", "unknown")

if ip_version != 'unknown':
for job_name in jobs_found.keys():

job_source_data = {}
if job_name in hits_info:
job_source_data = hits_info[job_name]['_source']

job_url = jobs_found[job_name].url
# If data does not exist in the source we
# don't wanna display it
topology = job_source_data.get(
"topology", "")
network_backend = job_source_data.get(
"network_backend", "")
ip_version = job_source_data.get(
"ip_version", "")
storage_backend = job_source_data.get(
"storage_backend", "")
dvr = job_source_data.get(
"dvr", "")
osp_release = job_source_data.get(
"osp_release", "")

if ip_version != '':
matches = IP_PATTERN.search(ip_version)
ip_version = matches.group(1)

Expand Down
18 changes: 16 additions & 2 deletions cibyl/sources/elasticsearch/api.py
Expand Up @@ -113,7 +113,9 @@ def get_jobs(self: object, **kwargs: Argument) -> list:
job_objects[job_name] = Job(name=job_name, url=url)
return AttributeDictValue("jobs", attr_type=Job, value=job_objects)

def __query_get_hits(self: object, query: dict, index: str = '*') -> list:
def __query_get_hits(self: object,
query: dict,
index: str = '*') -> list:
"""Perform the search query to ElasticSearch
and return all the hits

Expand All @@ -126,17 +128,29 @@ def __query_get_hits(self: object, query: dict, index: str = '*') -> list:
try:
LOG.debug("Using the following query: {}"
.format(str(query).replace("'", '"')))
# https://github.com/elastic/elasticsearch-py/issues/91
# For aggregations we should use the search method of the client
if 'aggs' in query:
results = self.es_client.search(
index=index,
body=query,
size=10000,
)
aggregation_key = list(results['aggregations'].keys())[0]
buckets = results['aggregations'][aggregation_key]['buckets']
return buckets
# For normal queries we can use the scan helper
hits = [item for item in scan(
self.es_client,
index=index,
query=query,
size=10000
)]
return hits
except Exception as exception:
raise ElasticSearchError(
"Error getting the results."
) from exception
return hits

@speed_index({'base': 2})
def get_builds(self: object, **kwargs: Argument):
Expand Down
56 changes: 36 additions & 20 deletions tests/unit/sources/elasticsearch/test_api.py
Expand Up @@ -354,31 +354,47 @@ def setUp(self) -> None:
}
]

# This is an aggregation + query results
self.tests_hits = [
{
'_source': {
'job_name': 'test',
'job_url': 'http://domain.tld/test/',
'build_result': 'SUCCESS',
'build_id': '1',
'build_num': '1',
'test_name': 'it_is_just_a_test',
'time_duration': '720',
'test_status': 'SUCCESS',
'test_time': '720',
'job_url': 'http://domain.tld/test/'
},
'key': 'test',
'last_build': {
'hits': {
'hits': [
{

'_source': {
'job_name': 'test',
'job_url': 'http://domain.tld/test/',
'ip_version': 'ipv4',
}
}
]
}
}
},
{
'_source': {
'job_name': 'test2',
'job_url': 'http://domain.tld/test2/',
'build_result': 'FAIL',
'build_id': '2',
'build_num': '2',
'test_name': 'it_is_just_a_test2',
'time_duration': '0.0001_bad_parsed',
'test_status': 'FAIL',
'test_time': '0.0001_bad_parsed',
'job_url': 'http://domain.tld/test2/'
},
'key': 'test',
'last_build': {
'hits': {
'hits': [
{
'_source': {
'job_name': 'test2',
'job_url': 'http://domain.tld/test2/',
'ip_version': 'ipv4',
}
}
]
}
}
}
]
Expand All @@ -388,7 +404,7 @@ def test_get_deployment(self: object, mock_query_hits: object) -> None:
"""Tests that the internal logic from
:meth:`ElasticSearch.get_deployment` is correct.
"""
mock_query_hits.return_value = self.build_hits
mock_query_hits.return_value = self.tests_hits

jobs_argument = Mock()
jobs_argument.value = ['test']
Expand All @@ -403,15 +419,15 @@ def test_get_deployment(self: object, mock_query_hits: object) -> None:
ip_version=ip_address_kwargs)
deployment = jobs['test'].deployment.value
self.assertEqual(deployment.ip_version.value, '4')
self.assertEqual(deployment.topology.value, 'unknown')
self.assertEqual(deployment.topology.value, '')

@patch.object(ElasticSearch, '_ElasticSearch__query_get_hits')
def test_deployment_filtering(self: object,
mock_query_hits: object) -> None:
"""Tests that the internal logic from :meth:`ElasticSearch.get_jobs`
is correct.
"""
mock_query_hits.return_value = self.build_hits
mock_query_hits.return_value = self.tests_hits

jobs_argument = Mock()
jobs_argument.value = ['test']
Expand All @@ -427,4 +443,4 @@ def test_deployment_filtering(self: object,

deployment = builds['test'].deployment.value
self.assertEqual(deployment.ip_version.value, '4')
self.assertEqual(deployment.topology.value, 'unknown')
self.assertEqual(deployment.topology.value, '')