Skip to content
Permalink
Browse files
Support for STOUT ingestion of records complete
  • Loading branch information
mooshu1x2 committed Jun 17, 2016
1 parent 35a4230 commit a77d1182f7c49dd5960da2f64bc273e1058601d5
Showing 3 changed files with 163 additions and 42 deletions.
@@ -25,7 +25,7 @@
"""
@app.route ('/', methods=['GET'])
def index ():
return jsonify (name="Distill", version="1.0 alpha", author="Michelle Beard", email="mbeard@draper.com", status=UserAle.getStatus (), apps=UserAle.getApps ())
return jsonify (name="Distill", version="1.0 alpha", author="Michelle Beard", email="mbeard@draper.com", status=UserAle.getStatus (), applications=UserAle.getApps ())

"""
curl -XPOST https://[hostname]:[port]/create/app_name
@@ -87,7 +87,7 @@ def delete (app_id):
Get all data associated with an application
"""
@app.route ('/search/<app_id>', defaults={"app_type" : None})
@app.route ('/search/<app_id>', defaults={"app_type" : None}, methods=['GET'])
@app.route ('/search/<app_id>/<app_type>', methods=['GET'])
def search (app_id, app_type):
q = request.args
@@ -97,6 +97,22 @@ def search (app_id, app_type):
except ValidationError as e:
return jsonify (error=e.message)

"""
This can be folded into /search api
curl -XGET https://[hostname]:[port]/app_name/stat?elem=button&event=param1,param2
Example:
curl -XGET https://[hostname]:[port]/xdata_v3/testing/?elem=signup&event=click
How many users clicked on my sign up button?
"""
@app.route ('/stat/<app_id>', defaults={"app_type" : None}, methods=['GET'])
@app.route ('/stat/<app_id>/<app_type>', methods=['GET'])
def stat (app_id, app_type):
q = request.args

return jsonify (error='Not implemented')

"""
curl -XGET https://[hostname]:[port]/denoise/app_name?save=true&type=parsed
@@ -134,10 +150,14 @@ def denoise (app_id):
If STOUT is enabled, the select method expects a stout index to exist or otherwise
it will return an error message.
"""
@app.route ('/stout/<app_id>', defaults={"app_type" : None})
@app.route ('/stout/<app_id>/<app_type>', methods=['GET'])
def merge_stout (app_id):
pass
#@app.route ('/stout/<app_id>', defaults={"app_type" : None})
#@app.route ('/stout/<app_id>/<app_type>', methods=['GET'])
@app.route ('/stout', methods=['GET'])
def merge_stout ():
flag = app.config ['ENABLE_STOUT']
if flag:
return Stout.ingest ()
return jsonify (status="STOUT is disabled.")

"""
Generic Error Message
@@ -17,18 +17,14 @@ HOST = '0.0.0.0'
# Port
PORT = 8090

# Define the application directory
# import os
# BASE_DIR = os.path.abspath(os.path.dirname(__file__))

# Enable STOUT integration into Distill
STOUT = True
ENABLE_STOUT = False
SQLITEDB = '/Users/msb3399/Documents/xdata/stout/stout.db'
MASTER = '/Users/msb3399/Documents/xdata/stout/master_ans.csv'
SELECTED = '/Users/msb3399/Documents/xdta/stout/selected_vars_for_distill.csv'
MAPPINGS = '/Users/msb3399/Documents/xdata/stout/MOT_Mappings.csv'
SELECTED = '/Users/msb3399/Documents/xdata/stout/selected_vars_for_distill.csv'

# Elasticsearch Configuration
# Aperature Tiles
ES_HOST = 'http://localhost'
ES_PORT = 9200
HTTP_AUTH = None
@@ -8,32 +8,137 @@
Licensed under Apache Software License.
'''

from distill import app
import sqlite3
import pandas as pd

class Stout ():
conn = None
cursor = None

"""
Parse master answer table into readable/queryable format
"""
def parse (f):
pass

def connect ():
# Load Configurations
app.config.from_pyfile('config.cfg')
db = app.config ['SQLITEDB']
conn = sqlite3.connect (db)
cursor = conn.cursor ()

def lookup (topic='SYS_IND_SESS_'):
cursor.execute ('SELECT ' + topic + ' FROM stout')

def operational_task (sessionID=''):
pass
# Look up session ID : tasknum in DB
# Determine if performing task1 or task2
# Add new column OT
from distill import app, es
from elasticsearch_dsl import DocType, String, Boolean, Date, Nested, Search
from elasticsearch_dsl.query import MultiMatch, Match, Q
from elasticsearch import Elasticsearch, TransportError, ConnectionError
from elasticsearch_dsl.connections import connections
from flask import jsonify

import pandas as pd

class StoutDoc (DocType):
sessionID = String (index="not_analyzed")
task1 = Nested ()
task2 = Nested ()
# tags = String (index='not_analyzed')

class Meta:
index = '.stout'
doc_type = 'testing'

@classmethod
def sync (cls, stout):
stout_doc = StoutDoc (meta={'id': stout['sessionID']})
stout_doc.sessionID = stout ['sessionID']
stout_doc.task1 = stout ['task1']
stout_doc.task2 = stout ['task2']
stout_doc.save ()

def save (self, *args, **kwargs):
return super (StoutDoc, self).save (*args, **kwargs)

# Don't think this is right? Should just be a single
# Stout entry, not StoutDoc
def get_model_obj (self):
from distill.models import StoutDoc
return StoutDoc.objects.get(id=self.meta.id)

class Stout (object):
@staticmethod
def ingest ():
# Create the mappings in elasticsearch
StoutDoc.init ()
status = True
data = parse ();
try:
for k,v in data.items ():
doc = StoutDoc ()
if 'sessionID' in v:
doc.sessionID = v['sessionID']
if 'task1' in v:
doc.task1 = v['task1']
if 'task2' in v:
doc.task2 = v['task2']
doc.save ()
except Error as e:
status = False
return jsonify (status=status)

"""
Parse master answer table with mapping into an Associate Array
"""
def parse ():
master = app.config ['MASTER']
mappings = app.config ['MAPPINGS']

fileContents=pd.read_csv(master, encoding='utf-8')
plainTextMappings=pd.read_csv(mappings, encoding='raw_unicode_escape')
headers=list(fileContents.columns.values)

#generate the mapping between header and plain text
translationRow={};
for fieldIndex in range(1,len(headers)):
t=plainTextMappings.ix[fieldIndex]
translationRow[headers[fieldIndex]]=t[9]

dictBySessionID={}
translationRow['items.text']='foo'
index=0
for row in fileContents.iterrows():
index=index+1

taskMetrics={}
index,data=row
identifier=row[1][0].split("::")
sessionID=identifier[0]
taskID=(identifier[1])
workingData={}
#is this session id already in the dictionary?
if sessionID in dictBySessionID:
#grab the entry as workingData
workingData=dictBySessionID[sessionID]

sysData={}
task1Data={}
task2Data={}
metaData={}
d={}

for fieldIndex in range(1,len(headers)):
if not pd.isnull(row[1][fieldIndex]): #only interested in non-null fields
tempDict={}
if headers[fieldIndex] in translationRow:
tempDict['field']=translationRow[headers[fieldIndex]]
#tempDict['field']=translationRow[9]
tempDict['value']=row[1][fieldIndex]
d[headers[fieldIndex]]=row[1][fieldIndex]
if "SYS" in headers[fieldIndex]:
sysData[headers[fieldIndex]]=tempDict
elif "OT1" in headers[fieldIndex]:
task1Data[headers[fieldIndex]]=tempDict
elif "OT2" in headers[fieldIndex]:
task2Data[headers[fieldIndex]]=tempDict
else:
metaData[headers[fieldIndex]]=tempDict

if d['TSK_TIME_DIFF_']>0: #block tasks with zero time elapsed
a=int(d['TSK_TIME_DIFF_OT1_'])
b=int(d['TSK_TIME_DIFF_OT2_'])
#figure out which task the values belong to
if ((a>0) & (b<=0)):
task1Data['taskID']=taskID
task1Data['meta']=metaData
task1Data['system']=sysData
workingData['task1']=task1Data
elif ((a<=0) & (b>0)):
task2Data['taskID']=taskID
task2Data['meta']=metaData
task2Data['system']=sysData
workingData['task2']=task2Data
else:
raise ValueError('Encountered an unexpected task time diff state')

workingData['sessionID'] = sessionID
dictBySessionID[sessionID]=workingData
return dictBySessionID

0 comments on commit a77d118

Please sign in to comment.