### Socket Programming

#### Using publisher-subscriber model, now we subscribe to the streaming host

In [2]:
import zmq
import pandas as pd
import datetime
import numpy as np
import plotly.io as pio
import plotly.graph_objects as go
pio.renderers.default = "notebook_connected"

#### Note: For reading in the stream, you need to have the data_server.py running in a separate terminal

In [None]:
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://0.0.0.0:5555')
socket.setsockopt_string(zmq.SUBSCRIBE, 'ET')

while True:
    data = socket.recv_string()
    print(data)

### Online Algorithm example

In [None]:
df = pd.DataFrame()
mom = 3
min_length = mom + 1

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://0.0.0.0:5555')
socket.setsockopt_string(zmq.SUBSCRIBE, 'ET')

while True:
    data = socket.recv_string()
    t = datetime.datetime.now()
    sym, value = data.split()
    df = pd.concat([df, pd.DataFrame({sym: float(value)}, index=[t])])
    # return last number of 5 second intervals
    dr = df.resample('5s', label='right').last()
    dr['log_returns'] = np.log(dr / dr.shift(1))
    if len(dr) > min_length:
        min_length += 1
        dr['momentum'] = np.sign(dr['log_returns'].rolling(mom).mean())
        print('\n' + '=' * 51)
        print('NEW SIGNAL | {}'.format(datetime.datetime.now()))
        print('=' * 51)
        print(dr.tail())
        # you signal off of the second to last because 
        # the resampling is labelled with the right timestamp
        # so the last datapoint has not completed its 5s interval yet
        if dr['momentum'].iloc[-2] == 1.0:
            print("\n Go Long")
        elif dr['momentum'].iloc[-2] == -1.0:
            print("\n Go Short")

### Real-Time Plotting

In [8]:
symbol = 'ET'

# you have to separately instantiate the widget
# before populating it. 
fig = go.FigureWidget()
fig.add_scatter()
fig

FigureWidget({
    'data': [{'type': 'scatter', 'uid': 'be28f77c-722b-4a71-9911-08a59a8071e7'}], 'layout': {'t…

In [10]:
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://0.0.0.0:5555')
socket.setsockopt_string(zmq.SUBSCRIBE, 'ET')

# update these lists to update the data objects of figure widgets
times = list()
prices = list()

for _ in range(500):
    msg = socket.recv_string()
    #print(msg)
    t = datetime.datetime.now()
    times.append(t)
    _, price = msg.split()
    prices.append(float(price))
    fig.data[0].x = times
    fig.data[0].y = prices

Exception ignored in: <function Context.__del__ at 0x7f67b19cf9a0>
Traceback (most recent call last):
  File "/home/eddie/miniconda3/envs/securities-analysis/lib/python3.10/site-packages/zmq/sugar/context.py", line 71, in __del__
    self.term()
  File "/home/eddie/miniconda3/envs/securities-analysis/lib/python3.10/site-packages/zmq/sugar/context.py", line 190, in term
    return super(Context, self).term()
  File "zmq/backend/cython/context.pyx", line 82, in zmq.backend.cython.context.Context.term
  File "zmq/backend/cython/checkrc.pxd", line 13, in zmq.backend.cython.checkrc._check_rc
KeyboardInterrupt: 


KeyboardInterrupt: 

### Streaming subplots

In [7]:
from plotly.subplots import make_subplots

f = make_subplots(rows=3, cols=1, shared_xaxes=True)
f.append_trace(go.Scatter(name='ET'), row=1, col=1)
f.append_trace(go.Scatter(name='RETURN', line=dict(width=1, dash='dot'),
mode='lines+markers', marker={'symbol':'triangle-up'}),
row=2, col=1)
f.append_trace(go.Scatter(name='MOMENTUM', line=dict(width=1, dash='dash'),
mode='lines+markers', marker={'symbol': 'x'}), row=3, col=1)

fig = go.FigureWidget(f)
fig

FigureWidget({
    'data': [{'name': 'ET', 'type': 'scatter', 'uid': '7dc6c3c7-2e9f-4d8e-9029-01b77cb7d293', '…

In [11]:
df = pd.DataFrame()

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://0.0.0.0:5555')
socket.setsockopt_string(zmq.SUBSCRIBE, 'ET')

for _ in range(75):
    msg = socket.recv_string()
    t = datetime.datetime.now()
    # split into list and assign each element to var
    sym, price = msg.split()
    df = pd.concat([df, pd.DataFrame({sym: float(price)}, index=[t])])
    df['LOG_RET'] = np.log(df[sym] / df[sym].shift(1))
    # you'll have NaN at the points where there is less than 10 trailing
    # datapoints
    df['MOM'] = df['LOG_RET'].rolling(10).mean()
    fig.data[0].x = df.index
    fig.data[1].x = df.index
    fig.data[2].x = df.index
    fig.data[0].y = df[sym]
    fig.data[1].y = df['LOG_RET']
    fig.data[2].y = df['MOM']


KeyboardInterrupt: 