In [1]:
import yfinance as yf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from src.utils import get_last_market_day
import pandas_market_calendars as mcal
from src.constants import base_url
from src.utils import calculate_atr

In [2]:
def get_symbol(df, symbol:str) -> pd.DataFrame:
    return pd.DataFrame(
    {
        'Open':df['Open'][symbol],
        'High':df['High'][symbol],
        'Low':df['Low'][symbol],
        'Close':df['Close'][symbol],
        'Volume':df['Volume'][symbol],
    }
)

def reaggregate_bars(df, resample:str='1d') -> pd.DataFrame:
    return pd.DataFrame(
    {
        'Open':df['Open'].resample(resample).first(),
        'High':df['High'].resample(resample).max(),
        'Low':df['Low'].resample(resample).min(),
        'Close':df['Close'].resample(resample).last(),
        'Volume':df['Volume'].resample(resample).sum(),
    }
    ).dropna()

In [3]:
with open('files/watchlist.txt') as f:
    watchlist = f.read()
last_market_day = get_last_market_day()

In [4]:
df_raw = yf.download(watchlist.split(' '), auto_adjust=False, interval='30m', period='1mo')

[*********************100%***********************]  80 of 80 completed


In [8]:
symbol = 'AAPL'
df_full = get_symbol(df_raw, symbol)
df_daily = reaggregate_bars(df_full, '1d')
df = df_full.loc[last_market_day].copy()

In [13]:
def calculate_tpo(
        df_symbol:pd.DataFrame, 
        session_date:str=get_last_market_day(),
        atr_scaler:float=25.0
    ) -> pd.Series:

    df_daily = reaggregate_bars(df_full, '1d')
    df = df_full.loc[session_date].copy()
    max_price = df['High'].max().round(2)
    min_price = df['Low'].min().round(2)
    atr = calculate_atr(df_daily).iloc[-1]
    tpo_bin_size = round(atr/atr_scaler, 2)
    bin_edges = np.arange(min_price, max_price + tpo_bin_size, tpo_bin_size)
    bin_labels = bin_edges[:-1]
    tpo_counts = pd.Series(0, index=bin_labels)
    for _, row in df.iterrows():
        low = row['Low']
        high = row['High']
        price_range = np.arange(low, high + tpo_bin_size, tpo_bin_size)
        binned = pd.cut(price_range, bins=bin_edges, labels=bin_labels, right=False)
        binned = binned.dropna().unique()
        for bin_label in binned:
            tpo_counts[bin_label] += 1
    return tpo_counts

In [21]:
def calculate_value_area(tpo_counts:pd.Series, value_area_pct:float=0.7) -> dict:
    sorted_tpo = tpo_counts.sort_values(ascending=False)
    total_tpos = sorted_tpo.sum()
    target_tpos = total_tpos * value_area_pct
    max_count = sorted_tpo.iloc[0]
    poc_candidates = sorted_tpo[sorted_tpo == max_count].index
    price_center = np.average(tpo_counts.index, weights=tpo_counts.values)
    poc = min(poc_candidates, key=lambda x: abs(x - price_center))  # closest to center
    value_area_bins = [poc]
    cumulative = tpo_counts[poc]
    remaining = tpo_counts.drop(index=poc) # remove POC bin
    sorted_bins = sorted(remaining.index)
    bin_size = sorted_bins[1] - sorted_bins[0]
    above = poc + bin_size
    below = poc - bin_size

    while cumulative < target_tpos:
        next_bins = []
        if above in remaining:
            next_bins.append((above, tpo_counts[above]))
        if below in remaining:
            next_bins.append((below, tpo_counts[below]))

        if not next_bins:
            break

    # Choose the bin with higher TPO count; if tied, prefer lower price
        next_bins.sort(key=lambda x: (-x[1], x[0]))
        chosen_price, count = next_bins[0]

        value_area_bins.append(chosen_price)
        cumulative += count
        remaining = remaining.drop(index=chosen_price)

        # Move pointers
        if chosen_price == above:
            above += bin_size
        else:
            below -= bin_size

    val = min(value_area_bins)
    vah = max(value_area_bins)

    return {
        'POC':round(poc,2),
        'VAH':round(vah,2),
        'VAL':round(val,2),
        'Value Area Bins': sorted(value_area_bins),
        'Total TPOs': total_tpos,
        'Captured TPOs': cumulative
    }

In [22]:
symbol = 'AAPL'
tpo = calculate_tpo(get_symbol(df_raw, symbol))
value_areas = calculate_value_area(tpo)

In [23]:
value_areas

{'POC': 214.3,
 'VAH': 214.6,
 'VAL': 213.85,
 'Value Area Bins': [213.85000000000002,
  214.00000000000003,
  214.15000000000003,
  214.30000000000004,
  214.45000000000005,
  214.60000000000005],
 'Total TPOs': np.int64(68),
 'Captured TPOs': np.int64(49)}