# Real-Time Streaming of Options Trades Data

<b> YouTube Tutorial </b> (Published: Feb 3, 2023): https://youtu.be/Hrs9CWb92_g

In today's tutorial we investigate how you can use ThetaData's API to use real-time streaming of trade level data for your individual analysis.

In [None]:
!pip install thetadata

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting thetadata
  Downloading thetadata-0.8.3-py3-none-any.whl (21 kB)
Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: wget
  Building wheel for wget (setup.py) ... [?25l[?25hdone
  Created wheel for wget: filename=wget-3.2-py3-none-any.whl size=9674 sha256=7906d6c51573c4be67c43e94805ab2dcc07fab7783e9d7345288d1ed25c0d658
  Stored in directory: /root/.cache/pip/wheels/bd/a8/c3/3cf2c14a1837a4e04bd98631724e81f33f462d86a1d895fae0
Successfully built wget
Installing collected packages: wget, thetadata
Successfully installed thetadata-0.8.3 wget-3.2


In [None]:
import os
import sys
import pickle
import numpy as np
import pandas as pd

from datetime import timedelta, datetime, date
from thetadata import ThetaClient, OptionReqType, OptionRight, DateRange, DataType, StockReqType
from thetadata import MessageType, TradeCondition
from thetadata import StreamMsg, StreamMsgType

Thetadata API account details

In [None]:
your_username = ''
your_password = ''

# Real-Time Intraday Streaming

In [None]:
# User generated method that gets called each time a message from the stream arrives.
def callback(msg: StreamMsg):
    msg.type = msg.type

    if msg.type == StreamMsgType.TRADE:
        print('---------------------------------------------------------------------------')
        print('con:                         ' + msg.contract.to_string())
        print('trade:                       ' + msg.trade.to_string())
        print('last quote at time of trade: ' + msg.quote.to_string())

In [None]:
client = ThetaClient(username=your_username, passwd=your_password)

If you require API support, feel free to join our discord server! http://discord.thetadata.us
Starting Theta Terminal [v0.8.3 REV: A]  -->  Config: C:\Users\Jonathon Emerick\ThetaData\ThetaTerminal
[MDDServer] Attempting login as pythonforquants@gmail.com  -->  CONNECTED: [nj-a.thetadata.us:12001], Bundle: PRO, Permissions: Real-Time
[Streaming] Attempting login as pythonforquants@gmail.com  -->  CONNECTED: [nj-a.thetadata.us:20000], Bundle: PRO, Permissions: Real-Time


In [None]:
client.connect_stream(callback)

In [None]:
client.req_full_trade_stream_opt()  # Subscribes to every option trade.

0

In [None]:
client.remove_full_trade_stream_opt()  # Unsubscribes from the full option trade stream.

1

In [None]:
client.close_stream()

Exception in thread Thread-6:
Traceback (most recent call last):
  File "C:\Users\Jonathon Emerick\anaconda3\lib\threading.py", line 980, in _bootstrap_inner
    self.run()
  File "C:\Users\Jonathon Emerick\anaconda3\lib\threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Jonathon Emerick\anaconda3\lib\site-packages\thetadata\client.py", line 483, in _recv_stream
    msg.type = StreamMsgType.from_code(parse_int(self._read_stream(1)[:1]))
  File "C:\Users\Jonathon Emerick\anaconda3\lib\site-packages\thetadata\client.py", line 516, in _read_stream
    buffer = bytearray(self._stream_server.recv(n_bytes))
OSError: [WinError 10038] An operation was attempted on something that is not a socket


## Get all Expirations for AMZN Options

First thing we need is all the expiry dates of all contracts on AMZN that ThetaData has available.

In [None]:
def get_expirations(root_ticker) -> pd.DataFrame:
    """Request expirations from a particular options root"""
    # Create a ThetaClient
    client = ThetaClient(username=your_username, passwd=your_password, jvm_mem=4, timeout=15)

    # Connect to the Terminal
    with client.connect():

        # Make the request
        data = client.get_expirations(
            root=root_ticker,
        )

    return data

### Making requests to API for all Contracts by Expiry Dates

In [None]:
root_ticker = 'AMZN'
expirations = get_expirations(root_ticker)
expirations

If you require API support, feel free to join our discord server! http://discord.thetadata.us
Starting Theta Terminal [v0.8.3 REV: A]  -->  Config: C:\Users\Jonathon Emerick\ThetaData\ThetaTerminal
[MDDServer] Attempting login as pythonforquants@gmail.com  -->  CONNECTED: [nj-a.thetadata.us:12001], Bundle: PRO, Permissions: Real-Time
API connected: Socket[addr=/127.0.0.1,port=64651,localport=11000]
REQ: MSG_CODE=200&version=0.8.3
REQ: MSG_CODE=201&root=AMZN
End of api stream.


0     2012-06-01
1     2012-06-08
2     2012-06-16
3     2012-06-22
4     2012-06-29
         ...    
571   2024-06-21
572   2024-09-20
573   2025-01-17
574   2025-06-20
575   2025-12-19
Length: 576, dtype: datetime64[ns]

## Market-Makers are not forced to show Quotes on all options!

There are rules listed for each Exchange that market makers must abide by. For Example on the NASDAQ where MSFT trades here are the [rules](https://listingcenter.nasdaq.com/rulebook/mrx/rules)

Specifically there is a large difference between the obligation of a Competitive Market Maker and the Primary Market Makers for a particular options series. This is notable in whether they need to present two-sided quotes on Non-standard options like weekly or quarterly expiry options and adjusted options.

To be safe here, we will only want to return option contracts with 'standard' option expires. These expire on the Saturday following the third Friday of the month, and some have the expiry date as the Third friday of the month, but in the past were recorded as the Saturday. Therefore we need to find the intersection of all the expiries that Thetadata has options data for and the 3rd Fridays and the following Saturday dates for every month since Jun-2021.

In [None]:
trading_days = pd.date_range(start=datetime(2023,1,24),end=datetime(2024,12,31),freq='B')
# The third friday in every month
contracts = pd.date_range(start=datetime(2023,1,24),end=datetime(2024,12,31),freq='WOM-3FRI')
# Find contract expiries that match with ThetaData expiries
mth_expirations = [exp for exp in expirations if exp in contracts]
# Convert from python list to pandas datetime
mth_expirations = pd.to_datetime(pd.Series(mth_expirations))

mth_expirations

0    2023-02-17
1    2023-03-17
2    2023-04-21
3    2023-06-16
4    2023-07-21
5    2023-09-15
6    2023-10-20
7    2024-01-19
8    2024-03-15
9    2024-06-21
10   2024-09-20
dtype: datetime64[ns]

## Get all Strikes for each AMZN Option Expiry

We will need these later, so I will build up a dictionary and pickle this data for future use.

In [None]:
def get_strikes(root_ticker, expiration_dates) -> pd.DataFrame:
    """Request strikes from a particular option contract"""
    # Create a ThetaClient
    client = ThetaClient(username=your_username, passwd=your_password, jvm_mem=4, timeout=15)

    all_strikes = {}

    # Connect to the Terminal
    with client.connect():

        for exp_date in expiration_dates:

            # Make the request
            data = client.get_strikes(
                root=root_ticker,
                exp=exp_date
            )

            all_strikes[exp_date] = pd.to_numeric(data)


    return all_strikes

### Making requests to API for Strikes

In [None]:
root_ticker = 'AMZN'

all_strikes = get_strikes(root_ticker, mth_expirations)

with open('strikes.pkl', 'wb') as f:
    pickle.dump(all_strikes, f)

If you require API support, feel free to join our discord server! http://discord.thetadata.us
[Streaming] Attempting login as pythonforquants@gmail.com  -->  
Starting Theta Terminal [v0.8.3 REV: A]  -->  Config: C:\Users\Jonathon Emerick\ThetaData\ThetaTerminal
[MDDServer] Attempting login as pythonforquants@gmail.com  -->  CONNECTED: [nj-a.thetadata.us:12001], Bundle: PRO, Permissions: Real-Time
API connected: Socket[addr=/127.0.0.1,port=64663,localport=11000]
REQ: MSG_CODE=200&version=0.8.3
REQ: MSG_CODE=202&root=AMZN&exp=20230217
[Streaming] Attempting login as pythonforquants@gmail.com  -->  REQ: MSG_CODE=202&root=AMZN&exp=20230317
REQ: MSG_CODE=202&root=AMZN&exp=20230421
REQ: MSG_CODE=202&root=AMZN&exp=20230616
REQ: MSG_CODE=202&root=AMZN&exp=20230721
REQ: MSG_CODE=202&root=AMZN&exp=20230915
REQ: MSG_CODE=202&root=AMZN&exp=20231020
CONNECTED: [nj-a.thetadata.us:20000], Bundle: PRO, Permissions: Real-Time
REQ: MSG_CODE=202&root=AMZN&exp=20240119
REQ: MSG_CODE=202&root=AMZN&exp=202

In [None]:
with open('strikes.pkl', 'rb') as f:
    all_strikes = pickle.load(f)

print("Option Contract: ", mth_expirations[0])
print("AMZN Strike ", all_strikes[mth_expirations[0]][13])

Option Contract:  2023-02-17 00:00:00
AMZN Strike  78.0


# Realtime Streaming of AMZN Contracts

In [None]:
client = ThetaClient(username=your_username, passwd=your_password)

If you require API support, feel free to join our discord server! http://discord.thetadata.us


In [None]:
client.connect_stream(callback)

Starting Theta Terminal [v0.8.3 REV: A]  -->  Config: C:\Users\Jonathon Emerick\ThetaData\ThetaTerminal


In [None]:
root_ticker = 'AMZN'
opt_types=["P", "C"]
for opt_type in opt_types:
    for expiry in mth_expirations:
        strikes = all_strikes[expiry]
        for strike in strikes:
            # add specific contract to required trade stream using req_trade_stream_opt()
            client.req_trade_stream_opt(root_ticker, expiry.date(), strike, OptionRight.CALL if opt_type=="C" else OptionRight.PUT)

In [None]:
client.close_stream()

Exception in thread Thread-10:
Traceback (most recent call last):
  File "C:\Users\Jonathon Emerick\anaconda3\lib\threading.py", line 980, in _bootstrap_inner
    self.run()
  File "C:\Users\Jonathon Emerick\anaconda3\lib\threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Jonathon Emerick\anaconda3\lib\site-packages\thetadata\client.py", line 483, in _recv_stream
    msg.type = StreamMsgType.from_code(parse_int(self._read_stream(1)[:1]))
  File "C:\Users\Jonathon Emerick\anaconda3\lib\site-packages\thetadata\client.py", line 516, in _read_stream
    buffer = bytearray(self._stream_server.recv(n_bytes))
OSError: [WinError 10038] An operation was attempted on something that is not a socket


[MDDServer] Attempting login as pythonforquants@gmail.com  -->  CONNECTED: [nj-a.thetadata.us:12001], Bundle: PRO, Permissions: Real-Time


# Combine Realtime Data to Pandas DataFrame

This requires creating a new callback method that gets called each time a message from the stream arrives.

In [None]:
global trades_data
trades_data = {}

opt_types = ["P", "C"]
for expiry in mth_expirations:
    trades_data[expiry] = {}
    strikes = all_strikes[expiry]
    for strike in strikes:
        trades_data[expiry][strike] = {}
        for opt_type in opt_types:
            trades_data[expiry][strike][opt_type] = pd.DataFrame(
                columns = ['ms_of_day','sequence','size','condition','price','date']
            )

[Streaming] Attempting login as pythonforquants@gmail.com  -->  CONNECTED: [nj-a.thetadata.us:20000], Bundle: PRO, Permissions: Real-Time


In [None]:
def build_trades_data(msg: StreamMsg):
    msg.type = msg.type

    if msg.type == StreamMsgType.TRADE:
        print('---------------------------------------------------------------------------')
        print('trade:                       ' + msg.trade.to_string())
        # Set up expiry, strike and opt_type for easy reference in dataframe
        expiry = datetime(msg.contract.exp.year, msg.contract.exp.month, msg.contract.exp.day)
        strike = msg.contract.strike
        opt_type = "C" if msg.contract.isCall else "P"

        trades_data[expiry][strike][opt_type] = pd.concat([
            trades_data[expiry][strike][opt_type],
                    pd.DataFrame({'ms_of_day': msg.trade.ms_of_day,
                     'sequence': msg.trade.sequence,
                     'size': msg.trade.size,
                     'condition': str(msg.trade.condition).replace('TradeCondition.', ''),
                     'price': msg.trade.price,
                     'date': msg.trade.date}, index=[msg.trade.sequence])
            ], ignore_index = False)

In [None]:
client = ThetaClient(username=your_username, passwd=your_password)

If you require API support, feel free to join our discord server! http://discord.thetadata.us
Starting Theta Terminal [v0.8.3 REV: A]  -->  Config: C:\Users\Jonathon Emerick\ThetaData\ThetaTerminal
[MDDServer] Attempting login as pythonforquants@gmail.com  -->  CONNECTED: [nj-a.thetadata.us:12001], Bundle: PRO, Permissions: Real-Time
[Streaming] Attempting login as pythonforquants@gmail.com  -->  CONNECTED: [nj-a.thetadata.us:20000], Bundle: PRO, Permissions: Real-Time


In [None]:
client.connect_stream(build_trades_data)

In [None]:
# add specific contract to required trade stream using req_trade_stream_opt()
root_ticker = 'AMZN'
opt_types=["P", "C"]
for opt_type in opt_types:
    for expiry in mth_expirations:
        strikes = all_strikes[expiry]
        for strike in strikes:
            client.req_trade_stream_opt(root_ticker, expiry.date(), strike, OptionRight.CALL if opt_type=="C" else OptionRight.PUT)

In [None]:
client.close_stream()

Exception in thread Thread-12:
Traceback (most recent call last):
  File "C:\Users\Jonathon Emerick\anaconda3\lib\threading.py", line 980, in _bootstrap_inner
    self.run()
  File "C:\Users\Jonathon Emerick\anaconda3\lib\threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "C:\Users\Jonathon Emerick\anaconda3\lib\site-packages\thetadata\client.py", line 483, in _recv_stream
    msg.type = StreamMsgType.from_code(parse_int(self._read_stream(1)[:1]))
  File "C:\Users\Jonathon Emerick\anaconda3\lib\site-packages\thetadata\client.py", line 516, in _read_stream
    buffer = bytearray(self._stream_server.recv(n_bytes))
OSError: [WinError 10038] An operation was attempted on something that is not a socket


### Access Your Data in trades_data dict

In [None]:
opt_types = ["P", "C"]
for expiry in mth_expirations:
    strikes = all_strikes[expiry]
    for strike in strikes:
        for opt_type in opt_types:
            if not trades_data[expiry][strike][opt_type].empty:
                print("Expiry: ", expiry, " Strike: ", strike, " Opt Type: ", opt_type, "\n")
                print(trades_data[expiry][strike][opt_type])