In [1]:
import joblib
import pandas as pd

import mlflow
from mlflow.tracking import MlflowClient

import ray
from sklearn.metrics import accuracy_score
from sklearn.ensemble import ExtraTreesClassifier
from ray import tune
from ray.tune.sklearn import TuneSearchCV
from sklearn.model_selection import train_test_split

from preprocessing import get_data
from influxdb_handler import save_to_influx

In [2]:
ray.init("ray://localhost:10001") ## VM
print(ray.cluster_resources())

{'CPU': 16.0, 'object_store_memory': 19170683289.0, 'memory': 38341366580.0, 'node:__internal_head__': 1.0, 'node:10.0.7.60': 1.0}


In [6]:
@ray.remote(num_cpus=12)
def train_and_tune_extra_tree_model(sp500_data):
    X = sp500_data.drop(['Target'], axis=1)
    train_x, test_x, train_y, test_y = train_test_split(X, sp500_data['Target'], test_size=0.25, random_state=42)
    
    model = ExtraTreesClassifier(random_state=42)
    # Hyperparameter, die getunt werden sollen
    param_distributions = {
        'n_estimators': tune.randint(100, 2000),
        'max_depth': tune.randint(100, 2000),
        'min_samples_split': tune.choice([1, 5, 20]),
        'min_samples_leaf': tune.choice([1, 2, 20]),
        'max_features': tune.choice(['auto', 'sqrt', 'log2'])
    }

    tuner = TuneSearchCV(
        model,
        param_distributions,
        n_trials=30,  # Anzahl der Durchläufe
        early_stopping=False,  # Frühzeitiges Stoppen für schlecht abschneidende Trials
        max_iters=20,  # Maximale Anzahl von Iterationen pro Trial
        search_optimization="random",  # Optimierungsalgorithmus
        cv=5,  # Kreuzvalidierung
        random_state=42,
    )
    tuner.fit(train_x, train_y)
    best_model = tuner.best_estimator_
    #joblib.dump(best_model, './data/predict_model/best_extra_tree_model.pkl')
    predictions = best_model.predict(test_x)
    accuracy = accuracy_score(test_y, predictions)
    print(f"Best model parameters: {tuner.best_params_}")
    print(f"Test Accuracy: {accuracy}")

    return best_model, accuracy

In [7]:
def log_to_mlflow(model, accuracy):
    mlflow.set_experiment("sp500_prediction")
    mlflow.set_tracking_uri("http://localhost:5000")
    best_extra_tree = "best_extra_tree_model"

    default_logged_model = 'runs:/5c036be77ea045228b58b4fa52821f65/model'

    with mlflow.start_run():
        mlflow.sklearn.log_model(model, "model")
        mlflow.log_metric("accuracy", accuracy)
        run_id = mlflow.active_run().info.run_uuid
        actual_model_path = f"runs:/{run_id}/model"
        client = MlflowClient()
        try:
            registered_model = client.get_registered_model(best_extra_tree)
        except:
            registered_model = None

        if not registered_model:
            client.create_registered_model(best_extra_tree)
            client.create_model_version(name=best_extra_tree,
                                        source=actual_model_path,
                                        run_id=run_id)
        else:
            latest_version = client.get_latest_versions(best_extra_tree, stages=["Production"])[0]
            latest_metrics = client.get_run(latest_version.run_id).data.metrics
            if "accuracy" in latest_metrics:
                latest_accuracy = latest_metrics["accuracy"]
                if accuracy > latest_accuracy:
                    version_info = client.create_model_version(name=best_extra_tree,
                                                               source=actual_model_path,
                                                               run_id=run_id)

                    client.transition_model_version_stage(
                        name=version_info.name,
                        version=version_info.version,
                        stage="Production"
                    )
                    
                    print("New model registered as best model!")
                    return actual_model_path
                else:
                    print("The new model isn't better")
                    return default_logged_model

In [None]:
def model_prediction():
    import mlflow
    import pandas as pd
    from influxdb_handler import get_data_form_influx, save_prediction_to_influx

    df, prediction_df = get_data_form_influx()
    logged_model = prediction_df["model"].iloc[0]
    loaded_model = mlflow.pyfunc.load_model(logged_model)
    prediction = loaded_model.predict(df)
    prediction_df["Target"] = prediction
    save_prediction_to_influx(prediction_df)

In [8]:
sp500_data, last_day_df = get_data()

best_model, accuracy = ray.get(train_and_tune_extra_tree_model.remote(sp500_data))

model_path = log_to_mlflow(best_model, accuracy)

save_to_influx(last_day_df=last_day_df, model_path=model_path)

model_prediction()

[*********************100%%**********************]  1 of 1 completed


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  sp500_data[f"SMA {time_period}"]      = ta.SMA(inputs, timeperiod = time_period)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  sp500_data[f"EMA {time_period}"]      = ta.EMA(inputs, timeperiod = time_period)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  sp500_data[f"EMA {20}"]               = ta.

[36m(train_and_tune_extra_tree_model pid=3361730)[0m ╭───────────────────────────────────────────────────────────────────╮
[36m(train_and_tune_extra_tree_model pid=3361730)[0m │ Configuration for experiment     _Trainable_2023-11-17_10-56-10   │
[36m(train_and_tune_extra_tree_model pid=3361730)[0m ├───────────────────────────────────────────────────────────────────┤
[36m(train_and_tune_extra_tree_model pid=3361730)[0m │ Search algorithm                 BasicVariantGenerator            │
[36m(train_and_tune_extra_tree_model pid=3361730)[0m │ Scheduler                        FIFOScheduler                    │
[36m(train_and_tune_extra_tree_model pid=3361730)[0m │ Number of trials                 30                               │
[36m(train_and_tune_extra_tree_model pid=3361730)[0m ╰───────────────────────────────────────────────────────────────────╯
[36m(train_and_tune_extra_tree_model pid=3361730)[0m 
[36m(train_and_tune_extra_tree_model pid=3361730)[0m View detailed 

[36m(train_and_tune_extra_tree_model pid=3361730)[0m [output] This will use the new output engine with verbosity 0. To disable the new output and use the legacy output engine, set the environment variable RAY_AIR_NEW_OUTPUT=0. For more information, please see https://github.com/ray-project/ray/issues/36949
[36m(bundle_reservation_check_func pid=3364017)[0m Traceback (most recent call last):
[36m(bundle_reservation_check_func pid=3364017)[0m   File "python/ray/_raylet.pyx", line 1649, in ray._raylet.execute_task
[36m(bundle_reservation_check_func pid=3364017)[0m   File "python/ray/_raylet.pyx", line 1651, in ray._raylet.execute_task
[36m(bundle_reservation_check_func pid=3364017)[0m   File "/home/eautenrieth/.local/lib/python3.10/site-packages/ray/_private/worker.py", line 740, in deserialize_objects
[36m(bundle_reservation_check_func pid=3364017)[0m     context = self.get_serialization_context()
[36m(bundle_reservation_check_func pid=3364017)[0m   File "/home/eautenrieth/

[36m(train_and_tune_extra_tree_model pid=3361730)[0m 


[36m(train_and_tune_extra_tree_model pid=3361730)[0m   results["rank_%s" % key_name] = np.asarray(


[36m(train_and_tune_extra_tree_model pid=3361730)[0m Best model parameters: {'n_estimators': 869, 'max_depth': 443, 'min_samples_split': 5, 'min_samples_leaf': 2, 'max_features': 'sqrt'}
[36m(train_and_tune_extra_tree_model pid=3361730)[0m Test Accuracy: 0.8047186932849365
The new model isn't better
Complete. Return to the InfluxDB UI.
