In [9]:
import models
import torch  
import researches 

model = models.LinearModel(3)
model.load_state_dict(torch.load('model_weights.pth', weights_only=True))
model.eval()


LinearModel(
  (linear): Linear(in_features=3, out_features=1, bias=True)
)

In [10]:
### Fundamental Building Block: Tick

from abc import ABC, abstractmethod
from typing import Generic, TypeVar

T = TypeVar('T')
R = TypeVar('R')

class Tick(ABC, Generic[T, R]):
    @abstractmethod
    def on_tick(self, val: T ) -> R:
        """Handle a new tick and optionally return a result."""
        pass



In [11]:
### Sliding Window: The fundamental data structure

from collections import deque
from typing import Deque, Optional
import numpy as np

class DequeWindow(Tick[T, Optional[T]], Generic[T]):
    def __init__(self, n : int):
        self._data : Deque[T] = deque(maxlen = n)
        
    def on_tick(self, val: T) -> Optional[T]:
        """Append a value and return the oldest value dropped (if any)."""
        dropped = None
        if self.is_full():
            dropped = self._data[0]
        self._data.append(val)
        return dropped
    
    def is_full(self) -> bool :
        return self._data.maxlen == len(self._data)
    
    def append_left(self, val: T) -> Optional[T]:
        dropped = None
        if self.is_full():
            dropped = self._data[-1]
        self._data.appendleft(val)
        return dropped
    
    def to_numpy(self) -> np.ndarray:
        return np.array(self._data)
    
    def __repr__(self) -> str:
        cls_name = self.__class__.__name__
        return f"{cls_name}(capacity={self._data.maxlen}, values={list(self._data)})"
    


In [12]:
w = DequeWindow(3)

w

DequeWindow(capacity=3, values=[])

In [13]:
w = DequeWindow(3)
w.on_tick(1)
w.on_tick(2)
w.on_tick(3)
w.on_tick(4)
w

DequeWindow(capacity=3, values=[2, 3, 4])

In [14]:
import numpy as np
from typing import Optional

class NumpyWindow(Tick[T, Optional[T]]):

    def __init__(self, n: int, dtype=np.float64):
        if n <=0:
            raise ValueError("Capacity must be positive")
        self._capacity = n
        self._data = np.zeros(n, dtype=dtype)
        self._size = 0
    
    def on_tick(self, val: float) -> Optional[float]:
        dropped = None
        
        if self._size < self._capacity:
            self._data[self._size] = val
            self._size +=1
        else:
            dropped = self._data[0]
            #shift left in-place
            for i in range(1, self._capacity):
                self._data[i - 1] = self._data[i]
            self._data[-1] = val

        return dropped
    
    def __getitem__(self, idx: int) -> float:
        """Index access (0 = oldest)."""
        if not 0 <= idx < self._size:
            raise IndexError("Index out of range.")
        return self._data[idx]
    
    def __len__(self) -> int:
        return self._size

    def capacity(self) -> int:
        return self._capacity
    
    def is_full(self) -> bool:
        return self._size == self._capacity
    
    def values(self) -> np.ndarray:
        return self._data[:self._size]
    
    def __repr__(self) -> str:
        value = self.values().tolist()
        return f"{self.__class__.__name__}(capacity={self._capacity}, size={self._size}, values={value})"


In [15]:
def benchmark_window(window, n):
    for i in range(n):
        window.on_tick(i)

window_size = 10
n = 5000000

In [16]:
%%time
benchmark_window(NumpyWindow(window_size), n)

CPU times: total: 12.3 s
Wall time: 14.1 s


In [17]:
%%time
benchmark_window(DequeWindow(window_size), n)

CPU times: total: 1.94 s
Wall time: 2.28 s


In [18]:
class Last(Tick[T, T], Generic[T]):
    def __init__(self):
        self._value: Optional[T] = None

    def on_tick(self, val: T) -> Optional[T]:
        self._value = val
        return val
    
    def __repr__(self) -> str:
        cls_name = self.__class__.__name__
        return f"{cls_name}(value={self._value})"

In [19]:
last_val = Last()
last_val

Last(value=None)

In [20]:
last_val = Last()
for i in range(5):
    last_val.on_tick(i)
last_val

Last(value=4)

In [21]:
### Streaming Log Returns

# Recap about the log returns
ts = [100, 120, 100]
log_returns = [
    np.log(ts[1] / ts[0]),
    np.log(ts[2] / ts[1])
]

log_returns

[np.float64(0.1823215567939546), np.float64(-0.1823215567939546)]

In [22]:
np.sum(log_returns)

np.float64(0.0)

In [23]:
import numpy as np

class LogReturn(Tick[float, Optional[float]]): # i remove the Generic[T] here
    def __init__(self):
        self._window = NumpyWindow(2) 

    def on_tick(self, val: float) -> Optional[float]:
        self._window.on_tick(val)
        if self._window.is_full():
            return np.log(self._window[1] / self._window[0])
        else:
            return None
    
    def __repr__(self) -> str:
        cls_name = self.__class__.__name__
        
        return f"{cls_name}(window={self._window})"

In [24]:
f = LogReturn()
f.on_tick(100.0)
f

LogReturn(window=NumpyWindow(capacity=2, size=1, values=[100.0]))

In [25]:
v = f.on_tick(120.0)
v

np.float64(0.1823215567939546)

In [26]:
f

LogReturn(window=NumpyWindow(capacity=2, size=2, values=[100.0, 120.0]))

In [27]:
v = f.on_tick(100.0)
v

np.float64(-0.1823215567939546)

In [28]:
f

LogReturn(window=NumpyWindow(capacity=2, size=2, values=[120.0, 100.0]))

In [29]:
time_series = [0.1, -0.2, -0.3]
lag_1 = time_series[-1]
lag_2 = time_series[-2]
lag_3 = time_series[-3]
lag_1

-0.3

In [30]:
class LogReturnLags(Tick[float, torch.Tensor]):
    def __init__(self, no_lags: int):
        self._lags = DequeWindow(no_lags) # nagset lang sya ng deque na may size and capacity, may lags dito magdrop kapag napuno
        self._log_return = LogReturn() # sa loob nito nagset tayo ng Deque using Array Numpy and the capacity is 2, makukuha ntin dito yung log return

    def on_tick(self, val: float) -> torch.Tensor | None:
        log_ret = self._log_return.on_tick(val) ## kunin lang yung log value if the size == capacity
        if log_ret is not None: # kapag may laman yung log_ret
            self._lags.append_left(log_ret) #ilalagay nya sa left yung value
            return torch.tensor(self._lags.to_numpy(), dtype=torch.float32) if self._lags.is_full() else None # irereturn nya lang kapag full
        else:
            return None
        
    def __repr__(self) -> str:
        cls_name = self.__class__.__name__
        return f"{cls_name}(lags={self._lags}, log_return={self._log_return})"
        
        

In [31]:
lags = LogReturnLags(3)
lags

LogReturnLags(lags=DequeWindow(capacity=3, values=[]), log_return=LogReturn(window=NumpyWindow(capacity=2, size=0, values=[])))

In [32]:
v = lags.on_tick(90)
v

In [33]:
lags

LogReturnLags(lags=DequeWindow(capacity=3, values=[]), log_return=LogReturn(window=NumpyWindow(capacity=2, size=1, values=[90.0])))

In [34]:
v = lags.on_tick(100)
v


In [35]:
lags

LogReturnLags(lags=DequeWindow(capacity=3, values=[np.float64(0.10536051565782635)]), log_return=LogReturn(window=NumpyWindow(capacity=2, size=2, values=[90.0, 100.0])))

In [36]:
v = lags.on_tick(100)
v

In [37]:
lags

LogReturnLags(lags=DequeWindow(capacity=3, values=[np.float64(0.0), np.float64(0.10536051565782635)]), log_return=LogReturn(window=NumpyWindow(capacity=2, size=2, values=[100.0, 100.0])))

In [38]:
lags  = LogReturnLags(3)
v = lags.on_tick(90)
lags

LogReturnLags(lags=DequeWindow(capacity=3, values=[]), log_return=LogReturn(window=NumpyWindow(capacity=2, size=1, values=[90.0])))

In [39]:
lags.on_tick(100)
lags

LogReturnLags(lags=DequeWindow(capacity=3, values=[np.float64(0.10536051565782635)]), log_return=LogReturn(window=NumpyWindow(capacity=2, size=2, values=[90.0, 100.0])))

In [40]:
lags.on_tick(150)


In [41]:
lags.on_tick(110)


tensor([-0.3102,  0.4055,  0.1054])

In [42]:
[np.log(110/150), np.log(150/100), np.log(100/90)]

[np.float64(-0.3101549283038396),
 np.float64(0.4054651081081644),
 np.float64(0.10536051565782635)]

In [43]:
lags = LogReturnLags(3)
lags.on_tick(90)
lags.on_tick(100)
lags.on_tick(150)
lags.on_tick(110)
features = lags.on_tick(160)
features



tensor([ 0.3747, -0.3102,  0.4055])

In [44]:
X = features
with torch.no_grad():
    y_hat = model(X)
y_hat

tensor([-0.0060])

In [45]:
y_hat[0]

tensor(-0.0060)

In [46]:
### Build the Trading System, using decimal to represnet money
val = 0.1
total = 0.0

for i in range(10):
    total +=val
total

0.9999999999999999

In [47]:
from decimal import Decimal
dp = Decimal('0.2')
val = Decimal(0.1).quantize(dp)
print(val)
total = Decimal(0.0).quantize(dp)
print(total)
for i in range(10):
    total += val

total

0.1
0.0


Decimal('1.0')

In [48]:
from dataclasses import dataclass

@dataclass(frozen=True)
class Order:
    sym: str
    signed_qty: Decimal

    def __str__(self) -> str:
        sign = "LONG" if self.signed_qty > 0 else "SHORT"
        return f"Order({sign} {self.signed_qty} {self.sym})"


In [49]:
from decimal import Decimal

def decimal_sign(d: Decimal) -> int:
    return 1 if d > Decimal(0) else -1

def is_long(x: Decimal) -> bool:
    return decimal_sign(x) > 0

@dataclass(frozen=True)
class Trade:
    sym: str
    signed_qty: Decimal
    price: Decimal
    pnl: Decimal

    def __str__(self) -> str:
        sign = "LONG" if is_long(self.signed_qty) else "SHORT"
        return f"Trade({sign} {self.signed_qty} {self.sym} {self.price} {self.pnl})"
    
    def is_long(self) -> bool:
        return is_long(self.signed_qty)

In [50]:
@dataclass
class Position:
    sym: str
    signed_qty: Decimal
    price: Decimal

    def close(self) -> "Order":
        return Order(self.sym, -self.signed_qty)
    
    def is_long(self) -> bool:
        return is_long(self.signed_qty)
    
    def unrealized_pnl(self, current_price: Decimal) -> Decimal:
        entry_val = self.price * self.signed_qty
        exit_val = current_price * -self.signed_qty
        return entry_val + exit_val


In [51]:
from abc import ABC, abstractmethod
from decimal import Decimal

class Account(ABC):
    @abstractmethod
    def balance(self) -> Decimal:
        pass

    @abstractmethod
    def get_position(self, sym: str) -> Optional[Position]:
        pass

In [None]:
from decimal import Decimal
from typing import Dict, List, Optional

class TestAccount(Account):
    """A simulated account for testing or paper trading"""

    def __init__(self, _balance: Decimal) -> None:
        self._balance = _balance
        self._positions : Dict[str, Position] = {}
        self._trades : List[Trade] = []

    def balance(self) -> Decimal:
        return self._balance
    
    def get_position(self, sym) -> Optional[Position]:
        return self._positions.get(sym)
    
    def __repr__(self) -> str:
        return f"TestAccount(balance={self._balance}, positions={self._positions}, trades={self._trades})"

In [53]:
acc = TestAccount(Decimal(50.0))
acc.balance()

Decimal('50')

In [54]:
acc

TestAccount(balance=50, positions={}, trades=[])

In [55]:
### Model an Exchange
from abc import abstractmethod
from decimal import Decimal

class Exchange(Account):
    """Abstract base class representing a trading exchange/broker."""

    @abstractmethod
    def market_order(self, sym: str, signed_qty: Decimal, price: Decimal) ->Trade:
        pass

    @abstractmethod
    def limit_order(self, sym: str, signed_qty: Decimal, price: Decimal, post_only: bool) -> Optional[Trade]:
        """Execute a limit order and return a Trade if it crosses book."""
        pass
    

In [None]:
from typing import Dict, List

class TestExchange(Exchange):
    _account : TestAccount # type hint only

    def __init__(self, account: TestAccount):
        self._account = account

    def market_order(self, sym: str, signed_qty: Decimal, price: Decimal) -> "Trade":
        trade = self._update_position(sym, signed_qty, price)
        self._account._balance += trade.pnl
        self._account._trades.append(trade)
        return trade
    
    def _update_position(self, sym: str, signed_qty, price: Decimal) -> Trade:
        position = self._account._positions.pop(sym, None)
        pnl = Decimal(0.0)
        if position is not None:
            entry_val = position.price * position.signed_qty
            exit_val = price * position.signed_qty
            pnl = exit_val - entry_val
        else:
            self._account._positions[sym] = Position(sym, signed_qty, price)
        return Trade(sym, signed_qty, price, pnl)
    
    def limit_order(self, sym, signed_qty, price, post_only = False):
        raise Exception("not yet implemented")
    
    def balance(self) -> Decimal:
        return self._account.balance()
    
    def get_position(self, sym) -> Optional[Position]: 
        return self._account.get_position(sym)
    
    def __repr__(self) -> str:
        return f"TestExchange(balance={self.balance()}, position={self._account._positions}, trades={self._account._trades})"

In [57]:
exchange = TestExchange(TestAccount(Decimal(50.0)))

price = Decimal(10)
qty = Decimal(5.0)
exchange.market_order('BTCUSDT', qty, Decimal(price))

Trade(sym='BTCUSDT', signed_qty=Decimal('5'), price=Decimal('10'), pnl=Decimal('0'))

In [58]:
exchange

TestExchange(balance=50, position={'BTCUSDT': Position(sym='BTCUSDT', signed_qty=Decimal('5'), price=Decimal('10'))}, trades=[Trade(sym='BTCUSDT', signed_qty=Decimal('5'), price=Decimal('10'), pnl=Decimal('0'))])

In [59]:
### Close Position
price  = Decimal(15.0)
exchange.market_order('BTCUSDT', -qty, price)

Trade(sym='BTCUSDT', signed_qty=Decimal('-5'), price=Decimal('15'), pnl=Decimal('25'))

In [60]:
exchange

TestExchange(balance=75, position={}, trades=[Trade(sym='BTCUSDT', signed_qty=Decimal('5'), price=Decimal('10'), pnl=Decimal('0')), Trade(sym='BTCUSDT', signed_qty=Decimal('-5'), price=Decimal('15'), pnl=Decimal('25'))])

In [61]:
###Build Strategy Api


class Strategy(ABC):
    @abstractmethod
    def on_tick(self, price: float, account: Account) -> Optional[List[Order]]:
        pass


In [None]:
#Implement our strategy

import torch.nn as nn

class BasicTakerStrat(Strategy):
    def __init__(self,
                 sym: str,
                 model : nn.Module,
                 log_return_lags: LogReturnLags, # ilang lags yung return yung naka shift() yung log sa  lag 
                 scale_factor: Decimal = None) -> None: # naka ini naman ng 1.0
        
        self.sym = sym
        self.model = model
        self.log_return_lags = log_return_lags
        if scale_factor is None:
            scale_factor = Decimal(1.0)
        self.scale_factor = Decimal(scale_factor)

    def _signed_compound_trade_size(self, y_hat: float, account: Account, cur_price : Decimal, position: Optional[Position]) -> Decimal:
        dir_signal = np.sign(y_hat)
        cur_balance = account.balance()
        unrealized_balance = cur_balance + (position.unrealized_pnl(cur_price) if position else Decimal(0.0))
        qty = unrealized_balance / cur_price
        signed_qty = Decimal(dir_signal) * qty
        return signed_qty * self.scale_factor # +-qty  * scale_factor
    
    def _create_orders(self, y_hat: torch.Tensor, account: Account, price: Decimal) -> List[Order]:
        position = account.get_position(self.sym)
        signed_trade_size = self._signed_compound_trade_size(y_hat.item(), account, price, position)
        open_order = Order(self.sym, signed_trade_size)
        if position is not None:
            close_order = Order(position.sym, -position.signed_qty)
            return [close_order, open_order]
        return [open_order]

    def on_tick(self, price: float, account: Account) -> List[Order]:
        X = self.log_return_lags.on_tick(price)
        if X is not None:
            with torch.no_grad(): # turn off training
                y_hat = self.model(X)
                orders = self._create_orders(y_hat, account, Decimal(price))
                return orders
            
        return []
    



In [85]:
### stream lagged log returns

lags = LogReturnLags(3) 
# Create Account
acc = TestAccount(Decimal(100.0)) #nilagyan lang natin na 100
#Create Strategy
strat = BasicTakerStrat('BTCUSDT', model, lags, Decimal(1.0)) #yung model natin naka 3 inputs sya,

#first 12 hour interval - 2025/10/20 00:00
strat.on_tick(10.0, acc)




[]

In [86]:
#second 12 hour interval - 2025/10/20 12:00
strat.on_tick(120.0, acc)


[]

In [87]:
#third 12 hour interval - 2025/10/21 00:00
strat.on_tick(90.0, acc)

[]

In [88]:
#fourt 12 hour interval  - 2025/10/21 12:00
orders = strat.on_tick(100, acc)
orders

[Order(sym='BTCUSDT', signed_qty=Decimal('1'))]

In [89]:
exchange = TestExchange(acc)
order = orders[0]

exchange.market_order(order.sym, order.signed_qty, 100)


Trade(sym='BTCUSDT', signed_qty=Decimal('1'), price=100, pnl=Decimal('0'))

In [90]:
exchange

TestExchange(balance=100, position={'BTCUSDT': Position(sym='BTCUSDT', signed_qty=Decimal('1'), price=100)}, trades=[Trade(sym='BTCUSDT', signed_qty=Decimal('1'), price=100, pnl=Decimal('0'))])

In [91]:
orders = strat.on_tick(115, acc)
orders

[Order(sym='BTCUSDT', signed_qty=Decimal('-1')),
 Order(sym='BTCUSDT', signed_qty=Decimal('-0.7391304347826086956521739130'))]

In [92]:
order = orders[0]
exchange.market_order(order.sym, order.signed_qty, 115)

Trade(sym='BTCUSDT', signed_qty=Decimal('-1'), price=115, pnl=Decimal('15'))

In [93]:
exchange

TestExchange(balance=115, position={}, trades=[Trade(sym='BTCUSDT', signed_qty=Decimal('1'), price=100, pnl=Decimal('0')), Trade(sym='BTCUSDT', signed_qty=Decimal('-1'), price=115, pnl=Decimal('15'))])

In [94]:
order = orders[1]
exchange.market_order(order.sym, order.signed_qty, 115)

Trade(sym='BTCUSDT', signed_qty=Decimal('-0.7391304347826086956521739130'), price=115, pnl=Decimal('0'))

In [95]:
exchange

TestExchange(balance=115, position={'BTCUSDT': Position(sym='BTCUSDT', signed_qty=Decimal('-0.7391304347826086956521739130'), price=115)}, trades=[Trade(sym='BTCUSDT', signed_qty=Decimal('1'), price=100, pnl=Decimal('0')), Trade(sym='BTCUSDT', signed_qty=Decimal('-1'), price=115, pnl=Decimal('15')), Trade(sym='BTCUSDT', signed_qty=Decimal('-0.7391304347826086956521739130'), price=115, pnl=Decimal('0'))])

In [96]:
orders = strat.on_tick(100, acc)
orders

[Order(sym='BTCUSDT', signed_qty=Decimal('0.7391304347826086956521739130')),
 Order(sym='BTCUSDT', signed_qty=Decimal('1.039130434782608695652173913'))]

In [97]:
order = orders[0]
exchange.market_order(order.sym, order.signed_qty, 100)

Trade(sym='BTCUSDT', signed_qty=Decimal('0.7391304347826086956521739130'), price=100, pnl=Decimal('11.08695652173913043478260870'))

In [98]:
exchange

TestExchange(balance=126.0869565217391304347826087, position={}, trades=[Trade(sym='BTCUSDT', signed_qty=Decimal('1'), price=100, pnl=Decimal('0')), Trade(sym='BTCUSDT', signed_qty=Decimal('-1'), price=115, pnl=Decimal('15')), Trade(sym='BTCUSDT', signed_qty=Decimal('-0.7391304347826086956521739130'), price=115, pnl=Decimal('0')), Trade(sym='BTCUSDT', signed_qty=Decimal('0.7391304347826086956521739130'), price=100, pnl=Decimal('11.08695652173913043478260870'))])

In [99]:
order = orders[1]
exchange.market_order(order.sym, order.signed_qty, 100)

Trade(sym='BTCUSDT', signed_qty=Decimal('1.039130434782608695652173913'), price=100, pnl=Decimal('0'))

In [100]:
exchange

TestExchange(balance=126.0869565217391304347826087, position={'BTCUSDT': Position(sym='BTCUSDT', signed_qty=Decimal('1.039130434782608695652173913'), price=100)}, trades=[Trade(sym='BTCUSDT', signed_qty=Decimal('1'), price=100, pnl=Decimal('0')), Trade(sym='BTCUSDT', signed_qty=Decimal('-1'), price=115, pnl=Decimal('15')), Trade(sym='BTCUSDT', signed_qty=Decimal('-0.7391304347826086956521739130'), price=115, pnl=Decimal('0')), Trade(sym='BTCUSDT', signed_qty=Decimal('0.7391304347826086956521739130'), price=100, pnl=Decimal('11.08695652173913043478260870')), Trade(sym='BTCUSDT', signed_qty=Decimal('1.039130434782608695652173913'), price=100, pnl=Decimal('0'))])