Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ htmlcov/

.env
*.pyc
db.sqlite3
db.sqlite3
166 changes: 137 additions & 29 deletions complaint_search/es_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,132 @@
_COMPLAINT_ES_INDEX = os.environ.get('COMPLAINT_ES_INDEX', 'complaint-index')
_COMPLAINT_DOC_TYPE = os.environ.get('COMPLAINT_DOC_TYPE', 'complaint-doctype')

_OPTIONAL_FILTERS = ("product", "issue", "company", "state", "zip_code", "timely",
"company_response", "company_public_response",
"consumer_consent_provided", "submitted_via", "tag")

_OPTIONAL_FILTERS_PARAM_TO_ES_MAP = {
"product": "product.raw",
"sub_product": "sub_product.raw",
"issue": "issue.raw",
"sub_issue": "sub_issue.raw",
"company_public_response": "company_public_response.raw",
"consumer_consent_provided": "consumer_consent_provided.raw"
}
_OPTIONAL_FILTERS_CHILD_MAP = {
"product": "sub_product",
"issue": "sub_issue"
}

_OPTIONAL_FILTERS_STRING_TO_BOOL = ("consumer_disputed", "has_narratives")

def get_es():
global _ES_INSTANCE
if _ES_INSTANCE is None:
_ES_INSTANCE = Elasticsearch([_ES_URL], http_auth=(_ES_USER, _ES_PASSWORD),
timeout=100)
return _ES_INSTANCE

def _create_and_append_bool_should_clauses(es_field_name, value_list,
filter_list, with_subitems=False, es_subitem_field_name=None):


def _create_aggregation(**kwargs):

Field = namedtuple('Field', 'name size has_subfield')
fields = [
Field('has_narratives', 10, False),
Field('company', 10000, False),
Field('product', 10000, True),
Field('issue', 10000, True),
Field('state', 50, False),
Field('zip_code', 1000, False),
Field('timely', 10, False),
Field('company_response', 100, False),
Field('company_public_response', 100, False),
Field('consumer_disputed', 100, False),
Field('consumer_consent_provided', 100, False),
Field('tag', 100, False),
Field('submitted_via', 100, False)
]
aggs = {}

# Creating aggregation object for each field above
for field in fields:
field_aggs = {
"filter": {
"and": {
"filters": [

]
}
}
}

es_field_name = _OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(field.name, field.name)
es_subfield_name = None
if field.has_subfield:
es_subfield_name = _OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(_OPTIONAL_FILTERS_CHILD_MAP.get(field.name))
field_aggs["aggs"] = {
field.name: {
"terms": {
"field": es_field_name,
"size": field.size
},
"aggs": {
es_subfield_name: {
"terms": {
"field": es_subfield_name,
"size": field.size
}
}
}
}
}
else:
field_aggs["aggs"] = {
field.name: {
"terms": {
"field": es_field_name,
"size": field.size
}
}
}

date_filter = {
"range": {
"date_received": {

}
}
}
if "min_date" in kwargs:
date_filter["range"]["date_received"]["from"] = kwargs["min_date"]
if "max_date" in kwargs:
date_filter["range"]["date_received"]["to"] = kwargs["max_date"]

field_aggs["filter"]["and"]["filters"].append(date_filter)

for item in kwargs:
if item in _OPTIONAL_FILTERS and item != field.name:
clauses = _create_and_append_bool_should_clauses(_OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(item, item),
kwargs[item], field_aggs["filter"]["and"]["filters"],
with_subitems=item in _OPTIONAL_FILTERS_CHILD_MAP,
es_subitem_field_name=_OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(_OPTIONAL_FILTERS_CHILD_MAP.get(item)))
elif item in _OPTIONAL_FILTERS_STRING_TO_BOOL and item != field.name:
clauses = _create_and_append_bool_should_clauses(_OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(item, item),
[ 0 if cd.lower() == "no" else 1 for cd in kwargs[item] ],
field_aggs["filter"]["and"]["filters"])

aggs[field.name] = field_aggs

return aggs

def _create_bool_should_clauses(es_field_name, value_list,
with_subitems=False, es_subitem_field_name=None):
if value_list:
if not with_subitems:
term_list = [ {"terms": {es_field_name: [value]}}
for value in value_list ]
filter_list.append({"bool": {"should": term_list}})
return {"bool": {"should": term_list}}
else:
item_dict = defaultdict(list)
for v in value_list:
Expand All @@ -53,7 +165,16 @@ def _create_and_append_bool_should_clauses(es_field_name, value_list,
subitem_term = {"terms": {es_subitem_field_name: subitems}}
f_list.append({"and": {"filters": [item_term, subitem_term]}})

filter_list.append({"bool": {"should": f_list}})
return {"bool": {"should": f_list}}

def _create_and_append_bool_should_clauses(es_field_name, value_list,
filter_list, with_subitems=False, es_subitem_field_name=None):

filter_clauses = _create_bool_should_clauses(es_field_name, value_list,
with_subitems, es_subitem_field_name)

if filter_clauses:
filter_list.append(filter_clauses)

# List of possible arguments:
# - fmt: format to be returned: "json", "csv", "xls", or "xlsx"
Expand Down Expand Up @@ -88,23 +209,6 @@ def search(**kwargs):
"sort": "relevance_desc"
}

OPTIONAL_FILTERS = ("product", "issue", "company", "state", "zip_code", "timely",
"company_response", "company_public_response",
"consumer_consent_provided", "submitted_via", "tag")

OPTIONAL_FILTERS_PARAM_TO_ES_MAP = {
"product": "product.raw",
"sub_product": "sub_product.raw",
"issue": "issue.raw",
"sub_issue": "sub_issue.raw"
}
OPTIONAL_FILTERS_CHILD_MAP = {
"product": "sub_product",
"issue": "sub_issue"
}

OPTIONAL_FILTERS_STRING_TO_BOOL = ("consumer_disputed", "has_narratives")

params.update(**kwargs)

res = None
Expand Down Expand Up @@ -150,6 +254,9 @@ def search(**kwargs):
# post-filter
body["post_filter"] = {"and": {"filters": []}}

## Create base aggregation
body["aggs"] = _create_aggregation(**kwargs)

## date
if params.get("min_date") or params.get("max_date"):
date_clause = {"range": {"date_received": {}}}
Expand All @@ -160,18 +267,19 @@ def search(**kwargs):

body["post_filter"]["and"]["filters"].append(date_clause)

## Create bool should clauses for fields in OPTIONAL_FILTERS
for field in OPTIONAL_FILTERS:
if field in OPTIONAL_FILTERS_CHILD_MAP:
_create_and_append_bool_should_clauses(OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(field, field),

## Create bool should clauses for fields in _OPTIONAL_FILTERS
for field in _OPTIONAL_FILTERS:
if field in _OPTIONAL_FILTERS_CHILD_MAP:
_create_and_append_bool_should_clauses(_OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(field, field),
params.get(field), body["post_filter"]["and"]["filters"], with_subitems=True,
es_subitem_field_name=OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(OPTIONAL_FILTERS_CHILD_MAP.get(field),
OPTIONAL_FILTERS_CHILD_MAP.get(field)))
es_subitem_field_name=_OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(_OPTIONAL_FILTERS_CHILD_MAP.get(field),
_OPTIONAL_FILTERS_CHILD_MAP.get(field)))
else:
_create_and_append_bool_should_clauses(OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(field, field),
_create_and_append_bool_should_clauses(_OPTIONAL_FILTERS_PARAM_TO_ES_MAP.get(field, field),
params.get(field), body["post_filter"]["and"]["filters"])

for field in OPTIONAL_FILTERS_STRING_TO_BOOL:
for field in _OPTIONAL_FILTERS_STRING_TO_BOOL:
if params.get(field):
_create_and_append_bool_should_clauses(field,
[ 0 if cd.lower() == "no" else 1 for cd in params.get(field) ],
Expand Down
Loading