In [1]:
# Create reducer
from learning_observer.stream_analytics.helpers import kvs_pipeline, KeyField, Scope

@kvs_pipeline(scope=Scope([KeyField.STUDENT]), module_override='testing')
async def event_counter(event, state):
    if state is None:
        state = {}
    state['event_count'] = state.get('event_count', 0) + 1
    return state, state

In [2]:
import learning_observer.jupyter_helpers

ID = 'event_counter'
module = 'example_mod'

# add endpoint
learning_observer.jupyter_helpers.add_reducer_to_execution_dag(ID, event_counter, module, {'event_count': 0})
# start communication protocol endpoint
# await learning_observer.jupyter_helpers.serve_communication_protocol_endpoint()

Loading execution DAG from example_mod


In [3]:
# Create a dashboard to connect to the reducer you just wrote
# This basic dashbaord will show the message back from the server
# in a markdown format.
import dash
from dash import Dash, html, dcc, callback, Output, Input, State, clientside_callback, Patch
import time
import json
import lo_dash_react_components as lodrc
import pandas as pd
import plotly.graph_objects as go

app = Dash(__name__)

fig = go.Figure(data=go.Scatter(
    x=pd.Series(dtype=object), y=pd.Series(dtype=object),
    # labels=dict(x='Time', y='Event counter')
))

app.layout = html.Div([
    html.H4('Graph of event count'),
    dcc.Graph(id='graph', figure=fig),
    html.H4('Incoming data.'),
    lodrc.LOConnection(id='ws', url='ws://localhost:9999/wsapi/communication_protocol')
])

clientside_callback(
    '''function(msg) {
        if (!msg) {
            return window.dash_clientside.no_update;
        }
        const data = JSON.parse(msg.data);
        const students = data.test.event_count;
        console.log(students);
        if (students.length === 0) {
            console.log('inside')
            return window.dash_clientside.no_update;
        }
        const studentIndex = 0;
        const x = [Date.now() / 1000];
        const y = [students[studentIndex].event_count];
        return [
            { x: [x], y: [y] },
            [0]
        ];
    }''',
    Output('graph', 'extendData'),
    Input('ws', 'message')
)
# Send connection information on the websocket when the connected
clientside_callback(
    f'''function(state) {{
        if (state === undefined) {{
            return window.dash_clientside.no_update;
        }}
        if (state.readyState === 1) {{
            return JSON.stringify({{"test": {{"execution_dag": "{module}", "target_exports": ["event_count"], "kwargs": {{"course_id": 12345}}}}}});
        }}
    }}''',
    Output('ws', 'send'),
    Input('ws', 'state')
)


app.run_server(jupyter_mode='inline', debug=True)

In [34]:
# Run reducer over events this can be done with a list events or a sample file
import learning_observer.offline

# list of events
events = [{}] * 3
await learning_observer.offline.process_file(events_list=events, source="localhost.testcase", pipeline=event_counter, userid='Tester')

# sample file
import os
sample_file = os.path.join('learning_observer', 'logs', 'sample01.log')
await learning_observer.offline.process_file(file_path=sample_file, source="localhost.testcase", pipeline=event_counter, userid='Tester')

<function kvs_pipeline.<locals>.decorator.<locals>.wrapper_closure.<locals>.process_event at 0x7ff8989f2cb0>
<function kvs_pipeline.<locals>.decorator.<locals>.wrapper_closure.<locals>.process_event at 0x7ff899b1aa70>


(1214, 'localhost.testcase', 'Tester')

In [4]:
import learning_observer.stream_analytics

In [5]:
print(json.dumps(learning_observer.stream_analytics.REDUCER_MODULES, indent=2, default=str))

{
  "org.mitros.mirror": [
    {
      "reducer": "<function init.<locals>.<lambda> at 0x7f6ab193b9a0>"
    }
  ],
  "org.mitros.writing_analytics": [
    {
      "reducer": "<function time_on_task at 0x7f6ab8afcee0>",
      "scope": "Scope({<KeyField.STUDENT: 1>, <EventField.doc_id>})"
    },
    {
      "reducer": "<function reconstruct at 0x7f6ab8afd000>",
      "scope": "Scope({<KeyField.STUDENT: 1>, <EventField.doc_id>})"
    },
    {
      "reducer": "<function event_count at 0x7f6ab8afd120>",
      "scope": "Scope({<KeyField.STUDENT: 1>})"
    },
    {
      "reducer": "<function document_list at 0x7f6ab8afd480>",
      "scope": "Scope({<KeyField.STUDENT: 1>})"
    },
    {
      "reducer": "<function last_document at 0x7f6ab8afd5a0>",
      "scope": "Scope({<KeyField.STUDENT: 1>})"
    },
    {
      "reducer": "<function document_tagging at 0x7f6ab8afd360>",
      "scope": "Scope({<KeyField.STUDENT: 1>})"
    },
    {
      "reducer": "<function document_access_timestamps at 0