In [None]:
import sys
import os
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import RandomizedSearchCV, cross_val_score, KFold
from dotenv import load_dotenv
import dask
from dask.distributed import Client, LocalCluster

src_path = os.path.abspath(os.path.join('..'))
if src_path not in sys.path:
    sys.path.append(src_path)

sys.dont_write_bytecode = True
from src.utils.utils import *
from src.utils.constants import *
from src.visualization.visualize import *

In [None]:
load_dotenv()
ip = os.getenv("DISTRIBUTED_MAIN_IP")
# cluster = LocalCluster(n_workers=4)
client = Client(ip)

In [None]:
ground_truth = pd.read_parquet(GROUND_TRUTH_PATH)
ground_truth = ground_truth[['origin_time', 'label']]

In [None]:
imported_data = {}

for exchange in EXCHANGES:
    imported_data[(CANDLES, exchange)] = pd.read_parquet(os.path.join(INTERIM_DATA_PATH, f'{exchange}_{CANDLES}_pca_data.parquet'))
    imported_data[(ORDERBOOKS, exchange)] = pd.read_parquet(os.path.join(INTERIM_DATA_PATH, f'{exchange}_{ORDERBOOKS}_pca_data.parquet'))

In [None]:
merged_df = {}
cols_to_drop = ['origin_time', 'label']

for (data_type, exchange), df in imported_data.items():     
    merged_df[(data_type, exchange)] = {}
    merged_df[(data_type, exchange)]['full'] = pd.merge(ground_truth[cols_to_drop], df, on='origin_time', how='inner')
    merged_df[(data_type, exchange)]['X'] = merged_df[(data_type, exchange)]['full'].drop(cols_to_drop, axis=1)
    merged_df[(data_type, exchange)]['y'] = merged_df[(data_type, exchange)]['full']['label']

In [None]:
@dask.delayed
def process_dataset(data_type, exchange, param_distributions, df):
    X_train, X_test, y_train, y_test = train_test_split(df['X'], df['y'], test_size=TEST_SIZE, random_state=RANDOM_STATE)

    # Nested Cross-Validation: Uses an outer loop for model evaluation and an inner loop for hyperparameter tuning.
    outer_cv = KFold(n_splits=5, shuffle=True, random_state=RANDOM_STATE)
    inner_cv = KFold(n_splits=3, shuffle=True, random_state=RANDOM_STATE)
    
    clf = RandomForestClassifier(random_state=RANDOM_STATE)

    # Initialize the RandomizedSearchCV object
    randomized_search = RandomizedSearchCV(estimator=clf, param_distributions=param_distributions, n_iter=50, cv=CV_FOLDS, scoring='accuracy', n_jobs=-1, random_state=RANDOM_STATE)

    # Perform nested cross-validation
    nested_scores = cross_val_score(randomized_search, df['X'], df['y'], cv=outer_cv, scoring='accuracy')

    # Fit the random search to the data
    randomized_search.fit(X_train, y_train)

    # Best parameters and score
    best_params = randomized_search.best_params_
    best_score = randomized_search.best_score_
    best_clf = randomized_search.best_estimator_

    # Predict on the test set
    y_pred = best_clf.predict(X_test)

    # Evaluate the model
    evaluation = get_evaluation(y_test, y_pred)

    train_scores = []
    test_scores = []
    n_estimators_range = param_distributions['n_estimators']

    for n_estimators in n_estimators_range:
        model = RandomForestClassifier(
            criterion=best_params['criterion'],
            random_state=RANDOM_STATE, 
            n_estimators=n_estimators,
            max_depth=best_params['max_depth'],
            min_samples_split=best_params['min_samples_split'], 
            min_samples_leaf=best_params['min_samples_leaf']
        )
        
        # Cross-validation on the training data
        train_cv_results = cross_val_score(model, df['X_train'], df['y_train'], cv=inner_cv, scoring='accuracy')
        train_scores.append(train_cv_results.mean())
        
        # Evaluate on the test set
        model.fit(df['X_train'], df['y_train'])
        test_score = model.score(df['X_test'], df['y_test'])
        test_scores.append(test_score)

    return {
        'best_params': best_params,
        'best_score': best_score,
        'nested_scores': nested_scores,
        'evaluation': evaluation,
        'train_scores': train_scores,
        'test_scores': test_scores,
        'n_estimators_range': n_estimators_range
    }

In [None]:
param_distributions = {
    'criterion': [PARAM_DISTRIBUTION_CRITERION],
    'n_estimators': [50],
    'max_depth': range(1, 20),
    'min_samples_split': [10, 20, 50, 100],
    'min_samples_leaf': [10, 20, 30, 50]
}

In [None]:
tasks = [process_dataset(data_type, exchange, param_distributions, df) for (data_type, exchange), df in merged_df.items()]
results = dask.compute(*tasks)

In [None]:
display(results)

In [None]:
for (data_type, exchange), result in zip(merged_df.keys(), results):
    print(f"Results for {exchange} {data_type}:")
    print(f"Best parameters for {exchange} {data_type}: {result['best_params']}")
    print(f'Nested CV Accuracy: {result["nested_scores"].mean():.2f}')
    print(f'Test Set Accuracy: {result["evaluation"]["accuracy"]:.2f}')
    print(f'Classification Report:')
    display(result['evaluation']['classification_report'])
    print(f'Confusion Matrix:')
    display(result['evaluation']['confusion_matrix'])

    plot_tree_learning_curves(exchange, data_type, result['depths'], result['train_scores'], result['test_scores'], 'random_forest')

    pd.DataFrame.to_pickle(result, os.path.join(PROCESSED_DATA_PATH, f'{exchange}_{data_type}_random_forest_results.pkl'))

In [None]:
# for (data_type, exchange), df in merged_df.items():
#     result = process_dataset(data_type, exchange, param_distributions, df)
#     print(f"Results for {exchange} {data_type}:")
#     print(f"Best parameters: {result['best_params']}")
#     print(f'Nested CV Accuracy: {result["nested_scores"].mean():.2f}')
#     print(f'Accuracy: {result["evaluation"]["accuracy"]:.2f}')
#     print(f'Classification Report:')
#     display(result['evaluation']['classification_report'])
#     print(f'Confusion Matrix:')
#     display(result['evaluation']['confusion_matrix'])
#     plot_tree_learning_curves(exchange, data_type, result['depths'], result['train_scores'], result['test_scores'], 'random_forest')
#     pd.DataFrame.to_pickle(result, os.path.join(PROCESSED_DATA_PATH, f'{exchange}_{data_type}_random_forest_results.pkl'))