# Dings Trader — Full Pipeline (All-in-One)
Ein einziges Notebook für den kompletten Ablauf.

**Empfohlene Runtime-Strategie:**
- Schritte 0–3 auf **CPU/T4** (spart Credits)
- dann Runtime auf **A100** wechseln
- danach Schritte 4–7 ausführen

Alle Schritte schreiben Logs nach `logs/colab/full_pipeline.log` und drucken `OK:` / `ERROR:`.


## 0) Setup (Drive + Symlinks + Dependencies)


In [None]:
step = "00_drive_mount"
try:
    from google.colab import drive
    drive.mount('/content/drive')
    print(f"OK: {step}")
except Exception as e:
    print(f"ERROR: {step} :: {e}")
    raise


In [None]:
%%bash
    set -euo pipefail
    STEP="00_setup_symlinks_and_deps"
    ROOT="${ROOT:-/content/Dings_Trader/TraderHimSelf}"
    cd "$ROOT"
    mkdir -p logs/colab
    LOG="logs/colab/full_pipeline.log"
    trap 'rc=$?; echo "ERROR: ${STEP} :: exit_code=${rc}" | tee -a "$LOG"; exit $rc' ERR

    echo "START: ${STEP}" | tee -a "$LOG"
    STORE="${STORE:-/content/drive/MyDrive/dings-trader-store/TraderHimSelf}"
mkdir -p "$STORE"/{data_raw,data_processed,models,logs,runs,checkpoints}

for d in data_raw data_processed models logs runs checkpoints; do
  [ -L "$d" ] && rm "$d" || true
  if [ -e "$d" ] && [ ! -L "$d" ]; then
    if [ -n "$(ls -A "$d" 2>/dev/null || true)" ]; then
      mv "$d" "${d}.backup.$(date +%s)"
    else
      rm -rf "$d"
    fi
  fi
  ln -s "$STORE/$d" "$d"
done

python -m pip install -r requirements.txt 2>&1 | tee -a "$LOG"
readlink -f data_raw | tee -a "$LOG"
    echo "OK: ${STEP}" | tee -a "$LOG"


## 1) Download Data (CPU/T4)


In [None]:
%%bash
    set -euo pipefail
    STEP="01_download_data"
    ROOT="${ROOT:-/content/Dings_Trader/TraderHimSelf}"
    cd "$ROOT"
    mkdir -p logs/colab
    LOG="logs/colab/full_pipeline.log"
    trap 'rc=$?; echo "ERROR: ${STEP} :: exit_code=${rc}" | tee -a "$LOG"; exit $rc' ERR

    echo "START: ${STEP}" | tee -a "$LOG"
    START_DATE="${START_DATE:-2019-01-01}"
END_DATE="${END_DATE:-$(date +%F)}"
SYMBOL="${SYMBOL:-BTCUSDT}"

python download_binance_data.py   --data-dir data_raw   --start-date "$START_DATE"   --end-date "$END_DATE"   --symbol "$SYMBOL" 2>&1 | tee -a "$LOG"

ls -lah data_raw | tee -a "$LOG"
    echo "OK: ${STEP}" | tee -a "$LOG"


## 2) Build Dataset (CPU/T4)


In [None]:
%%bash
set -euo pipefail
STEP="02_build_dataset"
ROOT="${ROOT:-/content/Dings_Trader/TraderHimSelf}"
cd "$ROOT"
mkdir -p logs/colab
LOG="logs/colab/full_pipeline.log"
trap 'rc=$?; echo "ERROR: ${STEP} :: exit_code=${rc}" | tee -a "$LOG"; exit $rc' ERR

echo "START: ${STEP}" | tee -a "$LOG"
python build_dataset.py 2>&1 | tee -a "$LOG"
echo "OK: ${STEP}" | tee -a "$LOG"


## 3) Feature Engine (CPU/T4)


In [None]:
%%bash
set -euo pipefail
STEP="03_feature_engine"
ROOT="${ROOT:-/content/Dings_Trader/TraderHimSelf}"
cd "$ROOT"
mkdir -p logs/colab
LOG="logs/colab/full_pipeline.log"
trap 'rc=$?; echo "ERROR: ${STEP} :: exit_code=${rc}" | tee -a "$LOG"; exit $rc' ERR

echo "START: ${STEP}" | tee -a "$LOG"
python feature_engine.py build 2>&1 | tee -a "$LOG"
echo "OK: ${STEP}" | tee -a "$LOG"


## 3.5) Runtime Switch auf A100
Jetzt Runtime auf **A100** wechseln (Runtime → Change runtime type).
Nach Reconnect die **nächsten zwei Zellen erneut** ausführen (Drive mount + setup), dann mit Schritt 4 weiter.


In [1]:
step = "35_drive_remount_after_switch"
try:
    from google.colab import drive
    drive.mount('/content/drive')
    print(f"OK: {step}")
except Exception as e:
    print(f"ERROR: {step} :: {e}")
    raise


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
OK: 35_drive_remount_after_switch


In [19]:
%%bash
set -euo pipefail

REPO_DIR="/content/Dings_Trader"
REPO_URL="https://github.com/MidnightCoffeex/Dings_Trader.git"

cd /content
if [ -d "$REPO_DIR/.git" ]; then
  echo "Repo existiert -> update auf neuesten Stand"
  cd "$REPO_DIR"
  git fetch origin main
  git reset --hard origin/main
else
  echo "Repo fehlt -> clone"
  git clone "$REPO_URL" Dings_Trader
  cd "$REPO_DIR"
fi

cd /content/Dings_Trader/TraderHimSelf
ls -lah requirements.txt
git log --oneline -n 1
echo "OK: repo ready (latest main)"

Repo existiert -> update auf neuesten Stand
HEAD is now at ce54736 feat(precompute): add periodic batch progress logs and unbuffered step-05 output
-rw-r--r-- 1 root root 361 Feb 10 21:35 requirements.txt
ce54736 feat(precompute): add periodic batch progress logs and unbuffered step-05 output
OK: repo ready (latest main)


From https://github.com/MidnightCoffeex/Dings_Trader
 * branch            main       -> FETCH_HEAD
   5322699..ce54736  main       -> origin/main


In [9]:
#relink + deps

%%bash
set -euo pipefail
ROOT="/content/Dings_Trader/TraderHimSelf"
STORE="/content/drive/MyDrive/dings-trader-store/TraderHimSelf"

cd "$ROOT"
mkdir -p "$STORE"/{data_raw,data_processed,models,logs,runs,checkpoints}

for d in data_raw data_processed models logs runs checkpoints; do
  [ -L "$d" ] && rm "$d" || true
  [ -e "$d" ] && [ ! -L "$d" ] && rm -rf "$d"
  ln -s "$STORE/$d" "$d"
done

python -m pip install -r requirements.txt
echo "OK: relink after switch"

OK: relink after switch


## 4) Pipeline-Argumente (JSON, hier editieren)

Diese Zelle ist der **Single Point of Truth** für PatchTST + PPO.

- PatchTST-spezifisch: `patch_*` + Forecast-Parameter
- PPO-spezifisch: `ppo_*`
- Geteilte/übertragene Parameter: `symbol`, `decision_tf`, `intrabar_tf`, Dateipfade, `package_*`

⚠️ Aktuell bleibt Runtime-seitig **15m Decision + 3m Intrabar** Standard. Andere TFs sind vorerst Datei-/Metadaten-basiert.


In [None]:
import json, os

if os.path.isdir('/content/Dings_Trader/TraderHimSelf'):
    ROOT='/content/Dings_Trader/TraderHimSelf'
elif os.path.isdir('/content/dings-trader/projects/dings-trader/TraderHimSelf'):
    ROOT='/content/dings-trader/projects/dings-trader/TraderHimSelf'
else:
    raise FileNotFoundError('Repo nicht gefunden. Setup-/Clone-Zellen zuerst ausführen.')

PIPELINE_JSON=os.path.join(ROOT, 'models', 'pipeline_args.json')
os.makedirs(os.path.dirname(PIPELINE_JSON), exist_ok=True)

# ---- editierbare top-level knobs ----
DECISION_TF='15m'      # z.B. '15m' oder '1h' (Dateibindung unten)
INTRABAR_TF='3m'       # bleibt vorerst 3m
FEATURE_SET='train30'  # bleibt vorerst train30

# ---- Dateibindung (TF -> Datei) ----
# Convention:
#   candles: data_processed/aligned_<tf>.parquet
#   features/scaler: data_processed/<feature_set>/(features|scaler) bzw. *_<tf> wenn tf != 15m

decision_file = os.path.join(ROOT, 'data_processed', f'aligned_{DECISION_TF}.parquet')
intrabar_file = os.path.join(ROOT, 'data_processed', f'aligned_{INTRABAR_TF}.parquet')

fs_dir = os.path.join(ROOT, 'data_processed', FEATURE_SET)
if DECISION_TF.lower() == '15m':
    features_file = os.path.join(fs_dir, 'features.parquet')
    scaler_file = os.path.join(fs_dir, 'scaler.pkl')
else:
    features_file = os.path.join(fs_dir, f'features_{DECISION_TF}.parquet')
    scaler_file = os.path.join(fs_dir, f'scaler_{DECISION_TF}.pkl')

PIPELINE_ARGS={
    # ---- shared / handover ----
    'pipeline_version': 'arg_interface_v1',
    'symbol': 'BTCUSDT',
    'decision_tf': DECISION_TF,
    'intrabar_tf': INTRABAR_TF,
    'feature_set': FEATURE_SET,
    'model_tag': 'paper_ppo_v1',

    'data_file_raw_decision': decision_file,
    'data_file_raw_intrabar': intrabar_file,
    'data_file_features': features_file,
    'scaler_path': scaler_file,

    # Paketierung / Manifest
    'package_root': os.path.join(ROOT, 'models', 'packages'),
    'package_id': None,
    'package_dir': None,
    'manifest_path': None,

    # Legacy alias paths (werden nach Training automatisch aktualisiert)
    'forecast_model_alias_path': os.path.join(ROOT, 'models', 'forecast_model.pt'),
    'ppo_model_alias_path': os.path.join(ROOT, 'models', 'ppo_policy_final.zip'),

    # ---- PatchTST stage ----
    'lookback_steps': 512,
    'forecast_horizon_steps': 16,
    'horizon_steps': [1, 2, 4, 8, 16],
    'horizon_weights': [1.0, 1.0, 0.8, 0.6, 0.4],

    'patch_epochs': 20,
    'patch_learning_rate': 1e-4,

    # DataLoader/Runtime (optional; wenn None -> profile/default)
    'patch_batch_size': 1024,
    'patch_num_workers': 12,
    'patch_pin_memory': True,
    'patch_prefetch_factor': 4,
    'patch_persistent_workers': True,
    'patch_amp': True,
    'patch_compile': True,

    # Output path: None => automatisch package_dir/forecast_features.parquet
    'forecast_features_output': None,

    # ---- PPO stage ----
    'ppo_total_timesteps': 2_000_000,   # z.B. 2M oder 8M
    'ppo_learning_rate': 3e-4,
    'ppo_n_steps': 4096,
    'ppo_batch_size': 512,
    'ppo_n_epochs': 10,
    'ppo_gamma': 0.99,
    'ppo_gae_lambda': 0.95,
    'ppo_clip_range': 0.2,
    'ppo_ent_coef': 0.01,

    # PPO runtime
    'ppo_n_envs': 1,
    'ppo_vec_env': 'auto',
    'ppo_device': 'auto',
}

with open(PIPELINE_JSON, 'w', encoding='utf-8') as f:
    json.dump(PIPELINE_ARGS, f, indent=2, ensure_ascii=False)

print('OK: pipeline json written ->', PIPELINE_JSON)
print(json.dumps(PIPELINE_ARGS, indent=2, ensure_ascii=False))


## 5) Train PatchTST (A100)
Nutzen dieselbe `pipeline_args.json`.


In [None]:
%%bash
set -euo pipefail
STEP="04_train_patchtst"
if [ -d "/content/Dings_Trader/TraderHimSelf" ]; then
  ROOT="/content/Dings_Trader/TraderHimSelf"
elif [ -d "/content/dings-trader/projects/dings-trader/TraderHimSelf" ]; then
  ROOT="/content/dings-trader/projects/dings-trader/TraderHimSelf"
else
  echo "ERROR: Repo nicht gefunden. Erst Clone/Update-Zelle ausführen."
  exit 1
fi

cd "$ROOT"
mkdir -p logs/colab
LOG="logs/colab/${STEP}.log"
PIPELINE_JSON="${PIPELINE_JSON:-$ROOT/models/pipeline_args.json}"
trap 'rc=$?; echo "ERROR: ${STEP} :: exit_code=${rc}" | tee -a "$LOG"; exit $rc' ERR

echo "START: ${STEP}" | tee -a "$LOG"
echo "PIPELINE_JSON=${PIPELINE_JSON}" | tee -a "$LOG"
export PYTHONUNBUFFERED=1

python -u forecast/train_patchtst.py train   --profile high-util   --pipeline-json "$PIPELINE_JSON"   --log-every-batches 1 2>&1 | tee -a "$LOG"

echo "OK: ${STEP}" | tee -a "$LOG"


In [None]:
%%bash
set -euo pipefail
STEP="05_precompute_forecast"

if [ -d "/content/Dings_Trader/TraderHimSelf" ]; then
  ROOT="/content/Dings_Trader/TraderHimSelf"
elif [ -d "/content/dings-trader/projects/dings-trader/TraderHimSelf" ]; then
  ROOT="/content/dings-trader/projects/dings-trader/TraderHimSelf"
else
  echo "ERROR: Repo nicht gefunden. Erst Clone/Update-Zelle ausführen."
  exit 1
fi

cd "$ROOT"
mkdir -p logs/colab
LOG="logs/colab/${STEP}.log"
PIPELINE_JSON="${PIPELINE_JSON:-$ROOT/models/pipeline_args.json}"
trap 'rc=$?; echo "ERROR: ${STEP} :: exit_code=${rc}" | tee -a "$LOG"; exit $rc' ERR

echo "START: ${STEP}" | tee -a "$LOG"
echo "PIPELINE_JSON=${PIPELINE_JSON}" | tee -a "$LOG"
export PYTHONUNBUFFERED=1

python -u forecast/train_patchtst.py precompute   --profile high-util   --pipeline-json "$PIPELINE_JSON"   --log-every-batches 10 2>&1 | tee -a "$LOG"

echo "OK: ${STEP}" | tee -a "$LOG"


## 6) Train PPO (JSON-Handover)
PPO liest dieselbe `pipeline_args.json` (inkl. Timesteps, LR, Batch etc.).


In [None]:
%%bash
set -euo pipefail

if [ -d "/content/Dings_Trader/TraderHimSelf" ]; then
  ROOT="/content/Dings_Trader/TraderHimSelf"
elif [ -d "/content/dings-trader/projects/dings-trader/TraderHimSelf" ]; then
  ROOT="/content/dings-trader/projects/dings-trader/TraderHimSelf"
else
  echo "ERROR: Repo nicht gefunden. Erst Clone/Update-Zelle ausführen."
  exit 1
fi

STEP="06_train_ppo_cpu_live"
LOG="$ROOT/logs/colab/${STEP}.log"
PIPELINE_JSON="${PIPELINE_JSON:-$ROOT/models/pipeline_args.json}"
mkdir -p "$ROOT/logs/colab"

cd "$ROOT"
echo "START: ${STEP}" | tee "$LOG"
echo "PIPELINE_JSON=${PIPELINE_JSON}" | tee -a "$LOG"
export PYTHONUNBUFFERED=1

( CUDA_VISIBLE_DEVICES= python -u policy/train_ppo.py     --profile high-util     --pipeline-json "$PIPELINE_JSON" >> "$LOG" 2>&1 ) &
PID=$!
echo "PID=$PID" | tee -a "$LOG"

tail -n 0 -f "$LOG" --pid="$PID" | stdbuf -oL awk '/Pipeline JSON|PPO Configuration|Daten geladen|Initialisiere PPO Agent|Starte Training|rollout|train\/|time\/|fps|Training abgeschlossen|Modell gespeichert|WARN|ERROR/'
wait "$PID"
echo "OK: ${STEP}" | tee -a "$LOG"


In [None]:
%%bash
# Optional: laufendes PPO-Training live beobachten
# (falls Zell-Output abgebrochen wurde)
tail -f /content/Dings_Trader/TraderHimSelf/logs/colab/06_train_ppo_cpu_live.log


### Optional: PPO auf GPU
Nutzen dieselbe `pipeline_args.json`.


In [None]:
%%bash
set -euo pipefail
STEP="06_train_ppo_gpu_optional"

if [ -d "/content/Dings_Trader/TraderHimSelf" ]; then
  ROOT="/content/Dings_Trader/TraderHimSelf"
elif [ -d "/content/dings-trader/projects/dings-trader/TraderHimSelf" ]; then
  ROOT="/content/dings-trader/projects/dings-trader/TraderHimSelf"
else
  echo "ERROR: Repo nicht gefunden. Erst Clone/Update-Zelle ausführen."
  exit 1
fi

cd "$ROOT"
mkdir -p logs/colab
LOG="logs/colab/${STEP}.log"
PIPELINE_JSON="${PIPELINE_JSON:-$ROOT/models/pipeline_args.json}"
trap 'rc=$?; echo "ERROR: ${STEP} :: exit_code=${rc}" | tee -a "$LOG"; exit $rc' ERR

echo "START: ${STEP}" | tee -a "$LOG"
echo "PIPELINE_JSON=${PIPELINE_JSON}" | tee -a "$LOG"
export PYTHONUNBUFFERED=1

python -u policy/train_ppo.py   --profile high-util   --pipeline-json "$PIPELINE_JSON" 2>&1 | tee -a "$LOG"

echo "OK: ${STEP}" | tee -a "$LOG"


## 7) Final Report (copy/paste to chat)


In [29]:
%%bash
set -euo pipefail
STEP="07_final_report"

if [ -d "/content/Dings_Trader/TraderHimSelf" ]; then
  ROOT="/content/Dings_Trader/TraderHimSelf"
elif [ -d "/content/dings-trader/projects/dings-trader/TraderHimSelf" ]; then
  ROOT="/content/dings-trader/projects/dings-trader/TraderHimSelf"
else
  echo "ERROR: Repo nicht gefunden. Erst Clone/Update-Zelle ausführen."
  exit 1
fi

cd "$ROOT"
mkdir -p logs/colab
LOG="logs/colab/${STEP}.log"
trap 'rc=$?; echo "ERROR: ${STEP} :: exit_code=${rc}" | tee -a "$LOG"; exit $rc' ERR

echo "START: ${STEP}" | tee -a "$LOG"
echo "REPORT_START" | tee -a "$LOG"
python report_status.py 2>&1 | tee -a "$LOG"
echo "REPORT_END" | tee -a "$LOG"
echo "OK: ${STEP}" | tee -a "$LOG"

START: 07_final_report
REPORT_START
DINGS_TRADER_REPORT v1
generated_at=2026-02-11T01:18:25.075405+00:00

[FILES]
- raw_15m: OK (7.5MB) :: /content/Dings_Trader/TraderHimSelf/data_raw/btcusdt_15m.parquet
- raw_3m: OK (25.5MB) :: /content/Dings_Trader/TraderHimSelf/data_raw/btcusdt_3m.parquet
- raw_funding: OK (81.1KB) :: /content/Dings_Trader/TraderHimSelf/data_raw/btcusdt_funding.parquet
- aligned_15m: OK (10.0MB) :: /content/Dings_Trader/TraderHimSelf/data_processed/aligned_15m.parquet
- aligned_3m: OK (45.0MB) :: /content/Dings_Trader/TraderHimSelf/data_processed/aligned_3m.parquet
- features: OK (45.9MB) :: /content/Dings_Trader/TraderHimSelf/data_processed/features.parquet
- forecast_features: OK (10.4MB) :: /content/Dings_Trader/TraderHimSelf/data_processed/forecast_features.parquet
- scaler: OK (1.3KB) :: /content/Dings_Trader/TraderHimSelf/data_processed/scaler.pkl
- forecast_model: OK (444.5MB) :: /content/Dings_Trader/TraderHimSelf/models/forecast_model.pt

[DATA_SUMMARY]
- a

In [31]:
%%bash
set -euo pipefail

if [ -d "/content/Dings_Trader/TraderHimSelf" ]; then
  ROOT="/content/Dings_Trader/TraderHimSelf"
elif [ -d "/content/dings-trader/projects/dings-trader/TraderHimSelf" ]; then
  ROOT="/content/dings-trader/projects/dings-trader/TraderHimSelf"
else
  echo "ERROR: Repo nicht gefunden."
  exit 1
fi

echo "FINAL_EVAL_START"
echo "root=$ROOT"

python - "$ROOT" <<'PY'
import os, re, glob, statistics, sys
root = sys.argv[1]

logs = sorted(
    glob.glob(os.path.join(root, "logs", "colab", "06_train_ppo*.log")),
    key=os.path.getmtime,
    reverse=True
)
if not logs:
    print("status=FAIL")
    print("reason=no_ppo_log_found")
    raise SystemExit(0)

log_file = logs[0]
txt = open(log_file, "r", encoding="utf-8", errors="ignore").read()

def vals(name):
    pat = rf"\|\s*{re.escape(name)}\s*\|\s*([\-0-9.eE]+)\s*\|"
    return [float(x) for x in re.findall(pat, txt)]

def last(v): return v[-1] if v else None
def med10(v):
    if not v: return None
    vv = v[-10:] if len(v) >= 10 else v
    return statistics.median(vv)

fps = vals("fps")
ts  = vals("total_timesteps")
ev  = vals("explained_variance")
kl  = vals("approx_kl")
cf  = vals("clip_fraction")

model_path = os.path.join(root, "models", "ppo_policy_final.zip")
ckpts = glob.glob(os.path.join(root, "checkpoints", "ppo", "*.zip"))

print("status=OK")
print(f"log_file={os.path.basename(log_file)}")
print(f"timesteps_last={int(last(ts)) if last(ts) is not None else 'NA'}")
print(f"fps_last={last(fps) if last(fps) is not None else 'NA'}")
print(f"fps_median_last10={med10(fps) if med10(fps) is not None else 'NA'}")
print(f"explained_variance_last={last(ev) if last(ev) is not None else 'NA'}")
print(f"approx_kl_last={last(kl) if last(kl) is not None else 'NA'}")
print(f"clip_fraction_last={last(cf) if last(cf) is not None else 'NA'}")
print(f"ppo_model_exists={os.path.exists(model_path)}")
print(f"ppo_checkpoints={len(ckpts)}")
PY

echo "FINAL_EVAL_END"

FINAL_EVAL_START
root=/content/Dings_Trader/TraderHimSelf
status=OK
log_file=06_train_ppo_cpu_live.log
timesteps_last=1003520
fps_last=398.0
fps_median_last10=398.0
explained_variance_last=0.00989
approx_kl_last=0.0044323625
clip_fraction_last=0.121
ppo_model_exists=True
ppo_checkpoints=20
FINAL_EVAL_END
