<a href="https://colab.research.google.com/github/adidror005/youtube-videos/blob/main/TradingStream.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup

### Install Python SDK


In [1]:
!pip install alpaca-py

Collecting alpaca-py
  Downloading alpaca_py-0.40.0-py3-none-any.whl.metadata (13 kB)
Collecting sseclient-py<2.0.0,>=1.7.2 (from alpaca-py)
  Downloading sseclient_py-1.8.0-py2.py3-none-any.whl.metadata (2.0 kB)
Downloading alpaca_py-0.40.0-py3-none-any.whl (121 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m121.7/121.7 kB[0m [31m5.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sseclient_py-1.8.0-py2.py3-none-any.whl (8.8 kB)
Installing collected packages: sseclient-py, alpaca-py
Successfully installed alpaca-py-0.40.0 sseclient-py-1.8.0


### Grab API key and Secret

In [1]:
from google.colab import userdata
ALPACA_API_KEY = userdata.get('ALPACA_API_KEY')
ALPACA_API_SECRET = userdata.get('ALPACA_API_SECRET')

### Connect to Trading Client

In [2]:
from alpaca.trading.client import TradingClient
trading_client = TradingClient(ALPACA_API_KEY, ALPACA_API_SECRET, paper=True)

### Helper Function to View Respones as DataFrames

In [3]:
import pandas as pd

class Util:
    @staticmethod
    def to_dataframe(data):
        if isinstance(data, list):
            return pd.DataFrame([item.__dict__ for item in data])
        return pd.DataFrame(data, columns=['tag', 'value']).set_index('tag')

### Nest_Asnycio
* Needed for running asyncio in Jupyter Credit Ewald De Wit!

In [4]:
import nest_asyncio
nest_asyncio.apply()


# Trading Stream

### Where we can handle events related to orders we placed.
For example,
* New Order
* Replaced Order
* Filled Ordre

### Initialize Trading Stream

In [5]:
from alpaca.trading.stream import TradingStream
trading_stream = TradingStream(ALPACA_API_KEY, ALPACA_API_SECRET)

### Example 1: Very Basic Example of TradingStream Couroutine

In [6]:
from alpaca.trading.models import TradeUpdate
from alpaca.trading.enums import OrderStatus


### Subscribe to "Trade" Updates
**Note** I wish they could rename it because this can be confusing from market data trade updates...

In [7]:
async def on_order_update(trade_event:TradeUpdate):
    status = trade_event.order.status
    side = trade_event.order.side
    qty_filled = trade_event.qty
    price_filled = trade_event.price
    cum_filled = trade_event.order.filled_qty
    total_qty = trade_event.order.qty
    symbol = trade_event.order.symbol
    print(f"Update for order {trade_event.order.id}")
    print(f"Order Status: {status}")
    print(f"Filled: {side} {qty_filled} {symbol} at {price_filled}")
    print(f"Total Filled: {cum_filled}/{total_qty} {symbol}")


### Run the Websocket Stream
* This is blocking later we can try the "non-blocking" version.

In [8]:
trading_stream.subscribe_trade_updates(on_order_update)
trading_stream.run()

Update for order d81e3b6d-2c35-43db-857e-8619cc49486e
Order Status: OrderStatus.PENDING_NEW
Filled: OrderSide.BUY None AMD at None
Total Filled: 0/None AMD
Update for order d81e3b6d-2c35-43db-857e-8619cc49486e
Order Status: OrderStatus.NEW
Filled: OrderSide.BUY None AMD at None
Total Filled: 0/None AMD
keyboard interrupt, bye


### Let's trigger a take profit 1% Higher after a buy fill

In [10]:
from alpaca.trading.models import TradeUpdate
from alpaca.trading.enums import OrderStatus, OrderSide
from alpaca.trading.requests import LimitOrderRequest
from alpaca.trading.enums import TimeInForce
import numpy as np

trading_stream = TradingStream(ALPACA_API_KEY, ALPACA_API_SECRET)
tp_pct = 0.01


async def on_order_update(trade_event:TradeUpdate):
    status = trade_event.order.status
    side = trade_event.order.side
    qty_filled = trade_event.qty
    price_filled = trade_event.price
    cum_filled = trade_event.order.filled_qty
    total_qty = trade_event.order.qty
    symbol = trade_event.order.symbol
    print(f"Update for order {trade_event.order.id}")
    print(f"Order Status: {status}")
    if status in [OrderStatus.FILLED,OrderStatus.PARTIALLY_FILLED] and side == OrderSide.BUY:
        print(f"Filled: {side} {qty_filled} {symbol} at {price_filled}")
        print(f"Total Filled: {cum_filled}/{total_qty} {symbol}")
        tp_price = np.round((1+tp_pct)*price_filled,2)
        tp_order = LimitOrderRequest(
            symbol=symbol,
            qty=qty_filled,
            side=OrderSide.SELL,
            limit_price=tp_price,
            time_in_force=TimeInForce.DAY,
            extended_hours=True)
        tp_order = trading_client.submit_order(tp_order)
        print(f"Submit Take Profit {OrderSide.SELL} Order for {symbol} with ID {trade_event.order.id} at {tp_price}")







In [None]:
trading_stream.subscribe_trade_updates(on_order_update)
trading_stream.run()

Update for order 5bc81d43-5341-4ad4-acc5-58d0bb48fb45
Order Status: OrderStatus.PENDING_NEW
Update for order 5bc81d43-5341-4ad4-acc5-58d0bb48fb45
Order Status: OrderStatus.NEW
Update for order d81e3b6d-2c35-43db-857e-8619cc49486e
Order Status: OrderStatus.PARTIALLY_FILLED
Filled: OrderSide.BUY 1.751969193 AMD at 114.26
Total Filled: 1.751969193/None AMD
Submit Take Profit OrderSide.SELL Order for AMD with ID d81e3b6d-2c35-43db-857e-8619cc49486e at 115.4
Update for order d81e3b6d-2c35-43db-857e-8619cc49486e
Order Status: OrderStatus.FILLED
Filled: OrderSide.BUY 7.0 AMD at 114.26
Total Filled: 8.751969193/None AMD
Submit Take Profit OrderSide.SELL Order for AMD with ID d81e3b6d-2c35-43db-857e-8619cc49486e at 115.4
Update for order b8d2fd31-5cf1-4caa-b94d-aff9d7ab3787
Order Status: OrderStatus.PARTIALLY_FILLED
Filled: OrderSide.BUY 1.751969193 AMD at 114.26
Total Filled: 1.751969193/None AMD
Submit Take Profit OrderSide.SELL Order for AMD with ID b8d2fd31-5cf1-4caa-b94d-aff9d7ab3787 at 11

# Example 3

### Now Let's Combine TradingStream + StockDataStream
* With a less rigid takeProfit meaning one that can be flexible based on some trade logic and not just 1 pct
Let's make a dummy strategy where we will
* Market Buy with a 2% take profit on second 1-minute bar
* Market Buy with a 3% take profit on third 1-minute dollar bar
* No buy orders after :)
**IMPORTANT** Due to Fact OrderIds are Replaced for "Replaced/Modified" Orders This should be Handled with Care in Those Cases!!!!
-- Perhaps a possible solution is to link it back up the orderID chain!

In [None]:
import asyncio
import numpy as np
from alpaca.trading.models import TradeUpdate
from alpaca.trading.enums import OrderStatus, OrderSide
from alpaca.trading.requests import LimitOrderRequest
from alpaca.trading.enums import TimeInForce

trading_stream = TradingStream(ALPACA_API_KEY, ALPACA_API_SECRET)

orders_to_tp_pct  = {}
bar_number  = 0

async def on_order_update(trade_event:TradeUpdate):
    global orders_to_tp_pct
    global bar_number
    order_id = trade_event.order.id
    status = trade_event.order.status
    side = trade_event.order.side
    qty_filled = trade_event.qty
    price_filled = trade_event.price
    cum_filled = trade_event.order.filled_qty
    total_qty = trade_event.order.qty
    symbol = trade_event.order.symbol
    print(f"Update for order {trade_event.order.id}")
    print(f"Order Status: {status}")
    if status in [OrderStatus.FILLED,OrderStatus.PARTIALLY_FILLED] and side == OrderSide.BUY:
        print(f"Filled: {side} {qty_filled} {symbol} at {price_filled}")
        print(f"Total Filled: {cum_filled}/{total_qty} {symbol}")
        tp_price = np.round((1+orders_to_tp_pct[order_id])*price_filled,2)
        tp_order = LimitOrderRequest(
            symbol=symbol,
            qty=qty_filled,
            side=OrderSide.SELL,
            limit_price=tp_price,
            time_in_force=TimeInForce.DAY,
            extended_hours=True)
        tp_order = trading_client.submit_order(tp_order)
        print(f"Submit {tp_pct:.0%} Take Profit {OrderSide.SELL} Order for {symbol} with ID {trade_event.order.id} at {tp_price}")






trading_stream.subscribe_trade_updates(on_order_update)

### Define Stock Data Stream Part

In [None]:
from alpaca.data.live import StockDataStream
from alpaca.data.requests import StockLatestQuoteRequest
from alpaca.data.enums import DataFeed
stock_data_stream  = StockDataStream(ALPACA_API_KEY, ALPACA_API_SECRET,feed=DataFeed.SIP)

#### Couroutine

In [None]:
from alpaca.trading.requests import MarketOrderRequest
from alpaca.trading.enums import OrderSide, TimeInForce
async def on_1min_bar(bar):
    global orders_to_tp_pct
    global bar_number
    if bar_number<=10:
      bar_number+=1
      print(f" Bar #:{bar_number}")
      print(bar)
    if bar_number>=2 and bar_number<=3:
        market_order_request = MarketOrderRequest(
            symbol="AMD",
            notional=1000,
            side=OrderSide.BUY,
            time_in_force=TimeInForce.DAY
        )
        order=trading_client.submit_order(
            order_data=market_order_request
        )
        orders_to_tp_pct[order.id] = bar_number/100



### Subscribe to 1 min bar updates

In [None]:
stock_data_stream.subscribe_bars(on_1min_bar, "AMD")

In [None]:
import asyncio
await asyncio.gather(*[stock_data_stream._run_forever(),trading_stream._run_forever()])

 Bar #:1
symbol='AMD' timestamp=datetime.datetime(2025, 4, 28, 11, 14, tzinfo=datetime.timezone.utc) open=95.63 high=95.63 low=95.63 close=95.63 volume=212.0 trade_count=9.0 vwap=95.63
Update for order 97b8c77f-f83b-4693-b493-3b556bba3319
Order Status: OrderStatus.PENDING_NEW
Update for order 97b8c77f-f83b-4693-b493-3b556bba3319
Order Status: OrderStatus.NEW


ERROR:alpaca.trading.stream:error during websocket communication: UUID('97b8c77f-f83b-4693-b493-3b556bba3319')
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/alpaca/trading/stream.py", line 173, in _run_forever
    await self._consume()
  File "/usr/local/lib/python3.11/dist-packages/alpaca/trading/stream.py", line 146, in _consume
    await self._dispatch(msg)
  File "/usr/local/lib/python3.11/dist-packages/alpaca/trading/stream.py", line 90, in _dispatch
    await self._trade_updates_handler(self._cast(msg))
  File "<ipython-input-12-16ce78ef2267>", line 29, in on_order_update
    tp_price = np.round((1+orders_to_tp_pct[order_id])*price_filled,2)
                           ~~~~~~~~~~~~~~~~^^^^^^^^^^
KeyError: UUID('97b8c77f-f83b-4693-b493-3b556bba3319')


Update for order 97b8c77f-f83b-4693-b493-3b556bba3319
Order Status: OrderStatus.PARTIALLY_FILLED
Filled: OrderSide.BUY 6.0 AMD at 95.69
Total Filled: 6/None AMD
 Bar #:2
symbol='AMD' timestamp=datetime.datetime(2025, 4, 28, 11, 15, tzinfo=datetime.timezone.utc) open=95.6 high=95.6 low=95.6 close=95.6 volume=1056.0 trade_count=11.0 vwap=95.6
 Bar #:3
symbol='AMD' timestamp=datetime.datetime(2025, 4, 28, 11, 16, tzinfo=datetime.timezone.utc) open=95.51 high=95.52 low=95.5 close=95.5 volume=2456.0 trade_count=39.0 vwap=95.505437
 Bar #:4
symbol='AMD' timestamp=datetime.datetime(2025, 4, 28, 11, 17, tzinfo=datetime.timezone.utc) open=95.5 high=95.6 low=95.5 close=95.6 volume=2913.0 trade_count=50.0 vwap=95.506897
 Bar #:5
symbol='AMD' timestamp=datetime.datetime(2025, 4, 28, 11, 18, tzinfo=datetime.timezone.utc) open=95.57 high=95.57 low=95.57 close=95.57 volume=1025.0 trade_count=16.0 vwap=95.57
 Bar #:6
symbol='AMD' timestamp=datetime.datetime(2025, 4, 28, 11, 20, tzinfo=datetime.timezon

In [None]:
import asyncio
import numpy as np
from alpaca.trading.models import TradeUpdate
from alpaca.trading.enums import OrderStatus, OrderSide, TimeInForce
from alpaca.trading.requests import LimitOrderRequest, MarketOrderRequest
from alpaca.data.live import StockDataStream
from alpaca.data.enums import DataFeed

class SimpleTrader:
    def __init__(self, api_key: str, api_secret: str, symbol: str = "AMD"):
        from alpaca.trading.client import TradingClient
        from alpaca.trading.stream import TradingStream

        self.symbol = symbol
        self.orders_to_tp_pct = {}
        self.bar_number = 0

        self.trading_client = TradingClient(api_key, api_secret, paper=True)
        self.trading_stream = TradingStream(api_key, api_secret)
        self.stock_data_stream = StockDataStream(api_key, api_secret, feed=DataFeed.SIP)

    async def on_order_update(self, trade_event: TradeUpdate):
        order_id = trade_event.order.id
        status = trade_event.order.status
        side = trade_event.order.side
        qty_filled = trade_event.qty
        price_filled = trade_event.price
        cum_filled = trade_event.order.filled_qty
        total_qty = trade_event.order.qty
        symbol = trade_event.order.symbol

        print(f"Update for order {order_id}")
        print(f"Order Status: {status}")

        if status in [OrderStatus.FILLED, OrderStatus.PARTIALLY_FILLED] and side == OrderSide.BUY:
            print(f"Filled: {side} {qty_filled} {symbol} at {price_filled}")
            print(f"Total Filled: {cum_filled}/{total_qty} {symbol}")

            tp_pct = self.orders_to_tp_pct.get(order_id, 0.02)  # default 2% if not found
            tp_price = np.round((1 + tp_pct) * price_filled, 2)

            tp_order_request = LimitOrderRequest(
                symbol=symbol,
                qty=qty_filled,
                side=OrderSide.SELL,
                limit_price=tp_price,
                time_in_force=TimeInForce.DAY,
                extended_hours=True
            )

            self.trading_client.submit_order(tp_order_request)

            print(f"Submitted {tp_pct:.0%} Take Profit SELL Order for {symbol} at {tp_price}")

    async def on_1min_bar(self, bar):
        if self.bar_number <= 10:
            self.bar_number += 1
            print(f"Bar #{self.bar_number}")
            print(bar)

        if 2 <= self.bar_number <= 3:
            market_order_request = MarketOrderRequest(
                symbol=self.symbol,
                notional=1000,
                side=OrderSide.BUY,
                time_in_force=TimeInForce.DAY
            )

            order = self.trading_client.submit_order(market_order_request)
            self.orders_to_tp_pct[order.id] = self.bar_number / 100

    async def run(self):
        self.trading_stream.subscribe_trade_updates(self.on_order_update)
        self.stock_data_stream.subscribe_bars(self.on_1min_bar, self.symbol)

        await asyncio.gather(
            self.trading_stream._run_forever(),
            self.stock_data_stream._run_forever()
        )

# === Usage Example ===

# Replace these with your actual keys
ALPACA_API_KEY = "your_api_key"
ALPACA_API_SECRET = "your_api_secret"

trader = SimpleTrader(ALPACA_API_KEY, ALPACA_API_SECRET)

asyncio.run(trader.run())
