# vnstock_sepa_demo — SEPA-style scanner & analyser (Minervini-inspired)

Notebook này áp dụng checklist kiểu Mark Minervini (SEPA) để phân tích mã VN: trend filter (MA), breakout/pivot, volatility contraction, volume confirm (factor=1.5) và Relative Strength so với VNINDEX.

HƯỚNG DẪN: chỉnh `symbol`, `start_date`, `end_date` ở cell **USER INPUTS** rồi **Run All**. Nếu thiếu package, chạy cell đầu để cài.


In [None]:
# Install missing packages if required (safe subprocess method)
import importlib, sys, subprocess
reqs = ['vnstock','pandas','numpy','matplotlib','scipy']
missing = [p for p in reqs if importlib.util.find_spec(p) is None]
if missing:
    print('Installing', missing)
    subprocess.check_call([sys.executable, '-m', 'pip', 'install'] + missing)
else:
    print('All packages present')


In [None]:
# ===== USER INPUTS =====
symbol = 'VNINDEX'   # change to any symbol like 'PDR' or 'VIC'
start_date = ''  # YYYY-MM-DD or empty for 1 year
end_date = ''
from datetime import datetime, timedelta
if end_date == '':
    end_date = datetime.today().strftime('%Y-%m-%d')
if start_date == '':
    start_date = (datetime.today() - timedelta(days=365)).strftime('%Y-%m-%d')
print(f'Scanning {symbol} from {start_date} to {end_date}')

# volume confirmation factor
VOLUME_FACTOR = 1.5  # user-specified threshold


In [None]:
# ===== Fetch data (compatibility with vnstock versions) =====
import pandas as pd, numpy as np
df = None
try:
    from vnstock import Quote
    q = Quote(symbol=symbol, source='TCBS')
    df = q.history(start=start_date, end=end_date, interval='1D')
    if isinstance(df, dict) and 'data' in df:
        df = pd.DataFrame(df['data'])
    print('Loaded via Quote.history()')
except Exception as e:
    print('Quote API failed:', e)
    try:
        from vnstock import Vnstock
        stock = Vnstock().stock(symbol=symbol, source='TCBS')
        df = stock.quote.history(start=start_date, end=end_date, interval='1D')
        print('Loaded via Vnstock().stock().quote.history()')
    except Exception as e2:
        print('Vnstock API failed:', e2)
        try:
            from vnstock import stock_historical_data
            df = stock_historical_data(symbol=symbol, start_date=start_date, end_date=end_date, resolution='1D')
            print('Loaded via stock_historical_data()')
        except Exception as e3:
            raise RuntimeError('Cannot fetch data via vnstock: ' + str(e3))

if not isinstance(df, pd.DataFrame):
    df = pd.DataFrame(df)
df.columns = [c.lower() for c in df.columns]
if 'time' in df.columns:
    df['time'] = pd.to_datetime(df['time'])
elif 'date' in df.columns:
    df['time'] = pd.to_datetime(df['date'])
else:
    try:
        df.index = pd.to_datetime(df.index)
        df = df.reset_index().rename(columns={'index':'time'})
    except Exception:
        raise RuntimeError('No time column found')
df = df.set_index('time').sort_index()
print('Data rows:', len(df))
df.head()


In [None]:
# ===== Compute indicators: SMA50/150/200, ATR, BB width, MA volume, RSI9/45 =====
def sma(s, n): return s.rolling(n).mean()
def ema(s, n): return s.ewm(span=n, adjust=False).mean()
def atr(df, n=14):
    high = df['high']; low = df['low']; close = df['close']
    tr = pd.concat([high - low, (high - close.shift()).abs(), (low - close.shift()).abs()], axis=1).max(axis=1)
    return tr.rolling(n).mean()

df['SMA50'] = sma(df['close'], 50)
df['SMA150'] = sma(df['close'], 150)
df['SMA200'] = sma(df['close'], 200)
df['ATR14'] = atr(df, 14)
df['BB_mid'] = sma(df['close'], 20)
df['BB_up'] = df['BB_mid'] + 2 * df['close'].rolling(20).std()
df['BB_low'] = df['BB_mid'] - 2 * df['close'].rolling(20).std()
df['BB_width'] = (df['BB_up'] - df['BB_low']) / df['BB_mid']
df['VOL_MA20'] = df['volume'].rolling(20).mean()

def rsi_wilder(series, length=14):
    delta = series.diff()
    up = delta.clip(lower=0)
    down = -delta.clip(upper=0)
    ma_up = up.ewm(alpha=1/length, adjust=False).mean()
    ma_down = down.ewm(alpha=1/length, adjust=False).mean()
    rs = ma_up / ma_down
    return 100 - (100 / (1 + rs))

df['RSI9'] = rsi_wilder(df['close'], 9)
df['RSI45'] = rsi_wilder(df['close'], 45)

print('Indicators computed')
df[['close','SMA50','SMA150','SMA200','ATR14','BB_width','VOL_MA20','RSI9','RSI45']].tail()


In [None]:
# ===== Relative Strength vs VNINDEX (if symbol != VNINDEX) =====ndef fetch_index(index_symbol='VNINDEX'):
    try:
        from vnstock import Quote
        q = Quote(symbol=index_symbol, source='TCBS')
        idf = q.history(start=start_date, end=end_date, interval='1D')
        if isinstance(idf, dict) and 'data' in idf:
            idf = pd.DataFrame(idf['data'])
        idf.columns = [c.lower() for c in idf.columns]
        idf['time'] = pd.to_datetime(idf.get('time', idf.get('date')))
        idf = idf.set_index('time').sort_index()
        return idf
    except Exception as e:
        print('Index fetch failed:', e)
        return None

if symbol.upper() != 'VNINDEX':
    idx = fetch_index('VNINDEX')
    if idx is not None:
        joined = df[['close']].join(idx['close'].rename('idx_close'), how='inner')
        joined['RS'] = joined['close'] / joined['idx_close'] * 100
        joined['RS_52w_high'] = joined['RS'].rolling(252).max()
        rs_series = joined['RS']
        print('RS series computed, last RS:', rs_series.iloc[-1])
    else:
        rs_series = None
else:
    rs_series = None


In [None]:
# ===== Detect pivot (recent resistance) and breakout with volume confirmation (factor VOLUME_FACTOR) =====
lookback_months = 6
lookback_days = int(lookback_months * 21)
pivot_high = df['close'].rolling(lookback_days).max()
recent_pivot = pivot_high.shift(1).iloc[-1]
print('Recent pivot (max over last', lookback_months, 'months):', recent_pivot)

# breakout condition: close > recent_pivot and volume > VOLUME_FACTOR * VOL_MA20
breakout = (df['close'] > recent_pivot) & (df['volume'] > VOLUME_FACTOR * df['VOL_MA20'])
df['breakout'] = breakout
print('Breakout occurrences in period:', df['breakout'].sum())


In [None]:
# ===== Volatility contraction: check BB_width and ATR trend in base period =====
base_period = 60  # last 60 days to evaluate base tightness
bb_recent = df['BB_width'].iloc[-base_period:]
atr_recent = df['ATR14'].iloc[-base_period:]
bb_shrinking = bb_recent.iloc[-10:].mean() < bb_recent.mean()
atr_shrinking = atr_recent.iloc[-10:].mean() < atr_recent.mean()
print('BB shrinking:', bb_shrinking, 'ATR shrinking:', atr_shrinking)


In [None]:
# ===== Trend filter per Minervini: price above SMA50/150/200 =====
trend_ok = False
try:
    trend_ok = (df['close'].iloc[-1] > df['SMA50'].iloc[-1]) and (df['SMA50'].iloc[-1] > df['SMA150'].iloc[-1]) and (df['SMA150'].iloc[-1] > df['SMA200'].iloc[-1])
except Exception:
    trend_ok = False
print('Trend strong (price>SMA50>SMA150>SMA200):', trend_ok)


In [None]:
# ===== Checklist summary (SEPA-like) =====
checklist = {}
checklist['Trend filter (price>SMA50>SMA150>SMA200)'] = bool(trend_ok)
checklist['Volatility contraction (BB/ATR smaller recently)'] = bool(bb_shrinking and atr_shrinking)
checklist['Breakout above recent pivot with volume'] = bool(df['breakout'].iloc[-1])
checklist['RS strong (if available)'] = bool(rs_series is not None and rs_series.iloc[-1] >= rs_series.rolling(252, min_periods=1).max().iloc[-1]*0.9)

print('SEPA Checklist:')
for k,v in checklist.items():
    print(f'- {k}:', 'PASS' if v else 'FAIL')


In [None]:
# ===== Plot full annotated chart (price + SMAs + breakout + BB + RS if present) =====
import matplotlib.pyplot as plt
from matplotlib.dates import DateFormatter
plt.style.use('seaborn-whitegrid')
fig, axes = plt.subplots(4,1, figsize=(14,14), sharex=True, gridspec_kw={'height_ratios':[3,1,1,1]})
ax0, ax1, ax2, ax3 = axes
ax0.plot(df.index, df['close'], color='black', label='Close')
ax0.plot(df.index, df['SMA50'], color='blue', label='SMA50')
ax0.plot(df.index, df['SMA150'], color='orange', label='SMA150')
ax0.plot(df.index, df['SMA200'], color='red', label='SMA200')
ax0.fill_between(df.index, df['BB_low'], df['BB_up'], color='lightgrey', alpha=0.2, label='BB band')
ax0.scatter(df.index[df['breakout']], df['close'][df['breakout']], marker='^', color='green', s=120, label=f'Breakout (vol>{volume_factor}x MA20)')
ax0.set_title(f'{symbol} — SEPA checklist analysis')
ax0.legend()
ax1.bar(df.index, df['volume'].fillna(0), color='grey')
ax1.plot(df.index, df['VOL_MA20'], color='navy', label='Vol MA20')
ax1.legend()
ax2.plot(df.index, df['BB_width'], label='BB width')
ax2.set_ylabel('BB width')
ax3.plot(df.index, df['ATR14'], label='ATR14')
ax3.set_ylabel('ATR')
if rs_series is not None:
    ax0_twin = ax0.twinx()
    ax0_twin.plot(joined.index, joined['RS'], color='purple', alpha=0.6, label='RS vs VNINDEX')
    ax0_twin.set_ylabel('RS')
    ax0_twin.legend(loc='upper right')
date_fmt = DateFormatter('%Y-%m-%d')
ax3.xaxis.set_major_formatter(date_fmt)
plt.xticks(rotation=20)
plt.tight_layout()
plt.show()


In [None]:
# ===== Auto notes and suggested action =====
notes = []
if checklist['Trend filter (price>SMA50>SMA150>SMA200)']:
    notes.append('Trend filter PASS: cổ phiếu trong xu hướng tăng theo MA.')
else:
    notes.append('Trend filter FAIL: tránh mua mới trừ khi có breakout rất mạnh.')
if checklist['Breakout above recent pivot with volume']:
    notes.append('Breakout với volume: đây là điểm pivot entry — kiểm tra RS và tight base.')
else:
    notes.append('Chưa breakout hoặc thiếu volume confirm.')
if checklist['Volatility contraction (BB/ATR smaller recently)']:
    notes.append('Volatility contraction PASS: base tight, khả năng breakout mạnh khi volume tăng.')
else:
    notes.append('Base chưa đủ tight — độ xác suất breakout thấp hơn.')
if checklist['RS strong (if available)']:
    notes.append('RS mạnh so với VNINDEX → ưu tiên.')
else:
    notes.append('RS yếu hoặc không có dữ liệu RS → thận trọng.')

print('\n'.join(notes))
