In [None]:
%load_ext autoreload
%autoreload 2

# wSAA

> Module description for wSAA classes

In [None]:
#| default_exp wSAA

In [None]:
#| hide
from nbdev.showdoc import *

# from nbdev.qmd import *

## Packages

In [None]:
#| export
from __future__ import annotations
from fastcore.docments import *
from fastcore.test import *
from fastcore.utils import *

import pandas as pd
import numpy as np

from sklearn.ensemble import RandomForestRegressor
from sklearn.base import MetaEstimatorMixin
from dddex.baseClasses import BaseWeightsBasedEstimator
from dddex.utils import restructureWeightsDataList

## wSAA - Random Forest

In [None]:
#| export 

class RandomForestWSAA(RandomForestRegressor, BaseWeightsBasedEstimator):
    
    def fit(self, 
            X: np.ndarray, # Feature matrix
            y: np.ndarray, # Target values
            **kwargs):

        super().fit(X = X, 
                    y = y, 
                    **kwargs)
        
        self.y = y
        self.leafIndicesTrain = self.apply(X)
    
    #---
    
    def getWeights(self, 
               X: np.ndarray, # Feature matrix for which conditional density estimates are computed.
               # Specifies structure of the returned density estimates. One of: 
               # 'all', 'onlyPositiveWeights', 'summarized', 'cumDistribution', 'cumDistributionSummarized'
               outputType: str='onlyPositiveWeights', 
               # Optional. List with length X.shape[0]. Values are multiplied to the estimated 
               # density of each sample for scaling purposes.
               scalingList: list=None, 
               ) -> list: # List whose elements are the conditional density estimates for the samples specified by `X`.
        
        __doc__ = BaseWeightsBasedEstimator.getWeights.__doc__
        
        #---

        leafIndicesDf = self.apply(X)

        weightsDataList = list()

        for leafIndices in leafIndicesDf:
            leafComparisonMatrix = (self.leafIndicesTrain == leafIndices) * 1
            nObsInSameLeaf = np.sum(leafComparisonMatrix, axis = 0)

            # It can happen that RF decides that the best strategy is to fit no tree at
            # all and simply average all results (happens when min_child_sample is too high, for example).
            # In this case 'leafComparisonMatrix' mustn't be averaged because there has been only a single tree.
            if len(leafComparisonMatrix.shape) == 1:
                weights = leafComparisonMatrix / nObsInSameLeaf
            else:
                weights = np.mean(leafComparisonMatrix / nObsInSameLeaf, axis = 1)

            weightsPosIndex = np.where(weights > 0)[0]

            weightsDataList.append((weights[weightsPosIndex], weightsPosIndex))

        #---

        weightsDataList = restructureWeightsDataList(weightsDataList = weightsDataList, 
                                                     outputType = outputType, 
                                                     y = self.y, 
                                                     scalingList = scalingList,
                                                     equalWeights = False)

        return weightsDataList
    
    #---
    
    def predict(self : BaseWeightsBasedEstimator, 
                X: np.ndarray, # Feature matrix for which conditional quantiles are computed.
                probs: list, # Probabilities for which quantiles are computed.
                outputAsDf: bool=True, # Determines output. Either a dataframe with probs as columns or a dict with probs as keys.
                # Optional. List with length X.shape[0]. Values are multiplied to the predictions
                # of each sample to rescale values.
                scalingList: list=None, 
                ): 
        
        __doc__ = BaseWeightsBasedEstimator.predict.__doc__
        
        return super(MetaEstimatorMixin, self).predict(X = X,
                                                       probs = probs, 
                                                       outputAsDf = outputAsDf,
                                                       scalingList = scalingList)
    
    #---
    
    def pointPredict(self,
                     X: np.ndarray, # Feature Matrix
                     **kwargs):
        """Original `predict` method to generate point forecasts"""
        
        return super().predict(X = X,
                               **kwargs)


In [None]:
# show_doc(RandomForestWSAA)

In [None]:
# show_doc(RandomForestWSAA.fit)

In [None]:
# show_doc(RandomForestWSAA.getWeights)

## SAA

In [None]:
#| export

class SampleAverageApproximation(BaseWeightsBasedEstimator):
    """SAA is a featureless approach that assumes the density of the target variable is given
    by assigning equal probability to each historical observation of said target variable."""
    
    def __init__(self):
        
        self.yTrain = None
        
    #---
        
    def __str__(self):
        return "SAA()"
    __repr__ = __str__ 
    
    #---
    
    def fit(self: SAA, 
            y: np.ndarray, # Target values which form the estimated density function based on the SAA algorithm.
            ):
        self.yTrain = y
    
    #---
    
    def getWeights(self, 
                   X: np.ndarray=None, # Feature matrix for which conditional density estimates are computed.
                   # Specifies structure of the returned density estimates. One of: 
                   # 'all', 'onlyPositiveWeights', 'summarized', 'cumDistribution', 'cumDistributionSummarized'
                   outputType: str='onlyPositiveWeights', 
                   # Optional. List with length X.shape[0]. Values are multiplied to the estimated 
                   # density of each sample for scaling purposes.
                   scalingList: list=None, 
                   ) -> list: # List whose elements are the conditional density estimates for the samples specified by `X`.
        
        __doc__ = BaseWeightsBasedEstimator.getWeights.__doc__
        
        #---

        if X is None:
            neighborsList = [np.arange(len(self.yTrain))]
        else:
            neighborsList = [np.arange(len(self.yTrain))] * X.shape[0]

        # weightsDataList is a list whose elements correspond to one test prediction each. 
        weightsDataList = [(np.repeat(1 / len(neighbors), len(neighbors)), np.array(neighbors)) for neighbors in neighborsList]

        weightsDataList = restructureWeightsDataList(weightsDataList = weightsDataList, 
                                                     outputType = outputType, 
                                                     y = self.yTrain,
                                                     scalingList = scalingList,
                                                     equalWeights = True)

        return weightsDataList
    

In [None]:
# show_doc(SampleAverageApproximation)

In [None]:
# show_doc(SampleAverageApproximation.fit)

In [None]:
# show_doc(SampleAverageApproximation.getWeights)

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()

# Test Code

In [None]:
#| hide

# from lightgbm import LGBMRegressor
# from dddex.loadData import *

# data, XTrain, yTrain, XTest, yTest = loadDataYaz(testDays = 14, 
#                                                  returnXY = True,
#                                                  daysToCut = 0)

In [None]:
#| hide

# RF = RandomForestWSAA(max_depth = 2,
#                       n_estimators = 10,
#                       n_jobs = 1)

# RF.fit(X = XTrain, y = yTrain)

In [None]:
#| hide

# RF.predict(XTest, probs = [0.5])