Skip to content

Commit

Permalink
Merge 7e831e1 into 2757f74
Browse files Browse the repository at this point in the history
  • Loading branch information
valeriocos committed Nov 14, 2018
2 parents 2757f74 + 7e831e1 commit e8d027f
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 129 deletions.
64 changes: 63 additions & 1 deletion grimoire_elk/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

logger = logging.getLogger(__name__)

HEADER_JSON = {"Content-Type": "application/json"}


class ElasticConnectException(Exception):
message = "Can't connect to ElasticSearch"
Expand Down Expand Up @@ -87,7 +89,7 @@ def _check_instance(url, insecure):
raise ElasticConnectException

def __init__(self, url, index, mappings=None, clean=False,
insecure=True, analyzers=None):
insecure=True, analyzers=None, aliases=None):
''' clean: remove already existing index
insecure: support https with invalid certificates
'''
Expand All @@ -101,6 +103,8 @@ def __init__(self, url, index, mappings=None, clean=False,

# Valid index for elastic
self.index = self.safe_index(index)
self.aliases = aliases

self.index_url = self.url + "/" + self.index
self.wait_bulk_seconds = 2 # time to wait to complete a bulk operation

Expand Down Expand Up @@ -131,6 +135,10 @@ def __init__(self, url, index, mappings=None, clean=False,
map_dict = mappings.get_elastic_mappings(es_major=self.major)
self.create_mappings(map_dict)

if aliases:
for alias in aliases:
self.add_alias(alias)

def safe_put_bulk(self, url, bulk_json):
""" Bulk PUT controlling unicode issues """

Expand Down Expand Up @@ -167,6 +175,60 @@ def safe_put_bulk(self, url, bulk_json):
logger.info("%i items uploaded to ES (%s)", inserted_items, url)
return inserted_items

def list_aliases(self):
"""List aliases linked to the index"""

# check alias doesn't exist
r = self.requests.get(self.index_url + "/_alias", headers=HEADER_JSON, verify=False)
try:
r.raise_for_status()
except requests.exceptions.HTTPError as ex:
logger.warning("Something went wrong when retrieving aliases on %s.",
self.index_url)
logger.warning(ex)
return

aliases = r.json()[self.index]['aliases']
return aliases

def add_alias(self, alias):
"""
Add an alias to the index set in the elastic obj
:param alias: alias to add
:returns: None
"""
aliases = self.list_aliases()
if alias in aliases:
logger.warning("Alias %s already exists on %s.", alias, self.index_url)
return

# add alias
alias_data = """
{
"actions": [
{
"add": {
"index": "%s",
"alias": "%s"
}
}
]
}
""" % (self.index, alias)

r = self.requests.post(self.url + "/_aliases", headers=HEADER_JSON, verify=False, data=alias_data)
try:
r.raise_for_status()
except requests.exceptions.HTTPError as ex:
logger.warning("Something went wrong when adding an alias on %s. Alias not set.",
self.index_url)
logger.warning(ex)
return

logger.info("Alias %s created on %s.", alias, self.index_url)

def bulk_upload(self, items, field_id):
"""Upload in controlled packs items to ES using bulk API"""

Expand Down
12 changes: 6 additions & 6 deletions grimoire_elk/elk.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ def feed_backend_arthur(backend_name, backend_params):


def feed_backend(url, clean, fetch_archive, backend_name, backend_params,
es_index=None, es_index_enrich=None, project=None, arthur=False):
es_index=None, es_index_enrich=None, project=None, arthur=False,
es_aliases=None):
""" Feed Ocean with backend data """

backend = None
Expand Down Expand Up @@ -144,7 +145,7 @@ def feed_backend(url, clean, fetch_archive, backend_name, backend_params,
backend = backend_cmd.backend

ocean_backend = connector[1](backend, fetch_archive=fetch_archive, project=project)
elastic_ocean = get_elastic(url, es_index, clean, ocean_backend)
elastic_ocean = get_elastic(url, es_index, clean, ocean_backend, es_aliases)
ocean_backend.set_elastic(elastic_ocean)

if fetch_archive:
Expand Down Expand Up @@ -510,8 +511,7 @@ def enrich_backend(url, clean, backend_name, backend_params,
author_id=None, author_uuid=None, filter_raw=None,
filters_raw_prefix=None, jenkins_rename_file=None,
unaffiliated_group=None, pair_programming=False,
node_regex=False,
studies_args=None):
node_regex=False, studies_args=None, es_enrich_aliases=None):
""" Enrich Ocean index """

backend = None
Expand Down Expand Up @@ -549,9 +549,9 @@ def enrich_backend(url, clean, backend_name, backend_params,
db_user, db_password, db_host)
enrich_backend.set_params(backend_params)
if url_enrich:
elastic_enrich = get_elastic(url_enrich, enrich_index, clean, enrich_backend)
elastic_enrich = get_elastic(url_enrich, enrich_index, clean, enrich_backend, es_enrich_aliases)
else:
elastic_enrich = get_elastic(url, enrich_index, clean, enrich_backend)
elastic_enrich = get_elastic(url, enrich_index, clean, enrich_backend, es_enrich_aliases)
enrich_backend.set_elastic(elastic_enrich)
if github_token and backend_name == "git":
enrich_backend.set_github_token(github_token)
Expand Down
57 changes: 5 additions & 52 deletions grimoire_elk/enriched/enrich.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@

SH_UNKNOWN_VALUE = 'Unknown'
DEMOGRAPHICS_ALIAS = 'demographics'
ONION_ALIAS = 'all_onion'

HEADER_JSON = {"Content-Type": "application/json"}

Expand Down Expand Up @@ -928,9 +929,9 @@ def enrich_onion(self, enrich_backend, in_index, out_index, data_source,

# Create alias if output index exists (index is always created from scratch, so
# alias need to be created each time)
if out_conn.exists() and not out_conn.exists_alias(out_index, "all_onion"):
logger.info("[Onion] Creating alias: all_onion")
out_conn.create_alias('all_onion')
if out_conn.exists() and not out_conn.exists_alias(out_index, ONION_ALIAS):
logger.info("[Onion] Creating alias: %s", ONION_ALIAS)
out_conn.create_alias(ONION_ALIAS)

logger.info("[Onion] This is the end.")

Expand Down Expand Up @@ -988,58 +989,10 @@ def enrich_demography(self, ocean_backend, enrich_backend, date_field="grimoire_
logger.error(ex)
return

self.add_alias(DEMOGRAPHICS_ALIAS)
self.elastic.add_alias(DEMOGRAPHICS_ALIAS)

logger.info("[Demography] End %s", self.elastic.index_url)

def add_alias(self, alias_name):
"""
Add an alias to the index set in the elastic obj
:param alias_name: name of the alias
:returns: None
"""
# check alias doesn't exist
r = self.requests.get(self.elastic.index_url + "/_alias", headers=HEADER_JSON, verify=False)
try:
r.raise_for_status()
except requests.exceptions.HTTPError as ex:
logger.warning("Something went wrong when retrieving aliases on %s. Alias not set.",
self.elastic.index_url)
logger.warning(ex)
return

aliases = r.json()[self.elastic.index]['aliases']
if alias_name in aliases:
logger.warning("Alias %s already exists on %s.", alias_name, self.elastic.index_url)
return

# add alias
alias_data = """
{
"actions": [
{
"add": {
"index": "%s",
"alias": "%s"
}
}
]
}
""" % (self.elastic.index, alias_name)

r = self.requests.post(self.elastic.url + "/_aliases", headers=HEADER_JSON, verify=False, data=alias_data)
try:
r.raise_for_status()
except requests.exceptions.HTTPError as ex:
logger.warning("Something went wrong when adding an alias on %s. Alias not set.",
self.elastic.index_url)
logger.warning(ex)
return

logger.info("Alias %s created on %s.", alias_name, self.elastic.index_url)

@staticmethod
def authors_min_max_dates(date_field, author_field="author_uuid"):
"""
Expand Down
9 changes: 5 additions & 4 deletions grimoire_elk/enriched/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
GITHUB = 'https://github.com/'
SH_GIT_COMMIT = 'github-commit'
DEMOGRAPHY_COMMIT_MIN_DATE = '1980-01-01'
AREAS_OF_CODE_ALIAS = 'git_areas_of_code'
logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -621,11 +622,11 @@ def enrich_areas_of_code(self, ocean_backend, enrich_backend, no_incremental=Fal

# Create alias if output index exists and alias does not
if out_conn.exists():
if not out_conn.exists_alias('git_areas_of_code'):
logger.info("[Areas of Code] Creating alias: git_areas_of_code")
out_conn.create_alias('git_areas_of_code')
if not out_conn.exists_alias(AREAS_OF_CODE_ALIAS):
logger.info("[Areas of Code] Creating alias: %s", AREAS_OF_CODE_ALIAS)
out_conn.create_alias(AREAS_OF_CODE_ALIAS)
else:
logger.info("[Areas of Code] Alias already exists: git_areas_of_code.")
logger.info("[Areas of Code] Alias already exists: %s.", AREAS_OF_CODE_ALIAS)

logger.info("[Areas of Code] End")

Expand Down
4 changes: 2 additions & 2 deletions grimoire_elk/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def get_connectors():
} # Will come from Registry


def get_elastic(url, es_index, clean=None, backend=None):
def get_elastic(url, es_index, clean=None, backend=None, es_aliases=None):

mapping = None

Expand All @@ -248,7 +248,7 @@ def get_elastic(url, es_index, clean=None, backend=None):
insecure = True
elastic = ElasticSearch(url=url, index=es_index, mappings=mapping,
clean=clean, insecure=insecure,
analyzers=analyzers)
analyzers=analyzers, aliases=es_aliases)

except ElasticConnectException:
logger.error("Can't connect to Elastic Search. Is it running?")
Expand Down

0 comments on commit e8d027f

Please sign in to comment.