<a href="https://colab.research.google.com/github/AndikaPutra509/Prediksi-Saham/blob/PrediksiSaham/Prediksi_Saham.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [437]:
import yfinance as yf
import pandas as pd
import numpy as np
import ta
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    roc_auc_score,
    f1_score,
    balanced_accuracy_score,
    accuracy_score,
)
from sklearn.ensemble import RandomForestClassifier, HistGradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler

In [438]:
SEED = 42
np.random.seed(SEED)

In [441]:
SYMBOL = "FORE.JK"
START = "2013-01-01"
END = "2026-02-24"
TRAIN_RATIO = 0.7
VAL_RATIO = 0.15
HOLD_BAND = 0.05

In [442]:
def normalize_columns(df: pd.DataFrame) -> pd.DataFrame:
    if isinstance(df.columns, pd.MultiIndex):
        df.columns = df.columns.get_level_values(0)
    return df

In [443]:
def replace_inf_with_nan(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    num_cols = out.select_dtypes(include=[np.number]).columns
    out.loc[:, num_cols] = out.loc[:, num_cols].replace([np.inf, -np.inf], np.nan)
    return out

In [444]:
def build_features(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    close = out["Close"]
    high = out["High"]
    low = out["Low"]
    volume = out["Volume"]

    out["RSI"] = ta.momentum.RSIIndicator(close).rsi()
    out["MA20"] = close.rolling(20).mean()
    out["MA50"] = close.rolling(50).mean()
    out["MACD"] = ta.trend.MACD(close).macd()

    bb = ta.volatility.BollingerBands(close)
    out["BB_high"] = bb.bollinger_hband()
    out["BB_low"] = bb.bollinger_lband()
    out["BB_width"] = out["BB_high"] - out["BB_low"]

    out["ATR"] = ta.volatility.AverageTrueRange(high, low, close).average_true_range()
    out["OBV"] = ta.volume.OnBalanceVolumeIndicator(close, volume).on_balance_volume()

    out["Return"] = close.pct_change()
    out["Return_5"] = close.pct_change(5)
    out["Return_10"] = close.pct_change(10)
    out["Volatility_10"] = out["Return"].rolling(10).std()
    out["Volatility_20"] = out["Return"].rolling(20).std()
    out["Volume_Change"] = volume.pct_change()

    for lag in [1, 2, 3, 5, 10]:
        out[f"Lag_Return_{lag}"] = out["Return"].shift(lag)
        out[f"Lag_RSI_{lag}"] = out["RSI"].shift(lag)

    out["Target"] = (out["Return"].shift(-1) > 0).astype(int)
    out = replace_inf_with_nan(out)
    return out.dropna().copy()

In [445]:
def split_time_series(df: pd.DataFrame):
    n = len(df)
    train_end = int(n * TRAIN_RATIO)
    val_end = int(n * (TRAIN_RATIO + VAL_RATIO))
    return df.iloc[:train_end], df.iloc[train_end:val_end], df.iloc[val_end:]

In [446]:
def find_best_threshold(y_true: np.ndarray, probs: np.ndarray) -> float:
    thresholds = np.arange(0.30, 0.71, 0.01)
    best_t, best_bacc, best_f1 = 0.5, -1.0, -1.0
    for t in thresholds:
        preds = (probs >= t).astype(int)
        bacc = balanced_accuracy_score(y_true, preds)
        f1 = f1_score(y_true, preds, zero_division=0)
        if (bacc > best_bacc) or (np.isclose(bacc, best_bacc) and f1 > best_f1):
            best_t, best_bacc, best_f1 = float(t), float(bacc), float(f1)
    return best_t

In [447]:
def decide_signal(prob_up: float, threshold: float, hold_band: float = HOLD_BAND) -> str:
    upper = threshold + hold_band
    lower = threshold - hold_band
    if prob_up >= upper and prob_up > 0.5:
        return "BELI"
    if prob_up <= lower and prob_up < 0.5:
        return "JUAL"
    return "TAHAN"

In [448]:
def get_model_candidates():
    return {
        "HistGradientBoosting": HistGradientBoostingClassifier(
            learning_rate=0.03,
            max_depth=4,
            max_iter=400,
            min_samples_leaf=20,
            random_state=SEED,
        ),
        "RandomForest": RandomForestClassifier(
            n_estimators=600,
            max_depth=8,
            min_samples_leaf=8,
            class_weight="balanced_subsample",
            random_state=SEED,
            n_jobs=-1,
        ),
        "LogisticRegression": Pipeline(
            steps=[
                ("imputer", SimpleImputer(strategy="median")),
                ("scaler", StandardScaler()),
                (
                    "clf",
                    LogisticRegression(
                        max_iter=2000,
                        class_weight="balanced",
                        random_state=SEED,
                    ),
                ),
            ]
        ),
    }

In [449]:
def evaluate_model(name, model, X_train, y_train, X_val, y_val):
    model.fit(X_train, y_train)
    val_probs = np.nan_to_num(model.predict_proba(X_val)[:, 1], nan=0.5, posinf=1.0, neginf=0.0)
    threshold = find_best_threshold(y_val, val_probs)
    val_preds = (val_probs >= threshold).astype(int)
    return {
        "name": name,
        "model": model,
        "threshold": threshold,
        "val_auc": roc_auc_score(y_val, val_probs),
        "val_bacc": balanced_accuracy_score(y_val, val_preds),
        "val_acc": accuracy_score(y_val, val_preds),
    }

In [450]:
def suggest_stoploss(signal: str, last_close: float, atr_value: float, prob_up: float, base_mult: float = 1.5):
    """
    Stop-loss sederhana berbasis ATR dan confidence model.
    - Confidence tinggi -> stop-loss sedikit lebih longgar.
    - Confidence rendah -> stop-loss lebih ketat.
    Mengembalikan tuple (stoploss_price, stoploss_pct, note).
    """
    if signal == "TAHAN":
        return None, None, "Tidak ada stop-loss karena sinyal TAHAN"

    confidence = abs(prob_up - 0.5) * 2  # 0..1
    multiplier = base_mult + (0.7 * confidence)

    if atr_value is None or np.isnan(atr_value) or atr_value <= 0:
        fallback_pct = 0.03 if signal == "BELI" else 0.03
        if signal == "BELI":
            stop = last_close * (1 - fallback_pct)
            return stop, fallback_pct * 100, "Fallback 3% (ATR tidak valid)"
        stop = last_close * (1 + fallback_pct)
        return stop, fallback_pct * 100, "Fallback 3% (ATR tidak valid, skenario short)"

    if signal == "BELI":
        stop = last_close - (multiplier * atr_value)
        stop_pct = ((last_close - stop) / last_close) * 100
        return stop, stop_pct, f"ATR x {multiplier:.2f} di bawah harga masuk"

    # JUAL diasumsikan skenario short/exit defensif
    stop = last_close + (multiplier * atr_value)
    stop_pct = ((stop - last_close) / last_close) * 100
    return stop, stop_pct, f"ATR x {multiplier:.2f} di atas harga referensi"

In [451]:
def estimate_expected_daily_return(prob_up: float, return_series: pd.Series) -> float:
    up_returns = return_series[return_series > 0]
    down_returns = return_series[return_series <= 0]

    mean_up = float(up_returns.mean()) if len(up_returns) else 0.0
    mean_down = float(down_returns.mean()) if len(down_returns) else 0.0
    expected_return = (prob_up * mean_up) + ((1 - prob_up) * mean_down)
    return expected_return

In [452]:
def forecast_next_week(last_close: float, expected_daily_return: float, start_date: pd.Timestamp) -> pd.DataFrame:
    future_dates = pd.bdate_range(start=start_date + pd.Timedelta(days=1), periods=5)
    prices = []
    price = float(last_close)
    for _ in range(5):
        price = price * (1 + expected_daily_return)
        prices.append(price)

    out = pd.DataFrame(
        {
            "Date": future_dates,
            "Predicted_Close": prices,
            "Expected_Daily_Return": expected_daily_return,
        }
    )
    return out

In [453]:
def main():
    raw = yf.download(SYMBOL, start=START, end=END, auto_adjust=True, progress=True)
    raw = normalize_columns(raw)
    df = raw[["Open", "High", "Low", "Close", "Volume"]].copy()
    df = build_features(df)

    train_df, val_df, test_df = split_time_series(df)
    feature_cols = [c for c in df.columns if c != "Target"]

    X_train, y_train = train_df[feature_cols], train_df["Target"].to_numpy()
    X_val, y_val = val_df[feature_cols], val_df["Target"].to_numpy()
    X_test, y_test = test_df[feature_cols], test_df["Target"].to_numpy()

    X_train = replace_inf_with_nan(X_train)
    X_val = replace_inf_with_nan(X_val)
    X_test = replace_inf_with_nan(X_test)

    candidates = get_model_candidates()
    evaluations = [evaluate_model(name, model, X_train, y_train, X_val, y_val) for name, model in candidates.items()]

    best = max(evaluations, key=lambda x: (x["val_bacc"], x["val_auc"]))
    best_model = best["model"]
    best_threshold = best["threshold"]

    probs_up = np.nan_to_num(best_model.predict_proba(X_test)[:, 1], nan=0.5, posinf=1.0, neginf=0.0)
    probs_down = 1 - probs_up
    preds = (probs_up >= best_threshold).astype(int)

    print("Model candidates (validation):")
    for ev in evaluations:
        print(
            f"- {ev['name']}: AUC={ev['val_auc']:.4f}, "
            f"BalancedAcc={ev['val_bacc']:.4f}, Acc={ev['val_acc']:.4f}, "
            f"Threshold={ev['threshold']:.2f}"
        )

    print(f"\nModel terpilih: {best['name']}")
    print(f"Best threshold (validation): {best_threshold:.2f}")
    print(f"Test accuracy: {accuracy_score(y_test, preds):.4f}")
    print(f"Test balanced accuracy: {balanced_accuracy_score(y_test, preds):.4f}")
    print(f"Test ROC-AUC: {roc_auc_score(y_test, probs_up):.4f}")
    print(confusion_matrix(y_test, preds))
    print(classification_report(y_test, preds, digits=4, zero_division=0))

    result = pd.DataFrame(
        {
            "Date": test_df.index,
            "Close": test_df["Close"].to_numpy().reshape(-1),
            "Prob_Naik": probs_up,
            "Prob_Turun": probs_down,
            "Aktual": np.where(y_test == 1, "NAIK", "TURUN"),
        }
    )
    result["Signal"] = result["Prob_Naik"].apply(lambda p: decide_signal(float(p), best_threshold))

    print("\nContoh output (10 hari terakhir):")
    print(result.tail(10).to_string(index=False))

    latest_row = replace_inf_with_nan(df[feature_cols].tail(1))
    prob_up_now = float(np.nan_to_num(best_model.predict_proba(latest_row)[:, 1], nan=0.5, posinf=1.0, neginf=0.0)[0])
    prob_down_now = 1 - prob_up_now
    signal_now = decide_signal(prob_up_now, best_threshold)
    atr_now = float(df["ATR"].iloc[-1]) if "ATR" in df.columns else np.nan
    stoploss_price, stoploss_pct, stoploss_note = suggest_stoploss(
        signal=signal_now,
        last_close=float(df["Close"].iloc[-1]),
        atr_value=atr_now,
        prob_up=prob_up_now,
    )

    print("\nSignal saat ini:")
    print(f"Tanggal data terakhir : {df.index[-1].date()}")
    print(f"Prob_Naik saat ini   : {prob_up_now:.4f}")
    print(f"Prob_Turun saat ini  : {prob_down_now:.4f}")
    print(f"Signal saat ini      : {signal_now}")
    if stoploss_price is not None:
        print(f"Stop-loss saran      : {stoploss_price:.2f} ({stoploss_pct:.2f}%)")
    print(f"Catatan stop-loss    : {stoploss_note}")

    expected_ret = estimate_expected_daily_return(prob_up_now, train_df["Return"])
    weekly_forecast = forecast_next_week(
        last_close=float(df["Close"].iloc[-1]),
        expected_daily_return=expected_ret,
        start_date=df.index[-1],
    )

    print("\nPrediksi harga 1 minggu ke depan (5 hari bursa):")
    print(weekly_forecast.to_string(index=False))

In [454]:
if __name__ == "__main__":
    main()

[*********************100%***********************]  1 of 1 completed


Model candidates (validation):
- HistGradientBoosting: AUC=0.6357, BalancedAcc=0.5000, Acc=0.5833, Threshold=0.32
- RandomForest: AUC=0.7357, BalancedAcc=0.5786, Acc=0.6250, Threshold=0.30
- LogisticRegression: AUC=0.5714, BalancedAcc=0.5500, Acc=0.6250, Threshold=0.30

Model terpilih: RandomForest
Best threshold (validation): 0.30
Test accuracy: 0.5833
Test balanced accuracy: 0.6429
Test ROC-AUC: 0.8714
[[ 4 10]
 [ 0 10]]
              precision    recall  f1-score   support

           0     1.0000    0.2857    0.4444        14
           1     0.5000    1.0000    0.6667        10

    accuracy                         0.5833        24
   macro avg     0.7500    0.6429    0.5556        24
weighted avg     0.7917    0.5833    0.5370        24


Contoh output (10 hari terakhir):
      Date  Close  Prob_Naik  Prob_Turun Aktual Signal
2026-02-06  466.0   0.588013    0.411987   NAIK   BELI
2026-02-09  470.0   0.556015    0.443985   NAIK   BELI
2026-02-10  480.0   0.453349    0.546651   NAI