# Market infrastructure - run music stream and visualize

This notebook acts like the raw market data stream for the underlying (music data stream), and also visualizes it in "real-time". It also makes the real-time data (bars) available to other applications via Jupyter data storage.

- https://medium.com/plotly/introducing-jupyterdash-811f1f57c02e
- https://dash.plotly.com/live-updates
- https://pbpython.com/interactive-dashboards.html#id6
- https://mybinder.org/ + https://github.com/echow1/trading_music
- (maybe) https://www.freecodecamp.org/news/how-to-create-auto-updating-data-visualizations-in-python-with-matplotlib-and-aws/
- https://kapernikov.com/ipywidgets-with-matplotlib/
- https://medium.com/@akoios/titan-tutorial-1-hello-world-c4595bd58c08
- https://community.plotly.com/t/how-to-add-restful-api-endpoints-to-a-dash-app/27162/5
- https://r-forge.r-project.org/scm/viewvc.php/*checkout*/pkg/quantstrat/sandbox/backtest_musings/research_replication.pdf?root=blotter

In [1]:
from __future__ import division
from more_itertools import peekable
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import scipy as scp
import pickle
import magenta
import os, time, re, json
%matplotlib inline
from IPython.core.display import display, HTML
### change width of notebook display
display(HTML("<style>.container { width:70% !important; }</style>"))

import plotly.express as px
from jupyter_dash import JupyterDash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output

# for exposing API
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
from flask import Flask
from flask_restful import Resource, Api

JUPYTER_PICKLE_FILE = "config/shared_jupyter_data.pkl"
def write_shared_jupyter(key, value, path=JUPYTER_PICKLE_FILE, overwrite=False):
    if (os.path.exists(path)):
        with open(path, "rb") as fp:
            shared_jupyter_data = pickle.load(fp)
        if overwrite:
            shared_jupyter_data = {key: value}
        else:
            shared_jupyter_data[key] = value
    else:
        shared_jupyter_data = {key: value}
    with open(path, 'wb') as fp: 
        pickle.dump(shared_jupyter_data, fp)

def read_shared_jupyter(key=None, path=JUPYTER_PICKLE_FILE):
    if (os.path.exists(path)):
        with open(path, "rb") as fp:
            shared_jupyter_data = pickle.load(fp)
            if key is not None:
                if key in shared_jupyter_data:
                    return(shared_jupyter_data[key])
                else:
                    print("Not found!")
                    return(None)
            else:
                return(shared_jupyter_data)
    else:
        print("No data")

def pandasToJson(df):
    return(df.to_json(orient="split"))
def jsonToPandas(json):
    return(pd.read_json(json, orient="split"))

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

FIG_WIDTH = 1200
FIG_HEIGHT = 800

PITCH_MIN = 20
PITCH_MAX = 120
VELOCITY_MIN = 0
VELOCITY_MAX = 120

def hheader(x):
    print("#########################################")
    print("### {}".format(x))
    print("#########################################")

# Magenta dependencies:
# https://github.com/magenta/magenta

# Magenta uses pretty_midi to deal with midi files
import pretty_midi

Import requested from: 'numba.decorators', please update to use 'numba.core.decorators' or pin to Numba version 0.48.0. This alias will not be present in Numba version 0.50.0.[0m
  from numba.decorators import jit as optional_jit
Import of 'jit' requested from: 'numba.decorators', please update to use 'numba.core.decorators' or pin to Numba version 0.48.0. This alias will not be present in Numba version 0.50.0.[0m
  from numba.decorators import jit as optional_jit


### Setup: read in the music stream

In [2]:
""" Set up music stream """

### for reading in chunks
from collections import deque

def csvStream(csvfile):
    csv_stream = pd.read_csv(csvfile, index_col=0, iterator=True)
    return(csv_stream)

def nextChunk(csvStream, chunksize=5):
    return(csvStream.get_chunk(chunksize))

barsQueue = []
barSize = 5
maxBarsQueueSize = 120

def nextChunkWithOverlap(musStream, cq=barsQueue, barSize=barSize, maxNumBars=maxBarsQueueSize):
    """
    Iterate over the music stream with rolling window.
    For smoother plotting, set chunksize <<< max number of chunks.
    """
    nextBar = (musStream.get_chunk(barSize))
    # make space
    if len(cq) >= maxNumBars:
        cq.pop(0)
    cq.append(nextBar)
    #should be sorted always because FIFO but maybe should check.
    res = (pd.concat(cq))
    ### full current chunk as well as the increment bar added
    return(res, nextBar)

In [3]:
INPUT_PATH = "data_processed/maestro/"
add_input_path = lambda x: "{}/{}".format(INPUT_PATH, x)
MUSIC_STREAM_SUBSTR = "maestro_full_music_stream"

music_files = []
for root, dirs, files in os.walk(INPUT_PATH):
    for file in files:
        if MUSIC_STREAM_SUBSTR in file:
            music_files.append(os.path.join(root, file))

### pick first as the music stream
music_files = sorted(music_files) # play in order
print("Number of music streams found:")
print(len(music_files))
print(music_files[:10])

### should only 1 have file to stream
if (len(music_files) > 1):
    whichStream = int(input("Index (0 ... N-1) of stream to pick:"))
else:
    whichStream = 0
musicStream = csvStream(music_files[whichStream])

Number of music streams found:
1
['data_processed/maestro/maestro_full_music_stream.csv']


### Task 0: Stream music audio and display as a webapp

In [4]:
# Could just play synthesized MIDI along for now (and replace with real audio later)
# have to somehow coordinate with plots with latency etc.

### Task 1: Stream MIDI and display primitive music statistics (pitch, velocity etc.)

(do audio later after download)

In [None]:
# import jp_proxy_widget
# from scipy.io import wavfile

RUN_PLOTS = True
DASH_PORT = 8100
PLOTLY_REFRESH_SEC = 0.5
STREAM_SLEEP_SEC = PLOTLY_REFRESH_SEC * 5

if RUN_PLOTS:

    """ Demo: auto-updating time series plot, use with voila and watch update """

    """ Set up plot.ly / dash plots to be updated automatically in real-time
    """
    
    ### Start Jupyter-Dash app but also expose endpoint for data
    ### curl "http://localhost:<DASH_PORT>/<RESOURCE_ENDPOINT>"
    server = Flask('my_app')
    external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
    # the core Jupyter-Dash app
    app = JupyterDash(__name__, server=server, external_stylesheets=external_stylesheets)
    api = Api(server)
    class HelloWorld(Resource):
        """ Get current snapshot of the music stream.
            $ curl "http://localhost:8100/read"
        """
        def get(self):
            return({"currTime": currTime, "lastBar": pandasToJson(lastBar)})
    api.add_resource(HelloWorld, '/read')
    
    app.layout = html.Div([
        html.H1("Music streaming statistics"),
        ### Graph 1: pitch
        dcc.Graph(id='pitch_graph'),
        ### Graph 2: velocity
        dcc.Graph(id='velocity_graph'),
        ### Graphs update automatically (separate from the main for-loops for data analysis)
        ### Make sure updates faster than the main for-loops (so don't miss any data updates)
        ### updates every [interval] milliseconds.
        dcc.Interval(id='interval-component', interval=PLOTLY_REFRESH_SEC*1000, n_intervals=0)
    # ])
    ],     style={'width': '80%', 'float': 'left', 'height': '4.5rem'})

    ### Plot #1: pitch info
    @app.callback(Output('pitch_graph', 'figure'), Input("interval-component", "n_intervals"))
    def update_pitch_figure(n=0):
        """
        Update plotly figure. (Like ggplot2: color based on group)
        currBar (global): variable with the current data.
        """
        fig = px.line(currChunk, x="streaming_start_sec",
                      y=['pitch_median'],
                      render_mode="webgl", template="plotly_dark",
            title="Pitch statistics by sampled bar, streaming",range_y=[PITCH_MIN, PITCH_MAX]).update_traces(mode='lines')
        return(fig)

    ### Plot #2: velocity info
    @app.callback(Output('velocity_graph', 'figure'), Input("interval-component", "n_intervals"))
    def update_velocity_figure(n=0):
        """
        Update plotly figure. (Like ggplot2: color based on group)
        currBar (global): variable with the current data.
        """
        fig = px.line(currChunk, x="streaming_start_sec",
                      y=['velocity_median'],
                      render_mode="webgl", template="plotly_dark",
            title="Velocity statistics by sampled bar, streaming",range_y=[VELOCITY_MIN, VELOCITY_MAX]).update_traces(mode='lines')
        return(fig)

    ### Run app locally (inline cuts off output)
    app.run_server(mode='external', port=DASH_PORT)

iterations = 0
currTime = time.time()
while True:
    currChunk, lastBar = nextChunkWithOverlap(musicStream)
    if currChunk is None:
        print(">> End of stream!")
        break
    ### Only start when queue is full (seed with initial data)
    if len(barsQueue) < maxBarsQueueSize:
        iterations += 1
        continue
    
    if (iterations % 1000 == 0):
        print("Iterations {} ...".format(iterations+1))
        # snapshot data for prototyping (e.g. constructing and simulating music derivatives)
        pd.concat(barsQueue).to_csv("music_stream_sample.csv")
        
    ### store via Jupyter so other notebooks can access
    ### (later work might actually build an API)
    # causes bugs with Dash + slow
    # write_shared_jupyter("currChunk", currChunk, overwrite=True)

    """ Analysis with current bar here """

    ### read trading decisions from other agents

    ### Take a short break between analyses (so plotly can catch up)
    ### should be >> plot auto-update interval so that all plots
    ### update basically at the same time. 
    currTime = time.time() # update timestamp before taking break
    time.sleep(STREAM_SLEEP_SEC) # 1 second is comfortable for nice UI
    iterations += 1
    
    ### loop back to beginning
    if iterations > 600000:
        print("Looping back to beginning")
        musicStream = csvStream(music_files[whichStream])
        chunksQueue = []
        iterations = 0

Dash is running on http://127.0.0.1:8100/

Dash app running on http://127.0.0.1:8100/


### [TODO later after build rest of market] Task 3: Extract music features from audio in real-time and also make available

Sequential learning.
- Validate (try out) against the streamed music audio and series.
- These are the constructed underlyings for financial derivatives, upon which prediction/regression can work.
- Make a local API so other scripts can GET/POST requests (bid/ask) for this.
- Will later put Bayesian updates in the for-loop above so can make plot of extracted features

In [None]:
""" TEMPO
    Strategy: linear Gaussian state space model / Kalman filter.
    Model tempo (latent variable zt) as a function of notes etc. (observed variables x1 ... xt)
    https://www.researchgate.net/publication/224711190_A_Modified_Kalman_Filtering_Approach_to_On-Line_Musical_Beat_Tracking
"""

# test_data = pd.concat(chunksQueue)
# print(test_data.shape)
# test_data.head()


In [None]:
""" HARMONY
    (maybe classification from dissonance to consonance? or mood? could of course do actual chords.)
"""

#

In [None]:
""" RHYTHM
"""

#