In [1]:
%matplotlib inline
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn import preprocessing

In [2]:
from sklearn.exceptions import UndefinedMetricWarning
import warnings

warnings.filterwarnings(action='ignore', category=UndefinedMetricWarning)

In [3]:
data_ = pd.read_csv('datasets/diamonds.csv')
data_.head()

Unnamed: 0.1,Unnamed: 0,carat,cut,color,clarity,depth,table,price,x,y,z
0,1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
1,2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
2,3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
3,4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
4,5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75


In [4]:
data = data_.copy()

In [5]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 53940 entries, 0 to 53939
Data columns (total 11 columns):
Unnamed: 0    53940 non-null int64
carat         53940 non-null float64
cut           53940 non-null object
color         53940 non-null object
clarity       53940 non-null object
depth         53940 non-null float64
table         53940 non-null float64
price         53940 non-null int64
x             53940 non-null float64
y             53940 non-null float64
z             53940 non-null float64
dtypes: float64(6), int64(2), object(3)
memory usage: 4.5+ MB


In [6]:
data_ = data_.drop(['Unnamed: 0', 'clarity', 'color'], axis=1)

In [7]:
from sklearn import model_selection

array = data.values
Y = data_.pop('cut').values
X = data_.values
validation_size = 0.20
seed = 7
X_train, X_validation, Y_train, Y_validation = model_selection.train_test_split(X, Y, test_size=validation_size, random_state=seed)

from sklearn import preprocessing
le = preprocessing.LabelEncoder()
Y_train = le.fit_transform(Y_train)
Y_validation = le.fit_transform(Y_validation)

In [8]:
X_train

array([[ 2.07, 61.8 , 61.  , ...,  8.12,  8.16,  5.03],
       [ 0.62, 62.2 , 54.  , ...,  5.46,  5.51,  3.41],
       [ 0.97, 62.9 , 57.  , ...,  6.28,  6.31,  3.96],
       ...,
       [ 0.41, 60.7 , 59.  , ...,  4.79,  4.77,  2.9 ],
       [ 1.05, 62.3 , 56.  , ...,  6.47,  6.54,  4.05],
       [ 0.7 , 62.4 , 57.  , ...,  5.64,  5.7 ,  3.54]])

In [9]:
from sympy.logic import simplify_logic
from sympy.abc import x, y, z
from sympy import *
from sympy.logic.boolalg import *
from sympy.logic.inference import satisfiable

from pyeda.inter import *
from itertools import product
from pypred import OptimizedPredicateSet, PredicateSet, Predicate
from collections import OrderedDict

import uuid
import operator

import graphviz
from sklearn import tree

def viz_tree(clf, class_names, feature_names):
    tree_ = tree.export_graphviz(clf, out_file = None, filled=True, rounded=True, class_names=class_names, feature_names=feature_names)
    graph = graphviz.Source(tree_)
    return graph


def get_truth(inp, relate, cut):
    ops = {'>': operator.gt,
           '<': operator.lt,
           '>=': operator.ge,
           '<=': operator.le,
           '=': operator.eq}
    return ops[relate](inp, cut)


def get_leaf_class_name(class_names, node_values):
    max_value_index = 0
    max_value = 0
    for c, value in enumerate(node_values[0]):
        if value != 0 and value > max_value:
            max_value_index = c
            max_value = value
    
    return class_names[max_value_index]

        
def get_node_value(node_values):
    max_ = 0
    for c, value in enumerate(node_values[0]):
        if value > max_:
            max_ = value
            
    return max_
        
        
def get_paths_for(class_name, tree, cur_index, lst, paths, sign):
    if cur_index >= len(tree.children_left):
        return
    
    if len(lst) > 0 and cur_index != -1:
        lst[len(lst) - 1] = (lst[len(lst) - 1][0], lst[len(lst) - 1][1], sign, lst[len(lst) - 1][3])
        
    lst.append((cur_index, tree.feature[cur_index], sign, round(tree.threshold[cur_index], 2)))
    if cur_index != -1:
        get_paths_for(class_name, tree, tree.children_left[cur_index], lst, paths, '<=')
        get_paths_for(class_name, tree, tree.children_right[cur_index], lst, paths, '>')
    else:
        if get_leaf_class_name(data['cut'].unique(), tree.value[list(lst[-2])[0]]) == class_name:
            paths.add(tuple(lst[:-1]))
        
    lst.pop()
    
    
def get_feature_value_pairs(tree, paths):
    
    pairs = []
    for path in paths:
        d = []
        features = tree.feature
        thresholds = tree.threshold

        for i, p in enumerate(list(path[:-1])):
            d.append((i, features[i], thresholds[i]))

        pairs.append(d)
        
    return pairs


def rule_predict(class_name, rule):
    pred = []
    
    for row in X_validation:
        
        valid = True
        for condition in rule[:-1]:
            condition = list(condition)
            
            if not get_truth(row[condition[1]], condition[2], condition[3]):
                valid = False
            
        if valid:
            pred.append(class_name)
        else:
            pred.append(-1)
            
    return pred


# Function which computes rule accuracy based. It makes a classification report and takes
# the precision value and returns it
def compute_rule_accuracy(class_name, rule):
    pred = rule_predict(class_name, rule)
    report = classification_report(pred, Y_validation, output_dict = True)
    
    return round(report[str(class_name)]['precision'], 2)


def prune_rule(class_name, rule):
    max_accuracy = 0
    final_rule = rule
    
    for i in range(len(rule[:-1])):
        accuracy = compute_rule_accuracy(class_name, rule[i:])
        
        if accuracy > max_accuracy:
            final_rule = rule[i:]
            max_accuracy = accuracy
           
    return (max_accuracy, tuple(final_rule))


def combine_rules(class_name, rf_model):
    rules = set()
    
    for estimator in rf_model.estimators_:
        get_paths_for(class_name, estimator.tree_, 0, [], rules, '<=')
        
    return rules


def get_class_index(class_name):
    for c, cls in enumerate(data['cut'].unique()):
        if cls == class_name:
            return c
        
        
def get_rules_accuracy(class_name, rf_model, removeRedundancy = False):
    rules = combine_rules(class_name, rf_model)
    
    if removeRedundancy:
        rules = redundancy_condition_removal(rules)
    
    rules = list(rules)
    
    final_rules = set()
    for rule in rules:
        rule_ = [compute_rule_accuracy(get_class_index(class_name), rule), rule]
        final_rules.add(tuple(rule_))
        
    return sorted(list(final_rules), reverse=True, key=lambda x: list(x)[0])
    
        
def get_pruned_rules_accuracy(class_name, rf_model):
    rules = list(combine_rules(class_name, rf_model))
    
    final_rules = set()
    for rule in rules:
        pruned = prune_rule(get_class_index(class_name), list(rule))
        final_rules.add(tuple(pruned))
    
    return sorted(list(final_rules), reverse=True, key=lambda x: list(x)[0])


def redundancy_condition_removal(rule_set):
    output = set()
    
    for rule in list(rule_set):
        
        rule = list(rule)
        for i, cond in enumerate(rule[:-1]):
            cond = list(cond)
            
            for cond2 in rule[i + 1:-1]:
                cond2 = list(cond2)
                    
                if cond[1] == cond2[1] and cond[2] == cond2[2]:
                    if cond[2] == '>':
                        cond[3] = (cond[3] if cond[3] > cond2[3] else cond2[3])
                    else:
                        cond[3] = (cond[3] if cond[3] < cond2[3] else cond2[3])
                        
                    # Assign changed rule to condition
                    rule[i] = tuple(cond)
                    
                    # Remove redundant rule from list
                    rule.remove(tuple(cond2))
                
        output.add(tuple(rule))
        
    return output


def create_big_or_rule(rule_set):
    output = ""
    for i, rule in enumerate(list(rule_set)):
        output += create_and_expression(rule)
        
        if len(rule_set) - 1 != i:
            output += " | "
            
    return output


def create_big_and_rule(rule_set):
    output = ""
    for i, rule in enumerate(list(rule_set)):
        output += create_and_expression(rule)
        
        if len(rule_set) - 1 != i:
            output += " & "
            
    return output


def combine_rule_using_and(rule_set):
    output = ""
    for i, rule in enumerate(list(rule_set)):
        output += "(" + rule + ")"
            
        if len(rule_set) - 1 != i:
            output += " & "
            
    return output


def combine_forest_rules(rf_model, class_name, r_dict, v_dict):
    final_rule = set()
    simplified_final_rule = set()
    for estimator in rf_model.estimators_:
    
        rules = set()
        get_paths_for(class_name, estimator.tree_, 0, [], rules, '<=')
        
        rules = redundancy_condition_removal(rules)
        tree_rule = sympify(create_big_or_rule(rules), evaluate=False)
        
        final_rule.add(str(tree_rule))
        simplified_final_rule.add(convert_to_function('&', expr(str(tree_rule)).to_cnf()))
        
        
    return final_rule, simplified_final_rule


def create_and_expression(rule):
    output = ""
    
    for i, condition in enumerate(list(rule)[:-1]):
        condition = list(condition)
        key = str(condition[1]) + str(condition[3])
        
        var = "A" + uuid.uuid4().hex[:6].upper()
        
        if key in rule_dict:
            if condition[2] != rule_dict[key][0]:
                output += "Not(" + rule_dict[key][1] + ")"
            else:
                output += rule_dict[key][1]
        else:
            expr = var + ' = ' + 'symbols(\'' + (str(condition[1]) + condition[2] + str(condition[3])) + '\')'
            exec(expr)
            
            rule_dict[key] = [condition[2], var]
            output += var
        
            # Also push to rule_dict where key is the variable name and value is the rule
            var_dict[var] = condition
        
        if len(list(rule)[:-1]) - 1 != i:
            output += " & "
        
    return "(" + output + ")"


def print_final_rule(simplified_rule):
    simplified_rule = str(simplified_rule).split()
    
    final_rule = ""
    for el in simplified_rule:
        if el in ('&', '|'):
            final_rule += " " + el + " "
        else:
            if "~" in el:
                el = el.replace("~", "")
                el_ = el
                
                lb_count = el_.count('(')
                if "(" in el_:
                    el_ = el_.replace("(", "")
                
                rb_count = el_.count(')')
                if ")" in el_:
                    el_ = el_.replace(")", "")
                
                var = var_dict[str(el_)].copy()[1:]
                if var[1] == '>':
                    var[1] = '<='
                else:
                    var[1] = '>'
                  
                if lb_count == 0 and rb_count == 0:
                    final_rule += str(tuple(var))
                elif lb_count != 0 and rb_count == 0:
                    final_rule += ("(" * lb_count) + str(tuple(var))
                elif lb_count == 0 and rb_count != 0:
                    final_rule += str(tuple(var)) + (")" * rb_count)
            
            elif "(" in el:
                b_count = el.count('(')
                
                el = el.replace("(", "")
                final_rule += ("(" * b_count) + str(tuple(var_dict[str(el)][1:]))
                
            elif ")" in el:
                b_count = el.count(')')
                
                el = el.replace(")", "")
                final_rule += str(tuple(var_dict[str(el)][1:])) + (")" * b_count)
            
            else:
                final_rule += str(tuple(var_dict[str(el)][1:]))
            
    return final_rule


def convert_to_function(sign, expr_):
    rule = ""
    for i, el in enumerate(expr_.xs):
        if "And" in str(el):
            rule += "(" + convert_to_function("&", el) + ")"
        elif "Or" in str(el):
            rule += "(" + convert_to_function("|", el) + ")"
        else:
            rule += str(el)
            
        if i != len(expr_.xs) - 1:
            rule += " " + sign + " "
            
    return rule


def get_valid_predicate(rule):
    rule = rule.replace("&", "and").replace("|", "or").replace("0, '", 'sepal_length_cm ').replace("1, '", 'sepal_width_cm ')
    rule = rule.replace("2, '", 'petal_length_cm ').replace("3, '", 'petal_width_cm ').replace("',", "")
    
    return rule
    
    
def predict_rule_class(rule_set):
    X = X_train
    Y = Y_train
    
    Z = predicate_rules_set_predict(rules_set, X_validation)
    
    print(accuracy_score(Y_validation, Z))
    print(classification_report(Z, Y_validation))
    
    
def compute_final_rule_accuracy(class_index, rule):
    X = X_validation
    Y = Y_validation
    
    # Classification report
    Z = compute_rule_accuracy(class_index, rule, X)

    report = classification_report(Z, Y, output_dict = True)
    print(report[str(class_index)])
    
    
def compute_rule_accuracy(class_name, rule, dataset):
    """This method allows you to evaluate the whole rule given by 'rule' parameter to 'class_name'
    Returns an array of evaluated rule
    """
    
    pred = []
    
    # Store dataframe attribute names
    df_columns = list(data.columns.values[:-1])
    
    # Create an ordered dictionary in order to keep the order when we take values for given columns
    d_ = OrderedDict()
    
    # Create and optimize predicate given by the parameter
    test = Predicate(rule)
    s = OptimizedPredicateSet([test])
    
    for row in dataset:
        
        # Create dictionary consisting of attribute names and corresponding row values for them
        for l, at in enumerate(df_columns):
            d_[at] = row[l]
        
        # Evaluate rule with created dictionary
        match = s.evaluate(dict(d_))
            
        # If the conditions from rule were met, append the class to final predictions array
        if len(match):
            pred.append(class_name)
        else:
            pred.append(-1)
            
    return pred


def attribute_rule_predict(class_name, rule, attr1, attr2, dataset):
    """Method used to get prediction only for two attributes given by 'attr1' and 'attr2' parameters
    This method is mainly used mainly for plotting purposes
    """ 
    
    # Create an empty predictions array
    pred = []
    
    # Create and optimize predicate given by the parameter
    test = Predicate(rule)
    s = OptimizedPredicateSet([test])
    
    # Store dataframe attribute names
    df_columns = list(data.columns.values[:-1])
    
    # Create dictionary in which we will create the predicate
    d_ = dict()
    
    for row in dataset:
        
        # Get the attribute names by indexes from df_columns.
        d_[df_columns[attr1]] = row[0]
        d_[df_columns[attr2]] = row[1]
    
        # Evaluate rule with created dictionary
        match = s.evaluate(d_)
            
        # If the conditions from rule were met, append the class to final predictions array
        if len(match):
            pred.append(class_name)
        else:
            pred.append(-1)
            
    return pred


def visualize_decision_boundaries(rule, attr1, attr2):
    """
    This function is used to draw decision boundaries for 2 attributes which will be used to predict the output
    """
    
    # Create a dataset from validation data having only columns specified by paramters
    X = X_validation[:, [attr1, attr2]]
    Y = Y_validation

    # Plotting decision regions
    x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
    y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1

    xx, yy = np.meshgrid(np.arange(x_min, x_max, 0.1), np.arange(y_min, y_max, 0.1))

    f, axarr = plt.subplots(figsize=(10, 8))

    # Create prediction
    ravel_ = np.c_[xx.ravel(), yy.ravel()]
    Z = np.array(attribute_rule_predict(1, rule, attr1, attr2, ravel_))
    Z = Z.reshape(xx.shape)
    
    # Plot predicted dots
    axarr.contourf(xx, yy, Z, alpha=0.5)
    axarr.scatter(X[:, 0], X[:, 1], c=Y, s=50, edgecolor='k')
    axarr.set_title("When random forest is very sure")

    plt.show()
    
    
def predicate_rules_set_predict(rules_set, dataset):
    
    pred = []
    
    df_columns = list(data.columns.values[:-1])
    d_ = OrderedDict()
    
    w_set = set()
    for r in rules_set:
        r = (r[0], OptimizedPredicateSet([Predicate(r[1])]))
        w_set.add(r)
        
    for i, row in enumerate(dataset):
        
        
        for l, at in enumerate(df_columns):
            d_[at] = row[l]
            
        
        j = 0
        for r in w_set:
            class_i = r[0]
            rule_p = r[1]

            match = rule_p.evaluate(dict(d_))

            if len(match):
                pred.append(class_i)
                break
        
            j += 1
            
        if j == len(w_set):
            pred.append(-1)
        
    return pred


def redundancy_condition_removal_(rule_set):
    output = set()
    
    for rule in list(rule_set):
        
        rule = list(rule)
        for i, cond in enumerate(rule):
            cond = list(cond)
            
            for cond2 in rule[i + 1:]:
                cond2 = list(cond2)
                    
                if cond[0] == cond2[0] and cond[1] == cond2[1]:
                    if cond[1] == '>':
                        cond[2] = (cond[2] if cond[2] > cond2[2] else cond2[2])
                    else:
                        cond[2] = (cond[2] if cond[2] < cond2[2] else cond2[2])
                        
                    # Assign changed rule to condition
                    rule[i] = tuple(cond)
                    
                    # Remove redundant rule from list
                    rule.remove(tuple(cond2))
                
        output.add(tuple(rule))
        
    return output


def split_rules(rule_str):
    redundant_rules_set = set()
    composed_rules_set = set()
    
    rule = ""
    stack = []
    
    for c in rule_str:
        
        if c in (' ', '&', '|') and len(stack) == 0:
            continue
            
        rule += c
        
        if c == "(":
            stack.append(c)
        
        if c == ")" and len(stack) == 1:
            stack.pop()
            
            if len(stack) == 0:
                try:
                    redundant_rules_set.add(eval(rule))
                except:
                    composed_rules_set.add(rule)
            
            rule = ""
            
        elif c == ')':
            stack.pop()
            
    return redundant_rules_set, composed_rules_set


def compose_final_rule(no_redundant_rules, composed_rules_set):
    final_rule = ""
    
    for i, rule in enumerate(no_redundant_rules):
        final_rule += str(rule) + " & "
            
    for i, rule in enumerate(composed_rules_set):
        final_rule += rule
        
        if len(composed_rules_set) - 1 != i:
            final_rule += " & "
        
    return final_rule

In [10]:
rule_dict = {}
var_dict = {}

rules, s_rules = combine_forest_rules(rfclf, "Fair", rule_dict, var_dict)
rules

NameError: name 'rfclf' is not defined

In [None]:
s_rules

In [None]:
s = combine_rule_using_and(s_rules)
s

In [None]:
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score

from sklearn.tree import DecisionTreeClassifier

In [None]:
clf = DecisionTreeClassifier(max_depth=6)

clf.fit(X_train, Y_train)

In [None]:
pred = clf.predict(X_validation)

In [None]:
print(accuracy_score(Y_validation, pred))
print(classification_report(Y_validation, pred))

In [None]:
feature_names_array = data_.columns.values

In [None]:
viz_tree(clf, data['cut'].unique(), feature_names_array)

In [None]:
rules_set = set()
get_paths_for("Fair", clf.tree_, 0, [], rules_set, '<=')
rules_set

In [None]:
rules_set = redundancy_condition_removal(rules_set)
rules_set

In [None]:
from sklearn.ensemble import RandomForestClassifier

rfclf = RandomForestClassifier(n_estimators=5, max_depth=4)
rfclf.fit(X_train, Y_train)

pred = rfclf.predict(X_validation)

In [None]:
s_rules

In [None]:
s = combine_rule_using_and(s_rules)
s

In [None]:
s2 = expr(s).to_dnf()
s2