In [None]:
%config Completer.use_jedi = False       #without this line, autocomplete seemed to not be working, so I am putting it here. source: https://stackoverflow.com/questions/40536560/ipython-and-jupyter-autocomplete-not-working

In [None]:
import pandas as pd
import cbpro

In [None]:
import time
import numpy as np
import pandas as pd
import holoviews as hv
import streamz
import streamz.dataframe
from streamz import Stream

from holoviews import opts
from holoviews.streams import Pipe, Buffer

from functools import partial    #to decide which parts of data to plot

hv.extension('bokeh')

In [None]:
from tornado.ioloop import PeriodicCallback
from tornado import gen

# WebsocketClient Class

The cbpro package provides a class to connect to the Coinbase Pro websocket data feed. Here below is the way it is called by default, but there are 3 methods that should be overwritten in order to acctually deal with data: `.on_open()`, `.on_message()`, and `.on_close()`. These should be overwritten on a child class that inherits from `cbpro.WebsocketClient` and deals with the data appropriately. Below is the default parent class and on the next cells is the implementation of a child.

```py
#Creating a Data streaming client
stream = cbpro.WebsocketClient(url='wss://ws-feed-public.sandbox.pro.coinbase.com',
                                      products=["BTC-USD", "ETH-USD"],
                                      channels=["ticker"])
                                      
```

Now, here is two custom implementations, the first one prints the data coming from the server. The second version sends the data to a Holoviews Buffer object to be plotted dynamically:

In [None]:
class TextWebsocketClient(cbpro.WebsocketClient):
    def on_open(self):
        self.url           = 'wss://ws-feed-public.sandbox.pro.coinbase.com'
        self.message_count = 0
    def on_message(self,msg):
        self.message_count += 1
        msg_type = msg.get('type',None)
        if msg_type == 'ticker':
            time_val   = msg.get('time',('-'*27))
            price_val  = msg.get('price',None)
            price_val  = float(price_val) if price_val is not None else 'None'
            product_id = msg.get('product_id',None)
            
            print(f"{time_val:30} {price_val:.3f} {product_id}\tchannel type:{msg_type}")

    def on_close(self):
        print(f"<---Websocket connection closed--->\n\tTotal messages: {self.message_count}")

In [None]:
class GraphicWebsocketClient(cbpro.WebsocketClient):
    CBPRO_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'   #this parses the string date returned by Coinbase Pro into a pandas datetime object
    def __init__(self,datastream=None,**kwargs):
        super().__init__(**kwargs)
        self.datastream = datastream
    
    def on_open(self):
        self.url           = 'wss://ws-feed-public.sandbox.pro.coinbase.com'
        self.message_count = 0
    
    def on_message(self,msg):
        self.message_count += 1
        msg_type = msg.get('type',None)
        if msg_type == 'ticker':
            time_val   = msg.get('time',None)
            price_val  = msg.get('price',None)
            
            #I'm concerned that this is too much work for every new data point, but
            #   what do I know?
            #new_data_df = pd.DataFrame({'timestamp':[time_val],
            #                            'price':[price_val]})
            #new_data_df['timestamp'] = pd.to_datetime(new_data_df['timestamp'],format=CBPRO_DATE_FORMAT)
            #new_data_df.set_index('timestamp',drop=True)
            #new_data_df.index = pd.to_datetime(new_data_df.index,format=CBPRO_DATE_FORMAT)
            #print(new_data_df)
            #print(new_data_df.index)
            #self.datastream.emit(new_data_df)
            
            '''
            self.datastream.emit(pd.DataFrame({       # .emit() is for Stream objects
                'timestamp':[pd.to_datetime(time_val,format=CBPRO_DATE_FORMAT)],
                'price'    :[price_val]
            }))
            '''
            
            '''
            self.datastream.send(pd.DataFrame({       #.send() works for Buffer objects
                'timestamp':[pd.to_datetime(time_val,format=self.CBPRO_DATE_FORMAT)],
                'price'    :[price_val],
            }).set_index('timestamp')
                                )
            '''
            
            self.datastream.send(
                #np.array([[pd.to_datetime(time_val,format=self.CBPRO_DATE_FORMAT)], price_val])
                np.array([[time_val,price_val]])
            )
        
    
    def on_close(self):
        print(f"<---Websocket connection closed--->\n\tTotal messages: {self.message_count}")

I tried many ways to get the data. Initially, I was using a streaming dataframe together with buffer but I think I'm gettting the same result just with a buffer. Below is part of my original code:

```py
df_struct = {
    'timestamp'  :[],
    'price'      :[],
#    'product_id' :[],
}

CBPRO_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f'
sample_df = pd.DataFrame(df_struct)#.set_index('timestamp',drop=True)
#sample_df.index = pd.to_datetime(sample_df.index,format=CBPRO_DATE_FORMAT)

stream = Stream()
stream_df = streamz.dataframe.DataFrame(stream,example=sample_df)

buffer = Buffer(stream_df,length=100)
print(stream_df)
```


However, I changed it to the following (I left the commented out lines to show different attempts at getting the data right):

In [None]:
#df_sample = pd.DataFrame({'timestamp':[],'price':[]}).set_index('timestamp',drop=True)
#buffer = Buffer(df_sample, length=100)
buffer = Buffer(np.zeros((0,2)), length=100)
#buffer = Buffer(np.zeros((0, 2)),length=100)
product_to_stream = 'BTC-USD'
socket_stream = GraphicWebsocketClient(buffer,products=[product_to_stream],channels=['ticker'])
#socket_stream = GraphicWebsocketClient(stream_df,products=['BTC-USD'],channels=['ticker'])
socket_stream.start()

In [None]:
'''
hv.DynamicMap(partial(hv.Curve,
              kdims=['timestamp'],
              vdims=['price']),
              streams=[buffer]).opts(padding=0.05, width=800,show_grid=True)
'''

'''
hv.DynamicMap(
    partial(hv.Curve,kdims=['timestamp'],vdims=['price']),
    streams=[buffer]
).opts(
        title=f'Live {product_to_stream} Chart',
        padding=0.05,
        width=800,
        height=500,
        show_grid=True
    )
'''

hv.DynamicMap(
    hv.Curve,streams=[buffer],
).opts(
        title=f'Live {product_to_stream} Chart',
        padding=0.05,
        width=800,
        height=500,
        show_grid=True
    )

In [None]:
hv.DynamicMap(hv.Table, streams=[buffer]).opts(padding=0.1, width=600)

In [None]:
socket_stream.close()