# Anchors

In [1]:
pip install anchor-exp

Collecting anchor-exp
  Downloading anchor_exp-0.0.2.0.tar.gz (427 kB)
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
Collecting spacy (from anchor-exp)
  Downloading spacy-3.8.5-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (27 kB)
Collecting spacy-legacy<3.1.0,>=3.0.11 (from spacy->anchor-exp)
  Downloading spacy_legacy-3.0.12-py2.py3-none-any.whl.metadata (2.8 kB)
Collecting spacy-loggers<2.0.0,>=1.0.0 (from spacy->anchor-exp)
  Downloading spacy_loggers-1.0.5-py3-none-any.whl.metadata (23 kB)
Collecting murmurhash<1.1.0,>=0.28.0 (from spacy->anchor-exp)
  Downloading murmurhash-1.0.12-cp311-cp311-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (2.1 kB)
Collecting cymem<2.1.0,>=2.0.2 (from spacy->anchor-exp)
  Downloading cymem-2.0.11-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metada

In [2]:
from __future__ import print_function
import numpy as np
np.random.seed(1)
import sys
import sklearn
import sklearn.ensemble
%load_ext autoreload
%autoreload 2
from anchor import utils
from anchor import anchor_tabular
import pandas as pd
import matplotlib.pyplot as plt

In [3]:
dataset_folder = 'datasets/dataset500.csv'
df = pd.read_csv(dataset_folder)

all_true_dataset = df[(df['req_0'] == 1) & (df['req_1'] == 1) & (df['req_2'] == 1) & (df['req_3'] == 1)]
all_true_dataset = all_true_dataset.drop(columns=['req_0','req_1', 'req_2', 'req_3'])
print(all_true_dataset.shape)

(12, 9)


In [4]:
df0 = df.drop(columns=['req_1', 'req_2', 'req_3'])
df0.to_csv('datasets/dataset500req0.csv', index=False)

df1 = df.drop(columns=['req_0', 'req_2', 'req_3'])
df1.to_csv('datasets/dataset500req1.csv', index=False)

df2 = df.drop(columns=['req_0', 'req_1', 'req_3'])
df2.to_csv('datasets/dataset500req2.csv', index=False)

df3 = df.drop(columns=['req_0', 'req_1', 'req_2'])
df3.to_csv('datasets/dataset500req3.csv', index=False)

In [5]:
datasets = []
dataframes = []
for i in range(4):
    dataset_folder = 'datasets/dataset500req' + str(i) + '.csv'
    datasets.append(utils.load_csv_dataset(dataset_folder, 9))

    c = sklearn.ensemble.RandomForestClassifier(n_estimators=50, n_jobs=5)
    c.fit(datasets[i].train, datasets[i].labels_train)
    print('Train', sklearn.metrics.accuracy_score(datasets[i].labels_train, c.predict(datasets[i].train)))
    print('Test', sklearn.metrics.accuracy_score(datasets[i].labels_test, c.predict(datasets[i].test)))

    explainer = anchor_tabular.AnchorTabularExplainer(
        datasets[i].class_names, #it maps the 0 and 1 in the dataset's requirements to the class names
        datasets[i].feature_names,
        datasets[i].train,
        datasets[i].categorical_names)
    
    names = []
    for j in range(all_true_dataset.shape[0]):
        
        #do a csv with a prediction for each sample
        #print(all_true_dataset.iloc[i].values.reshape(1, -1)[0])
        with open('datasets/anchorsReq' +str(i) +'.csv', 'a') as f:
            f.write('Prediction: %s\n' % (explainer.class_names[c.predict(all_true_dataset.iloc[j].values.reshape(1, -1))[0]]))
            exp = explainer.explain_instance(all_true_dataset.iloc[j].values.reshape(1, -1), c.predict, threshold=0.95)
            f.write('Anchor: %s\n' % (' AND '.join(exp.names())))
            #print('Anchor: %s' % (' AND '.join(exp.names())))
            f.write('Precision: %.2f\n' % exp.precision())
            f.write('Coverage: %.2f\n' % exp.coverage())
            f.write('\n')
            names.append(exp.names())
    
    dataframes.append(names)
    
    print(dataframes[i])




Train 1.0
Test 0.98
[["b'firm obstacle' = 1.0", "b'power' > 23.00", "b'cruise speed' <= 71.28", "b'image resolution' > 23.99", "b'smoke intensity' <= 49.24", "b'obstacle distance' <= 43.80"], ["b'firm obstacle' = 1.0", "b'image resolution' > 75.39", "b'cruise speed' <= 44.79", "b'power' > 23.00", "b'illuminance' > 51.56", "b'smoke intensity' <= 49.24"], ["b'firm obstacle' = 1.0", "b'power' > 23.00", "b'cruise speed' <= 44.79", "b'smoke intensity' <= 49.24", "b'image resolution' > 23.99"], ["b'firm obstacle' = 1.0", "b'cruise speed' <= 22.33", "b'power' > 23.00", "b'smoke intensity' <= 74.25", "b'image resolution' > 23.99"], ["b'firm obstacle' = 1.0", "b'image resolution' > 75.39", "b'cruise speed' <= 44.79", "b'power' > 23.00", "b'smoke intensity' <= 49.24", "b'illuminance' > 27.51"], ["b'firm obstacle' = 1.0", "b'smoke intensity' <= 23.26", "b'image resolution' > 51.33", "27.51 < b'illuminance' <= 75.57"], ["b'firm obstacle' = 1.0", "b'image resolution' > 75.39", "b'cruise speed' <= 4

It prints the rules that allow us to keep the sample's prediction, in this case False for requirement 2, and the precision and coverage with which these rules hold

The coverage tells us how much of the dataset we have explained with this rule.

In [6]:
lista = []
for i in range(df.shape[0]):
    dat = df.iloc[i]
    if((dat['firm obstacle'] == 1 and dat['power']>23.00 and dat['cruise speed']<=71.28 and dat['smoke intensity']<=49.24 and dat['image resolution']>23.99 and dat['obstacle distance']<=43.80)):
        lista.append(dat)
        print(dat['firm obstacle'], dat['power'], dat['cruise speed'], dat['smoke intensity'], dat['image resolution'], dat['obstacle distance'], dat['req_0'], dat['req_1'], dat['req_2'], dat['req_3'])

print(len(lista)-12)

1.0 69.0 22.7995 32.0775 47.5676 18.952 True False False True
1.0 72.0 18.0329 9.0105 88.1242 14.3371 True False True True
1.0 88.0 47.0146 11.4562 55.1652 28.0682 True False True True
1.0 72.0 61.9286 35.0041 80.1206 34.0424 True False False False
1.0 38.0 50.9071 45.0738 33.397 15.681 True True True True
1.0 30.0 35.8364 46.5315 49.6464 7.3371 True True True True
1.0 46.0 29.8661 45.8727 50.6223 17.2824 True False True True
1.0 97.0 12.8875 0.9936 49.3559 13.2518 True True True True
1.0 54.0 0.8053 6.5258 82.9626 12.8178 False False False False
1.0 38.0 0.5755 22.1285 61.4105 37.1102 True True False True
1.0 77.0 25.9089 31.7312 93.6499 22.3561 True True True True
1.0 31.0 43.2555 21.008 25.6881 34.9979 True False True True
1.0 80.0 38.3862 26.5053 75.4232 11.7324 True False False True
1.0 89.0 27.4022 15.124 76.5957 16.5379 True False False False
1.0 56.0 27.8734 13.3137 48.469 18.1208 True False False False
1.0 51.0 69.8011 15.0281 81.2153 15.2147 True False True True
1.0 89.0 67.3

In [19]:
dataframes[0][2][3]

"b'smoke intensity' <= 49.24"

In [22]:
def get_anchor(a):
    quoted_part = a.split("'")[1]
    rest = a.replace(f"'{quoted_part}'", '').replace("b", '').strip()

    return quoted_part, rest

In [33]:
import re

def parse_constraint(constraint):
    match = re.match(r"(>=|<=|>|<)\s*(-?\d+\.?\d*)", constraint.strip())
    if not match:
        raise ValueError(f"Invalid constraint format: {constraint}")
    op, val = match.groups()
    return op, float(val)

def to_interval(op, val):
    if op == ">":
        return (val, float("inf"), False, True)
    elif op == ">=":
        return (val, float("inf"), True, True)
    elif op == "<":
        return (float("-inf"), val, True, False)
    elif op == "<=":
        return (float("-inf"), val, True, True)

def intersect_constraints(c1, c2):
    op1, val1 = parse_constraint(c1)
    op2, val2 = parse_constraint(c2)

    l1, u1, incl_l1, incl_u1 = to_interval(op1, val1)
    l2, u2, incl_l2, incl_u2 = to_interval(op2, val2)

    # Lower bound
    if l1 > l2 or (l1 == l2 and not incl_l1):
        lower = l1
        lower_inclusive = incl_l1
    elif l2 > l1 or (l2 == l1 and not incl_l2):
        lower = l2
        lower_inclusive = incl_l2
    else:
        lower = l1
        lower_inclusive = incl_l1 and incl_l2

    # Upper bound
    if u1 < u2 or (u1 == u2 and not incl_u1):
        upper = u1
        upper_inclusive = incl_u1
    elif u2 < u1 or (u2 == u1 and not incl_u2):
        upper = u2
        upper_inclusive = incl_u2
    else:
        upper = u1
        upper_inclusive = incl_u1 and incl_u2

    if lower > upper or (lower == upper and not (lower_inclusive and upper_inclusive)):
        return "Empty intersection"

    # Convert back to constraint format
    constraints = []
    if lower != float("-inf"):
        constraints.append(f"{'>=' if lower_inclusive else '>'} {lower}")
    if upper != float("inf"):
        constraints.append(f"{'<=' if upper_inclusive else '<'} {upper}")
    
    return " AND ".join(constraints)

def union_constraints(c1, c2):
    op1, val1 = parse_constraint(c1)
    op2, val2 = parse_constraint(c2)

    l1, u1, incl_l1, incl_u1 = to_interval(op1, val1)
    l2, u2, incl_l2, incl_u2 = to_interval(op2, val2)

    # Order the intervals by lower bound
    if l1 > l2 or (l1 == l2 and incl_l2 and not incl_l1):
        l1, u1, incl_l1, incl_u1, l2, u2, incl_l2, incl_u2 = l2, u2, incl_l2, incl_u2, l1, u1, incl_l1, incl_u1

    # Check if intervals are overlapping or adjacent
    overlap = (
        u1 > l2 or
        (u1 == l2 and (incl_u1 or incl_l2))
    )

    if overlap:
        # Merge into a single interval
        lower = l1
        lower_inclusive = incl_l1
        if u1 > u2 or (u1 == u2 and incl_u1):
            upper = u1
            upper_inclusive = incl_u1
        else:
            upper = u2
            upper_inclusive = incl_u2

        constraints = []
        if lower != float("-inf"):
            constraints.append(f"{'>=' if lower_inclusive else '>'} {lower}")
        if upper != float("inf"):
            constraints.append(f"{'<=' if upper_inclusive else '<'} {upper}")
        return " AND ".join(constraints)
    
    # Disjoint: return union of intervals
    def format_interval(l, u, il, iu):
        parts = []
        if l != float("-inf"):
            parts.append(f"{'>=' if il else '>'} {l}")
        if u != float("inf"):
            parts.append(f"{'<=' if iu else '<'} {u}")
        return " AND ".join(parts)

    interval1 = format_interval(l1, u1, incl_l1, incl_u1)
    interval2 = format_interval(l2, u2, incl_l2, incl_u2)
    return f"({interval1}) OR ({interval2})"

In [34]:
dictionaries =[]
for i in range(len(dataframes[0])):
    dictionaries.append([{}, {}, {}, {}])
    for j in range(4):
        for k in range(len(dataframes[j][i])):
            quoted, rest = get_anchor(dataframes[j][i][k])
            dictionaries[i][j][quoted] = rest

In [38]:
featureNames = ['cruise speed','image resolution','illuminance','controls responsiveness','power',
     'smoke intensity','obstacle size','obstacle distance','firm obstacle']

In [39]:
and_dict = []
for i in range(len(dataframes[0])):
    and_dict.append({})
    for f in featureNames:
        for j in range(3):
            if(dictionaries[i][j].get(f) != None and dictionaries[i][j+1].get(f) != 'None'):
                and_dict[i][f] = intersect_constraints(dictionaries[i][j][f], dictionaries[i][j+1][f])
                

[[{'firm obstacle': '= 1.0', 'power': '> 23.00', 'cruise speed': '<= 71.28', 'image resolution': '> 23.99', 'smoke intensity': '<= 49.24', 'obstacle distance': '<= 43.80'}, {'cruise speed': '> 44.79'}, {'firm obstacle': '= 1.0', 'controls responsiveness': '> 74.46', 'cruise speed': '<= 71.28', 'image resolution': '> 23.99', 'illuminance': '> 27.51', 'smoke intensity': '<= 49.24', 'obstacle size': '27.78 <  <= 71.67', 'power': '23.00 <  <= 50.00'}, {'firm obstacle': '= 1.0', 'controls responsiveness': '> 74.46', 'illuminance': '> 27.51', 'image resolution': '> 23.99', 'cruise speed': '22.33 <  <= 71.28', 'obstacle size': '<= 71.67', 'smoke intensity': '23.26 <  <= 74.25', 'power': '<= 77.25'}], [{'firm obstacle': '= 1.0', 'image resolution': '> 75.39', 'cruise speed': '<= 44.79', 'power': '> 23.00', 'illuminance': '> 51.56', 'smoke intensity': '<= 49.24'}, {'illuminance': '51.56 <  <= 75.57', 'cruise speed': '<= 44.79', 'controls responsiveness': '> 43.98', 'power': '> 50.00', 'smoke in

ValueError: Invalid constraint format: 22.33 <  <= 71.28

In [40]:
print(dictionaries)


[[{'firm obstacle': '= 1.0', 'power': '> 23.00', 'cruise speed': '<= 71.28', 'image resolution': '> 23.99', 'smoke intensity': '<= 49.24', 'obstacle distance': '<= 43.80'}, {'cruise speed': '> 44.79'}, {'firm obstacle': '= 1.0', 'controls responsiveness': '> 74.46', 'cruise speed': '<= 71.28', 'image resolution': '> 23.99', 'illuminance': '> 27.51', 'smoke intensity': '<= 49.24', 'obstacle size': '27.78 <  <= 71.67', 'power': '23.00 <  <= 50.00'}, {'firm obstacle': '= 1.0', 'controls responsiveness': '> 74.46', 'illuminance': '> 27.51', 'image resolution': '> 23.99', 'cruise speed': '22.33 <  <= 71.28', 'obstacle size': '<= 71.67', 'smoke intensity': '23.26 <  <= 74.25', 'power': '<= 77.25'}], [{'firm obstacle': '= 1.0', 'image resolution': '> 75.39', 'cruise speed': '<= 44.79', 'power': '> 23.00', 'illuminance': '> 51.56', 'smoke intensity': '<= 49.24'}, {'illuminance': '51.56 <  <= 75.57', 'cruise speed': '<= 44.79', 'controls responsiveness': '> 43.98', 'power': '> 50.00', 'smoke in

In [7]:
# Get test examples where the anchora pplies
#fit_anchor = np.where(np.all(new_dataset.test[:, exp.features()] == new_dataset.test[idx][exp.features()], axis=1))[0]
#print('Anchor test precision: %.2f' % (np.mean(c.predict(new_dataset.test[fit_anchor]) == c.predict(new_dataset.test[idx].reshape(1, -1)))))
#print('Anchor test coverage: %.2f' % (fit_anchor.shape[0] / float(new_dataset.test.shape[0])))