diff --git a/.gitignore b/.gitignore index b490b1ae..2e480319 100644 --- a/.gitignore +++ b/.gitignore @@ -55,3 +55,5 @@ tags .mypy_cache/ tmp* luigi/*.pid +*.pid +elasticsearch.yml diff --git a/README.md b/README.md index 35983ca2..b55724cd 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,7 @@ This section is purely informative, you can skip to [Run AWS Scripts](#run-aws-s * Python 3.6 * Apache Spark 2.2.0 * Vowpal Wabbit 8.1.1 +* Elastic Search 5.6.X (Not 6.X) * CUDA and Nvidia drivers if using a GPU instance * lz4 * All python packages in `packer/requirements.txt` @@ -186,10 +187,15 @@ To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLeve 17/07/25 10:04:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address ``` -2. Install ElasticSearch +2. Install ElasticSearch 5.6 + +Install version 5.6.X, do not use 6.X. Also be sure that the directory `bin/` within the extracted files is in your +`$PATH` as it contains the necessary binary `elasticsearch`. https://www.elastic.co/guide/en/elasticsearch/reference/current/install-elasticsearch.html +https://www.elastic.co/downloads/past-releases/elasticsearch-5-6-8 + 3. Install Python packages ``` diff --git a/cli.py b/cli.py index ba0c987b..0e406eac 100755 --- a/cli.py +++ b/cli.py @@ -13,6 +13,7 @@ from jinja2 import Environment, PackageLoader from qanta import qlogging +from qanta.guesser.elasticsearch import create_es_config, start_elasticsearch, stop_elasticsearch from qanta.util.environment import ENVIRONMENT from qanta.datasets.quiz_bowl import QuestionDatabase, Question, QB_QUESTION_DB from qanta.guesser.abstract import AbstractGuesser @@ -21,7 +22,7 @@ from qanta.update_db import write_answer_map, merge_answer_mapping -log = qlogging.get(__name__) +log = qlogging.get('cli') CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @@ -177,7 +178,9 @@ def generate_guesser_slurm(slurm_config_file, task, output_dir): enabled_guessers = list(AbstractGuesser.list_enabled_guessers()) for i, gs in enumerate(enabled_guessers): - if gs.guesser_class in slurm_config: + if gs.guesser_class == 'ElasticSearchGuesser': + raise ValueError('ElasticSearchGuesser is not compatible with slurm') + elif gs.guesser_class in slurm_config: guesser_slurm_config = slurm_config[gs.guesser_class] else: guesser_slurm_config = None @@ -217,5 +220,25 @@ def generate_guesser_slurm(slurm_config_file, task, output_dir): f.write(master_script) +@main.command() +@click.option('--generate-config/--no-generate-config', default=True, is_flag=True) +@click.option('--config-dir', default='.') +@click.option('--pid-file', default='elasticsearch.pid') +@click.argument('command', type=click.Choice(['start', 'stop', 'configure'])) +def elasticsearch(generate_config, config_dir, pid_file, command): + if generate_config: + create_es_config(path.join(config_dir, 'elasticsearch.yml')) + + if command == 'configure': + return + + if command == 'start': + start_elasticsearch(config_dir, pid_file) + elif command == 'stop': + stop_elasticsearch(pid_file) + + + + if __name__ == '__main__': main() diff --git a/log4j2.properties b/log4j2.properties new file mode 100644 index 00000000..f344d0ae --- /dev/null +++ b/log4j2.properties @@ -0,0 +1,74 @@ +status = error + +# log action execution errors for easier debugging +logger.action.name = org.elasticsearch.action +logger.action.level = debug + +appender.console.type = Console +appender.console.name = console +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] %marker%m%n + +appender.rolling.type = RollingFile +appender.rolling.name = rolling +appender.rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}.log +appender.rolling.layout.type = PatternLayout +appender.rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] %marker%.-10000m%n +appender.rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}-%d{yyyy-MM-dd}.log +appender.rolling.policies.type = Policies +appender.rolling.policies.time.type = TimeBasedTriggeringPolicy +appender.rolling.policies.time.interval = 1 +appender.rolling.policies.time.modulate = true + +rootLogger.level = info +rootLogger.appenderRef.console.ref = console +rootLogger.appenderRef.rolling.ref = rolling + +appender.deprecation_rolling.type = RollingFile +appender.deprecation_rolling.name = deprecation_rolling +appender.deprecation_rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_deprecation.log +appender.deprecation_rolling.layout.type = PatternLayout +appender.deprecation_rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c{1.}] %marker%.-10000m%n +appender.deprecation_rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_deprecation-%i.log.gz +appender.deprecation_rolling.policies.type = Policies +appender.deprecation_rolling.policies.size.type = SizeBasedTriggeringPolicy +appender.deprecation_rolling.policies.size.size = 1GB +appender.deprecation_rolling.strategy.type = DefaultRolloverStrategy +appender.deprecation_rolling.strategy.max = 4 + +logger.deprecation.name = org.elasticsearch.deprecation +logger.deprecation.level = warn +logger.deprecation.appenderRef.deprecation_rolling.ref = deprecation_rolling +logger.deprecation.additivity = false + +appender.index_search_slowlog_rolling.type = RollingFile +appender.index_search_slowlog_rolling.name = index_search_slowlog_rolling +appender.index_search_slowlog_rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_index_search_slowlog.log +appender.index_search_slowlog_rolling.layout.type = PatternLayout +appender.index_search_slowlog_rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %marker%.-10000m%n +appender.index_search_slowlog_rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_index_search_slowlog-%d{yyyy-MM-dd}.log +appender.index_search_slowlog_rolling.policies.type = Policies +appender.index_search_slowlog_rolling.policies.time.type = TimeBasedTriggeringPolicy +appender.index_search_slowlog_rolling.policies.time.interval = 1 +appender.index_search_slowlog_rolling.policies.time.modulate = true + +logger.index_search_slowlog_rolling.name = index.search.slowlog +logger.index_search_slowlog_rolling.level = trace +logger.index_search_slowlog_rolling.appenderRef.index_search_slowlog_rolling.ref = index_search_slowlog_rolling +logger.index_search_slowlog_rolling.additivity = false + +appender.index_indexing_slowlog_rolling.type = RollingFile +appender.index_indexing_slowlog_rolling.name = index_indexing_slowlog_rolling +appender.index_indexing_slowlog_rolling.fileName = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_index_indexing_slowlog.log +appender.index_indexing_slowlog_rolling.layout.type = PatternLayout +appender.index_indexing_slowlog_rolling.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %marker%.-10000m%n +appender.index_indexing_slowlog_rolling.filePattern = ${sys:es.logs.base_path}${sys:file.separator}${sys:es.logs.cluster_name}_index_indexing_slowlog-%d{yyyy-MM-dd}.log +appender.index_indexing_slowlog_rolling.policies.type = Policies +appender.index_indexing_slowlog_rolling.policies.time.type = TimeBasedTriggeringPolicy +appender.index_indexing_slowlog_rolling.policies.time.interval = 1 +appender.index_indexing_slowlog_rolling.policies.time.modulate = true + +logger.index_indexing_slowlog.name = index.indexing.slowlog.index +logger.index_indexing_slowlog.level = trace +logger.index_indexing_slowlog.appenderRef.index_indexing_slowlog_rolling.ref = index_indexing_slowlog_rolling +logger.index_indexing_slowlog.additivity = false diff --git a/qanta-hyper.yaml b/qanta-hyper.yaml index fb9d66ab..f92525ef 100644 --- a/qanta-hyper.yaml +++ b/qanta-hyper.yaml @@ -1,7 +1,10 @@ parameters: - qanta.guesser.dan.DanGuesser: - random_seed: [0, 1, 2, 3] + qanta.guesser.elasticsearch.ElasticSearchGuesser: enabled: [true] + use_wiki: [true, false] + #qanta.guesser.dan.DanGuesser: + # random_seed: [0, 1, 2, 3] + # enabled: [true] #qanta.guesser.vw.VWGuesser: # random_seed: [0, 1, 2, 3, 4] diff --git a/qanta/guesser/elasticsearch.py b/qanta/guesser/elasticsearch.py index f37b7356..f11add42 100644 --- a/qanta/guesser/elasticsearch.py +++ b/qanta/guesser/elasticsearch.py @@ -1,4 +1,5 @@ from typing import List, Optional, Dict +import subprocess import os import pickle @@ -7,19 +8,49 @@ import elasticsearch import progressbar from nltk.tokenize import word_tokenize +from jinja2 import Environment, PackageLoader from qanta.wikipedia.cached_wikipedia import Wikipedia from qanta.datasets.abstract import QuestionText -from qanta.datasets.quiz_bowl import QuizBowlDataset from qanta.guesser.abstract import AbstractGuesser from qanta.spark import create_spark_context from qanta.config import conf +from qanta.util.io import get_tmp_dir, safe_path from qanta import qlogging log = qlogging.get(__name__) +ES_PARAMS = 'es_params.pickle' connections.create_connection(hosts=['localhost']) -INDEX_NAME = 'qb' + + +def create_es_config(output_path, host='localhost', port=9200, tmp_dir=None): + if tmp_dir is None: + tmp_dir = get_tmp_dir() + data_dir = safe_path(os.path.join(tmp_dir, 'elasticsearch/data/')) + log_dir = safe_path(os.path.join(tmp_dir, 'elasticsearch/log/')) + env = Environment(loader=PackageLoader('qanta', 'templates')) + template = env.get_template('elasticsearch.yml') + config_content = template.render({ + 'host': host, + 'port': port, + 'log_dir': log_dir, + 'data_dir': data_dir + }) + with open(output_path, 'w') as f: + f.write(config_content) + + +def start_elasticsearch(config_dir, pid_file): + subprocess.run( + ['elasticsearch', '-d', '-p', pid_file, f'-Epath.conf={config_dir}'] + ) + + +def stop_elasticsearch(pid_file): + with open(pid_file) as f: + pid = int(f.read()) + subprocess.run(['kill', str(pid)]) class Answer(DocType): @@ -27,33 +58,30 @@ class Answer(DocType): wiki_content = Text() qb_content = Text() - class Meta: - index = INDEX_NAME - class ElasticSearchIndex: - @staticmethod - def delete(): + def __init__(self, name='qb'): + self.name = name + + def delete(self): try: - Index(INDEX_NAME).delete() + Index(self.name).delete() except elasticsearch.exceptions.NotFoundError: - log.info('Could not delete non-existent index, creating new index...') + log.info('Could not delete non-existent index.') - @staticmethod - def exists(): - return Index(INDEX_NAME).exists() + def exists(self): + return Index(self.name).exists() - @staticmethod - def build_large_docs(documents: Dict[str, str], use_wiki=True, use_qb=True, rebuild_index=False): + def build_large_docs(self, documents: Dict[str, str], use_wiki=True, use_qb=True, rebuild_index=False): if rebuild_index or bool(int(os.getenv('QB_REBUILD_INDEX', 0))): - log.info('Deleting index: {}'.format(INDEX_NAME)) - ElasticSearchIndex.delete() + log.info(f'Deleting index: {self.name}') + self.delete() - if ElasticSearchIndex.exists(): - log.info('Index {} exists'.format(INDEX_NAME)) + if self.exists(): + log.info(f'Index {self.name} exists') else: - log.info('Index {} does not exist'.format(INDEX_NAME)) - Answer.init() + log.info(f'Index {self.name} does not exist') + Answer.init(index=self.name) wiki_lookup = Wikipedia() log.info('Indexing questions and corresponding wikipedia pages as large docs...') bar = progressbar.ProgressBar() @@ -72,25 +100,24 @@ def build_large_docs(documents: Dict[str, str], use_wiki=True, use_qb=True, rebu page=page, wiki_content=wiki_content, qb_content=qb_content ) - answer.save() + answer.save(index=self.name) - @staticmethod - def build_many_docs(pages, documents, use_wiki=True, use_qb=True, rebuild_index=False): + def build_many_docs(self, pages, documents, use_wiki=True, use_qb=True, rebuild_index=False): if rebuild_index or bool(int(os.getenv('QB_REBUILD_INDEX', 0))): - log.info('Deleting index: {}'.format(INDEX_NAME)) - ElasticSearchIndex.delete() + log.info(f'Deleting index: {self.name}') + self.delete() - if ElasticSearchIndex.exists(): - log.info('Index {} exists'.format(INDEX_NAME)) + if self.exists(): + log.info(f'Index {self.name} exists') else: - log.info('Index {} does not exist'.format(INDEX_NAME)) - Answer.init() + log.info(f'Index {self.name} does not exist') + Answer.init(index=self.name) log.info('Indexing questions and corresponding pages as many docs...') if use_qb: log.info('Indexing questions...') bar = progressbar.ProgressBar() for page, doc in bar(documents): - Answer(page=page, qb_content=doc).save() + Answer(page=page, qb_content=doc).save(index=self.name) if use_wiki: log.info('Indexing wikipedia...') @@ -102,12 +129,14 @@ def build_many_docs(pages, documents, use_wiki=True, use_qb=True, rebuild_index= for i in range(0, len(content), 200): chunked_content = content[i:i + 200] if len(chunked_content) > 0: - Answer(page=page, wiki_content=' '.join(chunked_content)).save() + Answer(page=page, wiki_content=' '.join(chunked_content)).save(index=self.name) - @staticmethod - def search(text: str, max_n_guesses: int, + def search(self, text: str, max_n_guesses: int, normalize_score_by_length=False, wiki_boost=1, qb_boost=1): + if not self.exists(): + raise ValueError('The index does not exist, you must create it before searching') + if wiki_boost != 1: wiki_field = 'wiki_content^{}'.format(wiki_boost) else: @@ -118,7 +147,7 @@ def search(text: str, max_n_guesses: int, else: qb_field = 'qb_content' - s = Search(index='qb')[0:max_n_guesses].query( + s = Search(index=self.name)[0:max_n_guesses].query( 'multi_match', query=text, fields=[wiki_field, qb_field]) results = s.execute() guess_set = set() @@ -135,14 +164,11 @@ def search(text: str, max_n_guesses: int, guesses.append((r.page, r.meta.score / query_length)) return guesses -ES_PARAMS = 'es_params.pickle' -es_index = ElasticSearchIndex() - class ElasticSearchGuesser(AbstractGuesser): - def __init__(self): - super().__init__() - guesser_conf = conf['guessers']['ElasticSearch'] + def __init__(self, config_num): + super().__init__(config_num) + guesser_conf = conf['guessers']['qanta.guesser.elasticsearch.ElasticSearchGuesser'][self.config_num] self.n_cores = guesser_conf['n_cores'] self.use_wiki = guesser_conf['use_wiki'] self.use_qb = guesser_conf['use_qb'] @@ -150,15 +176,10 @@ def __init__(self): self.normalize_score_by_length = guesser_conf['normalize_score_by_length'] self.qb_boost = guesser_conf['qb_boost'] self.wiki_boost = guesser_conf['wiki_boost'] - self.kuro_trial_id = None - - def qb_dataset(self): - return QuizBowlDataset(guesser_train=True) + self.index = ElasticSearchIndex(name=f'qb_{self.config_num}') def parameters(self): - params = conf['guessers']['ElasticSearch'].copy() - params['kuro_trial_id'] = self.kuro_trial_id - return params + return conf['guessers']['qanta.guesser.elasticsearch.ElasticSearchGuesser'][self.config_num] def train(self, training_data): if self.many_docs: @@ -167,9 +188,9 @@ def train(self, training_data): for sentences, page in zip(training_data[0], training_data[1]): paragraph = ' '.join(sentences) documents.append((page, paragraph)) - ElasticSearchIndex.build_many_docs( + self.index.build_many_docs( pages, documents, - use_qb=self.use_qb, use_wiki=self.use_wiki + use_qb=self.use_qb, use_wiki=self.use_wiki, rebuild_index=True ) else: documents = {} @@ -180,33 +201,21 @@ def train(self, training_data): else: documents[page] = paragraph - ElasticSearchIndex.build_large_docs( + self.index.build_large_docs( documents, use_qb=self.use_qb, - use_wiki=self.use_wiki + use_wiki=self.use_wiki, + rebuild_index=True ) - try: - if bool(os.environ.get('KURO_DISABLE', False)): - raise ModuleNotFoundError - import socket - from kuro import Worker - worker = Worker(socket.gethostname()) - experiment = worker.experiment( - 'guesser', 'ElasticSearch', hyper_parameters=conf['guessers']['ElasticSearch'], - n_trials=5 - ) - trial = experiment.trial() - if trial is not None: - self.kuro_trial_id = trial.id - except ModuleNotFoundError: - trial = None def guess(self, questions: List[QuestionText], max_n_guesses: Optional[int]): def es_search(query): - return es_index.search(query, max_n_guesses, - normalize_score_by_length=self.normalize_score_by_length, - wiki_boost=self.wiki_boost, qb_boost=self.qb_boost) + return self.index.search( + query, max_n_guesses, + normalize_score_by_length=self.normalize_score_by_length, + wiki_boost=self.wiki_boost, qb_boost=self.qb_boost + ) if len(questions) > 1: sc = create_spark_context(configs=[('spark.executor.cores', self.n_cores), ('spark.executor.memory', '20g')]) @@ -218,26 +227,33 @@ def es_search(query): @classmethod def targets(cls): - return [] + return [ES_PARAMS] @classmethod def load(cls, directory: str): with open(os.path.join(directory, ES_PARAMS), 'rb') as f: params = pickle.load(f) - guesser = ElasticSearchGuesser() + guesser = ElasticSearchGuesser(params['config_num']) + guesser.n_cores = params['n_cores'] guesser.use_wiki = params['use_wiki'] guesser.use_qb = params['use_qb'] guesser.many_docs = params['many_docs'] guesser.normalize_score_by_length = params['normalize_score_by_length'] + guesser.qb_boost = params['qb_boost'] + guesser.wiki_boost = params['wiki_boost'] return guesser def save(self, directory: str): with open(os.path.join(directory, ES_PARAMS), 'wb') as f: pickle.dump({ + 'n_cores': self.n_cores, 'use_wiki': self.use_wiki, 'use_qb': self.use_qb, 'many_docs': self.many_docs, - 'normalize_score_by_length': self.normalize_score_by_length + 'normalize_score_by_length': self.normalize_score_by_length, + 'qb_boost': self.qb_boost, + 'wiki_boost': self.wiki_boost, + 'config_num': self.config_num }, f) def web_api(self, host='0.0.0.0', port=5000, debug=False): diff --git a/qanta/templates/elasticsearch.yml b/qanta/templates/elasticsearch.yml new file mode 100644 index 00000000..09e375de --- /dev/null +++ b/qanta/templates/elasticsearch.yml @@ -0,0 +1,9 @@ +path: + logs: {{ log_dir }} + data: {{ data_dir }} + +network.host: {{ host }} +http.port: {{ port }} + +cluster.name: qanta_cluster +node.name: qanta_node