# Discovery Dashboard Importer

This data importer aims to help you import data from Cerebral Cortex into Discovery Dashboard. We offer convenient and flexible options for you to customize your dashboard.

If you are not using Cerebral Cortex, you can also use this importer, but be sure to change the paths in the configuration section according to the comments.

## Important

- In order to generate meaningful and accurate visualizations, please edit the configurations according to your needs.
    - Mistakes in configurations might introduce undesirable behaviors of the Dashboard.
- Since not all data stored on Cerebral Cortex follow the same format (eg. data descriptors), some of the data might require some additional preprocessing in order to work. 
    - This importer is provided in the Jupyter Notebook so that you can easily preprocess your data :)

## Concepts

Discovery Dashboard is designed for studies with timeseries **measurements** and/or **events**.

- **Stream**: A stream of time series, with a meaningful name
    - **Measurements**: Streams with data points where each data point consists of a timestamp and a single scalar value. (eg. heart rate)
    - **Events**: Streams with data points where each data point consists of a timestamp. (eg. smoking)
    
Notice that **stream** for the dashboard is more restrictive than CerebralCortex Core Library's datastream datatype. This is because fields like lists/JSON won't be directly visualized as line paths. If you have a datastream of tuple, consider separating them into multiple streams.

## Troubleshooting

If anything goes wrong, manually remove the named docker volume `cerebralcortexdockercompose_chronix_data` and restart the services.

For more info about using `docker volume rm` see [docker's official documentation](https://docs.docker.com/engine/reference/commandline/volume_rm/)

If using Cerebral Cortex Vagrant, remember to first `vagrant ssh` and then use docker related commands.

## Under the hood

Discovery Dashboard uses Chronix Server as time series storage. Chronix acceptes influx line protocol (with microsecond precision), which is used by the importer. You can easily write your own importer if you'd like to.

# Sample code to bridge your data to the importer

 - Converts data from Cerebral Cortex API
 - You can modify the code or just write your own
 - Code here is deliberately verbose for readability
 - If you don't have real data, but still want to test out the dashboard, scroll to the end of the notebook and use the *(Optional) Sample Data* Cell
 
## Optional
 
 - The following example uses UUID. If you don't want to display the data anonymously, you can use the usernames instead.
     - It's just a unique identifier to group the streams.

In [None]:
STUDY_NAME = 'demo'

# For measurements, input lists of tuples (user_id, datetime, scalar_value)
sample_data_battery_measurements = []

# For events, input lists of tuples (user_id, datetime)
sample_data_notification_post_events = []


from cerebralcortex.cerebralcortex import CerebralCortex
from datetime import datetime
# Connect to CC Core API
CC = CerebralCortex("../cc_config_file/cc_vagrant_configuration.yml")
# Get all users in the specified study
users = CC.get_all_users(STUDY_NAME)

for user in users:
    user_id = user["identifier"]
    user_streams = CC.get_user_streams(user_id)
    for _, val in user_streams.items():
        # Here val is CC's datastream object
        for stream_id in val["stream_ids"]:
            # Here are samples from cc_demo
            # If you've walked through the other tutorials you might already
            # have these sample data
            if val["name"] == "CU_NOTIF_POST_PACKAGE--org.md2k.mcerebrum":
                data_stream = CC.get_stream(stream_id=stream_id, day=CC.get_stream_days(stream_id)[0])
                for data_point in data_stream.data:
                    sample_data_notification_post_events.append((user_id, data_point.start_time))
            if val["name"] == "BATTERY--org.md2k.mcerebrum--PHONE":
                data_stream = CC.get_stream(stream_id=stream_id, day=CC.get_stream_days(stream_id)[0])
                for data_point in data_stream.data:
                    sample_data_battery_measurements.append((user_id, data_point.start_time, data_point.sample[1]))
                
sample_data_notification_post_events

# Configurations

- Modify the config below and run this cell

In [None]:
importer_config = {
#--------------------------------------
# URL to Dashboard server and database
# If using Cerebral Cortex, use the following two lines:
    "server_endpoint": "http://dashboardserver:4567/api/config",
    "chronix_endpoint": "http://chronix:8983/solr/chronix/ingest/influxdb/write",
# If running locally via docker on mac, linux, or Windows 10 with Hyper V:
#     "server_endpoint": "http://localhost:4567/api/config",
#     "chronix_endpoint": "http://localhost:8983/solr/chronix/ingest/influxdb/write",
# If running locally via Docker Toolbox on Windows:
#     "server_endpoint": "http://192.168.99.100:4567/api/config",
#     "chronix_endpoint": "http://192.168.99.100:8983/solr/chronix/ingest/influxdb/write",
#---------------------------------------
# Name of the study that you want to visualize
    "study_name": "Demo",
# Stream Names
# - Dictionary key is whatever name you want to display
# - Dictionary value is a list
#   - For "measurements", the list contains tuples of (user_id, datetime, scalar_value)
#   - For "events", the list contains tuples of (user_id, datetime)
    "measurements": {
        "battery": sample_data_battery_measurements,
        "temperature": example_measurements_2
    },
    "events": {
        "notification": sample_data_notification_post_events,
        "sms": example_events_2
    },
# The time series resolutions that you want to use for resampling
# ["high frequency", "low frequency"] using pandas notation https://pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases
# These will be displayed in and out of zoomed lens respectively
# Commonly ["1Min", "5Min"]
    "resolutions": ["1Min", "5Min"],
# Specify a timezone that you want to display the data in
# The timezone string needs to be in the format of IANA Timezone database
# eg. "Asia/Shanghai", "America/New_York", "UTC"
# Warning: If the timezone is not set correctly, the browser can demonstrate unexpected behaviors
    "timezone": "America/New_York"
}

# Importer

- Run the cell below to start import
- You don't need to modify this, though you can write your own importer if you want to

In [None]:
# -------------------------------------------------------------
# Code for the importer itself
# 
# Don't modify unless you know what you are doing
# -------------------------------------------------------------
import pandas as pd
import requests
import json

def format_tags(tags):
    return ','.join('{}={}'.format(k, v) for k, v in tags.items())

def format_name(tags):
    return '-'.join('{}'.format(v) for _, v in tags.items())

def format_influx_line_protocol(measurement, tags, value,
                                ts):
    return '{},{} {} {}'.format(measurement, tags, 'value={}'.format(value),
                                ts)

def import_into_dashboard(**kwargs):
    configs = {}
    # Required by importer
    configs["server_endpoint"] = kwargs["server_endpoint"]
    configs["chronix_endpoint"] = kwargs["chronix_endpoint"]
    configs["study_name"] = kwargs["study_name"]
    configs["measurements"] = list(kwargs["measurements"].keys())
    configs["events"] = list(kwargs["events"].keys())
    configs["resolutions"] = kwargs["resolutions"]
    configs["timezone"] = kwargs["timezone"]
    measurements = kwargs["measurements"]
    events = kwargs["events"]
    # Optional Customizations
    configs["title"] = kwargs.get("title", "Discovery Dashboard: " + configs["study_name"])
    
    # Import configs to dashboard server
    r = requests.post(configs['server_endpoint'], data=json.dumps(configs))
    print(r.text)
    
    # Import events
    for stream_name, event_stream in events.items():
        request_buffer = []
        for (user_id, pydatetime) in event_stream:
            data_dict = {
                'studyName': configs["study_name"],
                'streamName': stream_name,
                'groupById': user_id,
                'metricType': 'event',
                'type': 'metric',
            }
            request_buffer.append(format_influx_line_protocol(
                format_name(data_dict),
                format_tags(data_dict),
                1,
                int(pydatetime.timestamp() * 1000)
            ))
        r = requests.post(configs["chronix_endpoint"], data='\n'.join(request_buffer))
        print(r.text)
    
    
    # Import measurements
    for stream_name, measurement_stream in measurements.items():
        df = pd.DataFrame(measurement_stream, columns=['user_id', 'timestamp', 'val']).set_index(['timestamp'])
        for resolution in configs["resolutions"]:
            request_buffer = []
            df_res = df.groupby(['user_id']).resample(resolution).mean().fillna(method='pad', limit=1).fillna(-1).reset_index()
            for (user_id, timestamp, value) in [(val[0], int(val[1].to_pydatetime().timestamp() * 1000), val[2]) for val in df_res.values]:
                data_dict = {
                    'studyName': configs["study_name"],
                    'streamName': stream_name,
                    'resolution': resolution,
                    'groupById': user_id,
                    'metricType': 'measurement',
                    'type': 'metric',
                }
                tags_str = format_tags(data_dict)
                name_str = format_name(data_dict)
                request_buffer.append(format_influx_line_protocol(name_str, tags_str, value, timestamp))
            r = requests.post(configs["chronix_endpoint"], data='\n'.join(request_buffer))
            print(r.text)


import_into_dashboard(**importer_config)

# (Optional) Sample Data

- If you don't have real data, but just want to test out the dashboard
- Run the cell below to generate some random dummy sample data
- Remember to plug them into the config cell above

In [None]:
import pandas as pd
import datetime
import uuid
import random

user_id_list = [str(uuid.uuid4()) for i in range(20)]

measurements_stream_1 = []
measurements_stream_2 = []
events_stream_1 = []
events_stream_2 = []

for user_id in user_id_list:
    measurements_stream_1.extend([(user_id, datetime.datetime.now() + datetime.timedelta(seconds=(60 * i)), random.random()) for i in range(60 * 24 * 3)])
    measurements_stream_2.extend([(user_id, datetime.datetime.now() + datetime.timedelta(seconds=(60 * i)), random.random()) for i in range(60 * 24 * 3)])
    events_stream_1.extend([(user_id, datetime.datetime.now() + datetime.timedelta(seconds=(3600 * i))) for i in range(48)])
    events_stream_2.extend([(user_id, datetime.datetime.now() + datetime.timedelta(seconds=(3600 * i))) for i in range(72)])

sample_data_battery_measurements = measurements_stream_1
sample_data_notification_post_events = events_stream_1
example_measurements_2 = measurements_stream_2
example_events_2 = events_stream_2