Skip to content
This repository has been archived by the owner on Nov 1, 2018. It is now read-only.

Commit

Permalink
Updated to newest pyes and did some major refactoring and bug fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh Bryan committed Oct 25, 2012
1 parent 6a90b13 commit c328995
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 153 deletions.
4 changes: 3 additions & 1 deletion agamemnon/factory.py
Expand Up @@ -319,7 +319,6 @@ def create_node(self, type, key, args=None, reference=False):
node = self.get_node(type, key)
node.attributes.update(args)
self.save_node(node)
self.delegate.on_modify(node)
return node
except NodeNotFoundException:
#since node won't get created without args, we will include __id by default
Expand Down Expand Up @@ -416,6 +415,9 @@ def save_node(self, node):
self.remove(self.get_cf(RELATIONSHIP_CF), key,
columns=['target__%s' % column for column in columns_to_remove])

# update plugins
self.on_modify(node)

def get_node(self, type, key):
try:
values = self.get(type, key)
Expand Down
106 changes: 59 additions & 47 deletions agamemnon/plugins/elasticsearch.py
Expand Up @@ -4,71 +4,80 @@
from itertools import groupby
from operator import itemgetter


class FullTextSearch(object):
def __init__(self,server,settings = None ):
def __init__(self, server, settings=None):
self.conn = ES(server)
self.indices = {}
if settings:
self.settings = settings
else:
self.settings = {
self.settings = {
'index': {
'analysis' : {
'analyzer' : {
'ngram_analyzer' : {
'tokenizer' : 'keyword',
'filter' : ['lowercase', 'filter_ngram'],
'type' : 'custom'
}
'analysis': {
'analyzer': {
'ngram_analyzer': {
'tokenizer': 'keyword',
'filter': ['lowercase', 'filter_ngram'],
'type': 'custom'
}
},
'filter' : {
'filter_ngram' : {
'type' : 'nGram',
'max_gram' : 30,
'min_gram' : 1
}
'filter': {
'filter_ngram': {
'type': 'nGram',
'max_gram': 30,
'min_gram': 1
}
}
}
}
}

def search_index(self, type, index_names, query_string, num_results=None, fields="_all"):
q = query.WildcardQuery(fields ,query_string)
results = self.conn.search(query=q, indices=index_names, doc_types=type)
type_id_list = [(r['_type'] ,r['_id']) for r in results['hits']['hits'][0:num_results]]
def search_index_text(self, query_string, fields="_all", **args):
q = query.TextQuery(fields, query_string)
return self.search_index(q, **args)

def search_index(self, query, indices=None, num_results=None, node_type=None):
results = self.conn.search(
query=query, indices=indices, doc_types=node_type)
meta_list = [r.get_meta() for r in results[0:num_results]]
node_dict = {}

# fetch nodes grouped by type to reduce number of db calls
for t, id_list in groupby(sorted(type_id_list), key=itemgetter(0)):
ids = [n[1] for n in id_list]
key = itemgetter('type')
for t, grouped_list in groupby(sorted(meta_list, key=key), key=key):
ids = [meta['id'] for meta in grouped_list]
for node in self.datastore.get_nodes(t, ids):
node_dict[(node.type, node.key)] = node

# return nodes in original order
nodelist = [node_dict[key] for key in type_id_list]
nodelist = [node_dict[(meta['type'], meta['id'])]
for meta in meta_list]

return nodelist

def create_index(self, type, indexed_variables, index_name):
self.conn.delete_index_if_exists(index_name)
self.conn.create_index(index_name,self.settings)
self.conn.create_index_if_missing(index_name, self.settings)
mapping = {}
for arg in indexed_variables:
mapping[arg] = {'boost':1.0,
'analyzer' : 'ngram_analyzer',
mapping[arg] = {'boost': 1.0,
'analyzer': 'ngram_analyzer',
'type': 'string',
'term_vector': 'with_positions_offsets'}
index_settings = {'index_analyzer':'ngram_analyzer',
'search_analyzer':'standard',
'properties':mapping}
self.conn.put_mapping(str(type),index_settings,[index_name])
index_settings = {'index_analyzer': 'ngram_analyzer',
'search_analyzer': 'standard',
'properties': mapping}
self.conn.put_mapping(str(type), index_settings, [index_name])
self.refresh_index_cache()
self.populate_index(type, index_name)

def refresh_index_cache(self):
self.indices = self.conn.get_mapping()
try:
self.indices = self.conn.get_mapping()
except exceptions.IndexMissingException:
self.indices = []

def delete_index(self,type, index_name):
def delete_index(self, index_name):
self.conn.delete_index_if_exists(index_name)
self.refresh_index_cache()

Expand All @@ -79,51 +88,54 @@ def populate_index(self, type, index_name):
mapping = self.conn.get_mapping(type, index_name)
for node in node_list:
key = node.key
index_dict = self.populate_index_document(type, index_name, node.attributes,mapping)
index_dict = self.populate_index_document(
type, index_name, node.attributes, mapping)
try:
self.conn.delete(index_name,type,key)
self.conn.delete(index_name, type, key)
except exceptions.NotFoundException:
pass
self.conn.index(index_dict,index_name,type,key)
self.conn.index(index_dict, index_name, type, key)
self.conn.refresh([index_name])

def on_create(self,node):
def on_create(self, node):
type_indices = self.get_indices_of_type(node.type)
for index_name in type_indices:
mapping = self.conn.get_mapping(node.type,index_name)
index_dict = self.populate_index_document(node.type,index_name,node.attributes,mapping)
self.conn.index(index_dict,index_name,node.type,node.key)
mapping = self.conn.get_mapping(node.type, index_name)
index_dict = self.populate_index_document(
node.type, index_name, node.attributes, mapping)
self.conn.index(index_dict, index_name, node.type, node.key)
self.conn.refresh([index_name])

def on_delete(self, node):
type_indices = self.get_indices_of_type(node.type)
for index_name in type_indices:
try:
self.conn.delete(index_name,node.type,node.key)
self.conn.delete(index_name, node.type, node.key)
self.conn.refresh([index_name])
except exceptions.NotFoundException:
pass

def on_modify(self, node):
type_indices = self.get_indices_of_type(node.type)
for index_name in type_indices:
mapping = self.conn.get_mapping(node.type,index_name)
index_dict = self.populate_index_document(node.type,index_name,node.attributes,mapping)
mapping = self.conn.get_mapping(node.type, index_name)
index_dict = self.populate_index_document(
node.type, index_name, node.attributes, mapping)
try:
self.conn.delete(index_name,node.type,node.key)
self.conn.index(index_dict,index_name,node.type,node.key)
self.conn.delete(index_name, node.type, node.key)
self.conn.index(index_dict, index_name, node.type, node.key)
self.conn.refresh([index_name])
except exceptions.NotFoundException:
pass

def get_indices_of_type(self,type):
def get_indices_of_type(self, type):
type_indices = [
key for key, value in self.indices.items()
if type in value
]
return type_indices

def populate_index_document(self,type,index_name,attributes,mapping):
def populate_index_document(self, type, index_name, attributes, mapping):
indexed_variables = mapping[type]['properties'].keys()
index_dict = {}
for arg in indexed_variables:
Expand Down

0 comments on commit c328995

Please sign in to comment.