Skip to content

Commit

Permalink
Userale and Stout classes. Currently buggy.
Browse files Browse the repository at this point in the history
  • Loading branch information
mooshu1x2 committed Jun 9, 2016
1 parent 51c171e commit 5ec7c69
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 40 deletions.
39 changes: 39 additions & 0 deletions distill/classes/stout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
'''
distill: This package contains a flask app RESTful api for distill
This flask app exposes some restful api endpoints for querying User-ALE.
Very similar to Lucene syntax for basic query operations.
Copyright 2016, The Charles Stark Draper Laboratory
Licensed under Apache Software License.
'''

from distill import app
import sqlite3
import pandas as pd

class Stout ():
conn = None
cursor = None

"""
Parse master answer table into readable/queryable format
"""
def parse (f):
pass

def connect ():
# Load Configurations
app.config.from_pyfile('config.cfg')
db = app.config ['SQLITEDB']
conn = sqlite3.connect (db)
cursor = conn.cursor ()

def lookup (topic='SYS_IND_SESS_'):
cursor.execute ('SELECT ' + topic + ' FROM stout')

def operational_task (sessionID=''):
pass
# Look up session ID : tasknum in DB
# Determine if performing task1 or task2
# Add new column OT
143 changes: 103 additions & 40 deletions distill/classes/userale.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,36 @@
'''

from elasticsearch_dsl import DocType, String, Boolean, Date, Float, Search
from elasticsearch import Elasticsearch, TransportError
from elasticsearch_dsl.query import MultiMatch, Match, Q
from elasticsearch import Elasticsearch, TransportError, ConnectionError
from elasticsearch_dsl.connections import connections
from datetime import datetime
from flask import jsonify
from werkzeug.datastructures import ImmutableMultiDict, MultiDict

from distill import my_app, es
import json, yaml, urllib2
from flask import jsonify, Markup

from distill import app, es

import datetime
import json
import yaml
import urllib2

"""
Generic class supporting basic CRUD operations
"""
class UserAle (object):

@staticmethod
def getStatus ():
"""
Get Status of Elasticsearch Instance
"""
try:
res = es.ping ()
except ConnectionError as e:
res = False
return res

"""
Register a new application in User Ale
Example:
Expand All @@ -36,15 +53,15 @@ class UserAle (object):
def create (app):
try:
res = es.indices.create (index=app)
doc = get_cluster_status (app)
doc = get_cluster_status (app)
return jsonify (doc)
except TransportError as e:
return jsonify (error=e.info)

"""
Fetch meta data associated with an application
Data will include all document types in index and all fields in index
Example:
Example:
{
"application" : "xdata_v3",
"health" : "green",
Expand All @@ -54,7 +71,7 @@ def create (app):
"raw_logs" : {
"@timestamp" : "date",
"action" : "string",
"elementId" : "string"
"elementId" : "string"
},
"parsed" : {
"@timestamp" : "date",
Expand Down Expand Up @@ -82,25 +99,66 @@ def update (app):

"""
Delete an application from User Ale
Example:
Example:
{
}
"""
@staticmethod
def delete (app):
try:
res = es.indices.delete (index=app)
return jsonify (status="Deleted index %s" % app)
except TransportError as e:
return jsonify (e.info)
# try:
# res = es.indices.delete (index=app)
# return jsonify (status="Deleted index %s" % app)
# except TransportError as e:
# return jsonify (e.info)
return jsonify (status="not allowed")

"""
Main method of entry to perform segmentation and integration of STOUT's master
answer table (if STOUT is enabled).
"""
@staticmethod
def select (app, type=None, params=None):
p = parse_query_parameters (app, params)
pass
def select (app, app_type=None, params=None):
p = parse_query_parameters (app, app_type, params)
# doc = UserAleDoc (meta={"index" : app, "doc_type" : app_type});
# search = UserAleDoc.search().query ('match', activity="HIDE")

# 'q': args.get('q', '{}'),
# 'fields': args.get('fl', '{}'),
# 'size': args.get ('size', 100),
# 'scroll': args.get ('scroll', False),
# 'filters': request_args.getlist ('fq')

# Start Search
s = Search (index="xdata_v3", doc_type=app_type)

# Check query


# Execute Filter Query
for x in p['filters']:
l = x.split (':', 1)
m = {}
m[l[0]] = l[1]

# print m[l[0]]
s = s.query ('match', **m)

# Check fields array
fields = p ['fields']
# if not fields:
# Comma delimited list
s = s.fields (p['fields'])

# Filter request
# s = s.filter('terms', tags='5c8b88fbca0fc7a5783c77931e037d\:\:3139')

# Size

# Execute
response = s.execute()
print(response.hits.total)
return jsonify (response.to_dict())

"""
"""
Expand Down Expand Up @@ -137,10 +195,10 @@ def merge_dicts (lst):
"""
@TODO Reformat mapping data
"""
def parse_mappings (app):
def parse_mappings (app, app_type=None):
try:
mappings = es.indices.get_mapping (index=app)
mappings = yaml.safe_load (json.dumps (mappings))
mappings = es.indices.get_mapping (index=app, doc_type=[app_type])
# mappings = yaml.safe_load (json.ess (mappings))
# print json.dumps (mappings [app]["mappings"], indent=4, separators=(',', ': '))
ignore = ["properties", "format"]
except TransportError as e:
Expand All @@ -150,44 +208,49 @@ def parse_mappings (app):
"""
Retrieve all possible columns in app
"""
def get_all_fields (app):
return []
def get_all_fields (app, app_type=None):
# mappings = es.indices.get_mapping (index=app, doc_type=app_type)
# mappings = json.loads (mappings)
# data = {key: value for (key, value) in dict(mappings).items ()}
# print mappings
return ['sessionID']

"""
Get query parameters from the request and preprocess them.
:param [dict-like structure] Any structure supporting get calls
:result [dict] Parsed parameters
"""
def parse_query_parameters(indx, request_args):
args = {key: value[0] for (key, value) in dict (request_args).iteritems()}
def parse_query_parameters (indx, app_type=None, request_args = {}):
args = {key: value[0] for (key, value) in dict (request_args).iteritems ()}

# print "args = ", args
# Parse out simple filter queries
filters = []
for filter in get_all_fields (indx):
for filter in get_all_fields (indx, app_type):
if filter in args:
filters.append((filter, args[filter]))

return {
'q': args.get('q', '{}'),
'fields': args.get('fl', '{}'),
'fields': args.get('fl', []),
'size': args.get ('size', 100),
'scroll': args.get ('scroll', False),
'filters': filters
'filters': request_args.getlist ('fq')
}

"""
Simple UserAleDoc class to perform queries on index
"""
class UserAleDoc (DocType):
pass

# doc = UserAleDoc (meta={"index" : "xdata_v3", "doc_type" : "testing"});
# search = UserAleDoc.search().query ('match', activity="HIDE")
Parsed Docs Class
meta = {'index' : 'xdata_v3'}
Defaults to doctype=parsed
"""
class UserAleParsedDoc (DocType):
timestamp = Date ()

# search = Search (index="xdata_v3").query ()
# search = search[:20]
class Meta:
doc_type = 'parsed'

# response = search.execute()
def save (self, ** kwargs):
return super (UserAleParsedDoc, self).save (**kwargs)

# print(response.hits.total)
# print(response[0].activity)
# doc = UserAleDoc (meta={"index" : "xdata_v3", "doc_type" : "testing"});
# search = UserAleDoc.search().query ('match', activity="HIDE")

0 comments on commit 5ec7c69

Please sign in to comment.