Skip to content

Commit

Permalink
break down to modules
Browse files Browse the repository at this point in the history
  • Loading branch information
d-mo committed Nov 28, 2017
1 parent fb0c300 commit e1e8ed1
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 155 deletions.
1 change: 1 addition & 0 deletions dsio/dashboard/__init__.py
@@ -0,0 +1 @@
#
105 changes: 105 additions & 0 deletions dsio/dashboard/kibana.py
@@ -0,0 +1,105 @@
import elasticsearch

from kibana_dashboard_api import Visualization, Dashboard
from kibana_dashboard_api import VisualizationsManager, DashboardsManager


def generate_dashboard(es_conn, sensor_names, df_scored, index_name):
dashboards = DashboardsManager(es_conn)
dashboard = Dashboard()
dashboard.id = "%s-dashboard" % index_name
dashboard.title = "%s dashboard" % index_name
dashboard.panels = []
dashboard.options = {"darkTheme": True}
dashboard.time_from = "now-15m"
dashboard.refresh_interval_value = 5000
dashboard.search_source = {
"filter": [{
"query": {
"query_string": {
"analyze_wildcard": True,
"query": "*"
}
}
}]
}
visualizations = VisualizationsManager(es_conn)
vis_list = visualizations.get_all() # list all visualizations
panels = []
i = 0
for sensor in sensor_names:
viz_id = "%s-%s" % (index_name, sensor)

# Check if visualization exists
viz = next((v for v in vis_list if v.id == viz_id), None)
if not viz: # If not, create it
viz = Visualization()
viz.id = viz_id
viz.title = "%s-%s" % (index_name, sensor)
viz.search_source = {
"index": index_name,
"query":{
"query_string":{
"analyze_wildcard": True,
"query":"*"
}
},
"filter":[]
}
viz.vis_state = {
"title": "%s-%s" % (index_name, sensor),
"type": "line",
"params": {
"addLegend": True,
"addTimeMarker": True,
"addTooltip": True,
"defaultYExtents": True,
"drawLinesBetweenPoints": True,
"interpolate": "linear",
"radiusRatio": 9,
"scale": "linear",
"setYExtents": False,
"shareYAxis": True,
"showCircles": True,
"smoothLines": True,
"times":[],
"yAxis":{}
},
"aggs": [
{
"id": "2",
"type": "max",
"schema":"radius",
"params": {
"field":"SCORE_%s" % sensor
}
}
],
"listeners": {}
}
try:
res = visualizations.add(viz)
assert res['_id'] == viz_id
except elasticsearch.exceptions.ConflictError:
res = visualizations.update(viz)

panel = {
"id": viz_id,
"panelIndex": i,
"row": i,
"col": i,
"size_x": 7,
"size_y": 4,
"type": "visualization"
}
panels.append(panel)
ret = dashboard.add_visualization(viz)
i += 1

try:
ret = dashboards.add(dashboard)
except elasticsearch.exceptions.ConflictError:
ret = dashboards.update(dashboard)
#pass # Dashboard already exists, let's not overwrite it

return ret
29 changes: 29 additions & 0 deletions dsio/helpers.py
@@ -0,0 +1,29 @@
""" Helper functions """

import dateparser


def detect_time(dataframe, timefield=None, timeunit=None):
""" Attempt to detect the time dimension in a dataframe """
columns = set(dataframe.columns)
for tfname in ['time', 'datetime', 'date', 'timestamp']:
if tfname in columns:
prev = current = None
for i in dataframe[tfname][:10]:
try:
current = dateparser.parse(str(i))
# timefield needs to be parsable and always increasing
if not current or (prev and prev > current):
tfname = ''
break
except TypeError:
tfname = ''
break
prev = current
if tfname:
timefield = tfname
if isinstance(i, float) and not timeunit:
timeunit = 's'
break

return timefield, timeunit
168 changes: 13 additions & 155 deletions dsio/main.py
Expand Up @@ -11,14 +11,13 @@
import webbrowser

import dateparser
import elasticsearch as ES
import elasticsearch
import numpy as np
import pandas as pd

from kibana_dashboard_api import Visualization, Dashboard
from kibana_dashboard_api import VisualizationsManager, DashboardsManager

from .restream.elastic import elasticsearch_batch_restreamer
from .dashboard.kibana import generate_dashboard
from .helpers import detect_time


def batch_score(X, q=0.99):
Expand All @@ -29,146 +28,6 @@ def batch_score(X, q=0.99):
return scores


def detect_time(dataframe, timeunit):
""" Attempt to detect the time dimension in a dataframe """
columns = set(dataframe.columns)
for tfname in ['time', 'datetime', 'date', 'timestamp']:
if tfname in columns:
prev = current = None
for i in dataframe[tfname][:10]:
try:
current = dateparser.parse(str(i))
# timefield needs to be parsable and always increasing
if not current or (prev and prev > current):
tfname = ''
break
except TypeError:
tfname = ''
break
prev = current
if tfname:
timefield = tfname
if isinstance(i, float) and not timeunit:
timeunit = 's'
break

return timefield, timeunit


def generate_dashboard(es_conn, sensor_names, df_scored, index_name):
dashboards = DashboardsManager(es_conn)
dashboard = Dashboard()
dashboard.id = "%s-dashboard" % index_name
dashboard.title = "%s dashboard" % index_name
dashboard.panels = []
dashboard.options = {"darkTheme": True}
dashboard.time_from = "now-15m"
dashboard.search_source = {
"filter": [{
"query": {
"query_string": {
"analyze_wildcard": True,
"query":"*"
}
}
}]
}
visualizations = VisualizationsManager(es_conn)
# list all visualizations
#vis_list = visualizations.get_all()
panels = []
i = 0
for sensor in sensor_names:
viz_id = "%s-%s" % (index_name, sensor)
vizualization = Visualization()
vizualization.id = viz_id
vizualization.title = "%s-%s" % (index_name, sensor)
vizualization.search_source = {
"index": index_name,
"query":{
"query_string":{
"analyze_wildcard": True,
"query":"*"
}
},
"filter":[]
}
vizualization.vis_state = {
"title":"%s-%s" % (index_name, sensor),
"type":"line",
"params":{
"addLegend": True,
"addTimeMarker": True,
"addTooltip": True,
"defaultYExtents": True,
"drawLinesBetweenPoints": True,
"interpolate": "linear",
"radiusRatio": 9,
"scale": "linear",
"setYExtents": False,
"shareYAxis": True,
"showCircles": True,
"smoothLines": True,
"times":[],
"yAxis":{}
},
"aggs": [
{
"id": "1",
"type": "avg",
"schema":"metric",
"params": {
"field": sensor,
"customLabel": sensor.replace('_', ' ')
}
}, {
"id": "2",
"type": "max",
"schema":"radius",
"params":{
"field":"SCORE_%s" % sensor
}
}, {
"id": "3",
"type": "date_histogram",
"schema": "segment",
"params":{
"field": "time",
"interval": "custom",
"customInterval": "5s",
"min_doc_count":1,
"extended_bounds":{}
}
}
],
"listeners": {}
}
try:
viz = visualizations.add(vizualization)
except ES.exceptions.ConflictError:
pass # Visualization already exists, let's not overwrite it

panel = {
"id": viz_id,
"panelIndex": i,
"row": i,
"col": i,
"size_x": 7,
"size_y": 4,
"type": "visualization"
}
panels.append(panel)
ret = dashboard.add_visualization(vizualization)
i += 1

try:
ret = dashboards.add(dashboard)
except ES.exceptions.ConflictError:
pass # Dashboard already exists, let's not overwrite it

return ret


def main():
""" Main function """

Expand Down Expand Up @@ -236,14 +95,6 @@ def main():
print('Missing sensors, aborting.')
sys.exit()

# Get ES index name and type from args or generate from input name
index_name = args.es_index
if not index_name and args.input:
index_name = args.input.split('/')[-1].split('.')[0].split('_')[0]
if not index_name:
index_name = 'dsio'
_type = args.entry_type

min_time = dataframe[timefield][0]
max_time = dataframe[timefield][dataframe[timefield].size-1]

Expand All @@ -266,15 +117,22 @@ def main():
values = df_scored[sensor].astype('category').cat.codes
df_scored['SCORE_{}'.format(sensor)] = batch_score(values)

# Get ES index name and type from args or generate from input name
index_name = args.es_index
if not index_name and args.input:
index_name = args.input.split('/')[-1].split('.')[0].split('_')[0]
if not index_name:
index_name = 'dsio'

### Adding index name and type for all events:
df_scored['_index'] = index_name
df_scored['_type'] = _type
df_scored['_type'] = args.entry_type

# init ElasticSearch
es_conn = ES.Elasticsearch(args.es_uri)
es_conn = elasticsearch.Elasticsearch(args.es_uri)
try:
es_conn.info()
except ES.ConnectionError:
except elasticsearch.ConnectionError:
print('Cannot connect to Elasticsearch at %s' % args.es_uri)
sys.exit()

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Expand Up @@ -3,3 +3,4 @@ elasticsearch
numpy
scipy
pandas
-e git://github.com/d-mo/kibana-dashboard-api#egg=kibana_dashboard_api

0 comments on commit e1e8ed1

Please sign in to comment.