In [None]:
# cbpro/WebsocketClient.py
# original author: Daniel Paquin
# mongo "support" added by Drew Rice
#
#
# Template object to receive messages from the Coinbase Websocket Feed

from __future__ import print_function
import json
import base64
import hmac
import hashlib
import time
from threading import Thread
from websocket import create_connection, WebSocketConnectionClosedException
from pymongo import MongoClient
from cbpro.cbpro_auth import get_auth_headers

## Websocket client class
class WebsocketClient(object):
    ##object with parameters
    def __init__(
            self,
            url="wss://ws-feed.pro.coinbase.com",
            products=None,
            message_type="subscribe",
            mongo_collection=None,
            should_print=True,
            auth=False,
            api_key="",
            api_secret="",
            api_passphrase="",
            # Make channels a required keyword-only argument; see pep3102
            *,
            # Channel options: ['ticker', 'user', 'matches', 'level2', 'full']
            channels):
        ##assigning the objects
        self.url = url
        self.products = products
        self.channels = channels
        self.type = message_type
        self.stop = True
        self.error = None
        self.ws = None #this is desgined to be the websocket connection
        self.thread = None #Thread, something i'm not too familiar with.
        self.auth = auth #authentication, but for my purposes for websockets this isnt needed
        self.api_key = api_key
        self.api_secret = api_secret
        self.api_passphrase = api_passphrase
        self.should_print = should_print
        self.mongo_collection = mongo_collection #mongo collection which i'm not entierly sure how it works

    #function that starts the connection    
    def start(self):
        ##nested fucntion that will call connection, listen, and disconnect function
        def _go():
            
            #what do these functions do
            self._connect()
            self._listen() #it is in a loop, how do you disconnect, or get out of this?
            #You can only get out of the loop inside listen if there is an error or you call 
            #the close function.
            self._disconnect()
        
        #when start function is called, stop will be turned to false
        self.stop = False
        #on_open function is called
        self.on_open()
        #thread is changed, and Thread function is called, with target = called _go function
        #what is Thread, what is target, what does it return?
        
        self.thread = Thread(target=_go)
        #Threading helps with processing multiple tasks concerrently. A Thread is an entity within a process
        #that can be scheduled for execution. I.e a thread is a subset of a process. Also it is a seuqence of instructions 
        #within a program that can be executed independently of other code
        
        # so for this thread, it's target is, is go, which will execute the connect, listen, and disconnect function
        
        self.keepalive = Thread(target=self._keepalive)
        #this thread, will call the function keepalive, and assign it to keepalive
        
        self.thread.start()
        #only self.thread will be started in the start function

##connect function, the connect function is useful for automating API calls that need
## auth keys, but for my purposes I think I only need 
## self.ws: creates the connection for the websocket feed
## and self.ws.send(), sends the request message for the websocket feed.

    def _connect(self):
        if self.products is None:
            self.products = ["BTC-USD"]
        elif not isinstance(self.products, list):
            self.products = [self.products]

        if self.url[-1] == "/":
            self.url = self.url[:-1]

        if self.channels is None:
            self.channels = [{"name": "ticker", "product_ids": [product_id for product_id in self.products]}]
            sub_params = {'type': 'subscribe', 'product_ids': self.products, 'channels': self.channels}
        else:
            sub_params = {'type': 'subscribe', 'product_ids': self.products, 'channels': self.channels}

        #won't need this
        if self.auth:
            timestamp = str(time.time())
            message = timestamp + 'GET' + '/users/self/verify'
            auth_headers = get_auth_headers(timestamp, message, self.api_key, self.api_secret, self.api_passphrase)
            sub_params['signature'] = auth_headers['CB-ACCESS-SIGN']
            sub_params['key'] = auth_headers['CB-ACCESS-KEY']
            sub_params['passphrase'] = auth_headers['CB-ACCESS-PASSPHRASE']
            sub_params['timestamp'] = auth_headers['CB-ACCESS-TIMESTAMP']

        #only thing, I think I need    
        self.ws = create_connection(self.url)
        self.ws.send(json.dumps(sub_params))

    #keep alive function that is threaded, and assigned to a variable called 
    #self.keepalive
    def _keepalive(self, interval=30):
        #assuming that ws.connected is a key word that checks and returns
        #if the connection is still alive, can't find any documentation on this keyword
        while self.ws.connected: 
            #another keyword for ping. Ping in websockets API is used to maintain connection
            #with server. Can't find any documentation on ping
            self.ws.ping("keepalive")
            #suspends execution for a number of seconds. In this case 30 secs
            time.sleep(interval)
    
    #Listen funcyion
    def _listen(self):
        #starts the thread for keepalive.
        self.keepalive.start()
        
        #while not self.stop. Self.stop is turned to false in start function
        #this loop will continue untill self.stop is turned to true.
        while not self.stop:
            
            #gets data from websocket, then turns data(which is in json format),
            #into regular python string,array, or object
            try:
                data = self.ws.recv()
                msg = json.loads(data)
             #takes care of value errors or exceptions, will have
            #to come back to this later and see how the error handling
            #is done
            except ValueError as e:
                self.on_error(e)
            except Exception as e:
                self.on_error(e)
            else:
                #if there are no exceptions then call message function
                self.on_message(msg)
    
    #disconnect function which is the last function that is called under the self.thread 
    def _disconnect(self):
        try:
            #checks if the websocket is still on.
            #if yes then calls close function
            if self.ws:
                self.ws.close() #but why do self.ws.close, why not just self.close
                #this close must be different from the one on the bottom.
                #this has to be a specfic function for ws.
        except WebSocketConnectionClosedException as e:
            pass
        finally:
            #closes or ends the excecution of keep alive. No longer keeping alive
            self.keepalive.join()
        
        #calls on_close function
        self.on_close()

    
    def close(self):
        self.stop = True   # will only disconnect after next msg recv
        self._disconnect() # force disconnect so threads can join
        self.thread.join()

    def on_open(self):
        if self.should_print:
            print("-- Subscribed! --\n")

    def on_close(self):
        if self.should_print:
            print("\n-- Socket Closed --")
    
    #prints message here and then adds it to mongo_collection if it is needed,
    #will def need this, but also you can add a different data structure to store
    #in comming messages or maybe a database SQL?
    def on_message(self, msg):
        if self.should_print:
            print(msg)
        if self.mongo_collection:  # dump JSON to given mongo collection
            self.mongo_collection.insert_one(msg)

    def on_error(self, e, data=None):
        self.error = e
        self.stop = True
        print('{} - data: {}'.format(e, data))


if __name__ == "__main__":
    import sys
    import cbpro
    import time


    class MyWebsocketClient(cbpro.WebsocketClient):
        def on_open(self):
            self.url = "wss://ws-feed.pro.coinbase.com/"
            self.products = ["BTC-USD", "ETH-USD"]
            self.message_count = 0
            print("Let's count the messages!")

        def on_message(self, msg):
            print(json.dumps(msg, indent=4, sort_keys=True))
            self.message_count += 1

        def on_close(self):
            print("-- Goodbye! --")


    wsClient = MyWebsocketClient()
    wsClient.start()
    print(wsClient.url, wsClient.products)
    try:
        while True:
            print("\nMessageCount =", "%i \n" % wsClient.message_count)
            time.sleep(1)
    except KeyboardInterrupt:
        wsClient.close()

    if wsClient.error:
        sys.exit(1)
    else:
        sys.exit(0)

In [9]:
from websocket import create_connection

NameError: name 'connected' is not defined