# Streaming/Real-Time Indicators

This notebook demonstrates numta's streaming indicator capabilities for real-time data processing.

In [None]:
import numpy as np
from numta.streaming import (
    # Overlap indicators
    StreamingSMA, StreamingEMA, StreamingBBANDS,
    StreamingDEMA, StreamingTEMA, StreamingWMA,
    # Momentum indicators
    StreamingRSI, StreamingMACD, StreamingSTOCH,
    StreamingMOM, StreamingROC,
    # Volatility indicators
    StreamingATR, StreamingTRANGE,
    # Volume indicators
    StreamingOBV, StreamingAD
)

## Introduction to Streaming Indicators

Streaming indicators maintain internal state and update efficiently with each new data point. This is ideal for:

- Real-time trading systems
- Live dashboards
- Event-driven architectures
- Low-latency applications

## Basic Streaming Usage

### StreamingSMA and StreamingEMA

In [None]:
# Create streaming indicators
sma = StreamingSMA(timeperiod=20)
ema = StreamingEMA(timeperiod=12)

# Simulate streaming price data
np.random.seed(42)
prices = 100 + np.cumsum(np.random.randn(100) * 0.5)

# Process prices one at a time
for i, price in enumerate(prices):
    sma_value = sma.update(price)
    ema_value = ema.update(price)
    
    # Check if indicator is ready (has enough data)
    if sma.ready and i >= 25:  # Print after warmup
        if i % 10 == 0:  # Print every 10th value
            print(f"Bar {i}: Price={price:.2f}, SMA={sma_value:.2f}, EMA={ema_value:.2f}")

### The `ready` Property

Each streaming indicator has a `ready` property that indicates whether enough data has been accumulated for valid output.

In [None]:
# Create new indicator
sma5 = StreamingSMA(timeperiod=5)

# Process some prices
for i in range(7):
    value = sma5.update(100 + i)
    print(f"Bar {i}: ready={sma5.ready}, value={value}")

### StreamingRSI

In [None]:
rsi = StreamingRSI(timeperiod=14)

# Process prices
for i, price in enumerate(prices):
    rsi_value = rsi.update(price)
    
    if rsi.ready and i >= 20 and i % 10 == 0:
        # Signal overbought/oversold
        signal = "Overbought" if rsi_value > 70 else "Oversold" if rsi_value < 30 else "Neutral"
        print(f"Bar {i}: RSI={rsi_value:.2f} ({signal})")

### StreamingMACD

In [None]:
macd = StreamingMACD(fastperiod=12, slowperiod=26, signalperiod=9)

# Process prices
for i, price in enumerate(prices):
    result = macd.update(price)
    
    if macd.ready and i >= 35 and i % 10 == 0:
        # result is a tuple: (macd_line, signal_line, histogram)
        macd_line, signal_line, histogram = result
        signal = "Bullish" if histogram > 0 else "Bearish"
        print(f"Bar {i}: MACD={macd_line:.3f}, Signal={signal_line:.3f}, Hist={histogram:.3f} ({signal})")

## OHLCV Bar Updates

Some indicators require OHLCV data. Use `update_bar()` for these.

### StreamingATR

In [None]:
# Create streaming ATR
atr = StreamingATR(timeperiod=14)

# Simulate OHLCV data
np.random.seed(42)
n = 100
base = 100 + np.cumsum(np.random.randn(n) * 0.5)

for i in range(n):
    open_ = base[i] + np.random.randn() * 0.2
    close = base[i]
    high = max(open_, close) + abs(np.random.randn()) * 0.5
    low = min(open_, close) - abs(np.random.randn()) * 0.5
    volume = 1000 + np.random.randint(0, 500)
    
    atr_value = atr.update_bar(open_, high, low, close, volume)
    
    if atr.ready and i >= 20 and i % 10 == 0:
        print(f"Bar {i}: ATR={atr_value:.4f}")

### StreamingSTOCH

In [None]:
stoch = StreamingSTOCH(fastk_period=5, slowk_period=3, slowd_period=3)

for i in range(n):
    open_ = base[i] + np.random.randn() * 0.2
    close = base[i]
    high = max(open_, close) + abs(np.random.randn()) * 0.5
    low = min(open_, close) - abs(np.random.randn()) * 0.5
    volume = 1000
    
    result = stoch.update_bar(open_, high, low, close, volume)
    
    if stoch.ready and i >= 15 and i % 10 == 0:
        slowk, slowd = result
        signal = "Overbought" if slowk > 80 else "Oversold" if slowk < 20 else "Neutral"
        print(f"Bar {i}: SlowK={slowk:.2f}, SlowD={slowd:.2f} ({signal})")

## Volume Indicators

In [None]:
obv = StreamingOBV()
ad = StreamingAD()

for i in range(n):
    open_ = base[i] + np.random.randn() * 0.2
    close = base[i]
    high = max(open_, close) + abs(np.random.randn()) * 0.5
    low = min(open_, close) - abs(np.random.randn()) * 0.5
    volume = 1000.0 + np.random.randint(0, 500)
    
    obv_value = obv.update_bar(open_, high, low, close, volume)
    ad_value = ad.update_bar(open_, high, low, close, volume)
    
    if i >= 10 and i % 20 == 0:
        print(f"Bar {i}: OBV={obv_value:.0f}, A/D={ad_value:.2f}")

## Building a Real-Time Dashboard

Here's an example of combining multiple streaming indicators.

In [None]:
class TradingDashboard:
    def __init__(self):
        self.sma_fast = StreamingSMA(timeperiod=10)
        self.sma_slow = StreamingSMA(timeperiod=20)
        self.rsi = StreamingRSI(timeperiod=14)
        self.macd = StreamingMACD()
        self.atr = StreamingATR(timeperiod=14)
        self.last_signal = None
    
    def update(self, open_, high, low, close, volume):
        # Update all indicators
        sma_fast_val = self.sma_fast.update(close)
        sma_slow_val = self.sma_slow.update(close)
        rsi_val = self.rsi.update(close)
        macd_result = self.macd.update(close)
        atr_val = self.atr.update_bar(open_, high, low, close, volume)
        
        # Check if all indicators are ready
        if not all([self.sma_fast.ready, self.sma_slow.ready, 
                    self.rsi.ready, self.macd.ready, self.atr.ready]):
            return None
        
        # Generate signals
        signal = {
            'price': close,
            'sma_fast': sma_fast_val,
            'sma_slow': sma_slow_val,
            'trend': 'up' if sma_fast_val > sma_slow_val else 'down',
            'rsi': rsi_val,
            'rsi_signal': 'overbought' if rsi_val > 70 else 'oversold' if rsi_val < 30 else 'neutral',
            'macd_hist': macd_result[2],
            'macd_signal': 'bullish' if macd_result[2] > 0 else 'bearish',
            'atr': atr_val
        }
        
        return signal

# Use the dashboard
dashboard = TradingDashboard()

for i in range(n):
    open_ = base[i] + np.random.randn() * 0.2
    close = base[i]
    high = max(open_, close) + abs(np.random.randn()) * 0.5
    low = min(open_, close) - abs(np.random.randn()) * 0.5
    volume = 1000.0 + np.random.randint(0, 500)
    
    signal = dashboard.update(open_, high, low, close, volume)
    
    if signal and i % 20 == 0:
        print(f"\nBar {i}:")
        print(f"  Price: {signal['price']:.2f}")
        print(f"  Trend: {signal['trend']} (SMA Fast: {signal['sma_fast']:.2f}, Slow: {signal['sma_slow']:.2f})")
        print(f"  RSI: {signal['rsi']:.2f} ({signal['rsi_signal']})")
        print(f"  MACD: {signal['macd_signal']}")
        print(f"  ATR: {signal['atr']:.4f}")

## Performance Considerations

Streaming indicators are optimized for:

1. **Memory efficiency**: Only store necessary historical data
2. **Speed**: O(1) update operations
3. **Low latency**: No need to recalculate entire series

### Resetting Indicators

Call `reset()` to clear internal state and start fresh.

In [None]:
sma = StreamingSMA(timeperiod=5)

# Process some data
for i in range(10):
    sma.update(100 + i)
print(f"Before reset: ready={sma.ready}")

# Reset
sma.reset()
print(f"After reset: ready={sma.ready}")

## Available Streaming Indicators

| Category | Indicators |
|----------|------------|
| Overlap | StreamingSMA, StreamingEMA, StreamingBBANDS, StreamingDEMA, StreamingTEMA, StreamingWMA |
| Momentum | StreamingRSI, StreamingMACD, StreamingSTOCH, StreamingMOM, StreamingROC |
| Volatility | StreamingATR, StreamingTRANGE |
| Volume | StreamingOBV, StreamingAD |

## Next Steps

- See `07_visualization.ipynb` for real-time chart visualization with lwcharts
- See `08_performance_optimization.ipynb` for performance tips