In [164]:
import numpy as np
from graphlib import TopologicalSorter
import sklearn.linear_model as lm

class PipelineStructure:
    def __init__(self):
        self.nodes = set()
        self.edges = set()
        self.components = {'start': None}
        self.current_node = 'start'

    def update(self, node, component):
        self.add_node(node, component)
        self.add_edge(self.current_node, node)
        self.current_node = node

    def add_node(self, node, component):
        self.nodes.add(node)
        self.components[node] = component

    def add_edge(self, sender, reciever):
        self.edges.add((sender, reciever))

    def make_graph(self):
        self.graph = dict()
        for edge in self.edges:
            self.graph.setdefault(edge[1], set()).add(edge[0])

    def __call__(self, feature_matrix: np.ndarray, response_vector: np.ndarray):
        self.outputs = dict()
        ts = TopologicalSorter(self.graph)
        for node in ts.static_order():

            if node == 'start':
                self.outputs[node] = (np.arange(feature_matrix.shape[1]), np.array([]))

            elif isinstance(self.components[node], (FeatureSelection, OutlierDetection)):
                layer = self.components[node]
                parants = list(self.graph[node])
                assert len(parants) == 1
                selected_features, detected_outliers = self.outputs[parants[0]]

                if isinstance(layer, FeatureSelection):
                    selected_features = layer.select_features(
                        feature_matrix, response_vector, selected_features, detected_outliers)

                else:
                    detected_outliers = layer.detect_outliers(
                        feature_matrix, response_vector, selected_features, detected_outliers)
                self.outputs[node] = (selected_features, detected_outliers)

            elif isinstance(self.components[node], MissingImputation):
                parants = list(self.graph[node])
                assert len(parants) == 1
                selected_features, detected_outliers = self.outputs[parants[0]]
                response_vector = self.components[node].impute_missing(
                    feature_matrix, response_vector)
                self.outputs[node] = (selected_features, detected_outliers)

            elif isinstance(self.components[node], IndexesOperator):
                layer = self.components[node]
                parants = list(self.graph[node])
                selected_features_list = []
                detected_outliers_list = []
                for parant in parants:
                    selected_features, detected_outliers = self.outputs[parant]
                    selected_features_list.append(selected_features)
                    detected_outliers_list.append(detected_outliers)
                if isinstance(layer, Union):
                    selected_features = layer.union(*selected_features_list)
                    detected_outliers = layer.union(*detected_outliers_list)
                elif isinstance(layer, Intersection):
                    selected_features = layer.intersection(*selected_features_list)
                    detected_outliers = layer.intersection(*detected_outliers_list)
                else:
                    raise TypeError('Input must be Union or Intersection')
                self.outputs[node] = (selected_features, detected_outliers)

            elif isinstance(self.components[node], (DeleteOutliers, ExtractFeatures)):
                parents = list(self.graph[node])
                assert len(parents) == 1
                self.outputs[node] = self.outputs[parants[0]]

            elif node == 'end':
                parants = list(self.graph[node])
                assert len(parants) == 1
                selected_features, _ = self.outputs[parants[0]]
                return selected_features

    def inference(self, feature_matrix):
        self.outputs = dict()
        ts = TopologicalSorter(self.graph)
        # ts.prepare()
        for node in ts.static_order():

            if node == 'start':
                self.outputs[node] = (np.arange(feature_matrix.shape[1]), np.array([]))

            elif isinstance(self.components[node], (FeatureSelection, OutlierDetection)):
                layer = self.components[node]
                parants = list(self.graph[node])
                assert len(parants) == 1
                selected_features, detected_outliers = self.outputs[parants[0]]

                if isinstance(layer, FeatureSelection):
                    selected_features = layer.select_features(
                        feature_matrix, response_vector, selected_features, detected_outliers)

                else:
                    detected_outliers = layer.detect_outliers(
                        feature_matrix, response_vector, selected_features, detected_outliers)
                self.outputs[node] = (selected_features, detected_outliers)

            elif isinstance(self.components[node], MissingImputation):
                parants = list(self.graph[node])
                assert len(parants) == 1
                selected_features, detected_outliers = self.outputs[parants[0]]
                response_vector = self.components[node].impute_missing(
                    feature_matrix, response_vector)
                self.outputs[node] = (selected_features, detected_outliers)

            elif isinstance(self.components[node], IndexesOperator):
                layer = self.components[node]
                parants = list(self.graph[node])
                selected_features_list = []
                detected_outliers_list = []
                for parant in parants:
                    selected_features, detected_outliers = self.outputs[parant]
                    selected_features_list.append(selected_features)
                    detected_outliers_list.append(detected_outliers)
                if isinstance(layer, Union):
                    selected_features = layer.union(*selected_features_list)
                    detected_outliers = layer.union(*detected_outliers_list)
                elif isinstance(layer, Intersection):
                    selected_features = layer.intersection(*selected_features_list)
                    detected_outliers = layer.intersection(*detected_outliers_list)
                else:
                    raise TypeError('Input must be Union or Intersection')
                self.outputs[node] = (selected_features, detected_outliers)

            elif isinstance(self.components[node], (DeleteOutliers, ExtractFeatures)):
                parents = list(self.graph[node])
                assert len(parents) == 1
                self.outputs[node] = self.outputs[parants[0]]

            elif node == 'end':
                parants = list(self.graph[node])
                assert len(parants) == 1
                selected_features, _ = self.outputs[parants[0]]
                return selected_features


    def __or__(self, other):
        if isinstance(other, PipelineStructure):
            pl = PipelineStructure()
            pl.nodes = self.nodes | other.nodes
            pl.edges = self.edges | other.edges
            pl.components = {**self.components, **other.components}
            if self.nodes >= other.nodes:
                pl.current_node = self.current_node
            else:
                pl.current_node = other.current_node
            return pl
        else:
            raise TypeError('Input must be PipelineStructure')


class FeatureMatrix:
    def __init__(self, pl_structure):
        self.pl_structure = pl_structure

class ResponseVector:
    def __init__(self, pl_structure):
        self.pl_structure = pl_structure

class SelectedFeatures:
    def __init__(self, pl_structure):
        self.pl_structure = pl_structure

class DetectedOutliers:
    def __init__(self, pl_structure):
        self.pl_structure = pl_structure


class FeatureSelection:
    instance_counter = dict()

    def __init__(
        self,
        name: str,
        parameters: float | list[float],
        candidates: list[float] | list[list[float]]):
        self.parameters = parameters
        self.candidates = candidates
        FeatureSelection.instance_counter.setdefault(name, 0)
        self.name = f'{name}_{FeatureSelection.instance_counter[name]}'
        FeatureSelection.instance_counter[name] += 1

    def __call__(self, feature_matrix: FeatureMatrix, response_vector: ResponseVector) -> SelectedFeatures:
        pl_structure = feature_matrix.pl_structure | response_vector.pl_structure
        pl_structure.update(self.name, self)
        return SelectedFeatures(pl_structure)

    def select_features(
        self, feature_matrix: np.ndarray, response_vector: np.ndarray,
        selected_features: np.ndarray, detected_outliers: np.ndarray) -> np.ndarray:
        raise NotImplementedError

    def perform_si(
        self,
        a: np.ndarray,
        b: np.ndarray,
        z: float,
        feature_matrix: np.ndarray,
        selected_features: np.ndarray,
        detected_outliers: np.ndarray,
        l: float,
        u: float,
        ) -> (np.ndarray, np.ndarray, float, float):
        raise NotImplementedError


class OutlierDetection:
    instance_counter = dict()

    def __init__(
        self,
        name: str,
        parameters: float | list[float],
        candidates: list[float] | list[list[float]]):
        self.parameters = parameters
        self.candidates = candidates
        OutlierDetection.instance_counter.setdefault(name, 0)
        self.name = f'{name}_{OutlierDetection.instance_counter[name]}'
        OutlierDetection.instance_counter[name] += 1

    def __call__(self, feature_matrix: FeatureMatrix, response_vector: ResponseVector) -> DetectedOutliers:
        pl_structure = feature_matrix.pl_structure | response_vector.pl_structure
        pl_structure.update(self.name, self)
        return DetectedOutliers(pl_structure)

    def detect_outliers(
        self, feature_matrix: np.ndarray, response_vector: np.ndarray,
        selected_features: np.ndarray, detected_outliers: np.ndarray) -> np.ndarray:
        raise NotImplementedError

    def perform_si(
        self,
        a: np.ndarray,
        b: np.ndarray,
        z: float,
        feature_matrix: np.ndarray,
        selected_features: np.ndarray,
        detected_outliers: np.ndarray,
        l: float,
        u: float,
        ) -> (np.ndarray, np.ndarray, float, float):
        raise NotImplementedError

class MissingImputation:
    instance_counter = dict()

    def __init__(self, name: str):
        MissingImputation.instance_counter.setdefault(name, 0)
        self.name = f'{name}_{MissingImputation.instance_counter[name]}'
        MissingImputation.instance_counter[name] += 1

    def __call__(self, feature_matrix: FeatureMatrix, response_vector: ResponseVector) -> ResponseVector:
        pl_structure = feature_matrix.pl_structure | response_vector.pl_structure
        pl_structure.update(self.name, self)
        return ResponseVector(pl_structure)

    def impute_missing(self, feature_matrix: np.ndarray, response_vector: np.ndarray) -> np.ndarray:
        raise NotImplementedError

    def compute_covariance(
        self,
        feature_matrix: np.ndarray,
        response_vector: np.ndarray,
        variance: float,
        ) -> (np.ndarray, np.ndarray, np.ndarray):
        raise NotImplementedError

class IndexesOperator:
    instance_counter = dict()

    def __init__(self, name: str):
        IndexesOperator.instance_counter.setdefault(name, 0)
        self.name = f'{name}_{IndexesOperator.instance_counter[name]}'
        IndexesOperator.instance_counter[name] += 1

    def __call__(self, *inputs: tuple[SelectedFeatures] | tuple[DetectedOutliers]) -> SelectedFeatures | DetectedOutliers:
        pl_structure = inputs[0].pl_structure
        pl_structure.update(self.name, self)
        input_type = type(inputs[0])
        if len(inputs) > 1:
            for input in inputs[1:]:
                if input_type != type(input):
                    raise TypeError('Inputs must be same type')
                input.pl_structure.update(self.name, self)
                pl_structure = pl_structure | input.pl_structure
        if isinstance(inputs[0], SelectedFeatures):
            self.mode = 'selected_features'
            return SelectedFeatures(pl_structure)
        elif isinstance(inputs[0], DetectedOutliers):
            self.mode = 'detected_outliers'
            return DetectedOutliers(pl_structure)
        else:
            raise TypeError('Inputs must be SelectedFeatures or DetectedOutliers')


class DeleteOutliers:
    counter = 0

    def __init__(self, name='delete'):
        self.name = f'{name}_{DeleteOutliers.counter}'
        DeleteOutliers.counter += 1

    def __call__(
        self,
        feature_matrix: FeatureMatrix,
        response_vector: ResponseVector,
        detected_outliers: DetectedOutliers
        ) -> (FeatureMatrix, ResponseVector):
        pl_structure = feature_matrix.pl_structure | response_vector.pl_structure | detected_outliers.pl_structure
        pl_structure.update(self.name, self)
        return FeatureMatrix(pl_structure), ResponseVector(pl_structure)

    def delete_outliers(
        self,
        feature_matrix: np.ndarray,
        response_vector: np.ndarray,
        detected_outliers: np.ndarray
        ) -> (np.ndarray, np.ndarray):
        non_outliers = np.delete(np.arange(feature_matrix.shape[0]), detected_outliers)
        return feature_matrix[non_outliers, :], response_vector[non_outliers]

class ExtractFeatures:
    counter = 0

    def __init__(self, name='extract'):
        self.name = f'{name}_{ExtractFeatures.counter}'
        ExtractFeatures.counter += 1

    def __call__(
        self,
        feature_matrix: FeatureMatrix,
        selected_features: SelectedFeatures
        ) -> FeatureMatrix:
        pl_structure = feature_matrix.pl_structure | selected_features.pl_structure
        pl_structure.update(self.name, self)
        return FeatureMatrix(pl_structure)

    def extract_features(
        self,
        feature_matrix: np.ndarray,
        selected_features: np.ndarray
        ) -> np.ndarray:
        return feature_matrix[:, selected_features]


class Union(IndexesOperator):
    def __init__(self, name='union'):
        super().__init__(name)

    def union(self, *inputs: tuple[np.ndarray]) -> np.ndarray:
        if len(inputs) == 1:
            return inputs[0]
        else:
            temp_set = set(inputs[0].tolist())
            for input in inputs[1:]:
                temp_set = temp_set | set(input.tolist())
            return np.array(list(temp_set))

class Intersection(IndexesOperator):
    def __init__(self, name='intersection'):
        super().__init__(name)

    def intersection(self, *inputs: tuple[np.ndarray]) -> np.ndarray:
        if len(inputs) == 1:
            return inputs[0]
        else:
            temp_set = set(inputs[0].tolist())
            for input in inputs[1:]:
                temp_set = temp_set & set(input.tolist())
            return np.array(list(temp_set))


class MeanValueImputation(MissingImputation):
    def __init__(self, name='mean_value_imputation'):
        super().__init__(name)

    def impute_missing(self, feature_matrix: np.ndarray, response_vector: np.ndarray) -> np.ndarray:
        _, y = feature_matrix, response_vector
        n = y.shape[0]

        # location of missing value
        missing_index = np.where(np.isnan(y))[0]

        # other than missing value and its averate
        y_delete = np.delete(y, missing_index)
        y_mean = np.mean(y_delete)

        # imputation
        y[missing_index] = y_mean
        return y

class EuclideanImputation(MissingImputation):
    def __init__(self, name='euclidean_imputation'):
        super().__init__(name)

class ManhattanImputation(MissingImputation):
    def __init__(self, name='manhattan_imputation'):
        super().__init__(name)

class ChebyshevImputation(MissingImputation):
    def __init__(self, name='chebyshev_imputation'):
        super().__init__(name)

class DefiniteRegressionImputation(MissingImputation):
    def __init__(self, name='definite_regression_imputation'):
        super().__init__(name)

class ProbabilisticRegressionImputation(MissingImputation):
    def __init__(self, name='probabilistic_regression_imputation'):
        super().__init__(name)

class StepwiseFeatureSelection(FeatureSelection):
    def __init__(self, name='stepwise_feature_selection', parameters=None, candidates=None):
        super().__init__(name, parameters, candidates)

    def select_features(self, feature_matrix: np.ndarray, response_vector: np.ndarray, selected_features: np.ndarray, detected_outliers: np.ndarray) -> np.ndarray:
        X, y = feature_matrix, response_vector
        M, O = selected_features, detected_outliers.tolist()

        X = np.delete(X,O,0)
        X = X[:,M]
        y = np.delete(y,O).reshape(-1,1)

        # initialize
        active_set = []
        inactive_set = list(range(X.shape[1]))

        # stepwise feature selection
        for _ in range(self.parameters):
            X_active = X[:,active_set]
            r = y - X_active @ np.linalg.inv(X_active.T @ X_active) @ X_active.T @ y
            correlation = X[:,inactive_set].T @ r

            ind = np.argmax(np.abs(correlation))
            active_set.append(inactive_set[ind])
            inactive_set.remove(inactive_set[ind])
        M = [M[i] for i in active_set]
        return np.array(M)

class MarginalScreening(FeatureSelection):
    def __init__(self, name='marginal_screening', parameters=None, candidates=None):
        super().__init__(name, parameters, candidates)

    def select_features(self, feature_matrix: np.ndarray, response_vector: np.ndarray, selected_features: np.ndarray, detected_outliers: np.ndarray) -> np.ndarray:
        X, y = feature_matrix, response_vector
        M, O = selected_features, detected_outliers.tolist()

        X = np.delete(X,O,0)
        X = X[:,M]
        y = np.delete(y,O).reshape(-1,1)

        # marginal screening
        XTy_abs = np.abs(X.T @ y).flatten()
        sort_XTy_abs = np.argsort(XTy_abs)[::-1]

        active_set = sort_XTy_abs[:self.parameters]
        M = [M[i] for i in active_set]
        return np.array(M)

class Lasso(FeatureSelection):
    def __init__(self, name='lasso', parameters=None, candidates=None):
        super().__init__(name, parameters, candidates)

    def select_features(self, feature_matrix: np.ndarray, response_vector: np.ndarray, selected_features: np.ndarray, detected_outliers: np.ndarray) -> np.ndarray:
        X, y = feature_matrix, response_vector
        M, O = selected_features, detected_outliers.tolist()

        X = np.delete(X,O,0)
        X = X[:,M]
        y = np.delete(y,O).reshape(-1,1)

        # lasso
        lasso = lm.Lasso(alpha=self.parameters, fit_intercept=False, max_iter=5000, tol=1e-10)
        lasso.fit(X, y)
        active_set = np.where(lasso.coef_ != 0)[0]
        M = [M[i] for i in active_set]
        return np.array(M)

class ElasticNet(FeatureSelection):
    def __init__(self, name='elastic_net', parameters=None, candidates=None):
        super().__init__(name, parameters, candidates)

class Lars(FeatureSelection):
    def __init__(self, name='lars', parameters=None, candidates=None):
        super().__init__(name, parameters, candidates)

class CookDistance(OutlierDetection):
    def __init__(self, name='cook_distance', parameters=None, candidates=None):
        super().__init__(name, parameters, candidates)

    def detect_outliers(self, feature_matrix: np.ndarray, response_vector: np.ndarray, selected_features: np.ndarray, detected_outliers: np.ndarray) -> np.ndarray:
        X, y = feature_matrix, response_vector
        M, O = selected_features, detected_outliers.tolist()

        X = np.delete(X, O, 0)
        X = X[:, M]
        y = np.delete(y, O).reshape(-1, 1)

        num_data = list(range(X.shape[0]))
        num_outlier_data = [i for i in num_data if i not in O]

        # cook's distance
        non_outlier = []
        outlier = []
        n, p = X.shape

        hat_matrix = X @ np.linalg.inv(X.T @ X) @ X.T
        Px = np.identity(n) - hat_matrix
        threshold = self.parameters / n # threshold value

        # outlier detection
        for i in range(n):
            ej = np.zeros((n, 1))
            ej[i] = 1
            hi = hat_matrix[i][i] # diagonal element of hat matrix
            Di_1 = (y.T @ (Px @ ej @ ej.T @ Px) @ y) / (y.T @ Px @ y) # first term of Di
            Di_2 = ((n - p) * hi) / (p * (1 - hi)**2) # second term of Di
            Di = Di_1 * Di_2

            if Di < threshold:
                non_outlier.append(i)
            else:
                outlier.append(i)

        O_ = [num_outlier_data[i] for i in outlier]
        O = O + O_
        return np.array(O)


class Dffits(OutlierDetection):
    def __init__(self, name='dffits', parameters=None, candidates=None):
        super().__init__(name, parameters, candidates)

class SoftIpod(OutlierDetection):
    def __init__(self, name='soft_ipod', parameters=None, candidates=None):
        super().__init__(name, parameters, candidates)



def make_dataset():
    feature_matrix = FeatureMatrix(PipelineStructure())
    response_vector = ResponseVector(PipelineStructure())
    return feature_matrix, response_vector

def union(*inputs):
    return Union()(*inputs)

def intersection(*inputs):
    return Intersection()(*inputs)

def delete_outliers(feature_matrix, response_vector, detected_outliers):
    return DeleteOutliers()(
        feature_matrix,
        response_vector,
        detected_outliers
        )

def extract_features(feature_matrix, selected_features):
    return ExtractFeatures()(feature_matrix, selected_features)

def mean_value_imputation(feature_matrix, response_vector):
    return MeanValueImputation()(feature_matrix, response_vector)

def euclidean_imputation(feature_matrix, response_vector):
    return EuclideanImputation()(feature_matrix, response_vector)

def manhattan_imputation(feature_matrix, response_vector):
    return ManhattanImputation()(feature_matrix, response_vector)

def chebyshev_imputation(feature_matrix, response_vector):
    return ChebyshevImputation()(feature_matrix, response_vector)

def definite_regression_imputation(feature_matrix, response_vector):
    return DefiniteRegressionImputation()(feature_matrix, response_vector)

def probabilistic_regression_imputation(feature_matrix, response_vector):
    return ProbabilisticRegressionImputation()(feature_matrix, response_vector)

def stepwise_feature_selection(feature_matrix, response_vector, parameters=10, candidates=None):
    return StepwiseFeatureSelection(parameters=parameters, candidates=candidates)(feature_matrix, response_vector)

def marginal_screening(feature_matrix, response_vector, parameters=10, candidates=None):
    return MarginalScreening(parameters=parameters, candidates=candidates)(feature_matrix, response_vector)

def lasso(feature_matrix, response_vector, parameters=0.1, candidates=None):
    return Lasso(parameters=parameters, candidates=candidates)(feature_matrix, response_vector)

def elastic_net(feature_matrix, response_vector, parameters=None, candidates=None):
    return ElasticNet(parameters=parameters, candidates=candidates)(feature_matrix, response_vector)

def lars(feature_matrix, response_vector, parameters=None, candidates=None):
    return Lars(parameters=parameters, candidates=candidates)(feature_matrix, response_vector)

def cook_distance(feature_matrix, response_vector, parameters=3.0, candidates=None):
    return CookDistance(parameters=parameters, candidates=candidates)(feature_matrix, response_vector)

def dffits(feature_matrix, response_vector, parameters=None, candidates=None):
    return Dffits(parameters=parameters, candidates=candidates)(feature_matrix, response_vector)

def soft_ipod(feature_matrix, response_vector, parameters=None, candidates=None):
    return SoftIpod(parameters=parameters, candidates=candidates)(feature_matrix, response_vector)

def pipeline(feature_matrix, response_vector, selected_features):
    pl_structure = selected_features.pl_structure
    pl_structure.update('end', None)
    pl_structure.make_graph()
    return pl_structure


In [165]:
X, y = make_dataset()
y_impute = mean_value_imputation(X, y)
O = cook_distance(X, y_impute)
X_del, y_del = delete_outliers(X, y_impute, O)
M = marginal_screening(X_del, y_del)
X_del_ext = extract_features(X_del, M)
M_lasso = lasso(X_del_ext, y_del)
M_sfs = stepwise_feature_selection(X_del_ext, y_del, parameters=5)
M = union(M_lasso, M_sfs)
pl = pipeline(X, y, M)


In [166]:
rng = np.random.default_rng(0)
X = rng.normal(size=(100, 20))
y = rng.normal(size=100)
y[[3, 7, 84]] = np.nan
M = pl(X, y)
print(M)


[ 1  2  3  6 15 17]


In [105]:
from graphlib import TopologicalSorter

graph = pl.graph

ts = TopologicalSorter(graph)
ts.prepare()
while ts.is_active():
    ready_nodes = ts.get_ready()
    ts.done(*ready_nodes)
    print(*ready_nodes)

for key, value in pl.components.items():
    print(key, value)

start
mean_value_imputation_0
cook_distance_0
delete_0
marginal_screening_0
extract_0
stepwise_feature_selection_0 lasso_0
union_0
end
start None
mean_value_imputation_0 <__main__.MeanValueImputation object at 0x7fe49e520b20>
cook_distance_0 <__main__.CookDistance object at 0x7fe49e521630>
delete_0 <__main__.DeleteOutliers object at 0x7fe4a81a3250>
marginal_screening_0 <__main__.MarginalScreening object at 0x7fe4a8051f30>
extract_0 <__main__.ExtractFeatures object at 0x7fe4a80508e0>
lasso_0 <__main__.Lasso object at 0x7fe4a8053550>
union_0 <__main__.Union object at 0x7fe4a80529b0>
stepwise_feature_selection_0 <__main__.StepwiseFeatureSelection object at 0x7fe4a8053ac0>
end None


In [98]:
X, y = make_dataset()
y_impute = mean_value_imputation(X, y)
O1 = cook_distance(X, y_impute)
O2 = dffits(X, y_impute)
O3 = soft_ipod(X, y_impute)
O = union(intersection(O1, O2), O3)
X_del, y_del = delete_outliers(X, y_impute, O)
M = stepwise_feature_selection(X_del, y_del)
pl = pipeline(X, y, M)

ts = TopologicalSorter(pl.graph)
ts.prepare()
while ts.is_active():
    ready_nodes = ts.get_ready()
    ts.done(*ready_nodes)
    print(*ready_nodes)

start
mean_value_imputation_3
cook_distance_3 dffits_1 soft_ipod_1
intersection_1
union_3
delete_3
stepwise_feature_selection_3
end
