Skip to content
Permalink
Browse files
Moved CRUD operations from userale model to new brew model. Added sup…
…port to fetch all columns from an application.
  • Loading branch information
mooshu1x2 committed Jun 23, 2016
1 parent 8374a2f commit 0d002843cc9f3b2dc9ef558d187ca82e49d738ea
Showing 5 changed files with 274 additions and 227 deletions.
@@ -1,23 +1,8 @@
'''
Distill: This package contains a flask app RESTful api for distill
Copyright 2016, The Charles Stark Draper Laboratory
Licensed under Apache Software License.
'''

from __future__ import absolute_import
import sys
from flask import Flask
from elasticsearch_dsl.connections import connections

if sys.version_info[:2] < (2, 7):
m = "Python 2.7 or later is required for Distill (%d.%d detected)."
raise ImportError (m % sys.version_info[:2])
del sys

# Initialize Flask instance
app = Flask (__name__)
# app.jinja_env.autoescape = False

# Load Configurations
app.config.from_pyfile('config.cfg')
@@ -31,6 +16,7 @@
ca_certs = app.config ['CA_CERTS']
client_cert = app.config ['CLIENT_CERT']
client_key = app.config ['CLIENT_KEY']
timeout = app.config ['TIMEOUT']

# Initialize Elasticsearch instance
es = connections.create_connection (hosts = [host],
@@ -40,4 +26,5 @@
verify_certs = verify_certs,
ca_certs = ca_certs,
client_cert = client_cert,
client_key = client_key)
client_key = client_key,
timeout=timeout)
@@ -1,15 +1,12 @@
__license__ = "Apache-2.0"
__revision__ = " $Id: $ "
__docformat__ = 'reStructuredText'

from flask import Flask, request, jsonify
from distill import app

from distill.models.brew import Brew
from distill.models.userale import UserAle
from distill.models.stout import Stout
from distill.exceptions import ValidationError
from distill.validation import validate_request


@app.route ('/', methods=['GET'])
def index ():
"""
@@ -39,7 +36,7 @@ def index ():
:return: Distill's status information as JSON blob
"""
return jsonify (name="Distill", version="1.0 alpha", author="Michelle Beard", email="mbeard@draper.com", status=UserAle.getStatus (), applications=UserAle.getApps ())
return jsonify (name="Distill", version="1.0 alpha", author="Michelle Beard", email="mbeard@draper.com", status=Brew.get_status (), applications=Brew.get_applications ())

@app.route ('/create/<app_id>', methods=['POST', 'PUT'])
def create (app_id):
@@ -53,7 +50,7 @@ def create (app_id):
:param app_id: Application name
:return: Newly created application's status as JSON blob
"""
return UserAle.create (app_id)
return Brew.create (app_id)

@app.route ('/status/<app_id>', methods=['GET'])
def status (app_id):
@@ -74,7 +71,7 @@ def status (app_id):
:param app_id: Application name
:return: Registered applications meta data as JSON blob
"""
return UserAle.read (app_id)
return Brew.read (app_id)

@app.route ('/update/<app_id>', methods=['POST', 'PUT'])
def update (app_id):
@@ -88,7 +85,7 @@ def update (app_id):
:param app_id: Application name
:return: Boolean response message as JSON blob
"""
return UserAle.update (app_id)
return Brew.update (app_id)

@app.route ('/delete/<app_id>', methods=['DELETE'])
def delete (app_id):
@@ -102,7 +99,7 @@ def delete (app_id):
:param app_id: Application name
:return: Boolean response message as JSON blob
"""
return UserAle.delete (app_id)
return Brew.delete (app_id)

@app.route ('/search/<app_id>', defaults={"app_type" : None}, methods=['GET'])
@app.route ('/search/<app_id>/<app_type>', methods=['GET'])
@@ -122,13 +119,7 @@ def search (app_id, app_type):
:param fl: List of fields to restrict the result set
:return: JSON blob of result set
"""

q = request.args
try:
validate_request (q)
return UserAle.select (app_id, app_type=app_type, params=q)
except ValidationError as e:
return jsonify (error=e.message)
return UserAle.select (app_id, app_type=app_type, params=q)

@app.route ('/stat/<app_id>', defaults={"app_type" : None}, methods=['GET'])
@app.route ('/stat/<app_id>/<app_type>', methods=['GET'])
@@ -0,0 +1,218 @@
from elasticsearch import Elasticsearch, TransportError
from flask import jsonify
from distill import es

class Brew (object):
"""
Distill supports basic CRUD operations and publishes the status
of an persistenct database. Eventually it will support ingesting logs sent from
an registered application.
"""

@staticmethod
def get_status ():
"""
Fetch the status of the underlying database instance.
:return: [bool] if connection to database instance has been established
"""
return es.ping (ignore=[400, 404])

@staticmethod
def get_applications ():
"""
Fetch all the registered applications in Distill.
.. note:: Private indexes starting with a period are not included in the result set
:return: [dict] dictionary of all registered applications and meta information
"""
doc = {}
query = { "aggs" : {
"count_by_type" : {
"terms" : {
"field" : "_type",
"size" : 100
}
}
}
}

try:
cluster_status = es.cat.indices (h=["index"], pri=False)
x = cluster_status.splitlines()

for idx in x:
idx = idx.rstrip ()

# Ignore private indexes (like .kibana or .stout)
if idx [:1] != '.':
response = es.search (index=idx, body=query)
d = {}
for tag in response["aggregations"]["count_by_type"]["buckets"]:
d [tag ['key']] = tag ['doc_count']
doc [idx] = d
except TransportError as e:
doc ['error'] = e.info
except Exception as e:
doc ['error'] = str (e)
return doc

@staticmethod
def create (app):
"""
Register a new application in Distill
.. code-block:: bash
{
"application" : "xdata_v3",
"health" : "green",
"num_docs" : 0,
"status" : "open"
}
:param app: [string] application name (e.g. xdata_v3)
:return: [dict] dictionary of application and its meta information
"""

# ignore 400 cause by IndexAlreadyExistsException when creating an index
res = es.indices.create (index=app, ignore=[400, 404])
doc = _get_cluster_status (app)
return jsonify (doc)

@staticmethod
def read (app):
"""
Fetch meta data associated with an application
.. code-block:: bash
Example:
{
"application" : "xdata_v3",
"health" : "green",
"num_docs" : "100",
"status" : "open"
"types" : {
"raw_logs" : {
"@timestamp" : "date",
"action" : "string",
"elementId" : "string"
},
"parsed" : {
"@timestamp" : "date",
"elementId_interval" : "string"
},
"graph" : {
"uniqueID" : "string",
"transition_count" : "long",
"p_value" : "float"
}
}
}
:param app: [string] application name (e.g. xdata_v3)
:return: [dict] dictionary of application and its meta information
"""

return jsonify (_get_cluster_status (app))

@staticmethod
def update (app):
"""
.. todo::
Currently not implemented
"""

return jsonify (status="not implemented")

@staticmethod
def delete (app):
"""
Technically closes the index so its content is not searchable.
.. code-block: bash
Example:
{
status: "Deleted index xdata_v3"
}
:param app: [string] application name (e.g. xdata_v3)
:return: [dict] status message of the event
"""

es.indices.close (index=app, ignore=[400, 404])
return jsonify (status="Deleted index %s" % app)

def _get_cluster_status (app):
"""
Return cluster status, index health, and document count as string
:param app: [string] application name (e.g. xdata_v3)
:return: [dict] dictionary of index meta data including field names
"""

doc = {}
try:
cluster_status = es.cat.indices (index=app, h=["health", "status", "docs.count"], pri=True, ignore=[400, 404])
v = str (cluster_status).split (" ")
m = ["health", "status", "num_docs"]
doc = dict (zip (m, v))
# Add back application
doc ["application"] = app
except TransportError as e:
doc ['error'] = e.info
except Exception as e:
doc ['error'] = str (e)

doc ['fields'] = _get_all_fields (app)
return doc

def _parse_mappings (app, app_type=None):
"""
.. todo:
Need to parse out result set that presents field list and type
"""

try:
mappings = es.indices.get_mapping (index=app, doc_type=[app_type], ignore=[400, 404])
# mappings = yaml.safe_load (json.ess (mappings))
# print json.dumps (mappings [app]["mappings"], indent=4, separators=(',', ': '))
ignore = ["properties", "format"]
except TransportError as e:
doc ['error'] = e.info
except Exception as e:
doc ['error'] = str (e)
return doc

def _get_all_fields (app, app_type=None):
"""
Retrieve all possible fields in an application
:param app: [string] application name (e.g. xdata_v3)
:param app_type: [string] application type (e.g. logs)
:return: [list] list of strings representing the fields names
"""
d = list ()
query = { "aggs" : {
"fields" : {
"terms" : {
"field" : "_field_names",
"size" : 100
}
}
}
}

try:
response = es.search (index=app, doc_type=app_type, body=query)
for tag in response['aggregations']['fields']['buckets']:
d.append (tag ['key'])
except TransportError as e:
doc ['error'] = e.info
except Exception as e:
d.append (str (e))
return d
@@ -1,13 +1,3 @@
'''
distill: This package contains a flask app RESTful api for distill
This flask app exposes some restful api endpoints for querying User-ALE.
Very similar to Lucene syntax for basic query operations.
Copyright 2016, The Charles Stark Draper Laboratory
Licensed under Apache Software License.
'''

from distill import app, es
from elasticsearch_dsl import DocType, String, Boolean, Date, Nested, Search
from elasticsearch_dsl.query import MultiMatch, Match, Q

0 comments on commit 0d00284

Please sign in to comment.