Skip to content

Commit

Permalink
configure Kibana index patterns
Browse files Browse the repository at this point in the history
  • Loading branch information
d-mo committed Dec 1, 2017
1 parent e1e8ed1 commit f6f32c7
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 10 deletions.
5 changes: 2 additions & 3 deletions arxiv/utils_restreamer.py
Expand Up @@ -182,7 +182,7 @@ def write_to_rabbit_csv(final_data_frame):
row = final_data_frame.iloc[num] # this a dataframe that row = final_data_frame.iloc[num] # this a dataframe that
# corresponds to the num line of the big dataframe # corresponds to the num line of the big dataframe
csv_string=final_data_frame[num:(num+1)].to_csv(path_or_buf=None, csv_string=final_data_frame[num:(num+1)].to_csv(path_or_buf=None,
index=False, index=False,
header=False) header=False)
# csv_string = StringIO() # csv_string = StringIO()
# row.to_csv(csv_string, header=False) # row.to_csv(csv_string, header=False)
Expand Down Expand Up @@ -573,8 +573,7 @@ def main():
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT) logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)


validate_conf(gconf) validate_conf(gconf)
config_dict = read_or_init_restreamed_metadata( config_dict = read_or_init_restreamed_metadata(METADATA_FILE)
METADATA_FILE)
try: try:
number_of_executions = gconf['numberOfDaysToReplay'] number_of_executions = gconf['numberOfDaysToReplay']
for i in range(number_of_executions): for i in range(number_of_executions):
Expand Down
41 changes: 40 additions & 1 deletion dsio/dashboard/kibana.py
Expand Up @@ -4,7 +4,7 @@
from kibana_dashboard_api import VisualizationsManager, DashboardsManager from kibana_dashboard_api import VisualizationsManager, DashboardsManager




def generate_dashboard(es_conn, sensor_names, df_scored, index_name): def generate_dashboard(es_conn, sensor_names, df_scored, index_name, timefield='time'):
dashboards = DashboardsManager(es_conn) dashboards = DashboardsManager(es_conn)
dashboard = Dashboard() dashboard = Dashboard()
dashboard.id = "%s-dashboard" % index_name dashboard.id = "%s-dashboard" % index_name
Expand Down Expand Up @@ -67,12 +67,31 @@ def generate_dashboard(es_conn, sensor_names, df_scored, index_name):
}, },
"aggs": [ "aggs": [
{ {
"id": "1",
"type": "avg",
"schema":"metric",
"params": {
"field": sensor,
"customLabel": sensor.replace('_', ' ')
}
}, {
"id": "2", "id": "2",
"type": "max", "type": "max",
"schema":"radius", "schema":"radius",
"params": { "params": {
"field":"SCORE_%s" % sensor "field":"SCORE_%s" % sensor
} }
}, {
"id": "3",
"type": "date_histogram",
"schema": "segment",
"params":{
"field": timefield,
"interval": "custom",
"customInterval": "5s",
"min_doc_count": 1,
"extended_bounds": {}
}
} }
], ],
"listeners": {} "listeners": {}
Expand All @@ -96,10 +115,30 @@ def generate_dashboard(es_conn, sensor_names, df_scored, index_name):
ret = dashboard.add_visualization(viz) ret = dashboard.add_visualization(viz)
i += 1 i += 1


# Create the index if it does not exist
if not es_conn.indices.exists(index_name):
index_properties = {"time" : {"type": "date"}}
body = {"mappings": {index_name: {"properties": index_properties}}}
es_conn.indices.create(index=index_name, body=body)

try: try:
ret = dashboards.add(dashboard) ret = dashboards.add(dashboard)
except elasticsearch.exceptions.ConflictError: except elasticsearch.exceptions.ConflictError:
ret = dashboards.update(dashboard) ret = dashboards.update(dashboard)
#pass # Dashboard already exists, let's not overwrite it #pass # Dashboard already exists, let's not overwrite it


es_conn.index(index='.kibana', doc_type="index-pattern", id=index_name,
body={"title": index_name, "timeFieldName": "time"})

kibana_config = es_conn.search(index='.kibana',
sort={'_uid': {'order': 'desc'}},
doc_type='config')
try:
kibana_id = kibana_config['hits']['hits'][0]['_id']
except:
raise # TODO

es_conn.update(index='.kibana', doc_type='config', id=kibana_id,
body={"doc": {"defaultIndex" : index_name}})

return ret return ret
12 changes: 7 additions & 5 deletions dsio/restream/elastic.py
@@ -1,8 +1,6 @@
""" """
Elasticsearch batch re-streamer Elasticsearch batch re-streamer
""" """
from __future__ import print_function

import time import time
import datetime import datetime


Expand All @@ -11,7 +9,7 @@
from elasticsearch import helpers from elasticsearch import helpers




def batchRedater(X, timefield, hz = 10): def batchRedater(X, timefield, hz=10):
# send 10 datapoints a second # send 10 datapoints a second
now = np.int(np.round(time.time())) now = np.int(np.round(time.time()))
X[timefield] = (now*1000 + X.index._data*hz) X[timefield] = (now*1000 + X.index._data*hz)
Expand All @@ -36,14 +34,18 @@ def df2es(Y, index_name, es, index_properties=None, recreate=True,
print('Deleting existing index {}'.format(index_name)) print('Deleting existing index {}'.format(index_name))
except: except:
pass pass

print('Creating index {}'.format(index_name)) print('Creating index {}'.format(index_name))
es.indices.create(index_name, body = body) es.indices.create(index_name, body=body)


# Formatting the batch to upload as a tuple of dictionnaries # Formatting the batch to upload as a tuple of dictionnaries
list_tmp = tuple(Y.fillna(0).T.to_dict().values()) list_tmp = tuple(Y.fillna(0).T.to_dict().values())

# Exporting to ES # Exporting to ES
out = helpers.bulk(es, list_tmp) out = helpers.bulk(es, list_tmp)


return out



# X = features # X = features
def elasticsearch_batch_restreamer(X, timefield, es, index_name, def elasticsearch_batch_restreamer(X, timefield, es, index_name,
Expand All @@ -58,7 +60,7 @@ def elasticsearch_batch_restreamer(X, timefield, es, index_name,
everyX = 200 everyX = 200


virtual_time = np.min(X[timefield]) virtual_time = np.min(X[timefield])
recreate = True recreate = False
while virtual_time < np.max(X[timefield]): while virtual_time < np.max(X[timefield]):
start_time = virtual_time start_time = virtual_time
virtual_time += everyX*1000 virtual_time += everyX*1000
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -38,7 +38,7 @@
'Development Status :: 2 - Pre-Alpha', 'Development Status :: 2 - Pre-Alpha',
'Intended Audience :: Developers', 'Intended Audience :: Developers',
'Natural Language :: English', 'Natural Language :: English',
'Programming Language :: Python :: 2.7' 'Programming Language :: Python :: 3.5'
], ],
entry_points={ entry_points={
'console_scripts': ['dsio=dsio.main:main'], 'console_scripts': ['dsio=dsio.main:main'],
Expand Down

0 comments on commit f6f32c7

Please sign in to comment.