diff --git a/dsio/dashboard/__init__.py b/dsio/dashboard/__init__.py new file mode 100644 index 0000000..4287ca8 --- /dev/null +++ b/dsio/dashboard/__init__.py @@ -0,0 +1 @@ +# \ No newline at end of file diff --git a/dsio/dashboard/kibana.py b/dsio/dashboard/kibana.py new file mode 100644 index 0000000..15c252b --- /dev/null +++ b/dsio/dashboard/kibana.py @@ -0,0 +1,105 @@ +import elasticsearch + +from kibana_dashboard_api import Visualization, Dashboard +from kibana_dashboard_api import VisualizationsManager, DashboardsManager + + +def generate_dashboard(es_conn, sensor_names, df_scored, index_name): + dashboards = DashboardsManager(es_conn) + dashboard = Dashboard() + dashboard.id = "%s-dashboard" % index_name + dashboard.title = "%s dashboard" % index_name + dashboard.panels = [] + dashboard.options = {"darkTheme": True} + dashboard.time_from = "now-15m" + dashboard.refresh_interval_value = 5000 + dashboard.search_source = { + "filter": [{ + "query": { + "query_string": { + "analyze_wildcard": True, + "query": "*" + } + } + }] + } + visualizations = VisualizationsManager(es_conn) + vis_list = visualizations.get_all() # list all visualizations + panels = [] + i = 0 + for sensor in sensor_names: + viz_id = "%s-%s" % (index_name, sensor) + + # Check if visualization exists + viz = next((v for v in vis_list if v.id == viz_id), None) + if not viz: # If not, create it + viz = Visualization() + viz.id = viz_id + viz.title = "%s-%s" % (index_name, sensor) + viz.search_source = { + "index": index_name, + "query":{ + "query_string":{ + "analyze_wildcard": True, + "query":"*" + } + }, + "filter":[] + } + viz.vis_state = { + "title": "%s-%s" % (index_name, sensor), + "type": "line", + "params": { + "addLegend": True, + "addTimeMarker": True, + "addTooltip": True, + "defaultYExtents": True, + "drawLinesBetweenPoints": True, + "interpolate": "linear", + "radiusRatio": 9, + "scale": "linear", + "setYExtents": False, + "shareYAxis": True, + "showCircles": True, + "smoothLines": True, + "times":[], + "yAxis":{} + }, + "aggs": [ + { + "id": "2", + "type": "max", + "schema":"radius", + "params": { + "field":"SCORE_%s" % sensor + } + } + ], + "listeners": {} + } + try: + res = visualizations.add(viz) + assert res['_id'] == viz_id + except elasticsearch.exceptions.ConflictError: + res = visualizations.update(viz) + + panel = { + "id": viz_id, + "panelIndex": i, + "row": i, + "col": i, + "size_x": 7, + "size_y": 4, + "type": "visualization" + } + panels.append(panel) + ret = dashboard.add_visualization(viz) + i += 1 + + try: + ret = dashboards.add(dashboard) + except elasticsearch.exceptions.ConflictError: + ret = dashboards.update(dashboard) + #pass # Dashboard already exists, let's not overwrite it + + return ret diff --git a/dsio/helpers.py b/dsio/helpers.py new file mode 100644 index 0000000..1bd7487 --- /dev/null +++ b/dsio/helpers.py @@ -0,0 +1,29 @@ +""" Helper functions """ + +import dateparser + + +def detect_time(dataframe, timefield=None, timeunit=None): + """ Attempt to detect the time dimension in a dataframe """ + columns = set(dataframe.columns) + for tfname in ['time', 'datetime', 'date', 'timestamp']: + if tfname in columns: + prev = current = None + for i in dataframe[tfname][:10]: + try: + current = dateparser.parse(str(i)) + # timefield needs to be parsable and always increasing + if not current or (prev and prev > current): + tfname = '' + break + except TypeError: + tfname = '' + break + prev = current + if tfname: + timefield = tfname + if isinstance(i, float) and not timeunit: + timeunit = 's' + break + + return timefield, timeunit diff --git a/dsio/main.py b/dsio/main.py index c4ab310..e15858a 100644 --- a/dsio/main.py +++ b/dsio/main.py @@ -11,14 +11,13 @@ import webbrowser import dateparser -import elasticsearch as ES +import elasticsearch import numpy as np import pandas as pd -from kibana_dashboard_api import Visualization, Dashboard -from kibana_dashboard_api import VisualizationsManager, DashboardsManager - from .restream.elastic import elasticsearch_batch_restreamer +from .dashboard.kibana import generate_dashboard +from .helpers import detect_time def batch_score(X, q=0.99): @@ -29,146 +28,6 @@ def batch_score(X, q=0.99): return scores -def detect_time(dataframe, timeunit): - """ Attempt to detect the time dimension in a dataframe """ - columns = set(dataframe.columns) - for tfname in ['time', 'datetime', 'date', 'timestamp']: - if tfname in columns: - prev = current = None - for i in dataframe[tfname][:10]: - try: - current = dateparser.parse(str(i)) - # timefield needs to be parsable and always increasing - if not current or (prev and prev > current): - tfname = '' - break - except TypeError: - tfname = '' - break - prev = current - if tfname: - timefield = tfname - if isinstance(i, float) and not timeunit: - timeunit = 's' - break - - return timefield, timeunit - - -def generate_dashboard(es_conn, sensor_names, df_scored, index_name): - dashboards = DashboardsManager(es_conn) - dashboard = Dashboard() - dashboard.id = "%s-dashboard" % index_name - dashboard.title = "%s dashboard" % index_name - dashboard.panels = [] - dashboard.options = {"darkTheme": True} - dashboard.time_from = "now-15m" - dashboard.search_source = { - "filter": [{ - "query": { - "query_string": { - "analyze_wildcard": True, - "query":"*" - } - } - }] - } - visualizations = VisualizationsManager(es_conn) - # list all visualizations - #vis_list = visualizations.get_all() - panels = [] - i = 0 - for sensor in sensor_names: - viz_id = "%s-%s" % (index_name, sensor) - vizualization = Visualization() - vizualization.id = viz_id - vizualization.title = "%s-%s" % (index_name, sensor) - vizualization.search_source = { - "index": index_name, - "query":{ - "query_string":{ - "analyze_wildcard": True, - "query":"*" - } - }, - "filter":[] - } - vizualization.vis_state = { - "title":"%s-%s" % (index_name, sensor), - "type":"line", - "params":{ - "addLegend": True, - "addTimeMarker": True, - "addTooltip": True, - "defaultYExtents": True, - "drawLinesBetweenPoints": True, - "interpolate": "linear", - "radiusRatio": 9, - "scale": "linear", - "setYExtents": False, - "shareYAxis": True, - "showCircles": True, - "smoothLines": True, - "times":[], - "yAxis":{} - }, - "aggs": [ - { - "id": "1", - "type": "avg", - "schema":"metric", - "params": { - "field": sensor, - "customLabel": sensor.replace('_', ' ') - } - }, { - "id": "2", - "type": "max", - "schema":"radius", - "params":{ - "field":"SCORE_%s" % sensor - } - }, { - "id": "3", - "type": "date_histogram", - "schema": "segment", - "params":{ - "field": "time", - "interval": "custom", - "customInterval": "5s", - "min_doc_count":1, - "extended_bounds":{} - } - } - ], - "listeners": {} - } - try: - viz = visualizations.add(vizualization) - except ES.exceptions.ConflictError: - pass # Visualization already exists, let's not overwrite it - - panel = { - "id": viz_id, - "panelIndex": i, - "row": i, - "col": i, - "size_x": 7, - "size_y": 4, - "type": "visualization" - } - panels.append(panel) - ret = dashboard.add_visualization(vizualization) - i += 1 - - try: - ret = dashboards.add(dashboard) - except ES.exceptions.ConflictError: - pass # Dashboard already exists, let's not overwrite it - - return ret - - def main(): """ Main function """ @@ -236,14 +95,6 @@ def main(): print('Missing sensors, aborting.') sys.exit() - # Get ES index name and type from args or generate from input name - index_name = args.es_index - if not index_name and args.input: - index_name = args.input.split('/')[-1].split('.')[0].split('_')[0] - if not index_name: - index_name = 'dsio' - _type = args.entry_type - min_time = dataframe[timefield][0] max_time = dataframe[timefield][dataframe[timefield].size-1] @@ -266,15 +117,22 @@ def main(): values = df_scored[sensor].astype('category').cat.codes df_scored['SCORE_{}'.format(sensor)] = batch_score(values) + # Get ES index name and type from args or generate from input name + index_name = args.es_index + if not index_name and args.input: + index_name = args.input.split('/')[-1].split('.')[0].split('_')[0] + if not index_name: + index_name = 'dsio' + ### Adding index name and type for all events: df_scored['_index'] = index_name - df_scored['_type'] = _type + df_scored['_type'] = args.entry_type # init ElasticSearch - es_conn = ES.Elasticsearch(args.es_uri) + es_conn = elasticsearch.Elasticsearch(args.es_uri) try: es_conn.info() - except ES.ConnectionError: + except elasticsearch.ConnectionError: print('Cannot connect to Elasticsearch at %s' % args.es_uri) sys.exit() diff --git a/requirements.txt b/requirements.txt index 39aaf60..f52a3b4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ elasticsearch numpy scipy pandas +-e git://github.com/d-mo/kibana-dashboard-api#egg=kibana_dashboard_api \ No newline at end of file