In [5]:
import pandas as pd
import glob
import ta
from ta.trend import SMAIndicator, EMAIndicator, MACD
from ta.momentum import RSIIndicator, ROCIndicator
from ta.volatility import BollingerBands
import os
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
from xgboost import XGBClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
import shap


## ----- PARTIE 1 ----
def create_labels(df):
    df = df[['Close']].copy()
    df['Close Horizon'] = df['Close'].shift(-20)
    df['horizon return'] = (df['Close Horizon'] - df['Close']) / df['Close']
    df['label'] = df['horizon return'].apply(
        lambda x: 2 if x > 0.05 else (0 if x < -0.05 else 1)
    )
    return df

def apply_labeling_to_folder(folder_path):
    all_files = glob.glob(os.path.join(folder_path, "*.csv"))
    labeled_data = {}

    for file in all_files:
        try:
            df = pd.read_csv(file)
            if 'Close' not in df.columns:
                print(f"Fichier ignoré (pas de colonne 'Close') : {file}")
                continue
            df_labeled = create_labels(df)
            df_labeled.dropna(inplace=True)
            filename = os.path.basename(file)
            labeled_data[filename] = df_labeled
        except Exception as e:
            print(f"Erreur avec {file} : {e}")

    return labeled_data

folder = "Companies_historical_data/"
labeled_dict = apply_labeling_to_folder(folder)

# Pour voir un exemple
example_file = list(labeled_dict.keys())[0]
print(f"Exemple pour : {example_file}")
print(labeled_dict[example_file].head())


def add_technical_indicators(df):
    df['SMA 20'] = SMAIndicator(df['Close'], window=20).sma_indicator()
    df['EMA 20'] = EMAIndicator(df['Close'], window=20).ema_indicator()
    df['RSI 14'] = RSIIndicator(df['Close'], window=14).rsi()

    macd = MACD(df['Close'])
    df['MACD'] = macd.macd()
    df['MACD Signal'] = macd.macd_signal()

    boll = BollingerBands(df['Close'])
    df['Bollinger High'] = boll.bollinger_hband()
    df['Bollinger Low'] = boll.bollinger_lband()

    df['Rolling Volatility 20'] = df['Close'].rolling(window=20).std()
    df['ROC 10'] = ROCIndicator(df['Close'], window=10).roc()

    return df

def apply_technical_indicators_to_labeled_data(labeled_dict):
    enriched_dict = {}

    for filename, df in labeled_dict.items():
        try:
            df_enriched = add_technical_indicators(df)
            df_enriched.dropna(inplace=True)
            enriched_dict[filename] = df_enriched
        except Exception as e:
            print(f"Erreur pour {filename} : {e}")

    return enriched_dict

# Étape 1.1.1 : créer les labels
labeled_dict = apply_labeling_to_folder("Companies_Historical_Data")

# Étape 1.1.2 : ajouter les indicateurs techniques
enriched_dict = apply_technical_indicators_to_labeled_data(labeled_dict)

# Voir un exemple
example_file = list(enriched_dict.keys())[0]
print(enriched_dict[example_file].head())

def prepare_dataset_for_classification(enriched_dict):
    # Concaténer tous les DataFrames en un seul grand DataFrame
    full_df = pd.concat(enriched_dict.values(), ignore_index=True)

    # Définir X (features) et y (label)
    X = full_df.drop(columns=['label', 'Close Horizon', 'horizon return', 'Close'])
    y = full_df['label']

    # Standardisation des features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)

    # Split train/test
    X_train, X_test, y_train, y_test = train_test_split(
        X_scaled, y, test_size=0.2, random_state=42, stratify=y
    )

    return X_train, X_test, y_train, y_test, X.columns

# On suppose que enriched_dict est déjà créé via apply_technical_indicators_to_labeled_data()
X_train, X_test, y_train, y_test, feature_names = prepare_dataset_for_classification(enriched_dict)

print(f"Taille des données d'entraînement : {X_train.shape}")
print(f"Taille des données de test : {X_test.shape}")
print(f"Noms des features : {list(feature_names)}")

## ------- PARTE 2 ------
def train_and_evaluate_model(model, param_grid, X_train, X_test, y_train, y_test):
    grid_search = GridSearchCV(model, param_grid, cv=5, scoring='accuracy', n_jobs=-1)
    grid_search.fit(X_train, y_train)

    best_model = grid_search.best_estimator_
    y_pred = best_model.predict(X_test)

    print(f"\nMeilleurs paramètres : {grid_search.best_params_}")
    print("\nRapport de classification :")
    print(classification_report(y_test, y_pred))
    print(f"Accuracy : {accuracy_score(y_test, y_pred):.4f}")

    return best_model

rf_params = {
    'n_estimators': [100, 200],
    'max_depth': [5, 10],
    'min_samples_split': [2, 5]
}

best_rf = train_and_evaluate_model(RandomForestClassifier(random_state=42), rf_params, X_train, X_test, y_train, y_test)

xgb_params = {
    'n_estimators': [100, 200],
    'max_depth': [3, 6],
    'learning_rate': [0.01, 0.1]
}

best_xgb = train_and_evaluate_model(XGBClassifier(use_label_encoder=False, eval_metric='mlogloss'), xgb_params, X_train, X_test, y_train, y_test)
knn_params = {
    'n_neighbors': [3, 5, 10],
    'weights': ['uniform', 'distance']
}

best_knn = train_and_evaluate_model(KNeighborsClassifier(), knn_params, X_train, X_test, y_train, y_test)

#svm_params = {
#    'C': [0.1, 1, 10],
#    'kernel': ['linear', 'rbf']
#}

#best_svm = train_and_evaluate_model(SVC(), svm_params, X_train, X_test, y_train, y_test)

logreg_params = {
    'C': [0.1, 1, 10],
    'solver': ['liblinear']
}

best_logreg = train_and_evaluate_model(LogisticRegression(), logreg_params, X_train, X_test, y_train, y_test)

results = {
    'Modèle': ['Random Forest', 'XGBoost', 'KNN', 'SVM', 'Logistic Regression'],
    'Meilleure accuracy': [
        accuracy_score(y_test, best_rf.predict(X_test)),
        accuracy_score(y_test, best_xgb.predict(X_test)),
        accuracy_score(y_test, best_knn.predict(X_test)),
        #accuracy_score(y_test, best_svm.predict(X_test)),
        accuracy_score(y_test, best_logreg.predict(X_test))
    ]
}

df_results = pd.DataFrame(results)
print(df_results)

## ----- PARTIE 3 ----
def explain_model_with_shap(model, X_train, X_test, feature_names):
    """
    Utilise SHAP pour expliquer un modèle et afficher les summary plots.
    """
    explainer = shap.Explainer(model, X_train)
    shap_values = explainer(X_test)

    # Importance globale des features
    shap.summary_plot(shap_values, features=X_test, feature_names=feature_names)

    # Importance spécifique pour les classes "Buy" et "Sell"
    if len(shap_values.values.shape) > 2:  # Vérifie si le modèle gère plusieurs classes
        shap.summary_plot(shap_values[:, :, 2], X_test, feature_names=feature_names, title="SHAP pour 'Buy'")
        shap.summary_plot(shap_values[:, :, 0], X_test, feature_names=feature_names, title="SHAP pour 'Sell'")


Exemple pour : Adobe_historical_data.csv
        Close  Close Horizon  horizon return  label
0  267.690002     271.899994        0.015727      1
1  257.089996     271.859985        0.057451      2
2  257.760010     272.220001        0.056099      2
3  260.420013     271.429993        0.042278      1
4  259.739990     269.450012        0.037384      1
         Close  Close Horizon  horizon return  label      SMA 20      EMA 20  \
33  283.350006     274.510010       -0.031198      1  274.408000  275.949516   
34  279.640015     270.899994       -0.031255      1  274.815001  276.300992   
35  285.579987     259.029999       -0.092969      0  275.699500  277.184706   
36  283.660004     268.709991       -0.052704      0  276.509999  277.801401   
37  277.070007     272.859985       -0.015195      1  276.923000  277.731744   

       RSI 14      MACD  MACD Signal  Bollinger High  Bollinger Low  \
33  63.786694  4.563162     3.174027      286.751961     262.064039   
34  57.351511  4.275933 

KeyboardInterrupt: 