From c328995717e0563de292c3236df3c10401222921 Mon Sep 17 00:00:00 2001 From: Josh Bryan Date: Thu, 25 Oct 2012 15:38:45 -0500 Subject: [PATCH] Updated to newest pyes and did some major refactoring and bug fixes --- agamemnon/factory.py | 4 +- agamemnon/plugins/elasticsearch.py | 106 +++++++------ agamemnon/tests/unit_tests.py | 247 +++++++++++++++++------------ requirements.txt | 2 +- 4 files changed, 206 insertions(+), 153 deletions(-) diff --git a/agamemnon/factory.py b/agamemnon/factory.py index f8221cf..4e55cc1 100644 --- a/agamemnon/factory.py +++ b/agamemnon/factory.py @@ -319,7 +319,6 @@ def create_node(self, type, key, args=None, reference=False): node = self.get_node(type, key) node.attributes.update(args) self.save_node(node) - self.delegate.on_modify(node) return node except NodeNotFoundException: #since node won't get created without args, we will include __id by default @@ -416,6 +415,9 @@ def save_node(self, node): self.remove(self.get_cf(RELATIONSHIP_CF), key, columns=['target__%s' % column for column in columns_to_remove]) + # update plugins + self.on_modify(node) + def get_node(self, type, key): try: values = self.get(type, key) diff --git a/agamemnon/plugins/elasticsearch.py b/agamemnon/plugins/elasticsearch.py index db385ce..9cc4868 100644 --- a/agamemnon/plugins/elasticsearch.py +++ b/agamemnon/plugins/elasticsearch.py @@ -4,71 +4,80 @@ from itertools import groupby from operator import itemgetter + class FullTextSearch(object): - def __init__(self,server,settings = None ): + def __init__(self, server, settings=None): self.conn = ES(server) self.indices = {} if settings: self.settings = settings else: - self.settings = { + self.settings = { 'index': { - 'analysis' : { - 'analyzer' : { - 'ngram_analyzer' : { - 'tokenizer' : 'keyword', - 'filter' : ['lowercase', 'filter_ngram'], - 'type' : 'custom' - } + 'analysis': { + 'analyzer': { + 'ngram_analyzer': { + 'tokenizer': 'keyword', + 'filter': ['lowercase', 'filter_ngram'], + 'type': 'custom' + } }, - 'filter' : { - 'filter_ngram' : { - 'type' : 'nGram', - 'max_gram' : 30, - 'min_gram' : 1 - } + 'filter': { + 'filter_ngram': { + 'type': 'nGram', + 'max_gram': 30, + 'min_gram': 1 + } } } } } - def search_index(self, type, index_names, query_string, num_results=None, fields="_all"): - q = query.WildcardQuery(fields ,query_string) - results = self.conn.search(query=q, indices=index_names, doc_types=type) - type_id_list = [(r['_type'] ,r['_id']) for r in results['hits']['hits'][0:num_results]] + def search_index_text(self, query_string, fields="_all", **args): + q = query.TextQuery(fields, query_string) + return self.search_index(q, **args) + + def search_index(self, query, indices=None, num_results=None, node_type=None): + results = self.conn.search( + query=query, indices=indices, doc_types=node_type) + meta_list = [r.get_meta() for r in results[0:num_results]] node_dict = {} # fetch nodes grouped by type to reduce number of db calls - for t, id_list in groupby(sorted(type_id_list), key=itemgetter(0)): - ids = [n[1] for n in id_list] + key = itemgetter('type') + for t, grouped_list in groupby(sorted(meta_list, key=key), key=key): + ids = [meta['id'] for meta in grouped_list] for node in self.datastore.get_nodes(t, ids): node_dict[(node.type, node.key)] = node # return nodes in original order - nodelist = [node_dict[key] for key in type_id_list] + nodelist = [node_dict[(meta['type'], meta['id'])] + for meta in meta_list] return nodelist def create_index(self, type, indexed_variables, index_name): - self.conn.delete_index_if_exists(index_name) - self.conn.create_index(index_name,self.settings) + self.conn.create_index_if_missing(index_name, self.settings) mapping = {} for arg in indexed_variables: - mapping[arg] = {'boost':1.0, - 'analyzer' : 'ngram_analyzer', + mapping[arg] = {'boost': 1.0, + 'analyzer': 'ngram_analyzer', 'type': 'string', 'term_vector': 'with_positions_offsets'} - index_settings = {'index_analyzer':'ngram_analyzer', - 'search_analyzer':'standard', - 'properties':mapping} - self.conn.put_mapping(str(type),index_settings,[index_name]) + index_settings = {'index_analyzer': 'ngram_analyzer', + 'search_analyzer': 'standard', + 'properties': mapping} + self.conn.put_mapping(str(type), index_settings, [index_name]) self.refresh_index_cache() self.populate_index(type, index_name) def refresh_index_cache(self): - self.indices = self.conn.get_mapping() + try: + self.indices = self.conn.get_mapping() + except exceptions.IndexMissingException: + self.indices = [] - def delete_index(self,type, index_name): + def delete_index(self, index_name): self.conn.delete_index_if_exists(index_name) self.refresh_index_cache() @@ -79,51 +88,54 @@ def populate_index(self, type, index_name): mapping = self.conn.get_mapping(type, index_name) for node in node_list: key = node.key - index_dict = self.populate_index_document(type, index_name, node.attributes,mapping) + index_dict = self.populate_index_document( + type, index_name, node.attributes, mapping) try: - self.conn.delete(index_name,type,key) + self.conn.delete(index_name, type, key) except exceptions.NotFoundException: pass - self.conn.index(index_dict,index_name,type,key) + self.conn.index(index_dict, index_name, type, key) self.conn.refresh([index_name]) - def on_create(self,node): + def on_create(self, node): type_indices = self.get_indices_of_type(node.type) for index_name in type_indices: - mapping = self.conn.get_mapping(node.type,index_name) - index_dict = self.populate_index_document(node.type,index_name,node.attributes,mapping) - self.conn.index(index_dict,index_name,node.type,node.key) + mapping = self.conn.get_mapping(node.type, index_name) + index_dict = self.populate_index_document( + node.type, index_name, node.attributes, mapping) + self.conn.index(index_dict, index_name, node.type, node.key) self.conn.refresh([index_name]) def on_delete(self, node): type_indices = self.get_indices_of_type(node.type) for index_name in type_indices: try: - self.conn.delete(index_name,node.type,node.key) + self.conn.delete(index_name, node.type, node.key) self.conn.refresh([index_name]) except exceptions.NotFoundException: pass - + def on_modify(self, node): type_indices = self.get_indices_of_type(node.type) for index_name in type_indices: - mapping = self.conn.get_mapping(node.type,index_name) - index_dict = self.populate_index_document(node.type,index_name,node.attributes,mapping) + mapping = self.conn.get_mapping(node.type, index_name) + index_dict = self.populate_index_document( + node.type, index_name, node.attributes, mapping) try: - self.conn.delete(index_name,node.type,node.key) - self.conn.index(index_dict,index_name,node.type,node.key) + self.conn.delete(index_name, node.type, node.key) + self.conn.index(index_dict, index_name, node.type, node.key) self.conn.refresh([index_name]) except exceptions.NotFoundException: pass - def get_indices_of_type(self,type): + def get_indices_of_type(self, type): type_indices = [ key for key, value in self.indices.items() if type in value ] return type_indices - def populate_index_document(self,type,index_name,attributes,mapping): + def populate_index_document(self, type, index_name, attributes, mapping): indexed_variables = mapping[type]['properties'].keys() index_dict = {} for arg in indexed_variables: diff --git a/agamemnon/tests/unit_tests.py b/agamemnon/tests/unit_tests.py index b2a13f2..8f4892b 100644 --- a/agamemnon/tests/unit_tests.py +++ b/agamemnon/tests/unit_tests.py @@ -24,37 +24,37 @@ def create_node(self, node_type, id): key = 'node_%s' % id self.ds.create_node(node_type, key, attributes) node = self.ds.get_node(node_type, key) - self.failUnlessEqual(key, node.key) - self.failUnlessEqual(node_type, node.type) + self.assertEqual(key, node.key) + self.assertEqual(node_type, node.type) return key, attributes def containment(self, node_type, node): reference_node = self.ds.get_reference_node(node_type) test_reference_nodes = [rel.source_node for rel in node.instance.incoming] - self.failUnlessEqual(1, len(test_reference_nodes)) - self.failUnlessEqual(reference_node, test_reference_nodes[0]) + self.assertEqual(1, len(test_reference_nodes)) + self.assertEqual(reference_node, test_reference_nodes[0]) ref_ref_node = self.ds.get_reference_node() test_reference_nodes = [rel.target_node for rel in ref_ref_node.instance.outgoing] - self.failUnlessEqual(2, len(test_reference_nodes)) - self.failUnlessEqual(sorted([ref_ref_node, reference_node]), sorted(test_reference_nodes)) - self.failUnless(node_type in ref_ref_node.instance) - self.failUnless(ref_ref_node.key in reference_node.instance) + self.assertEqual(2, len(test_reference_nodes)) + self.assertEqual(sorted([ref_ref_node, reference_node]), sorted(test_reference_nodes)) + self.assertTrue(node_type in ref_ref_node.instance) + self.assertTrue(ref_ref_node.key in reference_node.instance) def get_set_attributes(self, node, attributes): - self.failUnlessEqual(attributes, node.attributes) + self.assertEqual(attributes, node.attributes) node['new_attribute'] = 'sample attr' node.commit() node = self.ds.get_node(node.type, node.key) - self.failUnlessEqual('sample attr', node['new_attribute']) - self.failIfEqual(attributes, node.attributes) + self.assertEqual('sample attr', node['new_attribute']) + self.assertNotEqual(attributes, node.attributes) # Test the context manager node = self.ds.get_node(node.type, node.key) with updating_node(node): node['new_attribute'] = 'new sample attr' node = self.ds.get_node(node.type, node.key) - self.failUnlessEqual('new sample attr', node['new_attribute']) - self.failIfEqual(attributes, node.attributes) + self.assertEqual('new sample attr', node['new_attribute']) + self.assertNotEqual(attributes, node.attributes) with updating_node(node): del(node['new_attribute']) node = self.ds.get_node(node.type, node.key) @@ -80,34 +80,34 @@ def create_random_relationship(self, node, target_type, node_list): } target_node = self.ds.get_node(target_type, target_key) rel = node.is_related_to(target_node, attributes=attributes, **kw_args) - self.failUnless(target_key in node.is_related_to) - self.failUnless(node.key in target_node.is_related_to) + self.assertTrue(target_key in node.is_related_to) + self.assertTrue(node.key in target_node.is_related_to) rel_to_target = target_node.is_related_to.relationships_with(node.key)[0] - self.failUnlessEqual(rel, rel_to_target) + self.assertEqual(rel, rel_to_target) complete_attributes = {} complete_attributes.update(attributes) complete_attributes.update(kw_args) test_attributes = rel_to_target.attributes for key in complete_attributes.keys(): - self.failUnlessEqual(complete_attributes[key], test_attributes[key]) - self.failUnlessEqual(len(complete_attributes), len(test_attributes)) - self.failUnlessEqual(rel.key, rel_to_target.key) - self.failUnless(self.ds.get_relationship(rel.type, rel.key) is not None) - self.failUnlessEqual(len(complete_attributes), len(self.ds.get_relationship(rel.type, rel.key).attributes)) + self.assertEqual(complete_attributes[key], test_attributes[key]) + self.assertEqual(len(complete_attributes), len(test_attributes)) + self.assertEqual(rel.key, rel_to_target.key) + self.assertTrue(self.ds.get_relationship(rel.type, rel.key) is not None) + self.assertEqual(len(complete_attributes), len(self.ds.get_relationship(rel.type, rel.key).attributes)) in_outbound_relationships = False for rel in node.is_related_to.outgoing: if rel.target_node.key == target_key: in_outbound_relationships = True - self.failUnless(in_outbound_relationships) + self.assertTrue(in_outbound_relationships) in_inbound_relationships = False for rel in target_node.is_related_to.incoming: if rel.source_node.key == node.key: in_inbound_relationships = True - self.failUnless(in_inbound_relationships) + self.assertTrue(in_inbound_relationships) rel['dummy_variable'] = 'dummy' rel_attributes = rel.attributes - self.failIfEqual(attributes, rel.attributes) - self.failUnlessEqual('dummy', rel_attributes['dummy_variable']) + self.assertNotEqual(attributes, rel.attributes) + self.assertEqual('dummy', rel_attributes['dummy_variable']) del(rel['dummy_variable']) try: rel['dummy_variable'] @@ -118,25 +118,25 @@ def create_random_relationship(self, node, target_type, node_list): rel.commit() rel_to_target = target_node.is_related_to.relationships_with(node.key)[0] if rel_to_target.key == rel.key: - self.failUnlessEqual(20, rel_to_target['int']) + self.assertEqual(20, rel_to_target['int']) return node, target_node def delete_relationships(self, source, target): source_initial_rel_count = len(source.relationships) target_initial_rel_count = len(target.relationships) - self.failUnless(target.key in source.is_related_to) - self.failUnless(source.key in target.is_related_to) + self.assertTrue(target.key in source.is_related_to) + self.assertTrue(source.key in target.is_related_to) rel_list = source.is_related_to.relationships_with(target.key) - self.failUnlessEqual(1, len(rel_list)) + self.assertEqual(1, len(rel_list)) rel = rel_list[0] rel.delete() - self.failIf(target.key in source.is_related_to) - self.failIf(source.key in target.is_related_to) + self.assertFalse(target.key in source.is_related_to) + self.assertFalse(source.key in target.is_related_to) source_post_delete_count = len(source.relationships) target_post_delete_count = len(target.relationships) - self.failUnlessEqual(source_initial_rel_count - 1, source_post_delete_count) - self.failUnlessEqual(target_initial_rel_count - 1, target_post_delete_count) + self.assertEqual(source_initial_rel_count - 1, source_post_delete_count) + self.assertEqual(target_initial_rel_count - 1, target_post_delete_count) return rel def test_multi_get(self): @@ -296,8 +296,8 @@ def test_one_node_type_one_relationship_type(self): source.delete() for deleted_rel in relationships_to_delete: target_incoming_relationships = [rel for rel in target.is_related_to.incoming] - self.failIf(source.key in target.is_related_to) - self.failIf(deleted_rel in target_incoming_relationships) + self.assertFalse(source.key in target.is_related_to) + self.assertFalse(deleted_rel in target_incoming_relationships) def test_large_relationship_sets(self): @@ -352,77 +352,116 @@ def setUp(self): self.ds = load_from_file(TEST_CONFIG_FILE, 'memory_config_1') +@attr(backend="memory") +@attr(plugin="elastic_search") class ElasticSearchTests(TestCase, AgamemnonTests): + + TEST_NODES = { + "test_type_1": { + "test1": { + "full_text": "This is a sentence worth searching.", + "author": "me", + }, + "test2": { + "full_text": "Four score and seven years ago...", + "author": "lincoln", + }, + "test3": { + "full_text": "We hold these truths to be self-evident, that all men are created equal...", + "author": "jefferson", + }, + }, + "test_type_2": { + "test1": { + "other_text": "something and something", + }, + "test2": { + "other_text": "One fish, two fish, red fish, blue fish", + }, + "test3": { + "other_text": "I don't like green eggs and ham, I don't like them sam I am", + }, + } + } + def setUp(self): self.ds = load_from_file(TEST_CONFIG_FILE, 'elastic_search_config') - node_type = 'node_test' - index_name = "test_index" - new_index_name = "new_index" - try: - node1 = self.ds.get_node(node_type,'node_1') - self.ds.delete_node(node1) - except NodeNotFoundException: - pass - try: - node2 = self.ds.get_node(node_type,'node_2') - self.ds.delete_node(node2) - except NodeNotFoundException: - pass - self.ds.conn.delete_index_if_exists(index_name) - self.ds.conn.delete_index_if_exists(new_index_name) - - def index_tests(self): - node_type = 'node_test' - index_name = "test_index" - new_index_name = "new_index" - args = ['integer','long','float','string'] - [key1,atr1] = self.create_node(node_type,1) - self.ds.create_index(node_type,args,index_name) + self.ds.truncate() + + def tearDown(self): + self.ds.truncate() + for index in self.ds.indices.keys(): + self.ds.delete_index(index) + + def _create_es_nodes(self): + for type, node_data in self.TEST_NODES.items(): + for key, attr in node_data.items(): + self.ds.create_node(type, key, attr) + + def test_create_index(self): + self.ds.create_index("test_type_1", ["full_text"], "test_index") #test to see if the index exists - self.failUnless(index_name in self.ds.conn.get_indices()) - #test to see if search function works (also populate_indices) - node1 = self.ds.get_node(node_type,key1) - nodes_found = self.ds.search_index(node_type,index_name,'name1') - self.failUnless(node1 in nodes_found) - self.failUnlessEqual(1,len(nodes_found)) - nodes_found = self.ds.search_index(node_type,index_name,'1000') - self.failUnless(node1 in nodes_found) - self.failUnlessEqual(1,len(nodes_found)) - #test get_indices_of_type function - type_indices = self.ds.get_indices_of_type(node_type) - self.failUnless(index_name in type_indices) - self.failUnlessEqual(1,len(type_indices)) - #test update_indices function - [key2,atr2] = self.create_node(node_type,2) - node2 = self.ds.get_node(node_type,key2) - nodes_found = self.ds.search_index(node_type,index_name,'name2') - self.failUnless(node2 in nodes_found) - self.failUnlessEqual(1,len(nodes_found)) - nodes_found = self.ds.search_index(node_type,index_name,'1000') - self.failUnless(node1 in nodes_found) - self.failUnless(node2 in nodes_found) - self.failUnlessEqual(2,len(nodes_found)) - #test modify_indices function - new_args = ['string','new_attr'] - self.ds.create_index(node_type,new_args,new_index_name) - nodes_found = self.ds.search_index(node_type,new_index_name,'new_value') - self.failUnlessEqual(0,len(nodes_found)) - self.ds.create_node(node_type,'node_2',{'new_attr':'new_value'}) - new_node2 = self.ds.get_node(node_type,'node_2') - nodes_found = self.ds.search_index(node_type,new_index_name,'new_value') - self.failUnless(node1 not in nodes_found) - self.failUnless(new_node2 in nodes_found) - self.failUnlessEqual(1,len(nodes_found)) - #test remove_node function - self.ds.delete_node(new_node2) - nodes_found = self.ds.search_index(node_type,index_name,'1000') - self.failUnlessEqual(1,len(nodes_found)) - nodes_found = self.ds.search_index(node_type,index_name,'node_2') - self.failUnlessEqual(0,len(nodes_found)) - #test delete_index function - num_indices = len(self.ds.conn.get_indices()) - #self.ds.delete_index(node_type,index_name) - #self.ds.delete_index(node_type,new_index_name) - #self.failUnlessEqual(2,num_indices-len(self.ds.conn.get_indices())) - #self.ds.delete_node(node1) + self.assertIn("test_index", self.ds.conn.get_indices()) + self.assertIn("test_index", self.ds.get_indices_of_type("test_type_1")) + self.assertNotIn("test_index", self.ds.get_indices_of_type("test_type_2")) + + def test_simple_text_search(self): + self._create_es_nodes() + self.ds.create_index("test_type_1", ["full_text","author"], "test_index") + nodes = self.ds.search_index_text("lincoln") + self.assertEqual(len(nodes), 1) + self.assertEqual(nodes[0].key, "test2") + self.assertEqual(nodes[0].type, "test_type_1") + + + def test_update_indices(self): + self._create_es_nodes() + self.ds.create_index("test_type_1", ["full_text","author"], "test_index") + node = self.ds.get_node("test_type_1", "test2") + node['author'] = 'joshbryan' + node.commit() + nodes = self.ds.search_index_text("lincoln") + self.assertEqual(len(nodes), 0) + self.ds.create_node("test_type_1", "new_node", {"full_text": "Lincoln is cool"}) + nodes = self.ds.search_index_text("lincoln") + self.assertEqual(len(nodes), 1) + self.assertEqual(nodes[0].key, "new_node") + self.assertEqual(nodes[0].type, "test_type_1") + + def test_delete_nodes(self): + self._create_es_nodes() + self.ds.create_index("test_type_1", ["full_text","author"], "test_index") + for type, node_data in self.TEST_NODES.items(): + for key in node_data.keys(): + self.ds.get_node(type, key).delete() + + stats = self.ds.conn.get_indices() + self.assertEqual(stats['test_index']['num_docs'], 0) + + + def test_multiple_types_one_index(self): + self._create_es_nodes() + self.ds.create_index("test_type_1", ["full_text"], "test_index") + self.ds.create_index("test_type_2", ["other_text"], "test_index") + nodes = self.ds.search_index_text("green truths", indices=["test_index"]) + self.assertEqual(len(nodes), 2) + type_key_list = [(node.type, node.key) for node in nodes] + self.assertIn(("test_type_1", "test3"), type_key_list) + self.assertIn(("test_type_2", "test3"), type_key_list) + + nodes = self.ds.search_index_text("green truths", node_type='test_type_1') + self.assertEqual(len(nodes), 1) + + def test_multiple_indices(self): + self._create_es_nodes() + self.ds.create_index("test_type_1", ["full_text"], "test_index1") + self.ds.create_index("test_type_1", ["author"], "test_index2") + nodes = self.ds.search_index_text("lincoln", indices=["test_index1"]) + self.assertEqual(len(nodes), 0) + + nodes = self.ds.search_index_text("lincoln", indices=["test_index2"]) + self.assertEqual(len(nodes), 1) + + nodes = self.ds.search_index_text("lincoln") + self.assertEqual(len(nodes), 1) diff --git a/requirements.txt b/requirements.txt index f1510a1..781e01f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,7 +2,7 @@ PyYAML==3.10 isodate==0.4.8 ordereddict==1.1 pycassa==1.4.0 -pyes==0.16.0 +pyes==0.19.1 pyparsing==1.5.6 python-dateutil==1.5 rdfextras==0.2