In [0]:

%pip install xgboost scikit-learn hyperopt joblib pandas

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m


In [0]:
# Trening ML + SCALER

import pandas as pd
import joblib
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from xgboost import XGBRegressor
from sklearn.linear_model import BayesianRidge, LinearRegression 
from sklearn.neighbors import KNeighborsRegressor
from pyspark.sql import functions as F
from hyperopt import fmin, tpe, hp, Trials, STATUS_OK
from sklearn.metrics import mean_squared_error

INPUT_TABLE = "eth_raw_stream_max_15_sec"
# Ścieżki
PATH_XGB = '/tmp/eth_model_xgboost.joblib'
PATH_BAYES = '/tmp/eth_model_bayes.joblib'
PATH_LINEAR = '/tmp/eth_model_linear.joblib'
PATH_KNN = '/tmp/eth_model_knn.joblib'
PATH_SCALER = '/tmp/eth_scaler.joblib'

N_TRIALS = 20 

def calculate_features_per_block(group):
    group = group.sort_values("timestamp")
    group['log_returns'] = np.log(group['close'] / group['close'].shift(1))
    group['returns_sma'] = group['log_returns'].rolling(window=12).mean()
    group['returns_std'] = group['log_returns'].rolling(window=12).std()
    
    future_close = group['close'].shift(-12)
    group['target_log_return'] = np.log(future_close / group['close'])
    return group

print("1. Ładowanie danych...")
try:
    df_spark = spark.read.table(INPUT_TABLE)
    pandas_df = df_spark.orderBy("timestamp").toPandas()
except Exception as e:
    print(f"❌ Błąd: {e}")
    raise e

pandas_df = calculate_features_per_block(pandas_df)
pandas_df = pandas_df.dropna()

print(f"Dane gotowe: {len(pandas_df)} wierszy.")

if len(pandas_df) < 150:
    print("⚠️ ZA MAŁO DANYCH!")
else:
    # 2. Skalowanie Danych (Kluczowe dla k-NN i Regresji)
    features_cols = ['returns_sma', 'returns_std', 'volume']
    
    # Inicjalizujemy Skaler
    scaler = StandardScaler()
    
    # Dopasowujemy go do WSZYSTKICH danych (żeby znał średnią i odchylenie)
    X_raw = pandas_df[features_cols].values
    X_scaled = scaler.fit_transform(X_raw)
    
    Y = pandas_df['target_log_return'].values
    
    # Zapisujemy skaler, żeby Konsument (Notatnik C) mógł go użyć
    joblib.dump(scaler, PATH_SCALER)
    print(f"✅ Skaler zapisany do: {PATH_SCALER}")

    # Podział na Train/Test (używamy już przeskalowanych X)
    total_len = len(X_scaled)
    train_end = int(total_len * 0.85) # Więcej na trening
    
    X_train, Y_train = X_scaled[:train_end], Y[:train_end]
    X_test, Y_test = X_scaled[train_end:], Y[train_end:]
    
    # --- MODEL 1: XGBoost ---
    print("\n--- Model 1: XGBoost ---")
    space = {
        'max_depth': hp.choice('max_depth', range(3, 10)),
        'learning_rate': hp.uniform('learning_rate', 0.01, 0.3),
        'n_estimators': hp.choice('n_estimators', range(50, 200, 10)),
        'gamma': hp.uniform('gamma', 0, 0.2),
    }
    def objective(params):
        model = XGBRegressor(**params, random_state=42, n_jobs=-1)
        model.fit(X_train, Y_train) 
        rmse = np.sqrt(mean_squared_error(Y_test, model.predict(X_test)))
        return {'loss': rmse, 'status': STATUS_OK}

    best = fmin(fn=objective, space=space, algo=tpe.suggest, max_evals=N_TRIALS, trials=Trials())
    final_xgb = XGBRegressor(**best, random_state=42, n_jobs=-1)
    final_xgb.fit(X_train, Y_train)
    joblib.dump(final_xgb, PATH_XGB)
    print(f"✅ XGBoost zapisany.")
    
    # --- MODEL 2: Bayes ---
    print("--- Model 2: Bayes ---")
    final_bayes = BayesianRidge() 
    final_bayes.fit(X_train, Y_train)
    joblib.dump(final_bayes, PATH_BAYES)
    
    # --- MODEL 3: Linear ---
    print("--- Model 3: Linear ---")
    final_linear = LinearRegression()
    final_linear.fit(X_train, Y_train)
    joblib.dump(final_linear, PATH_LINEAR)
    
    # --- MODEL 4: k-NN ---
    print("--- Model 4: k-NN ---")
    final_knn = KNeighborsRegressor(n_neighbors=10, weights='distance')
    final_knn.fit(X_train, Y_train)
    joblib.dump(final_knn, PATH_KNN)

    print("\nGOTOWE. Modele i Skaler zaktualizowane.")

[43mNote: you may need to restart the kernel using %restart_python or dbutils.library.restartPython() to use updated packages.[0m
1. Ładowanie danych...
Dane gotowe: 6460 wierszy.
✅ Skaler zapisany do: /tmp/eth_scaler.joblib

--- Model 1: XGBoost ---
  0%|          | 0/20 [00:00<?, ?trial/s, best loss=?] 10%|█         | 2/20 [00:00<00:02,  7.97trial/s, best loss: 0.0014281312772275055] 25%|██▌       | 5/20 [00:00<00:01, 12.76trial/s, best loss: 0.0014281312772275055] 35%|███▌      | 7/20 [00:00<00:01,  7.32trial/s, best loss: 0.0014281312772275055] 50%|█████     | 10/20 [00:01<00:00, 10.47trial/s, best loss: 0.0014281312772275055] 60%|██████    | 12/20 [00:01<00:00, 12.14trial/s, best loss: 0.0014281312772275055] 70%|███████   | 14/20 [00:01<00:00, 12.95trial/s, best loss: 0.0014281312772275055] 80%|████████  | 16/20 [00:01<00:00, 14.20trial/s, best loss: 0.0014281312772275055] 90%|█████████ | 18/20 [00:01<00:00, 11.99trial/s, best loss: 0.0014281312772275055]100%|█████████

In [0]:
# NOTATNIK C: ULTIMATE COMPARATOR (Ze Skalowaniem)

import time
import pandas as pd
import joblib
import numpy as np
from datetime import datetime
from pyspark.sql import functions as F
from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_absolute_percentage_error

# --- Konfiguracja ---
INPUT_TABLE = "eth_raw_stream_max_15_sec" 
REFRESH_RATE = 5 
# Ścieżki
PATH_XGB = '/tmp/eth_model_xgboost.joblib'
PATH_BAYES = '/tmp/eth_model_bayes.joblib'
PATH_LINEAR = '/tmp/eth_model_linear.joblib'
PATH_KNN = '/tmp/eth_model_knn.joblib'
PATH_SCALER = '/tmp/eth_scaler.joblib' # <--- NOWOŚĆ

# --- Klasa Trackera (Bez zmian) ---
class ErrorTracker:
    def __init__(self):
        self.actuals = []
        self.preds = []
        self.naive_errors = []
        self.last_actual = None
    def update(self, actual, predicted):
        if self.last_actual is not None:
            naive_error = abs(actual - self.last_actual)
            self.naive_errors.append(naive_error)
        self.actuals.append(actual)
        self.preds.append(predicted)
        self.last_actual = actual
        if len(self.actuals) > 100:
            self.actuals.pop(0)
            self.preds.pop(0)
            self.naive_errors.pop(0)
    def get_rmse(self):
        if len(self.actuals) < 2: return 0.0
        return np.sqrt(mean_squared_error(self.actuals, self.preds))
    def get_mae(self):
        if len(self.actuals) < 2: return 0.0
        return mean_absolute_error(self.actuals, self.preds)
    def get_mape(self):
        if len(self.actuals) < 2: return 0.0
        return mean_absolute_percentage_error(self.actuals, self.preds) * 100
    def get_mase(self):
        if len(self.actuals) < 2 or sum(self.naive_errors) == 0: return 0.0
        return self.get_mae() / np.mean(self.naive_errors)

# --- 1. ŁADOWANIE ZASOBÓW ---
trackers = {name: ErrorTracker() for name in ["XGBoost", "Bayes", "Linear", "kNN", "Stoch"]}
models = {}
path_map = {"XGBoost": PATH_XGB, "Bayes": PATH_BAYES, "Linear": PATH_LINEAR, "kNN": PATH_KNN}

print("1. Ładowanie Modeli i Skalera...")
try:
    scaler = joblib.load(PATH_SCALER)
    print("✅ Skaler danych załadowany.")
except:
    print("❌ BŁĄD: Brak skalera! Uruchom Notatnik B.")
    scaler = None

for name, path in path_map.items():
    try:
        models[name] = joblib.load(path)
    except:
        print(f"❌ Brak modelu: {name}")

# 2. Główna Funkcja Analizy
def run_comparison_analysis(last_predictions_dict):
    try:
        df = spark.read.table(INPUT_TABLE)
        if df.isEmpty() or df.count() < 20: return None
        
        pdf = df.orderBy(F.col("timestamp").desc()).limit(100).toPandas()
        pdf = pdf.sort_values("timestamp")
        
        # --- Inżynieria ---
        pdf['log_returns'] = np.log(pdf['close'] / pdf['close'].shift(1))
        pdf['returns_sma'] = pdf['log_returns'].rolling(window=12).mean()
        pdf['returns_std'] = pdf['log_returns'].rolling(window=12).std()
        
        current_price = pdf['close'].iloc[-1]
        current_time = pdf['timestamp'].iloc[-1]
        sigma = pdf['log_returns'].std() if not np.isnan(pdf['log_returns'].std()) else 0.001
        
        # --- PRZYGOTOWANIE WEJŚCIA ZE SKALOWANIEM ---
        # Surowe cechy
        X_raw = pdf[['returns_sma', 'returns_std', 'volume']].fillna(0).iloc[[-1]].values 
        
        if scaler:
            # Skalujemy tak samo jak przy treningu!
            X_input = scaler.transform(X_raw)
        else:
            X_input = X_raw # Fallback (wyniki będą gorsze)
        
        preds = {}
        
        # --- PREDYKCJE ML ---
        for name in ['XGBoost', 'Bayes', 'Linear', 'kNN']:
            if name in models:
                log_ret_pred = models[name].predict(X_input)[0]
                preds[name] = current_price * np.exp(log_ret_pred)
            else:
                preds[name] = current_price

        # --- MODEL STOCHASTYCZNY ---
        drift_log = models['XGBoost'].predict(X_input)[0] if 'XGBoost' in models else 0.0
        
        brownian = np.random.normal(0, sigma, 1000)
        jumps = np.random.choice([0, 1], size=1000, p=[0.95, 0.05])
        jump_mag = np.random.normal(0, 3*sigma, 1000) * jumps
        
        sim_log_ret = drift_log + brownian + jump_mag
        preds['Stoch'] = current_price * np.mean(np.exp(sim_log_ret))

        # --- UPDATE ---
        if last_predictions_dict:
            for name, old_pred in last_predictions_dict.items():
                trackers[name].update(current_price, old_pred)
            
        return {"ts": current_time, "price": current_price, "preds": preds}

    except Exception as e:
        # print(f"Err: {e}")
        return None

# --- Pętla ---
print(f"🚀 Start Monitoringu.")
last_preds_dict = None

try:
    while True:
        res = run_comparison_analysis(last_preds_dict)
        
        if res:
            rmses = {name: t.get_rmse() for name, t in trackers.items()}
            winner = min(rmses, key=rmses.get)
            
            ts_str = res['ts'].strftime('%H:%M:%S')
            print(f"\n[{ts_str}] CENA: {res['price']:.2f} USD")
            print("-" * 90)
            print(f"{'MODEL':<10} | {'PROGNOZA':<10} | {'RMSE':<10} | {'MAE':<10} | {'MAPE':<10}")
            print("-" * 90)
            
            for name in ["XGBoost", "Bayes", "Linear", "kNN", "Stoch"]:
                marker = "👑" if name == winner else "  "
                rmse = trackers[name].get_rmse()
                mae = trackers[name].get_mae()
                mape = trackers[name].get_mape()
                
                print(f"{marker} {name:<8} | {res['preds'].get(name, 0.0):<10.2f} | {rmse:<10.4f} | {mae:<10.4f} | {mape:<10.4f}")
                
            print("-" * 90)
            print(f"🏆 Lider: {winner}")
            
            last_preds_dict = res['preds']
        else:
            print(".", end="", flush=True)

        time.sleep(REFRESH_RATE)

except KeyboardInterrupt:
    print("Zatrzymano.")

1. Ładowanie Modeli i Skalera...
✅ Skaler danych załadowany.
🚀 Start Monitoringu.

[00:32:40] CENA: 3012.41 USD
------------------------------------------------------------------------------------------
MODEL      | PROGNOZA   | RMSE       | MAE        | MAPE      
------------------------------------------------------------------------------------------
👑 XGBoost  | 3011.99    | 0.0000     | 0.0000     | 0.0000    
   Bayes    | 3012.00    | 0.0000     | 0.0000     | 0.0000    
   Linear   | 3012.00    | 0.0000     | 0.0000     | 0.0000    
   kNN      | 3011.22    | 0.0000     | 0.0000     | 0.0000    
   Stoch    | 3012.01    | 0.0000     | 0.0000     | 0.0000    
------------------------------------------------------------------------------------------
🏆 Lider: XGBoost

[00:32:40] CENA: 3012.41 USD
------------------------------------------------------------------------------------------
MODEL      | PROGNOZA   | RMSE       | MAE        | MAPE      
--------------------------------

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:139)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:139)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:136)
	at scala.collection.immutable.Range.foreach(Range.scala:192)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:721)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:441)
	at scala.Option.getOrElse(Option.scala:201)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:441)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can