Skip to content

Commit

Permalink
Merge pull request #275 from datmo/triggers
Browse files Browse the repository at this point in the history
added trigger to monitoring
  • Loading branch information
asampat3090 committed Oct 30, 2018
2 parents 27d62d8 + fab0728 commit 1507954
Show file tree
Hide file tree
Showing 9 changed files with 194 additions and 18 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,6 @@ after_success:
- echo $TRAVIS_SECURE_ENV_VARS
- coveralls
env:
- LOGGING_LEVEL=DEBUG
global:
- LOGGING_LEVEL=DEBUG
- SLACK_WEBHOOK_URL=https://hooks.slack.com/services/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
1 change: 1 addition & 0 deletions appveyor.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ environment:
# See: http://stackoverflow.com/a/13751649/163740
CMD_IN_ENV: "cmd /E:ON /V:ON /C .\\appveyor\\run_with_env.cmd"
LOGGING_LEVEL: DEBUG
SLACK_WEBHOOK_URL: https://hooks.slack.com/services/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

matrix:
- PYTHON: "C:\\Python27"
Expand Down
1 change: 1 addition & 0 deletions circle.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ machine:

environment:
LOGGING_LEVEL: DEBUG
SLACK_WEBHOOK_URL: "https://hooks.slack.com/services/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"
TEST_PACKAGE: python -m pytest -s -v

pre:
Expand Down
9 changes: 6 additions & 3 deletions datmo/core/controller/deploy/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,12 @@ def model_deploy(self, cluster_name):
files_exclude = datmo_deploy['deploy'][
'files_exclude']
for item in list_dir:
if item in files_exclude and \
os.path.exists(os.path.join(tmp_dirpath, item)):
os.remove(os.path.join(tmp_dirpath, item))
if item in files_exclude:
if os.path.isfile(os.path.join(tmp_dirpath, item)):
os.remove(os.path.join(tmp_dirpath, item))
elif os.path.isdir(os.path.join(tmp_dirpath, item)):
shutil.rmtree(
os.path.join(tmp_dirpath, item))

if item.startswith('.') and \
os.path.isdir(os.path.join(tmp_dirpath, item)):
Expand Down
36 changes: 36 additions & 0 deletions datmo/core/util/misc_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import textwrap
import datetime
import pytz
import json
import tzlocal
import pytest
import collections
Expand Down Expand Up @@ -67,6 +68,41 @@ def bytes2human(n):
return "%sB" % n


def slack_message(webhook_url, options):
if webhook_url is None:
return False

slack_data = {
"attachments": [
{
"fallback": "Trigger from Datmo",
"color": "warning",
"pretext": "Trigger from Datmo during inference",
"author_name": options.get("author_name"),
"title": options.get("title"),
"text": options.get("text"),
"fields": [
{
"title": "Priority",
"value": options.get("priority")
if options.get("priority") is not None else "High",
"short": False
}
],
"ts": options.get("timestamp")
}
]
}

response = requests.post(webhook_url,
data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'})
if response.status_code != 200:
return False

return True


def grep(pattern, fileObj):
r = []
linenumber = 0
Expand Down
21 changes: 20 additions & 1 deletion datmo/core/util/tests/test_misc_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Tests for misc_functions.py
"""
import os
import time
import tempfile
import platform
import datetime
Expand All @@ -26,7 +27,7 @@ def to_bytes(val):
to_bytes("test")

from datmo.core.util.misc_functions import (
bytes2human, create_unique_hash, mutually_exclusive, is_project_dir,
bytes2human, slack_message, create_unique_hash, mutually_exclusive, is_project_dir,
find_project_dir, grep, prettify_datetime, format_table,
parse_cli_key_value, convert_keys_to_string, get_datmo_temp_path,
parse_path, parse_paths, list_all_filepaths)
Expand Down Expand Up @@ -116,6 +117,24 @@ def test_bytes2human(self):
result = bytes2human(100001221)
assert result == '95.4M'

def test_slack_message(self):
# Setting up the options for slack message
options = {"author_name": "testing misc functions", "title": "Test title", "text": "Test text",
"priority": "just chill!", "timestamp": int(round(time.time()))}

webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
result = slack_message(webhook_url, options)
assert result == True

webhook_url = os.environ.get("SLACK_WEBHOOK_URL")[:-1]
result = slack_message(webhook_url, options)
assert result == False

webhook_url = None
result = slack_message(webhook_url, options)
assert result == False


def test_grep(self):
# open current file and try to find this method in it
assert len(grep("test_grep", open(__file__, "r"))) == 2
Expand Down
59 changes: 53 additions & 6 deletions datmo/monitoring.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import os
import time
import json
import psutil
from datetime import datetime

from datmo.core.util.exceptions import InputError
from datmo.core.util.misc_functions import bytes2human
from datmo.core.util.misc_functions import bytes2human, slack_message
from datmo.core.util.remote_api import RemoteAPI


Expand Down Expand Up @@ -44,6 +45,7 @@ class Monitoring():
>>> datmo_client.set_start_time()
>>> y_predict = model_predict(x) # using a machine learning model for inference
>>> datmo_model.set_end_time()
>>> # x and y_predict are dictionaries
>>> datmo_id = datmo_client.track(input=x, prediction=y_predict) # Track predictions
>>> response = {'y': y, 'datmo_id': datmo_id}
>>> return response
Expand All @@ -52,6 +54,12 @@ class Monitoring():
...
>>> # For feedback
>>> datmo_client.track_feedback(id=response['datmo_id'], actual=y_actual)
...
>>> # For trigger
>>> input = {"f1": 2, "f2": 3}
>>> prediction = {"prediction": 1}
>>> notes = "this is a note for trigger"
>>> datmo_client.trigger(medium="slack", input=input, prediction=prediction, notes=notes)
"""

def __init__(self, api_key, home=None):
Expand All @@ -60,9 +68,6 @@ def __init__(self, api_key, home=None):
self._start_time, self._end_time, self._model_id, \
self._model_version_id, self._deployment_version_id = None, None, None, None, None

def __eq__(self, other):
return self.id == other.id if other else False

def __str__(self):
pass

Expand Down Expand Up @@ -210,12 +215,54 @@ def track_feedback(self, id, feedback):
}
response = self.remote_api.update_actual(id, update_dict)
body = response.get('body')
updated_at = body.get('updated') if body else 0
if updated_at > 0:
updated = body.get('updated') if body else 0
if updated > 0:
return True
else:
return False

def trigger(self, medium, input, prediction, notes, priority=None):
"""
Trigger information through the medium of communication
Parameters
----------
medium : str
medium being used to communicate (e.g. slack or twilio)
input : dict
dictionary for inputs
prediction : dict
dictionary for predictions
notes : str
string with notes for the trigger
priority : str, optional
string with priority for the trigger
Returns
-------
bool
True if successful trigger
"""
if not isinstance(input, dict) or\
not isinstance(prediction, dict) \
or not isinstance(notes, str):
return False

options = {}
if medium == "slack":
webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
options['author_name'] = "mode id:" + self._model_id + ">>" \
+ "deployment id:" + self._deployment_version_id \
+ ">>" + "version id:" + self._model_version_id
options['title'] = "Notes | Input | Prediction"
options['text'] = "Notes: " + notes + " \n" \
+ "Input: " + json.dumps(input, indent=4, sort_keys=True) + " \n" \
+ "Output: " + json.dumps(prediction, indent=4, sort_keys=True)
options['priority'] = priority
options['timestamp'] = int(round(time.time()))
return slack_message(webhook_url, options)
else:
return False

def search_metadata(self, filter):
"""
Search metadata from the remote storage
Expand Down
77 changes: 72 additions & 5 deletions datmo/tests/test_monitoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
Tests for monitoring module
"""
import os
import time
import tempfile
import platform
try:
basestring
except NameError:
basestring = str

from datmo.core.util.exceptions import InputError
from datmo.monitoring import Monitoring


Expand All @@ -24,35 +26,69 @@ def setup_class(self):
# TODO: move API key to environment variable
self.monitoring = Monitoring(
api_key="6a3a3cd900eaf7b406a41d68f8ca7969")
self.monitoring.__str__()
self.monitoring.__repr__()
self.monitoring.set_model_id("model_id")
self.monitoring.set_model_version_id("v3")
self.monitoring.set_deployment_version_id("microservice")
self.input_dict = {"test": 0.43}
self.monitoring.set_start_time
self.prediction_dict = {"test_output": 0.39}
self.feedback_dict = {"real_output": 0.40}
# TODO: move this only into test_set_track
self.test_data_id = self.monitoring.track(
input=self.input_dict, prediction=self.prediction_dict)
time.sleep(1)
self.monitoring.set_end_time
# Set track after setting end time
self.extra_test_data_id = self.monitoring.track(
input=self.input_dict, prediction=self.prediction_dict)

def teardown_class(self):
pass

def test_set_track(self):
assert isinstance(self.test_data_id, basestring)

def test_set_track_double(self):
data_id = self.monitoring.track(
input=self.input_dict, prediction=self.prediction_dict)
assert data_id != self.test_data_id

def test_track_feedback(self):
result = False
while not result:
time.sleep(4)
result = self.monitoring.track_feedback(
id=self.test_data_id, feedback=self.feedback_dict)

assert isinstance(result, bool)
assert result == True

# Failure case when improper data structure is passed
result = self.monitoring.track_feedback(
id=self.test_data_id, feedback=self.feedback_dict)
id=self.test_data_id, feedback="improper data for feedback")

assert isinstance(result, bool)
assert result == False

def test_trigger(self):
input = {"f1": 2, "f2": 3}
prediction = {"prediction": 1}
notes = "this is epic!"
priority = "Super Critical!"
result = self.monitoring.trigger(medium="slack", input=input, prediction=prediction,
notes=notes, priority=priority)
assert result == True

# Failure case when the data is improper
result = self.monitoring.trigger(medium="slack", input="improper input",
prediction="improper prediction format",
notes=notes, priority=priority)
assert result == False

# Failure case when medium is different
result = self.monitoring.trigger(medium="wrong_medium", input=input, prediction=prediction,
notes=notes, priority=priority)
assert result == False

def test_search_metadata(self):
filter_data = {"model_id": "model_id"}
result = self.monitoring.search_metadata(filter=filter_data)
Expand All @@ -68,7 +104,8 @@ def test_search_metadata(self):
filter = {
"model_id": "model_id",
"model_version_id": "v3",
"sort_created_at": "desc"
"sort_created_at": "desc",
"deployment_version_id": None
}
result = self.monitoring.search_metadata(filter)
assert isinstance(result, list)
Expand All @@ -83,6 +120,18 @@ def test_search_metadata(self):
assert isinstance(result, list)
assert len(result) >= 2

# Raised error if key doesn't exist in filter
result = False
try:
filter = {
"model_id": "model_id",
"wrong_key": "microservice"
}
self.monitoring.search_metadata(filter)
except InputError:
result = True
assert result == True

filter = {"model_id": "model_id", "id": self.test_data_id}
result = self.monitoring.search_metadata(filter)
assert isinstance(result, list)
Expand All @@ -109,8 +158,26 @@ def test_delete_meta_data(self):
assert result['total'] == 1
assert result['deleted'] == 1

# Raised error if key doesn't exist in filter
result = False
try:
filter = {
"model_id": "model_id",
"wrong_key": "microservice"
}
self.monitoring.delete_metadata(filter)
except InputError:
result = True
assert result == True

# TODO: separate deployment into another file

def test_get_deployment_info(self):
result = self.monitoring.get_deployment_info()
assert isinstance(result, dict)
assert isinstance(result['system_monitoring'], dict)
assert isinstance(result['log_monitoring'], basestring)

def test_get_deployment_master_info(self):
result = self.monitoring._get_datmo_deployment_master_info()
assert isinstance(result, dict)
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"flask>=0.10.1", "jinja2>=2.7.3", "markupsafe>=0.23",
"werkzeug>=0.9.6", "beautifulsoup4>=4.3.2", "gunicorn>=19.1.1",
"itsdangerous>=0.24", "six>=1.8.0", "wsgiref>=0.1.2", "urllib3==1.23",
"chardet==3.0.2", "requests==2.19.1", "plotly==3.3.0",
"chardet==3.0.2", "requests>=2.20.0", "plotly==3.3.0",
"jsonschema==2.6.0", "celery==4.2.1"
]
else: # python 3
Expand All @@ -35,7 +35,7 @@
"flask>=0.10.1", "jinja2>=2.7.3", "markupsafe>=0.23",
"werkzeug>=0.9.6", "beautifulsoup4>=4.3.2", "gunicorn>=19.1.1",
"itsdangerous>=0.24", "six>=1.8.0", "urllib3==1.23", "chardet==3.0.2",
"prettytable>=0.7.2", "requests==2.19.1", "plotly==3.3.0",
"prettytable>=0.7.2", "requests>=2.20.0", "plotly==3.3.0",
"jsonschema==2.6.0", "celery==4.2.1"
]

Expand Down

0 comments on commit 1507954

Please sign in to comment.