## Interacting with CerebralCortex Data

Cerebral Cortex is MD2K's big data cloud tool designed to support population-scale data analysis, visualization, model development, and intervention design for mobile-sensor data. It provides the ability to do machine learning model development on population scale datasets and provides interoperable interfaces for aggregation of diverse data sources.

This page provides an overview of the core Cerebral Cortex operations to familiarilze you with how to discover and interact with different sources of data that could be contained within the system.

_Note:_ While some of these examples are showing generated data, they are designed to function on real-world mCerebrum data and the signal generators were built to facilitate the testing and evaluation of the Cerebral Cortex platform by those individuals that are unable to see those original datasets or do not wish to collect data before evaluating the system.

## Setting Up Environment

Notebook does not contain the necessary runtime enviornments necessary to run Cerebral Cortex.  The following commands will download and install these tools, framework, and datasets.

In [None]:
import importlib, sys, os
from os.path import expanduser
sys.path.insert(0, os.path.abspath('..'))

DOWNLOAD_USER_DATA=False
ALL_USERS=False #this will only  work if DOWNLOAD_USER_DATA=True
IN_COLAB = 'google.colab' in sys.modules
MD2K_JUPYTER_NOTEBOOK = "MD2K_JUPYTER_NOTEBOOK" in os.environ
if (get_ipython().__class__.__name__=="ZMQInteractiveShell"): IN_JUPYTER_NOTEBOOK = True
JAVA_HOME_DEFINED = "JAVA_HOME" in os.environ
SPARK_HOME_DEFINED = "SPARK_HOME" in os.environ
PYSPARK_PYTHON_DEFINED = "PYSPARK_PYTHON" in os.environ
PYSPARK_DRIVER_PYTHON_DEFINED = "PYSPARK_DRIVER_PYTHON" in os.environ
HAVE_CEREBRALCORTEX_KERNEL = importlib.util.find_spec("cerebralcortex") is not None
SPARK_VERSION = "3.1.2"
SPARK_URL = "https://archive.apache.org/dist/spark/spark-"+SPARK_VERSION+"/spark-"+SPARK_VERSION+"-bin-hadoop2.7.tgz"
SPARK_FILE_NAME = "spark-"+SPARK_VERSION+"-bin-hadoop2.7.tgz"
CEREBRALCORTEX_KERNEL_VERSION = "3.3.14"

DATA_PATH = expanduser("~")
if DATA_PATH[:-1]!="/":
    DATA_PATH+="/"
USER_DATA_PATH = DATA_PATH+"cc_data/"

if MD2K_JUPYTER_NOTEBOOK:
    print("Java, Spark, and CerebralCortex-Kernel are installed and paths are already setup.")
else:

    SPARK_PATH = DATA_PATH+"spark-"+SPARK_VERSION+"-bin-hadoop2.7/"
    

    if(not HAVE_CEREBRALCORTEX_KERNEL):
        print("Installing CerebralCortex-Kernel")
        !pip -q install cerebralcortex-kernel==$CEREBRALCORTEX_KERNEL_VERSION
    else:
        print("CerebralCortex-Kernel is already installed.")

    if not JAVA_HOME_DEFINED:
        if not os.path.exists("/usr/lib/jvm/java-8-openjdk-amd64/") and not os.path.exists("/usr/lib/jvm/java-11-openjdk-amd64/"):
            print("\nInstalling/Configuring Java")
            !sudo apt update
            !sudo apt-get install -y openjdk-8-jdk-headless
            os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64/"
        elif os.path.exists("/usr/lib/jvm/java-8-openjdk-amd64/"):
            print("\nSetting up Java path")
            os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64/"
        elif  os.path.exists("/usr/lib/jvm/java-11-openjdk-amd64/"):
            print("\nSetting up Java path")
            os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64/"
    else:
        print("JAVA is already installed.")

    if (IN_COLAB or IN_JUPYTER_NOTEBOOK) and not MD2K_JUPYTER_NOTEBOOK:
        if SPARK_HOME_DEFINED:
            print("SPARK is already installed.")
        elif not os.path.exists(SPARK_PATH):
            print("\nSetting up Apache Spark ", SPARK_VERSION)
            !pip -q install findspark
            import pyspark
            spark_installation_path = os.path.dirname(pyspark.__file__)
            import findspark
            findspark.init(spark_installation_path)
            if not os.getenv("PYSPARK_PYTHON"):
                os.environ["PYSPARK_PYTHON"] = os.popen('which python3').read().replace("\n","")
            if not os.getenv("PYSPARK_DRIVER_PYTHON"):
                os.environ["PYSPARK_DRIVER_PYTHON"] = os.popen('which python3').read().replace("\n","")
        else:
            print("SPARK is already installed.")
    else:
        raise SystemExit("Please check your environment configuration at: https://github.com/MD2Korg/CerebralCortex-Kernel/")

if DOWNLOAD_USER_DATA:
    if not os.path.exists(USER_DATA_PATH):
        if ALL_USERS:
            print("\nDownloading all users' data.")
            !rm -rf $USER_DATA_PATH
            !wget -q http://mhealth.md2k.org/images/datasets/cc_data.tar.bz2 && tar -xf cc_data.tar.bz2 -C $DATA_PATH && rm cc_data.tar.bz2
        else:
            print("\nDownloading a user's data.")
            !rm -rf $USER_DATA_PATH
            !wget -q http://mhealth.md2k.org/images/datasets/s2_data.tar.bz2 && tar -xf s2_data.tar.bz2 -C $DATA_PATH && rm s2_data.tar.bz2
    else:
        print("Data already exist. Please remove folder", USER_DATA_PATH, "if you want to download the data again")

Installing CerebralCortex-Kernel
[K     |████████████████████████████████| 194 kB 5.2 MB/s 
[K     |████████████████████████████████| 1.3 MB 9.3 MB/s 
[K     |████████████████████████████████| 100 kB 9.1 MB/s 
[K     |████████████████████████████████| 105 kB 25.9 MB/s 
[K     |████████████████████████████████| 21.8 MB 6.5 MB/s 
[K     |████████████████████████████████| 20.6 MB 1.3 MB/s 
[K     |████████████████████████████████| 721 kB 40.9 MB/s 
[K     |████████████████████████████████| 636 kB 37.2 MB/s 
[K     |████████████████████████████████| 212.4 MB 61 kB/s 
[K     |████████████████████████████████| 77 kB 7.4 MB/s 
[K     |████████████████████████████████| 44 kB 3.0 MB/s 
[K     |████████████████████████████████| 94 kB 3.9 MB/s 
[K     |████████████████████████████████| 198 kB 61.3 MB/s 
[K     |████████████████████████████████| 554 kB 42.9 MB/s 
[?25h  Building wheel for datascience (setup.py) ... [?25l[?25hdone
  Building wheel for hdfs3 (setup.py) ... [?25l[?

# Import Your Own Data
mCerebrum is not the only way to collect and load data into *Cerebral Cortex*.  It is possible to import your own structured datasets into the platform. This example will demonstrate how to load existing data and subsequently how to read it back from Cerebral Cortex through the same mechanisms you have been utilizing.  Additionally, it demonstrates how to write a custom data transformation fuction to manipulate data and produce a smoothed result which can then be visualized.

## Initialize the system

In [None]:
from cerebralcortex.kernel import Kernel
CC = Kernel(cc_configs="default", study_name="default", new_study=True)

  """)


# Import Data
Cerebral Cortex provides a set of predefined data import routines that fit typical use cases.  The most common is CSV data parser, `csv_data_parser`.  These parsers are easy to write and can be extended to support most types of data.  Additionally, the data importer, `import_data`, needs to be brought into this notebook so that we can start the data import process.

The `import_data` method requires several parameters that are discussed below.
- `cc_config`: The path to the configuration files for Cerebral Cortex; this is the same folder that you would utilize for the `Kernel` initialization
- `input_data_dir`: The path to where the data to be imported is located; in this example, `sample_data` is available in the file/folder browser on the left and you should explore the files located inside of it
- `user_id`: The universally unique identifier (UUID) that owns the data to be imported into the system
- `data_file_extension`: The type of files to be considered for import
- `data_parser`: The import method or another that defines how to interpret the data samples on a per-line basis
- `gen_report`: A simple True/False value that controls if a report is printed to the screen when complete

### Download sample data

In [None]:
sample_file = DATA_PATH+"data.csv"
!wget -q https://raw.githubusercontent.com/MD2Korg/CerebralCortex/master/jupyter_demo/sample_data/data.csv -O $sample_file

In [None]:
iot_stream = CC.read_csv(file_path=sample_file, stream_name="some-sample-iot-stream", column_names=["timestamp", "some_vals", "version", "user"])

## View Imported Data

In [None]:
iot_stream.show(4)

+-------------------+-----------+-------+--------------------+
|          timestamp|  some_vals|version|                user|
+-------------------+-----------+-------+--------------------+
|2019-01-09 17:35:00|0.085188727|      1|00000000-afb8-476...|
|2019-01-09 17:35:01|0.168675497|      1|00000000-afb8-476...|
|2019-01-09 17:35:02|0.740485082|      1|00000000-afb8-476...|
|2019-01-09 17:35:03|0.713160997|      1|00000000-afb8-476...|
+-------------------+-----------+-------+--------------------+
only showing top 4 rows



## Document Data

In [None]:
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata, DataDescriptor, ModuleMetadata

stream_metadata = Metadata()
stream_metadata.set_name("iot-data-stream").set_description("This is randomly generated data for demo purposes.") \
    .add_dataDescriptor(
        DataDescriptor().set_name("timestamp").set_type("datetime").set_attribute("description", "UTC timestamp of data point collection.")) \
    .add_dataDescriptor(
    DataDescriptor().set_name("some_vals").set_type("float").set_attribute("description", \
    "Random values").set_attribute("range", \
    "Data is between 0 and 1.")) \
    .add_dataDescriptor(
        DataDescriptor().set_name("version").set_type("int").set_attribute("description", "version of the data")) \
    .add_dataDescriptor(
        DataDescriptor().set_name("user").set_type("string").set_attribute("description", "user id")) \
    .add_module(ModuleMetadata().set_name("cerebralcortex.data_importer").set_attribute("url", "hhtps://md2k.org").set_author(
        "Nasir Ali", "dev@md2k.org"))
iot_stream.metadata = stream_metadata

## View Metadata

In [None]:
iot_stream.metadata

{
    "annotations": [],
    "data_descriptor": [
        {
            "attributes": {
                "description": "UTC timestamp of data point collection."
            },
            "name": "timestamp",
            "type": "datetime"
        },
        {
            "attributes": {
                "description": "Random values",
                "range": "Data is between 0 and 1."
            },
            "name": "some_vals",
            "type": "float"
        },
        {
            "attributes": {
                "description": "version of the data"
            },
            "name": "version",
            "type": "int"
        },
        {
            "attributes": {
                "description": "user id"
            },
            "name": "user",
            "type": "string"
        }
    ],
    "description": "This is randomly generated data for demo purposes.",
    "input_streams": [],
    "modules": [
        {
            "attributes": {
                "url": "hhtps

## How to write an algorithm
This section provides an example of how to write a simple smoothing algorithm and apply it to the data that was just imported.

### Import the necessary modules

In [None]:
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType, IntegerType
from pyspark.sql.functions import minute, second, mean, window
from pyspark.sql import functions as F
import numpy as np

### Define the Schema
This schema defines what the computation module will return to the execution context for each row or window in the datastream.

In [None]:
# column name and return data type
# acceptable data types for schem are - "null", "string", "binary", "boolean",
# "date", "timestamp", "decimal", "double", "float", "byte", "integer",
# "long", "short", "array", "map", "structfield", "struct"
schema="timestamp timestamp, some_vals double, version int, user string, vals_avg double"


### Write a user defined function
The user-defined function (UDF) is one of two mechanisms available for distributed data processing within the Apache Spark framework. In this case, we are computing a simple windowed average.

In [None]:
def smooth_algo(key, df):
  # key contains all the grouped column values
  # In this example, grouped columns are (userID, version, window{start, end})
  # For example, if you want to get the start and end time of a window, you can
  # get both values by calling key[2]["start"] and key[2]["end"]
  some_vals_mean = df["some_vals"].mean()
  df["vals_avg"] = some_vals_mean
  return df

## Run the smoothing algorithm on imported data
The smoothing algorithm is applied to the datastream by calling the `run_algorithm` method and passing the method as a parameter along with which columns, `some_vals`, that should be sent.  Finally, the `windowDuration` parameter specified the size of the time windows on which to segment the data before applying the algorithm.  Notice that when the next cell is run, the operation completes nearly instantaneously.  This is due to the lazy evaluation aspects of the Spark framework.  When you run the next cell to show the data, the algorithm will be applied to the whole dataset before displaying the results on the screen. 

In [None]:
smooth_stream = iot_stream.compute(smooth_algo, schema=schema, windowDuration=10)

In [None]:
smooth_stream.show(truncate=False)

+-------------------+-----------+-------+------------------------------------+-------------------+
|timestamp          |some_vals  |version|user                                |vals_avg           |
+-------------------+-----------+-------+------------------------------------+-------------------+
|2019-01-09 17:46:30|0.070952751|1      |00000000-afb8-476e-9872-6472b4e66b68|0.37378515760000003|
|2019-01-09 17:46:31|0.279759975|1      |00000000-afb8-476e-9872-6472b4e66b68|0.37378515760000003|
|2019-01-09 17:46:32|0.096120952|1      |00000000-afb8-476e-9872-6472b4e66b68|0.37378515760000003|
|2019-01-09 17:46:33|0.121091841|1      |00000000-afb8-476e-9872-6472b4e66b68|0.37378515760000003|
|2019-01-09 17:46:34|0.356470355|1      |00000000-afb8-476e-9872-6472b4e66b68|0.37378515760000003|
|2019-01-09 17:46:35|0.800499717|1      |00000000-afb8-476e-9872-6472b4e66b68|0.37378515760000003|
|2019-01-09 17:46:36|0.799160143|1      |00000000-afb8-476e-9872-6472b4e66b68|0.37378515760000003|
|2019-01-0

## Visualize data
These are two plots that show the original and smoothed data to visually check how the algorithm transformed the data.

In [None]:
from cerebralcortex.plotting.basic.plots import plot_timeseries

In [None]:
plot_timeseries(iot_stream)


Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working


Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated since Python 3.3,and in 3.9 it will stop working



In [None]:
plot_timeseries(smooth_stream)