diff --git a/arxiv/utils_restreamer.py b/arxiv/utils_restreamer.py index 0b88f77..74f0117 100644 --- a/arxiv/utils_restreamer.py +++ b/arxiv/utils_restreamer.py @@ -182,7 +182,7 @@ def write_to_rabbit_csv(final_data_frame): row = final_data_frame.iloc[num] # this a dataframe that # corresponds to the num line of the big dataframe csv_string=final_data_frame[num:(num+1)].to_csv(path_or_buf=None, - index=False, + index=False, header=False) # csv_string = StringIO() # row.to_csv(csv_string, header=False) @@ -573,8 +573,7 @@ def main(): logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) validate_conf(gconf) - config_dict = read_or_init_restreamed_metadata( - METADATA_FILE) + config_dict = read_or_init_restreamed_metadata(METADATA_FILE) try: number_of_executions = gconf['numberOfDaysToReplay'] for i in range(number_of_executions): diff --git a/dsio/dashboard/kibana.py b/dsio/dashboard/kibana.py index 15c252b..cc0e6f8 100644 --- a/dsio/dashboard/kibana.py +++ b/dsio/dashboard/kibana.py @@ -4,7 +4,7 @@ from kibana_dashboard_api import VisualizationsManager, DashboardsManager -def generate_dashboard(es_conn, sensor_names, df_scored, index_name): +def generate_dashboard(es_conn, sensor_names, df_scored, index_name, timefield='time'): dashboards = DashboardsManager(es_conn) dashboard = Dashboard() dashboard.id = "%s-dashboard" % index_name @@ -67,12 +67,31 @@ def generate_dashboard(es_conn, sensor_names, df_scored, index_name): }, "aggs": [ { + "id": "1", + "type": "avg", + "schema":"metric", + "params": { + "field": sensor, + "customLabel": sensor.replace('_', ' ') + } + }, { "id": "2", "type": "max", "schema":"radius", "params": { "field":"SCORE_%s" % sensor } + }, { + "id": "3", + "type": "date_histogram", + "schema": "segment", + "params":{ + "field": timefield, + "interval": "custom", + "customInterval": "5s", + "min_doc_count": 1, + "extended_bounds": {} + } } ], "listeners": {} @@ -96,10 +115,30 @@ def generate_dashboard(es_conn, sensor_names, df_scored, index_name): ret = dashboard.add_visualization(viz) i += 1 + # Create the index if it does not exist + if not es_conn.indices.exists(index_name): + index_properties = {"time" : {"type": "date"}} + body = {"mappings": {index_name: {"properties": index_properties}}} + es_conn.indices.create(index=index_name, body=body) + try: ret = dashboards.add(dashboard) except elasticsearch.exceptions.ConflictError: ret = dashboards.update(dashboard) #pass # Dashboard already exists, let's not overwrite it + es_conn.index(index='.kibana', doc_type="index-pattern", id=index_name, + body={"title": index_name, "timeFieldName": "time"}) + + kibana_config = es_conn.search(index='.kibana', + sort={'_uid': {'order': 'desc'}}, + doc_type='config') + try: + kibana_id = kibana_config['hits']['hits'][0]['_id'] + except: + raise # TODO + + es_conn.update(index='.kibana', doc_type='config', id=kibana_id, + body={"doc": {"defaultIndex" : index_name}}) + return ret diff --git a/dsio/restream/elastic.py b/dsio/restream/elastic.py index e4f660d..3fcd0d0 100644 --- a/dsio/restream/elastic.py +++ b/dsio/restream/elastic.py @@ -1,8 +1,6 @@ """ Elasticsearch batch re-streamer """ -from __future__ import print_function - import time import datetime @@ -11,7 +9,7 @@ from elasticsearch import helpers -def batchRedater(X, timefield, hz = 10): +def batchRedater(X, timefield, hz=10): # send 10 datapoints a second now = np.int(np.round(time.time())) X[timefield] = (now*1000 + X.index._data*hz) @@ -36,14 +34,18 @@ def df2es(Y, index_name, es, index_properties=None, recreate=True, print('Deleting existing index {}'.format(index_name)) except: pass + print('Creating index {}'.format(index_name)) - es.indices.create(index_name, body = body) + es.indices.create(index_name, body=body) # Formatting the batch to upload as a tuple of dictionnaries list_tmp = tuple(Y.fillna(0).T.to_dict().values()) + # Exporting to ES out = helpers.bulk(es, list_tmp) + return out + # X = features def elasticsearch_batch_restreamer(X, timefield, es, index_name, @@ -58,7 +60,7 @@ def elasticsearch_batch_restreamer(X, timefield, es, index_name, everyX = 200 virtual_time = np.min(X[timefield]) - recreate = True + recreate = False while virtual_time < np.max(X[timefield]): start_time = virtual_time virtual_time += everyX*1000 diff --git a/setup.py b/setup.py index f4d34f7..7e7e8ff 100644 --- a/setup.py +++ b/setup.py @@ -38,7 +38,7 @@ 'Development Status :: 2 - Pre-Alpha', 'Intended Audience :: Developers', 'Natural Language :: English', - 'Programming Language :: Python :: 2.7' + 'Programming Language :: Python :: 3.5' ], entry_points={ 'console_scripts': ['dsio=dsio.main:main'],