In [147]:
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn.preprocessing import KBinsDiscretizer, OrdinalEncoder, FunctionTransformer
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
import numpy as np
import pandas as pd

In [148]:
df = pd.read_csv("../../data/AmesHousing.csv")

In [149]:
cat_cols = df.select_dtypes(include=["object", "category"]).columns.tolist()

In [150]:
y = df["SalePrice"].copy()
X = df.drop("SalePrice", axis=1).copy()

In [151]:
def sort_categories(X, y, categorical_columns, scoring_function):
    categories_sorted = {}
    for col in categorical_columns:
        category_scores = scoring_function(X[col], y)
        sorted_categories = category_scores.sort_values().index.tolist()
        categories_sorted[col] = sorted_categories
    return categories_sorted

# Scoring function for mean
def mean_score(x, y):
    return y.groupby(x).mean()

# WoE scoring function
def woe_score(x, y):
    total_goods = y.sum()
    total_bads = len(y) - total_goods
    grouped = y.groupby(x).agg(['sum', 'count'])
    grouped['goods'] = grouped['sum']
    grouped['bads'] = grouped['count'] - grouped['goods']
    grouped['woe'] = np.log((grouped['goods'] / total_goods) / (grouped['bads'] / total_bads))
    return grouped['woe']

In [152]:
def calculate_optimal_bins(X, max_bins=20, min_bins=2, factor=1.5):
    n_bins = []
    for column in X.columns:
        if len(pd.unique(X[column])) < max_bins:
            # For categorical data, use a default number of bins
            n_bins_col = min(len(pd.unique(X[column]))-1, max_bins)
            n_bins.append(max(min_bins, n_bins_col))
        else:
            Q1 = X[column].quantile(0.25)
            Q3 = X[column].quantile(0.75)
            IQR = Q3 - Q1
            n = len(X[column].dropna())  # Exclude NaN values for the calculation
            
            # Calculate bin width using Freedman-Diaconis rule
            bin_width = factor * IQR / (n ** (1/3))
            
            # Calculate number of bins
            if bin_width > 0:
                optimal_bins = int((X[column].max() - X[column].min()) / bin_width)
                optimal_bins = max(min(optimal_bins, max_bins), min_bins)
            else:
                optimal_bins = min_bins
            
            n_bins.append(optimal_bins)
            
    return n_bins

In [153]:
# Get categorical and numeric columns
cat_cols = X.select_dtypes(include=["object", "category"]).columns.tolist()

In [154]:
# Sort categories by the mean of the target
sorted_categories = sort_categories(X, y, cat_cols, mean_score)
categories = [sorted_categories[col] for col in cat_cols]

In [155]:
# Set up the OrdinalEncoder
ordinal_encoder = OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=np.nan)
X_copy = X.copy()
X_copy[cat_cols] = ordinal_encoder.fit_transform(X_copy[cat_cols])

# Calculate optimal bins
optimal_bins = calculate_optimal_bins(X_copy, max_bins=100)
#print(f"Optimal bins: {optimal_bins}")

In [156]:
import pandas as pd

# Assuming the code before has been run, and X_copy is already defined with categorical columns encoded
quantile_cuts = {}
for col in X_copy.columns:
    try:
        # Attempt to use the number of bins specified for each column, fall back to a default if any error
        num_bins = optimal_bins[X_copy.columns.get_loc(col)]
        quantile_cuts[col] = pd.qcut(X_copy[col], q=num_bins, duplicates='drop')
    except (ValueError, TypeError):
        print(f"Skipping column {col} due to an error with binning.")

# The result is a dictionary of Series, each transformed into quantile bins
# You might want to convert this dictionary back to a DataFrame:
transformed_data = pd.DataFrame(quantile_cuts)

In [157]:
y = (y >=y.quantile(0.75)).astype(int)

In [158]:
X = transformed_data

In [187]:
import pandas as pd
import numpy as np
import heapq
from sklearn.metrics import recall_score, precision_score

class RuleEvaluator:
    def __init__(self, min_samples, max_samples, min_precision, scorer):
        self.min_samples = min_samples
        self.max_samples = max_samples
        self.min_precision = min_precision
        self.scorer = scorer

    def evaluate(self, y, mask):
        count = mask.sum()
        if count < self.min_samples or count > self.max_samples:
            return None
        
        y_pred = mask.astype(int)
        recall = recall_score(y, y_pred, zero_division=0)
        precision = precision_score(y, y_pred, zero_division=0)
        
        if recall >= self.min_samples / len(y) and precision >= self.min_precision:
            rule_score = self.scorer(y[mask]) if self.scorer else 0
            return RuleResult(rule_score, recall, precision)
        return None

class RuleResult:
    def __init__(self, score, recall, precision):
        self.score = score
        self.recall = recall
        self.precision = precision

    def is_relevant(self):
        return self.score is not None

def create_rule_mask(X, col, value, operator):
    def check_interval(interval, value, operator):
        if operator == "<=":
            return interval.right <= value
        elif operator == ">=":
            return interval.left >= value
        return False

    mask = X[col].apply(lambda x: check_interval(x, value, operator)).astype(bool)
    return pd.Series(np.where(mask==True, True, False))

def process_column(X, y, col, rule_evaluator):
    results = []
    categories = X[col].cat.categories

    for category in categories:
        lower_edge, upper_edge = category.left, category.right

        rules = [
            (f"{col} <= {upper_edge}", create_rule_mask(X, col, upper_edge, "<=")),
            (f"{col} > {lower_edge}", create_rule_mask(X, col, lower_edge, ">"))
        ]

        for rule, mask in rules:
            result = rule_evaluator.evaluate(y, mask)
            if result and result.is_relevant():
                results.append((rule, result))

    nan_mask = X[col].isna()
    if nan_mask.any():
        nan_result = rule_evaluator.evaluate(y, nan_mask)
        if nan_result and nan_result.is_relevant():
            results.append((f"{col} isna()", nan_result))

    return results

def find_top_k_rules(X, y, k, rule_evaluator):
    min_heap = []
    for col in X.columns:
        results = process_column(X, y, col, rule_evaluator)
        for rule, result in results:
            heapq.heappush(min_heap, (result.score, rule, result))
            if len(min_heap) > k:
                heapq.heappop(min_heap)

    top_k_rules = sorted(min_heap, key=lambda x: -x[0])  # Sorting by score descending
    return [(score, rule, res.recall, res.precision) for score, rule, res in top_k_rules]

# Example usage
scorer = lambda y: np.log(np.mean(y) / (1 - np.mean(y))) if np.mean(y) not in [0, 1] else float('-inf')
rule_evaluator = RuleEvaluator(min_samples=10, max_samples=len(X)-10, min_precision=0.01, scorer=scorer)
top_k_rules = find_top_k_rules(X, y, 5, rule_evaluator)  # Top 5 rules

for score, rule, recall, precision in top_k_rules:
    print(f"Rule: {rule}, Score: {score}, Recall: {recall}, Precision: {precision}")

Rule: Pool QC <= 2.0, Score: 0.8472978603872034, Recall: 0.009536784741144414, Precision: 0.7
Rule: Exter Qual <= 2.0, Score: 0.2291137183007135, Recall: 0.8583106267029973, Precision: 0.5570291777188329
Rule: PID <= 528477022.5, Score: -0.040933408926252995, Recall: 0.4891008174386921, Precision: 0.4897680763983629
Rule: Mas Vnr Area > 96.391, Score: -0.08701137698962969, Recall: 0.014986376021798364, Precision: 0.4782608695652174
Rule: Mas Vnr Area isna(), Score: -0.08701137698962969, Recall: 0.014986376021798364, Precision: 0.4782608695652174


In [185]:
def convert_rules_to_categories(rules, encoder_categories, feature_names):
    categorical_rules = []
    for rule, result in rules:
        # Parsing the rule into components
        match = re.match(r"(.+?)\s*(<=|>=|>|<|==|!=)\s*(.*)", rule)
        if not match:
            print(f"Could not parse rule: {rule}")
            continue

        feature_name, operation, encoded_value = match.groups()

        # Find the category list for the feature from the encoder
        if feature_name in feature_names:
            cat_index = feature_names.index(feature_name)
            original_categories = encoder_categories[cat_index]
            
            # Convert the encoded value back to an integer index (if necessary)
            if encoded_value.isdigit():
                value_index = int(encoded_value)
            else:
                # Find index if encoded_value is a label
                value_index = np.where(original_categories == encoded_value)[0][0]

            # Determine relevant categories based on the operation
            if operation in ["<=", "=="]:
                relevant_categories = original_categories[:value_index + 1]
            elif operation == "<":
                relevant_categories = original_categories[:value_index]
            elif operation == ">=":
                relevant_categories = original_categories[value_index:]
            elif operation == ">":
                relevant_categories = original_categories[value_index + 1:]

            # Format the rule using category labels
            category_list = ", ".join([f"'{cat}'" for cat in relevant_categories])
            if category_list:
                new_rule = f"{feature_name} in [{category_list}]"
            else:
                new_rule = f"{feature_name} is empty"
        else:
            new_rule = rule  # Non-categorical rule remains unchanged

        categorical_rules.append((new_rule, result))

    return categorical_rules

# Example usage assuming the previous setup
feature_names = X[cat_cols].columns.tolist()  # Assuming cat_cols has the categorical columns
rules = [("Exter Qual <= 2", "result"), ("Pool QC <= 2", "result")]  # Example rules from encoded data

# Convert rules back to categorical labels
categorical_rules = convert_rules_to_categories(rules, ordinal_encoder.categories_, feature_names)

# Print the rules
for rule, _ in categorical_rules:
    print(rule)

Exter Qual in ['Ex', 'Fa', 'Gd']
Pool QC in ['Ex', 'Fa', 'Gd']


In [326]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import OrdinalEncoder
import bisect

class PandasQCutDiscretizer:
    def __init__(self, n_bins=3):
        self.n_bins = n_bins
        self.left_edges = {}
        self.right_edges = {}

    def fit_transform(self, X):
        X = X.copy()
        for col in X.columns:
            if X[col].dtype in [np.float64, np.int64]:  # We only discretize numeric columns
                X[col] = pd.qcut(X[col], self.n_bins, duplicates='drop') #TODO: handle bins where nunique <= n_bins
                self.left_edges[col] = [bin.left for bin in X[col].unique()]
                self.right_edges[col] = [bin.right for bin in X[col].unique()]
        return X

    def get_bin_edges(self):
        return self.bin_edges

class Encoder:
    def __init__(self):
        self.encoder = OrdinalEncoder()
        self.category_mappings = {}

    def fit_transform(self, X):
        X = X.copy()
        for col in X.select_dtypes(include=['category', 'object']).columns:
            original_categories = list(X[col].cat.categories)
            X[col] = self.encoder.fit_transform(X[[col]])[:, 0]
            self.category_mappings[col] = original_categories
        return X

    def convert_to_categorical(self, feature, value, is_lower_bound):
        categories = self.category_mappings.get(feature, [])
        index = int(value)  # Assuming value is already the correct integer index

        if index >= len(categories):  # Ensure the index does not exceed the last category
            index = len(categories) - 1
        if index < 0:  # Ensure the index is not negative
            index = 0

        if is_lower_bound:
            # Generates a rule for values in categories from index+1 to the end
            return f"{feature} in {categories[index + 1:]}"
        else:
            # Generates a rule for values in categories from the start up to index
            return f"{feature} in {categories[:index + 1]}"


class RuleGenerator:
    def __init__(self, discretizer, encoder):
        self.discretizer = discretizer
        self.encoder = encoder

    def generate_all_rules(self, X):
        rules = []
        for feature in X.columns:
            left_edges = discretizer.left_edges[feature]
            right_edges = discretizer.right_edges[feature]
            rules += [f"{feature} > {edge}" for edge in left_edges]
            rules += [f"{feature} <= {edge}" for edge in right_edges]
            
            if X[feature].isna().any():
                rules += f"{feature}.isna()"
            
        return rules

data = {
    'age': np.random.randint(20, 60, 100),
    'salary': np.random.normal(50000, 12000, 100),
    'category': ['group' + str(i % 3) for i in range(100)]
}
X = pd.DataFrame(data)
X['category'] = X['category'].astype('category')

# Initialize components
encoder = Encoder()
discretizer = PandasQCutDiscretizer()

# Process data
X_encoded = encoder.fit_transform(X)
X_preprocessed = discretizer.fit_transform(X_encoded)

# Generate rules
rule_generator = RuleGenerator(discretizer, encoder)
rules = rule_generator.generate_all_rules(X_preprocessed)

for rule in rules:
    print(rule)

age > 19.999
age > 48.0
age > 34.0
age <= 34.0
age <= 59.0
age <= 48.0
salary > 55655.496
salary > 45522.726
salary > 22052.444
salary <= 70617.815
salary <= 55655.496
salary <= 45522.726
category > -0.001
category > 1.0
category <= 1.0
category <= 2.0


In [335]:
class RuleGenerator:
    def __init__(self, discretizer, encoder):
        self.discretizer = discretizer
        self.encoder = encoder

    def generate_all_rules(self, X):
        rules = []
        for feature in X.columns:
            left_edges = self.discretizer.left_edges[feature]
            right_edges = self.discretizer.right_edges[feature]
            rules += [Rule(feature, '>', edge) for edge in left_edges]
            rules += [Rule(feature, '<=', edge) for edge in right_edges]
            
            if X[feature].isna().any():
                rules.append(Rule(feature, 'isna', None))
            
        return rules

In [356]:
data = {
    'age': np.random.randint(20, 60, 100),
    'salary': np.random.normal(50000, 12000, 100),
    'category': ['group' + str(i % 3) for i in range(100)],
    'target': np.random.randint(0, 2, 100)  # Binary target variable
}
X = pd.DataFrame(data)
y = X['target']
X['category'] = X['category'].astype('category')
X.drop('target', axis=1, inplace=True)

In [382]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import OrdinalEncoder

class PandasQCutDiscretizer:
    def __init__(self, n_bins=3):
        self.n_bins = n_bins
        self.left_edges = {}
        self.right_edges = {}

    def fit_transform(self, X):
        X = X.copy()
        for col in X.columns:
            if X[col].dtype in [np.float64, np.int64]:  # We only discretize numeric columns
                X[col] = pd.qcut(X[col], self.n_bins, duplicates='drop')
                self.left_edges[col] = [bin.left for bin in X[col].unique()]
                self.right_edges[col] = [bin.right for bin in X[col].unique()]
        return X

    def get_bin_edges(self):
        return self.left_edges, self.right_edges

class Encoder:
    def __init__(self):
        self.encoder = OrdinalEncoder()
        self.category_mappings = {}

    def fit_transform(self, X):
        X = X.copy()
        for col in X.select_dtypes(include=['category', 'object']).columns:
            original_categories = list(X[col].cat.categories)
            X[col] = self.encoder.fit_transform(X[[col]])[:, 0]
            self.category_mappings[col] = original_categories
        return X

    def convert_to_categorical(self, feature, value, is_lower_bound):
        categories = self.category_mappings.get(feature, [])
        index = int(value)  # Assuming value is already the correct integer index

        if index >= len(categories):  # Ensure the index does not exceed the last category
            index = len(categories) - 1
        if index < 0:  # Ensure the index is not negative
            index = 0

        if is_lower_bound:
            return f"{feature} in {categories[index + 1:]}"
        else:
            return f"{feature} in {categories[:index + 1]}"

def negate_last_rule(path):
    if path.rules:
        new_rules = path.rules[:-1] + [path.rules[-1].negate_rule()]
        return Path(new_rules)
    return path

class Rule:
    def __init__(self, feature, operator, value, score=0):
        self.feature = feature
        self.operator = operator
        self.value = value
        self.score = score

    def __str__(self):
        return f"{self.feature} {self.operator} {self.value}"

    def get_mask(self, X):
        if self.operator in ['isna', 'notnull']:
            if self.operator == 'isna':
                return X[self.feature].isna()
            else:
                return X[self.feature].notnull()
        else:
            return eval(f"X['{self.feature}'] {self.operator} {self.value}")

    def negate_rule(self):
        negation_map = {'<=': '>', '>=': '<', '<': '>=', '>': '<=', '==': '!=', '!=': '==', 'isna': 'notnull', 'notnull': 'isna'}
        new_operator = negation_map[self.operator]
        return Rule(self.feature, new_operator, self.value)

class RuleGenerator:
    def __init__(self, discretizer):
        self.discretizer = discretizer

    def generate_all_rules(self, X):
        rules = []
        for feature in X.columns:
            left_edges = self.discretizer.left_edges[feature]
            right_edges = self.discretizer.right_edges[feature]
            rules += [Rule(feature, '>', edge) for edge in left_edges]
            rules += [Rule(feature, '<=', edge) for edge in right_edges]
            
            if X[feature].isna().any():
                rules.append(Rule(feature, 'isna', None))
            
        return rules

class Path:
    def __init__(self, rules=None):
        self.rules = rules if rules is not None else []

    def get_mask(self, X):
        mask = np.ones(len(X), dtype=bool)
        for rule in self.rules:
            mask &= rule.get_mask(X)
        return mask

    def get_path_rule(self):
        return " and ".join([str(rule) for rule in self.rules])

    def __repr__(self):
        return "Path: " + " and ".join([str(rule) for rule in self.rules])

class BinaryRuleScore:
    def __init__(self, recall, precision, WoE):
        self.recall = recall
        self.precision = precision
        self.WoE = WoE

    def __repr__(self):
        return f"BinaryRuleScore (Recall: {self.recall}, Precision: {self.precision}, WoE: {self.WoE})"

class BinaryRuleEvaluator:
    def __init__(self):
        pass

    def evaluate(self, rule, X, y):
        mask = rule.get_mask(X)
        
        true_positives = y[mask].sum()
        total_positives = y.sum()
        predicted_positives = mask.sum()
        total_cases = len(y)

        recall = true_positives / total_positives if total_positives != 0 else 0
        precision = true_positives / predicted_positives if predicted_positives != 0 else 0

        total_negatives = total_cases - total_positives
        false_positives = predicted_positives - true_positives
        WoE = np.log((true_positives / total_positives) / (false_positives / total_negatives)) if false_positives and total_negatives else float('-inf')

        return BinaryRuleScore(recall, precision, WoE)

class RuleFilter:
    def __init__(self, min_recall, min_precision, min_WoE):
        self.min_recall = min_recall
        self.min_precision = min_precision
        self.min_WoE = min_WoE

    def apply(self, rule_score):
        return (rule_score.recall >= self.min_recall and
                rule_score.precision >= self.min_precision and
                rule_score.WoE >= self.min_WoE)

In [390]:

class Distinguisher:
    def __init__(self, encoder, discretizer, rule_generator, evaluator):
        self.encoder = encoder
        self.discretizer = discretizer
        self.rule_generator = rule_generator
        self.evaluator = evaluator
        self.rules = []
    
    def find_rules_recursive(self, X, y, chosen_rules, applied_path, rule_filter, best_score=float('-inf')):
        #print(f"Evaluating with applied path: {applied_path}")
        
        applied_rules = [] if applied_path is None else applied_path.rules
        evaluated_rules = []
        for rule in self.all_rules:
            rule_combination = Path(applied_rules + [rule])
            rule_score = self.evaluator.evaluate(rule_combination, X, y)
            #print(rule_combination, rule_score)
            evaluated_rules.append((rule_combination, rule_score))

        # Filter rules based on evaluation
        filtered_rules = [rule for rule in evaluated_rules if rule_filter.apply(rule[1])]
        if not filtered_rules:
            print("No valid rules after filtering.")
            return chosen_rules

        best_rule = max(filtered_rules, key=lambda x: x[1].WoE) # Use WoE as the primary score metric
        current_best_score = best_rule[1].WoE
        best_rule = best_rule[0]
        
        if current_best_score > best_score:
            chosen_rules.append(best_rule)
            print(best_rule)
            # Recursive calls to explore with and without the best new rule added
            self.find_rules_recursive(X, y, chosen_rules, best_rule, rule_filter, current_best_score)
            neg_rule = negate_last_rule(best_rule)
            neg_score = self.evaluator.evaluate(neg_rule, X, y)
            if rule_filter.apply(neg_score):
                chosen_rules.append(best_rule)
                self.find_rules_recursive(X, y, chosen_rules, neg_rule, rule_filter, neg_score.WoE)
        
        return chosen_rules

    def find_rules(self, X, y, rule_filter):
        X_encoded = self.encoder.fit_transform(X)
        X_preprocessed = self.discretizer.fit_transform(X_encoded)
        
        self.all_rules = self.rule_generator.generate_all_rules(X_preprocessed)
        
        chosen_rules = []
        applied_path = None
        
        self.final_rules = self.find_rules_recursive(X_encoded, y, chosen_rules, applied_path, rule_filter)

    def plot_rules_tree(self):
        for rule_set, score in self.rules:
            rule_descriptions = " and ".join([str(rule) for rule in rule_set])
            print(f"Rules: {rule_descriptions}, Score: {score}")

encoder = Encoder()
discretizer = PandasQCutDiscretizer()
rule_generator = RuleGenerator(discretizer)
evaluator = BinaryRuleEvaluator()
rule_filter = RuleFilter(min_recall=0.1, min_precision=0.2, min_WoE=0.01)

distinguisher = Distinguisher(encoder, discretizer, rule_generator, evaluator)
distinguisher.find_rules(X, y, rule_filter)
#distinguisher.plot_rules_tree(chosen_rules)


Path: age <= 47.0
Path: age <= 47.0 and salary <= 56531.954
Path: age <= 47.0 and salary <= 56531.954 and category > 1.0
Path: age <= 47.0 and salary <= 56531.954 and category > 1.0 and age > 35.0
Path: age <= 47.0 and salary <= 56531.954 and category <= 1.0 and salary <= 46120.581
Path: age <= 47.0 and salary <= 56531.954 and category <= 1.0 and salary <= 46120.581 and age <= 35.0
Path: age <= 47.0 and salary <= 56531.954 and category <= 1.0 and salary > 46120.581 and age > 35.0


In [391]:
distinguisher.final_rules

[Path: age <= 47.0,
 Path: age <= 47.0 and salary <= 56531.954,
 Path: age <= 47.0 and salary <= 56531.954 and category > 1.0,
 Path: age <= 47.0 and salary <= 56531.954 and category > 1.0 and age > 35.0,
 Path: age <= 47.0 and salary <= 56531.954 and category > 1.0,
 Path: age <= 47.0 and salary <= 56531.954 and category <= 1.0 and salary <= 46120.581,
 Path: age <= 47.0 and salary <= 56531.954 and category <= 1.0 and salary <= 46120.581 and age <= 35.0,
 Path: age <= 47.0 and salary <= 56531.954 and category <= 1.0 and salary <= 46120.581 and age <= 35.0,
 Path: age <= 47.0 and salary <= 56531.954 and category <= 1.0 and salary <= 46120.581,
 Path: age <= 47.0 and salary <= 56531.954 and category <= 1.0 and salary > 46120.581 and age > 35.0]

In [332]:
class Rule:
    def __init__(self, feature, operator, value, score=0):
        self.feature = feature
        self.operator = operator
        self.value = value
        self.score = score

    def get_mask(self, X):
        if self.operator in ['isna', 'notnull']:
            if self.operator == 'isna':
                return X[self.feature].isna()
            else:
                return X[self.feature].notnull()
        else:
            if pd.api.types.is_categorical_dtype(X[self.feature]):
                # Convert categorical comparison to work with category codes
                cat_value = X[self.feature].cat.categories.get_loc(self.value)
                return eval(f"X['{self.feature}'].cat.codes {self.operator} {cat_value}")
            else:
                return eval(f"X['{self.feature}'] {self.operator} {self.value}")

    def negate_rule(self):
        negation_map = {'<=': '>', '>=': '<', '<': '>=', '>': '<=', '==': '!=', '!=': '==', 'isna': 'notnull', 'notnull': 'isna'}
        new_operator = negation_map[self.operator]
        return Rule(self.feature, new_operator, self.value)

    def __repr__(self):
        return f"{self.feature} {self.operator} {self.value}"

In [333]:
class Path:
    def __init__(self, rules=None):
        self.rules = rules if rules is not None else []

    def get_mask(self, X):
        mask = np.ones(len(X), dtype=bool)
        for rule in self.rules:
            mask &= rule.get_mask(X)
        return mask

    def get_path_rule(self):
        return " and ".join([str(rule) for rule in self.rules])

    def __repr__(self):
        return "Path: " + " and ".join([str(rule) for rule in self.rules])

In [334]:
import numpy as np
import pandas as pd

class BinaryRuleScore:
    def __init__(self, recall, precision, WoE):
        self.recall = recall
        self.precision = precision
        self.WoE = WoE

    def __repr__(self):
        return f"BinaryRuleScore (Recall: {self.recall}, Precision: {self.precision}, WoE: {self.WoE})"

class BinaryRuleEvaluator:
    def __init__(self):
        pass

    def evaluate(self, rule, X, y):
        # Evaluating a rule assumes that rule.get_mask(X) is a method provided within the rule object
        mask = rule.get_mask(X)
        
        true_positives = y[mask].sum()
        total_positives = y.sum()
        predicted_positives = mask.sum()
        total_cases = len(y)

        recall = true_positives / total_positives if total_positives != 0 else 0
        precision = true_positives / predicted_positives if predicted_positives != 0 else 0

        total_negatives = total_cases - total_positives
        false_positives = predicted_positives - true_positives
        WoE = np.log((true_positives / total_positives) / (false_positives / total_negatives)) if false_positives and total_negatives else float('-inf')

        return BinaryRuleScore(recall, precision, WoE)

class RuleFilter:
    def __init__(self, min_recall, min_precision, min_WoE):
        self.min_recall = min_recall
        self.min_precision = min_precision
        self.min_WoE = min_WoE

    def apply(self, rule_score):
        # Filter based on the provided minima for recall, precision, and WoE
        return (rule_score.recall >= self.min_recall and
                rule_score.precision >= self.min_precision and
                rule_score.WoE >= self.min_WoE)

In [329]:
class Distinguisher:
    def __init__(self, preprocessor, rule_generator, evaluator):
        self.preprocessor = preprocessor
        self.rule_generator = rule_generator
        self.evaluator = evaluator
        self.rules = []

    def find_rules_recursive(self, X, y, chosen_rules, applied_rules, rule_filter):
        mask = np.ones(len(X), dtype=bool)
        for rule in applied_rules:
            mask &= rule.get_mask(X)
        X_sub = X[mask]
        y_sub = y[mask]

        if len(X_sub) == 0:
            chosen_rules.append((applied_rules, None))
            return chosen_rules

        rules = self.rule_generator.generate_all_rules(X_sub)
        if not rules:
            chosen_rules.append((applied_rules, None))
            return chosen_rules

        evaluated_rules = [(rule, self.evaluator.evaluate(rule, X_sub, y_sub)) for rule in rules]

        best_rule = max(evaluated_rules, key=lambda x: x[1].WoE)  # Use WoE as the primary score metric

        if rule_filter.apply(best_rule[1]):
            new_applied_rules = applied_rules + [best_rule[0]]
            self.find_rules_recursive(X, y, chosen_rules, new_applied_rules, rule_filter)
            self.find_rules_recursive(X, y, chosen_rules, new_applied_rules + [best_rule[0].negate_rule()], rule_filter)
        else:
            chosen_rules.append((applied_rules, best_rule[1]))

        return chosen_rules

    def find_rules(self, X, y, rule_filter):
        chosen_rules = []
        applied_rules = []
        final_rules = self.find_rules_recursive(X, y, chosen_rules, applied_rules, rule_filter)
        self.rules = [rule for rule in final_rules if rule_filter.apply(rule[1])]
        return self.rules

    def plot_rules_tree(self):
        for rule_set, score in self.rules:
            rule_descriptions = " and ".join([str(rule) for rule in rule_set])
            print(f"Rules: {rule_descriptions}, Score: {score}")

In [331]:
def main():
    data = {
        'age': np.random.randint(20, 60, 100),
        'salary': np.random.normal(50000, 12000, 100),
        'category': ['group' + str(i % 3) for i in range(100)],
        'target': np.random.randint(0, 2, 100)  # Binary target variable
    }
    X = pd.DataFrame(data)
    y = X['target']
    X['category'] = X['category'].astype('category')
    X.drop('target', axis=1, inplace=True)

    encoder = Encoder()
    discretizer = PandasQCutDiscretizer()
    evaluator = BinaryRuleEvaluator()
    filter = RuleFilter(min_recall=0.3, min_precision=0.2, min_WoE=0.01)

    X_encoded = encoder.fit_transform(X)
    X_preprocessed = discretizer.fit_transform(X_encoded)

    rule_generator = RuleGenerator(discretizer, encoder)
    distinguisher = Distinguisher(discretizer, rule_generator, evaluator)
    rules = distinguisher.find_rules(X_preprocessed, y, filter)
    distinguisher.plot_rules_tree()

if __name__ == "__main__":
    main()

AttributeError: 'str' object has no attribute 'get_mask'

In [57]:
import pandas as pd
from sklearn.metrics import recall_score, precision_score

def flip_rule(rule, X, y):
    # More robust rule parsing
    for op in ["<=", ">=", ">", "<"]:  # Search for these operators
        if op in rule:
            parts = rule.split(op)
            col = parts[0].strip()
            value = float(parts[1].strip())
            break
    else:
        return None  # If no operator is found, return None

    # Map operators to their flipped counterparts
    operator_flips = {"<=": ">", ">=": "<", ">": "<=", "<": ">="}
    new_operator = operator_flips.get(op)
    if new_operator is None:
        return None  # If the operator is not recognized, return None

    # Generate the new mask using the flipped operator
    if new_operator == ">":
        mask = X[col] > value
    elif new_operator == ">=":
        mask = X[col] >= value
    elif new_operator == "<":
        mask = X[col] < value
    elif new_operator == "<=":
        mask = X[col] <= value

    # Calculate metrics
    y_pred = mask.astype(int)
    recall = recall_score(y, y_pred, zero_division=0)
    precision = precision_score(y, y_pred, zero_division=0)

    # Construct the new rule description
    new_rule = f"{col} {new_operator} {value}"
    
    return new_rule, recall, precision

# # Example of how to use this function:
# best_rule = ('Lot Area <= 8923.0', -2.180593997108389, 0.17438692098092642, 0.10150674068199841)
# X = pd.DataFrame({'Lot Area': [9000, 8500, 5000]})  # Example feature data
# y = pd.Series([1, 0, 1])  # Example labels

if best_rule[1] < 0:
    result = flip_rule(best_rule[0], X, y)
    if result:
        new_rule, new_recall, new_precision = result
        new_score = abs(best_rule[1])  # Take the absolute value of the original score
        print(f"New Best Rule: ({new_rule}, {new_score}, {new_recall}, {new_precision})")
    else:
        print("Failed to flip the rule due to invalid format or operator.")
else:
    print("Best rule does not require flipping:", best_rule)

TypeError: Invalid comparison between dtype=category and float

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import KBinsDiscretizer
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from math import log

class Distinguisher:
    def __init__(self, n_bins=5, bin_strategy="quantile", min_recall=0.05, min_precision=0.0, tol=0.01, beam_width=1):
        self.n_bins = n_bins
        self.bin_strategy = bin_strategy
        self.min_recall = min_recall
        self.min_precision = min_precision
        self.tol = tol
        self.beam_width = beam_width

    def fit_transform(self, X):
        self.columns = X.columns
        self.discretizers = {}
        X_filled = X.fillna({col: f"NA_{col}" for col in X.columns})
        for col in X_filled.columns:
            if X_filled[col].dtype == "object" or X_filled[col].dtype.name == "category":
                X_filled[col] = X_filled[col].astype(str)
                self.discretizers[col] = None
            else:
                disc = KBinsDiscretizer(n_bins=self.n_bins, encode="ordinal", strategy=self.bin_strategy)
                self.discretizers[col] = disc.fit(X_filled[[col]])
        return self.transform(X_filled)

    def transform(self, X):
        X_transformed = pd.DataFrame(index=X.index)
        X_filled = X.fillna({col: f"NA_{col}" for col in X.columns})
        for col in X_filled.columns:
            if self.discretizers[col] is None:
                X_transformed[col] = X_filled[col]
            else:
                transformed = self.discretizers[col].transform(X_filled[[col]])
                X_transformed[col] = transformed.flatten()
        return X_transformed

    def compute_WoE(self, y):
        p_total = np.sum(y == 1)
        n_total = np.sum(y == 0)
        if p_total == 0 or n_total == 0:
            return 0  # Avoid division by zero
        return log((p_total / (p_total + n_total)) / (n_total / (p_total + n_total)))

    def find_best_rule(self, X, y, min_samples, best_score):
        best_rule = None
        for col in X.columns:
            values = np.unique(X[col])
            thresholds = np.concatenate([values, [max(values) + 1]])
            for val in thresholds:
                for operator in ["<=", ">"]:
                    rule = f"`{col}` {operator} {val}"
                    mask = X.eval(rule)
                    if mask.sum() <= min_samples:
                        continue
                    score = self.compute_WoE(y[mask])
                    if score > best_score:
                        best_score = score
                        best_rule = rule
        return best_rule, best_score

    def get_opposite_rule(self, rule):
        # Assumes rules are of the form '`column` <= value' or '`column` > value'
        if '<=' in rule:
            column, value = rule.split(' <= ')
            return f'{column} > {value}'
        elif '>' in rule:
            column, value = rule.split(' > ')
            return f'{column} <= {value}'
        return None
    
    def generate_rules(self, X, y, current_rule='', rules=[], applied_rules=set(), depth=0, baseline_woe=None):
        if depth > 5 or not X.size:
            return rules
        if baseline_woe is None:
            baseline_woe = self.compute_WoE(y)  # Calculate baseline WoE for the entire dataset at the start
        
        min_samples = int(len(X) * self.min_recall)
        rule, score = self.find_best_rule(X, y, min_samples, baseline_woe)
        rule_mask = X.eval(rule)
        if rule is None or rule in applied_rules:  # Ensure rule is valid and impactful
            return rules

        applied_rules.add(rule)  # Mark this rule as applied
        recall = y[rule_mask].sum() / y.sum()
        precision = y[rule_mask].sum() / rule_mask.sum()
        rule_woe = self.compute_WoE(y[rule_mask])

        if recall >= self.min_recall and precision >= self.min_precision and rule_woe > baseline_woe:
            new_rule = f"({current_rule} & {rule})" if current_rule else rule
            rules.append([new_rule, {'recall': recall, 'precision': precision, 'WoE': rule_woe}])
            # Recurse on both partitions with the updated baseline WoE
            self.generate_rules(X[rule_mask], y[rule_mask], new_rule, rules, applied_rules, depth + 1)
            self.generate_rules(X[~rule_mask], y[~rule_mask], self.get_opposite_rule(rule), rules, applied_rules, depth + 1)

        return rules

    def get_opposite_rule(self, rule):
        # Assumes rules are of the form 'column <= value' or 'column > value'
        if '<=' in rule:
            column, value = rule.split(' <= ')
            return f'{column.strip()} > {value.strip()}'
        elif '>' in rule:
            column, value = rule.split(' > ')
            return f'{column.strip()} <= {value.strip()}'
        return None

# Usage of the class
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
y = (iris.target == 1).astype(int)  # Binary classification: 1 if 'setosa', 0 otherwise
X.columns = [col.replace(" (cm)", "").replace(" ", "_") for col in X.columns]
df = X.copy()
df["target"] = y.copy()

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
distinguisher = Distinguisher(n_bins=5, min_recall=0.01, bin_strategy="quantile")
X_train_transformed = distinguisher.fit_transform(X_train)
rules = distinguisher.generate_rules(X_train, y_train)
print("Generated Rules:")
for rule in rules:
    print(rule)

Generated Rules:
['`sepal_width` <= 2.4', {'recall': 0.21621621621621623, 'precision': 0.8, 'WoE': 1.3862943611198906}]
['(`sepal_width` <= 2.4 & `sepal_length` > 4.5)', {'recall': 1.0, 'precision': 0.8888888888888888, 'WoE': 2.0794415416798357}]
['(`sepal_width` > 2.4 & `sepal_width` <= 2.6)', {'recall': 0.20689655172413793, 'precision': 0.6666666666666666, 'WoE': 0.6931471805599453}]
['((`sepal_width` > 2.4 & `sepal_width` <= 2.6) & `petal_length` <= 4.9)', {'recall': 1.0, 'precision': 0.8571428571428571, 'WoE': 1.791759469228055}]
['(`sepal_width` > 2.6 & `sepal_width` <= 2.7)', {'recall': 0.17391304347826086, 'precision': 0.5714285714285714, 'WoE': 0.28768207245178085}]
['((`sepal_width` > 2.6 & `sepal_width` <= 2.7) & `sepal_length` <= 6.0)', {'recall': 1.0, 'precision': 0.8, 'WoE': 1.3862943611198906}]


In [23]:
transformed_data.dtypes

Order             float64
PID               float64
MS SubClass       float64
MS Zoning         float64
Lot Frontage      float64
                   ...   
Misc Val          float64
Mo Sold           float64
Yr Sold           float64
Sale Type         float64
Sale Condition    float64
Length: 81, dtype: object

In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import KBinsDiscretizer
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris
from math import log

class Distinguisher:
    def __init__(self, n_bins=5, bin_strategy="quantile", min_recall=0.05, min_precision=0.0, tol=0.01, beam_width=1):
        self.n_bins = n_bins
        self.bin_strategy = bin_strategy
        self.min_recall = min_recall
        self.min_precision = min_precision
        self.tol = tol
        self.beam_width = beam_width

    def fit_transform(self, X):
        self.columns = X.columns
        self.discretizers = {}
        X_filled = X.fillna({col: f"NA_{col}" for col in X.columns})
        for col in X_filled.columns:
            if X_filled[col].dtype == "object" or X_filled[col].dtype.name == "category":
                X_filled[col] = X_filled[col].astype(str)
                self.discretizers[col] = None
            else:
                disc = KBinsDiscretizer(n_bins=self.n_bins, encode="ordinal", strategy=self.bin_strategy)
                self.discretizers[col] = disc.fit(X_filled[[col]])
        return self.transform(X_filled)

    def transform(self, X):
        X_transformed = pd.DataFrame(index=X.index)
        X_filled = X.fillna({col: f"NA_{col}" for col in X.columns})
        for col in X_filled.columns:
            if self.discretizers[col] is None:
                X_transformed[col] = X_filled[col]
            else:
                transformed = self.discretizers[col].transform(X_filled[[col]])
                X_transformed[col] = transformed.flatten()
        return X_transformed

    def compute_WoE(self, y):
        p_total = np.sum(y == 1)
        n_total = np.sum(y == 0)
        if p_total == 0 or n_total == 0:
            return 0  # Avoid division by zero
        return log((p_total / (p_total + n_total)) / (n_total / (p_total + n_total)))

    def find_best_rule(self, X, y, min_samples, best_score):
        best_rule = None
        for col in X.columns:
            values = np.unique(X[col])
            thresholds = np.concatenate([values, [max(values) + 1]])
            for val in thresholds:
                for operator in ["<=", ">"]:
                    rule = f"`{col}` {operator} {val}"
                    mask = X.eval(rule)
                    if mask.sum() <= min_samples:
                        continue
                    score = self.compute_WoE(y[mask])
                    if score > best_score:
                        best_score = score
                        best_rule = rule
        return best_rule, best_score

    def get_opposite_rule(self, rule):
        # Assumes rules are of the form '`column` <= value' or '`column` > value'
        if '<=' in rule:
            column, value = rule.split(' <= ')
            return f'{column} > {value}'
        elif '>' in rule:
            column, value = rule.split(' > ')
            return f'{column} <= {value}'
        return None
    
    def generate_rules(self, X, y, current_rule='', rules=[], applied_rules=set(), depth=0, baseline_woe=None):
        if depth > 5 or not X.size:
            return rules
        if baseline_woe is None:
            baseline_woe = self.compute_WoE(y)  # Calculate baseline WoE for the entire dataset at the start
        
        min_samples = int(len(X) * self.min_recall)
        rule, score = self.find_best_rule(X, y, min_samples, baseline_woe)
        rule_mask = X.eval(rule)
        if rule is None or rule in applied_rules:  # Ensure rule is valid and impactful
            return rules

        applied_rules.add(rule)  # Mark this rule as applied
        recall = y[rule_mask].sum() / y.sum()
        precision = y[rule_mask].sum() / rule_mask.sum()
        rule_woe = self.compute_WoE(y[rule_mask])

        if recall >= self.min_recall and precision >= self.min_precision and rule_woe > baseline_woe:
            new_rule = f"({current_rule} & {rule})" if current_rule else rule
            rules.append([new_rule, {'recall': recall, 'precision': precision, 'WoE': rule_woe}])
            # Recurse on both partitions with the updated baseline WoE
            self.generate_rules(X[rule_mask], y[rule_mask], new_rule, rules, applied_rules, depth + 1)
            self.generate_rules(X[~rule_mask], y[~rule_mask], self.get_opposite_rule(rule), rules, applied_rules, depth + 1)

        return rules

    def get_opposite_rule(self, rule):
        # Assumes rules are of the form 'column <= value' or 'column > value'
        if '<=' in rule:
            column, value = rule.split(' <= ')
            return f'{column.strip()} > {value.strip()}'
        elif '>' in rule:
            column, value = rule.split(' > ')
            return f'{column.strip()} <= {value.strip()}'
        return None

# Usage of the class
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
y = (iris.target == 1).astype(int)  # Binary classification: 1 if 'setosa', 0 otherwise
X.columns = [col.replace(" (cm)", "").replace(" ", "_") for col in X.columns]
df = X.copy()
df["target"] = y.copy()

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
distinguisher = Distinguisher(n_bins=5, min_recall=0.01, bin_strategy="quantile")
X_train_transformed = distinguisher.fit_transform(X_train)
rules = distinguisher.generate_rules(X_train, y_train)
print("Generated Rules:")
for rule in rules:
    print(rule)

Generated Rules:
['`sepal_width` <= 2.4', {'recall': 0.21621621621621623, 'precision': 0.8, 'WoE': 1.3862943611198906}]
['(`sepal_width` <= 2.4 & `sepal_length` > 4.5)', {'recall': 1.0, 'precision': 0.8888888888888888, 'WoE': 2.0794415416798357}]
['(`sepal_width` > 2.4 & `sepal_width` <= 2.6)', {'recall': 0.20689655172413793, 'precision': 0.6666666666666666, 'WoE': 0.6931471805599453}]
['((`sepal_width` > 2.4 & `sepal_width` <= 2.6) & `petal_length` <= 4.9)', {'recall': 1.0, 'precision': 0.8571428571428571, 'WoE': 1.791759469228055}]
['(`sepal_width` > 2.6 & `sepal_width` <= 2.7)', {'recall': 0.17391304347826086, 'precision': 0.5714285714285714, 'WoE': 0.28768207245178085}]
['((`sepal_width` > 2.6 & `sepal_width` <= 2.7) & `sepal_length` <= 6.0)', {'recall': 1.0, 'precision': 0.8, 'WoE': 1.3862943611198906}]
