<b>Note: This Jupyter Notebook is associated with the article [The Delta Hedged Options Arbitrage Strategy Explained (and How to Trade with Alpaca)](https://alpaca.markets/learn/executing-a-delta-hedged-options-arbitrage-strategy-using-alpacas-trading-api).</b>

In [None]:
# Install or upgrade the package `alpaca-py` and other essential packages
!pip install --upgrade alpaca-py
!pip install numpy scipy pandas datetime

In [None]:
import pandas as pd
import numpy as np
from scipy.stats import norm
import alpaca
import time
import requests
from scipy.optimize import brentq
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

from alpaca.data.historical.option import OptionHistoricalDataClient
from alpaca.data.historical.stock import StockHistoricalDataClient, StockLatestTradeRequest
from alpaca.data.requests import OptionLatestQuoteRequest
from alpaca.trading.client import TradingClient
from alpaca.trading.requests import (
    GetOptionContractsRequest,
    LimitOrderRequest
)
from alpaca.trading.enums import (
    AssetStatus,
    ExerciseStyle,
    OrderSide,
    TimeInForce,
    ContractType,
)


In [None]:
# API_KEY = "Alpaca's Trading API Key (Paper Account)"
# API_SECRET = "Alpaca's Trading API Secret Key (Paper Account)"

# A safe approach to setting up API credentials for Alpaca (Assume you run this notebook in Google Colab)
# Add your key to Colab Secrets. Add your API key to the Colab Secrets manager to securely store it
from google.colab import userdata
API_KEY = userdata.get('ALPACA_API_KEY')
API_SECRET = userdata.get('ALPACA_SECRET_KEY')
## We use paper environment for this example (Please do not modify this. This example is for paper trading only).
PAPER = True
BASE_URL = None

# Initialize Alpaca clients
trade_client = TradingClient(api_key=API_KEY, secret_key=API_SECRET, paper=PAPER, url_override=BASE_URL)
option_historical_data_client = OptionHistoricalDataClient(api_key=API_KEY, secret_key=API_SECRET, url_override=BASE_URL)
stock_data_client = StockHistoricalDataClient(api_key=API_KEY, secret_key=API_SECRET)

# Below are the variables for development this documents
# Please do not change these variables
trade_api_url = None
trade_api_wss = None
data_api_url = None
option_stream_data_wss = None

# Step 2: Extracting the S&P 500 Tickers

In [None]:
def get_sp500_stocks():
    url = "https://en.wikipedia.org/wiki/List_of_S%26P_500_companies"
    response = requests.get(url)
    sp500_table = pd.read_html(response.text, header=0)[0]
    return sp500_table['Symbol'].tolist()

stocks = get_sp500_stocks()
print(f"Total S&P 500 stocks: {len(stocks)}")

# Step 3: Extracting Stock and Options Prices

In [None]:
# Set up stock historical data client
stock_data_client = StockHistoricalDataClient(api_key=API_KEY, secret_key=API_SECRET)

# Function to get the latest quotes for S&P 500 stocks
def get_latest_quotes(stocks):
    quotes = {}
    for stock in stocks:
        req = StockLatestTradeRequest(symbol_or_symbols=stock)
        res = stock_data_client.get_stock_latest_trade(req)
        quotes[stock] = res[stock]
    return quotes

# Get the latest quotes for all S&P 500 stocks
latest_quotes = get_latest_quotes(stocks)
print(f"Total S&P 500 stocks quotes retrieved: {len(latest_quotes)}")

In [None]:
# Define a 5% range around the underlying price 
STRIKE_RANGE = 0.05

# Set the timezone
timezone = ZoneInfo('America/New_York')

# Get current date in US/Eastern timezone
today = datetime.now(timezone).date()

# Set the expiration date range for the options
min_expiration = today + timedelta(days=14)
max_expiration = today + timedelta(days=21)

# Function to get options contracts for S&P 500 stocks
def get_options_contracts(stocks):
    options_data = {}
    for stock in stocks:
        # Get the latest quote for the stock price
        underlying_price = latest_quotes[stock].price
        req = GetOptionContractsRequest(
            underlying_symbols=[stock],
            status=AssetStatus.ACTIVE,
            strike_price_gte = str(underlying_price * (1 - STRIKE_RANGE)),
            strike_price_lte = str(underlying_price * (1 + STRIKE_RANGE)),
            expiration_date_gte=min_expiration,
            expiration_date_lte=max_expiration,
            type=ContractType.CALL,
            style=ExerciseStyle.AMERICAN,
        )
        options_contracts = trade_client.get_option_contracts(req)
        print(len(options_contracts.option_contracts))
        options_data[stock] = options_contracts.option_contracts
    return options_data

# Get options contracts for all S&P 500 stocks
options_contracts = get_options_contracts(stocks)
print(f"Total options contracts retrieved: {sum(len(v) for v in options_contracts.values())}")

# Step 4: Calculating the Delta and Implied Volatility

In [None]:
# Calculate implied volatility
def calculate_implied_volatility(option_price, S, K, T, r, option_type):
    
    # Define a reasonable range for sigma
    sigma_lower = 1e-6
    sigma_upper = 5.0  # Adjust upper limit if necessary

    # Check if the option is out-of-the-money and price is close to zero
    intrinsic_value = max(0, (S - K) if option_type == 'call' else (K - S))
    if option_price <= intrinsic_value + 1e-6:
        
        print('Option price is close to intrinsic value; implied volatility is near zero.')
        return 0.0
    
    # Define the function to find the root
    def option_price_diff(sigma):
        d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
        d2 = d1 - sigma * np.sqrt(T)
        if option_type == 'call':
            price = S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
        elif option_type == 'put':
            price = K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)
        return price - option_price

    try:
        return brentq(option_price_diff, sigma_lower, sigma_upper)
    except ValueError as e:
        print(f"Failed to find implied volatility: {e}")
        return None

In [None]:
# Risk free rate for the options greeks and IV calculations
risk_free_rate = 0.01

def calculate_greeks(option_price, underlying_price, strike_price, T, risk_free_rate, option_type):
    # T = (expiration - datetime.now(timezone).date()).days / 365 # It is unconventional, but some use 225 days (# of annual trading days) in replace of 365 days
    T = max(T, 1e-6)  # Set minimum T to avoid zero
    
    if T == 1e-6:
        print('Option has expired or is expiring now; setting Greeks based on intrinsic value.')
        if option_type == 'put':
            delta = -1.0 if underlying_price < strike_price else 0.0
        else:
            delta = 1.0 if underlying_price > strike_price else 0.0
        gamma = 0.0
        theta = 0.0
        vega = 0.0
        return delta, gamma, theta, vega
    
    # Calculate IV
    IV = calculate_implied_volatility(option_price, underlying_price, strike_price, T, risk_free_rate, option_type)

    if IV is None or IV == 0.0:
        print('Implied volatility could not be determined, skipping Greek calculations.')
        return None
    
    d1 = (np.log(underlying_price / strike_price) + (risk_free_rate + 0.5 * IV ** 2) * T) / (IV * np.sqrt(T))
    d2 = d1 - IV * np.sqrt(T) # d2 for Theta calculation
    # Calculate Delta
    delta = norm.cdf(d1) if option_type == 'call' else -norm.cdf(-d1)
    # Calculate Gamma
    gamma = norm.pdf(d1) / (underlying_price * IV * np.sqrt(T))
    # Calculate Vega
    vega = underlying_price * np.sqrt(T) * norm.pdf(d1) / 100
    # Calculate Theta
    if option_type == 'call':
        theta = (
            - (underlying_price * norm.pdf(d1) * IV) / (2 * np.sqrt(T))
            - (risk_free_rate * strike_price * np.exp(-risk_free_rate * T) * norm.cdf(d2))
        )
    else:
        theta = (
            - (underlying_price * norm.pdf(d1) * IV) / (2 * np.sqrt(T))
            + (risk_free_rate * strike_price * np.exp(-risk_free_rate * T) * norm.cdf(-d2))
        )
    # Convert annualized theta to daily theta
    theta /= 365
    
    return delta, gamma, theta, vega

# Step 5: Calculating the Theoretical Price for Each Contract

In [None]:
# Black-Scholes price calculator (integrated from first chunk)
def black_scholes_price(S, K, T, r, sigma, option_type):
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    
    if option_type == "call":
        price = S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
    elif option_type == "put":
        price = K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)
    else:
        raise ValueError("Invalid option type. Must be 'call' or 'put'.")
    
    return price

# Step 6: Calculating Theoretical Prices, Greeks and Identify Price Difference

In [None]:
def calculate_theoretical_prices_and_greeks(latest_quotes, options_contracts, r):
    theoretical_prices = {}
    options_data = {}  # Will store both prices and Greeks
    skipped_count = 0
    
    for stock, contracts in options_contracts.items():
        if stock not in latest_quotes:
            continue
            
        S = latest_quotes[stock].price
        print(f"\nProcessing options for stock: {stock} (Price: {S})")

        for contract in contracts:
            K = contract.strike_price
            T = (contract.expiration_date - datetime.now(timezone).date()).days / 365.0
            # Skip expired options
            if T <= 1e-6:
                continue
            # Extract option type from enum value 
            option_type = contract.type.value

            req = OptionLatestQuoteRequest(symbol_or_symbols=contract.symbol)

            try:
                # Verify the symbol exists in the response before accessing it
                api_response = option_historical_data_client.get_option_latest_quote(req)
                if contract.symbol not in api_response:
                    print(f"No market data available for {contract.symbol} in API response")
                    skipped_count += 1
                    continue
                option_quote = api_response[contract.symbol]
                option_price = (option_quote.bid_price + option_quote.ask_price) / 2

                # print(f"Option type: {option_type}, Strike: {K}, Remaining: {T}")
                # print(f"Market price: {option_price} (Bid: {option_quote.bid_price}, Ask: {option_quote.ask_price})")
                IV = calculate_implied_volatility(option_price, S, K, T, r, option_type)
                if IV is not None:
                    # Calculate the theoretical price of option
                    theoretical_price = black_scholes_price(S, K, T, r, IV, option_type)
                    # Store the theoretical price in original format
                    theoretical_prices[contract.symbol] = theoretical_price
                    price_difference = abs(float(contract.close_price) - theoretical_price)
                    # Calculate Greeks
                    delta, gamma, theta, vega = calculate_greeks(option_price, S, K, T, r, option_type)
                    
                    # Store comprehensive data including Greeks
                    options_data[contract.symbol] = {
                        'stock': stock,
                        'option_name': contract.name,
                        'option_symbol': contract.symbol,
                        'strike_price': contract.strike_price,
                        'open_interest': contract.open_interest,
                        'expiration_date': contract.expiration_date,
                        'actual_price': float(contract.close_price),
                        'theoretical_price': theoretical_price,
                        'price_difference': price_difference,
                        'IV': IV,
                        'delta': delta,
                        'gamma': gamma,
                        'theta': theta,
                        'vega': vega
                    }

            except (KeyError, Exception) as e:
                # Skip if option not found or any other error occurs
                skipped_count += 1
                continue
    
    print(f"Processed options - added: {len(theoretical_prices)}, skipped: {skipped_count}")
    return options_data

In [None]:
# Calculate theoretical prices for all options contracts
options_data = calculate_theoretical_prices_and_greeks(latest_quotes, options_contracts, risk_free_rate)
print(f"Theoretical prices calculated for {len(options_data)} options contracts.")

# Step 7: Retrieving top 10 mispriced options

In [None]:
# Extract all option details into a list
options_list = list(options_data.values())

# Filter out options with price differences > $50 (which might be data errors)
MAX_PRICE_DIFF = 50.0  # Set the maximum reasonable price difference
filtered_options = []

for option in options_list:
    # Convert potential numpy values to regular Python float
    price_diff = float(abs(option['price_difference']))
    
    # Only include options with reasonable price differences
    if price_diff <= MAX_PRICE_DIFF:
        filtered_options.append(option)

# Sort the filtered list by price_difference
mispriced_options = sorted(filtered_options, key=lambda x: float(x['price_difference']), reverse=True)

# Get top 10 mispriced options
top_mispriced_options = mispriced_options[:10]

In [None]:
# Buying power percentage to use for the trade
BUY_POWER_LIMIT = 0.10

# Check account buying power
buying_power = float(trade_client.get_account().buying_power)

# Calculate the limit amount of buying power to use for the trade
buying_power_limit = buying_power * BUY_POWER_LIMIT

def check_buying_power(option, buying_power_limit, bid_price):
    risk = bid_price * 100
    print(f"Calculated theoretical max risk for {option['option_symbol']}: {risk}.")
    # Return True if the risk is acceptable (i.e., less than or equal to the limit)
    return risk <= buying_power_limit

In [None]:
# Function to place orders for delta hedging
def place_delta_hedging_orders(top_mispriced_options):
    orders = []
    for option in top_mispriced_options:
        delta = option['delta']
        stock = option['stock']
        symbol = option['option_symbol']

        if not check_buying_power(option, buying_power_limit, latest_quotes[stock].price):
                print(f"Skipping option {symbol} due to high risk relative to buying power.")
                continue
        # Place order to buy the call option
        call_order_data = LimitOrderRequest(symbol = symbol,
                                            limit_price = option['actual_price'],
                                            qty = 1,
                                            side = OrderSide.BUY,
                                            time_in_force = TimeInForce.DAY)

        call_order = trade_client.submit_order(order_data = call_order_data)
        orders.append(call_order)

        # Place order to short the underlying stock
        short_stock_qty = int(delta * 100)  # Delta * 100 shares per option contract
        stock_order_data = LimitOrderRequest(symbol = stock,
                                                limit_price = latest_quotes[stock].price,
                                                qty = short_stock_qty,
                                                side = OrderSide.SELL,
                                                time_in_force = TimeInForce.DAY)
        stock_order = trade_client.submit_order(order_data = stock_order_data)
        orders.append(stock_order)
    
    return orders


# Place delta hedging orders
orders = place_delta_hedging_orders(top_mispriced_options)
print(f"Total orders placed: {len(orders)}")