In [1]:
# list of directories the Python interpreter will search in 
# for required modules
import sys
sys.path

['/Volumes/ExtremeSSD/github_repos/trading_options/how_to',
 '/Users/catelinn/miniconda3/envs/quantra/lib/python39.zip',
 '/Users/catelinn/miniconda3/envs/quantra/lib/python3.9',
 '/Users/catelinn/miniconda3/envs/quantra/lib/python3.9/lib-dynload',
 '',
 '/Users/catelinn/miniconda3/envs/quantra/lib/python3.9/site-packages',
 '/Users/catelinn/miniconda3/envs/quantra/lib/python3.9/site-packages/IPython/extensions',
 '/Users/catelinn/.ipython']

In [2]:
# insert new path for Python interpreter to search in for required module
path_to_files = '/Volumes/ExtremeSSD/github_repos/trading_options/app'
sys.path.insert(1, path_to_files)
sys.path

['/Volumes/ExtremeSSD/github_repos/trading_options/how_to',
 '/Volumes/ExtremeSSD/github_repos/trading_options/app',
 '/Users/catelinn/miniconda3/envs/quantra/lib/python39.zip',
 '/Users/catelinn/miniconda3/envs/quantra/lib/python3.9',
 '/Users/catelinn/miniconda3/envs/quantra/lib/python3.9/lib-dynload',
 '',
 '/Users/catelinn/miniconda3/envs/quantra/lib/python3.9/site-packages',
 '/Users/catelinn/miniconda3/envs/quantra/lib/python3.9/site-packages/IPython/extensions',
 '/Users/catelinn/.ipython']

In [3]:
# import tda module
from tda.streaming import Client

# get access_token for authorized access to endpoints
client = Client(acc_type='margin')
client.authenticate()

OK
New access token retrieved with unexpired refresh token.


In [4]:
# account number and access token stored in the client
accountId = client.account_id
access_token = client.access_token

# Streaming Data from Websocket 


## Intro

TD-Ameritrade API provides websockets to stream live data. In brief, we can open a websocket and then login to TDA Streamer server, so we can subscribe to a service for live data update. For example, we can subscribe to `ACCT_ACTIVITY` service to receive updated data on new order placed, filled, cancelled etc. 

Instructions and details can be found in [TD-Ameritrade websocket streaming quickstart tutorial](https://developer.tdameritrade.com/content/streaming-data).


## Retrieve Info from UserPrinciples Response

Before we can open the websocket connection, we need to retrieve streamer info from USER PRINCIPLES endpoint, which is required for login to TDA Streamer. Here's how to do it:

> Make an authorized request (`POST`) to [get User Principals](https://developer.tdameritrade.com/user-principal/apis/get/userprincipals-0), which can be found in [User Info & Preferences API](https://developer.tdameritrade.com/user-principal/apis). We then can retrieve the login details from `userPrincipalsReponse.streamerInfo`


In [5]:
import requests

# Get Streamer info from User Principles
headers = {'Authorization': f'Bearer {access_token}'}
endpoint = 'https://api.tdameritrade.com/v1/userprincipals'
params = {'fields':'streamerSubscriptionKeys,streamerConnectionInfo'}
r = requests.get(url=endpoint, params=params, headers=headers)
r

<Response [200]>

In [6]:
userPrinciplesResponse = r.json()

# Extract streamer information
streamerInfo = userPrinciplesResponse['streamerInfo']

# Extract specific account details
for account in userPrinciplesResponse['accounts']:
    if account['accountId'] == accountId:
        account = account

In [7]:
# Grab the token timestamp and convert it to ms since epoch, which is accepted by Streamer
import dateutil
import datetime

def unix_time_ms(dt):
    # grab the starting point, so time '0'
    epoch = datetime.datetime.utcfromtimestamp(0)
    
    return (dt-epoch).total_seconds() * 1000.0

tokentimestamp = streamerInfo['tokenTimestamp']
tokentimestamp = dateutil.parser.parse(tokentimestamp, ignoretz=True)
tokentimestampMs = unix_time_ms(tokentimestamp)

# Define the credentials required for login command
credential = {
    'userid': account['accountId'],
    'token': streamerInfo['token'], 
    'company':  account['company'],
    'segment': account['segment'],
    'cddomain': account['accountCdDomainId'],
    'usergroup':streamerInfo['userGroup'],
    'accesslevel': streamerInfo['accessLevel'],
    'authorized': 'Y',
    'acl': streamerInfo['acl'],
    'timestamp': int(tokentimestampMs),
    'appid': streamerInfo['appId']    
    }

In [8]:
streamerInfo['tokenTimestamp']

'2022-03-18T00:44:59+0000'

## Define the Requests

In [9]:
import urllib

# Define a login request
login_request = {
    'service':'ADMIN',
    'requestid': '0', # login request comes first
    'command': 'LOGIN',
    'account': accountId,
    'source': streamerInfo['appId'],
    'parameters': {
        'token': streamerInfo['token'],
        'version': '1.0',
        'credential': urllib.parse.urlencode(credential) # convert json arguments to a query string
    }
}

In [10]:
# define request for different data sources
data_request = {'requests':[
                            {
                            'service': 'ACTIVES_NASDAQ',
                            'requestid': '1',
                            'command': 'SUBS',
                            'account': account['accountId'],
                            'source': streamerInfo['appId'],
                            'parameters': {
                                'keys': 'NASDAQ-60',
                                'fields': '0,1'
                            }},
                            
                            {
                            'service': 'LEVELONE_FUTURES',
                            'requestid': '2',
                            'command': 'SUBS',
                            'account': account['accountId'],
                            'source': streamerInfo['appId'],
                            'parameters': {
                                'keys': '/ES',
                                'fields': '0,3,8'
                            }}
                            ]
                }

In [11]:
# turn the requests into json strings
import json
login_encoded = json.dumps(login_request)
data_encoded = json.dumps(data_request)

# Create the Websocket client

In [12]:
import websockets
from websockets import client as wsClient
import asyncio

In [13]:
class WebSocketClient():
    """The client """
    
    def __init__(self):
        pass
        
    async def connect(self):
        '''
            Connecting to webSocket server
            websockets.client.connect returns a WebSocketClientProtocol, which is used to send and receive messages
        '''
        # Extract websocket streamer url
        uri = 'wss://'+streamerInfo['streamerSocketUrl']+'/ws'
        
        # connect to it
        self.connection = await wsClient.connect(uri)
        
        # if all goes well, let the user know
        if self.connection.open:
            print("connection established. client correctly connected")
            return self.connection
        
    async def sendMessage(self, message):
        '''
            Sending message to webSocket server: 
            - send login information
            - subscribe to data
        '''
        await self.connection.send(message)
        
    async def receiveMessage(self, connection):
        '''
            Receiving all server messages and handle them 
            it'd be in infinite loop, won't stop until user interruption
        '''
        while True:   
            try: 
                # grab and decode the message
                message = await connection.recv()
                message_decoded = json.loads(message)
                
                # print the data if the response contains data
                if 'data' in message_decoded.keys():
                    print(message_decoded['data'])
                
                print('-'*20)
                print('Received message from server:'+ str(message))
                
            except websockets.exceptions.ConnectionClosed:
                print("connection with server closed")
                break
                
                
    async def heartbeat(self, connection):
        '''
            Sending heartbeat to server every 5 seconds
            Ping - pong messages to verify connection is alive
        '''
        while True:
            try:
                await connection.send('ping')
                await asyncio.sleep(5)
            except websockets.exceptions.ConnectionClosed:
                print('Connection with server closed')
                break                 

# Make the Websocket Requests

In [14]:
# To allow asyncio event loop to be nested, which need to happen in envrionments such as
# web servers, GUI apps and in Jupyter Notebook
import nest_asyncio
nest_asyncio.apply()


if __name__ == '__main__':
    
    # create the client object
    client = WebSocketClient()
    
    # define an event loop
    loop = asyncio.get_event_loop()
    
    # start the connection to the websocket in the loop
    connection = loop.run_until_complete(client.connect())
    
    # define the tasks that we want to run
    tasks = [asyncio.ensure_future(client.receiveMessage(connection)),
             asyncio.ensure_future(client.sendMessage(login_encoded)),
             asyncio.ensure_future(client.receiveMessage(connection)),
             asyncio.ensure_future(client.sendMessage(data_encoded)),
             asyncio.ensure_future(client.receiveMessage(connection))]
    
    # run the tasks
    loop.run_until_complete(asyncio.wait(tasks))

connection established. client correctly connected
--------------------
Received message from server:{"response":[{"service":"ADMIN","requestid":"0","command":"LOGIN","timestamp":1647564300073,"content":{"code":0,"msg":"08-3"}}]}
--------------------
Received message from server:{"notify":[{"heartbeat":"1647564300075"}]}
--------------------
Received message from server:{"response":[{"service":"LEVELONE_FUTURES","requestid":"2","command":"SUBS","timestamp":1647564300075,"content":{"code":0,"msg":"SUBS command succeeded"}}]}
--------------------
Received message from server:{"notify":[{"heartbeat":"1647564300075"}]}
--------------------
Received message from server:{"response":[{"service":"ACTIVES_NASDAQ","requestid":"1","command":"SUBS","timestamp":1647564300075,"content":{"code":0,"msg":"SUBS command succeeded"}}]}
[{'service': 'LEVELONE_FUTURES', 'timestamp': 1647564300085, 'command': 'SUBS', 'content': [{'key': '/ES', 'delayed': False, 'assetMainType': 'FUTURE', '3': 4375.75, '8': 2

KeyboardInterrupt: 