# Notebook for developing code to go into structural_attack class

In [1]:
import sys
import os


# for development use local copy of aisdc in preference to installed version
sys.path.insert(0, os.path.abspath(".."))

In [2]:
import json

import numpy as np
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from sklearn.svm import SVC


from aisdc.attacks.structural_attack import (
    StructuralAttack,
)  # pylint: disable = import-error
from aisdc.attacks.target import Target  # pylint: disable = import-error

## helper function for test

In [3]:
def get_target(modeltype: str, **kwargs) -> Target:
    """loads dataset and creates target of the desired type"""

    X, y = load_breast_cancer(return_X_y=True, as_frame=False)
    train_X, test_X, train_y, test_y = train_test_split(X, y, test_size=0.3)

    # these types should be handled
    if modeltype == "dt":
        target_model = DecisionTreeClassifier(**kwargs)
    elif modeltype == "rf":
        target_model = RandomForestClassifier(**kwargs)
    elif modeltype == "xgb":
        target_model = XGBClassifier(**kwargs)
    # should get polite error but not DoF yet
    elif modeltype == "svc":
        target_model = SVC(**kwargs)
    else:
        raise NotImplementedError("model type passed to get_model unknown")

    # Train the classifier
    target_model.fit(train_X, train_y)

    #  Wrap the model and data in a Target object
    target = Target(model=target_model)
    target.add_processed_data(train_X, train_y, test_X, test_y)

    return target

In [9]:
import importlib
import aisdc.attacks.structural_attack

importlib.reload(aisdc.attacks.structural_attack)
from aisdc.attacks.structural_attack import StructuralAttack

In [10]:
def test_dt():
    """test for decision tree classifier"""

    print("\n\n\n======  Non Disclosive   ====\n\n")

    param_dict = {"max_depth": 1, "min_samples_leaf": 150}
    target = get_target("dt", **param_dict)
    target_path = target.save("dt.sav")
    myattack = StructuralAttack(target_path="dt.sav")
    myattack.attack(target)
    # assert myattack.DoF_risk ==0 ,"should be no DoF risk with devision stump"
    # assert myattack.k_anonymity_risk ==0, 'should be no k-anonymity risk with min_samples_leaf 150'
    # assert myattack.class_disclosure_risk ==0,'no class disclsoure risk for stump with min samles leaf 150'
    # assert myattack.unnecessary_risk ==0, 'not unnecessary risk if max_depth < 3.5'
    print(
        f"equiv_classes is {myattack.equiv_classes}\n"
        f"equiv_counts is {myattack.equiv_counts}\n"
        f"equiv_members is {myattack.equiv_members}\n"
    )

    print("\n\n\n======  Now Disclosive   ====\n\n")
    # highly disclosive
    param_dict = {"max_depth": None, "min_samples_leaf": 5, "min_samples_split": 2}
    target2 = get_target("dt", **param_dict)
    myattack2 = StructuralAttack()
    myattack2.attack(target2)
    # assert myattack2.DoF_risk ==0 ,"should be no DoF risk with decision stump"
    # assert myattack2.k_anonymity_risk ==1, 'should be  k-anonymity risk with unlimited depth and min_samples_leaf 5'
    # assert myattack2.class_disclosure_risk ==1,'should be class disclosure risk with unlimited depth and min_samples_leaf 5'
    # assert myattack2.unnecessary_risk ==1, ' unnecessary risk with unlimited depth and min_samples_leaf 5'
    # print(f' attack._get_param_names returns {myattack2._get_param_names()}')
    # print(f' attack.get_params returns {myattack2.get_params()}')

    print(
        f"equiv_classes is {myattack2.equiv_classes}\n"
        f"equiv_counts is {myattack2.equiv_counts}\n"
        f"equiv_members is {myattack2.equiv_members}\n"
    )

    # myattack.make_report()

In [11]:
test_dt()

INFO:acro:version: 0.4.2
INFO:acro:config: {'safe_threshold': 10, 'safe_dof_threshold': 10, 'safe_nk_n': 2, 'safe_nk_k': 0.9, 'safe_pratio_p': 0.1, 'check_missing_values': False}
INFO:acro:automatic suppression: False
INFO:structural_attack:Thresholds for count 10 and DoF 10
INFO:acro:version: 0.4.2
INFO:acro:config: {'safe_threshold': 10, 'safe_dof_threshold': 10, 'safe_nk_n': 2, 'safe_nk_k': 0.9, 'safe_pratio_p': 0.1, 'check_missing_values': False}
INFO:acro:automatic suppression: False
INFO:structural_attack:Thresholds for count 10 and DoF 10







ingroup [  0   2   5   7   9  10  11  12  14  15  17  19  20  22  24  26  28  29
  30  32  34  35  36  37  38  39  40  41  43  44  46  47  48  50  51  52
  53  54  57  58  61  63  64  65  66  69  71  72  73  76  78  81  82  85
  86  87  88  89  92  93  94  97  98 102 103 105 106 108 109 110 113 115
 116 117 118 121 123 125 126 128 130 131 132 134 135 137 138 139 140 141
 142 143 145 146 147 148 149 153 154 156 158 162 163 167 169 170 172 173
 176 178 180 181 182 183 184 186 187 188 192 193 195 196 197 198 199 201
 202 203 206 207 209 211 213 215 218 219 220 222 223 224 225 226 228 229
 231 233 235 237 238 240 241 242 245 246 247 248 250 252 254 256 258 259
 261 262 264 265 269 272 273 274 275 276 277 278 279 282 284 285 286 288
 289 294 299 300 303 304 307 308 309 311 312 314 315 316 317 318 319 321
 323 324 328 329 330 332 334 335 336 340 342 344 346 347 351 352 354 355
 357 358 360 362 365 366 367 371 373 374 375 377 379 380 381 384 386 388
 389 392 394 395 396 397],count 240
in

In [12]:
def test_rf():
    """test for decision tree classifier"""

    print("\n\n\n======  Non Disclosive   ====\n\n")

    param_dict = {"max_depth": 1, "min_samples_leaf": 150, "n_estimators": 5}
    target = get_target("rf", **param_dict)
    target_path = target.save("dt.sav")
    myattack = StructuralAttack(target_path="dt.sav")
    myattack.attack(target)
    # assert myattack.DoF_risk ==0 ,"should be no DoF risk with devision stump"
    # assert myattack.k_anonymity_risk ==0, 'should be no k-anonymity risk with min_samples_leaf 150'
    # assert myattack.class_disclosure_risk ==0,'no class disclsoure risk for stump with min samles leaf 150'
    # assert myattack.unnecessary_risk ==0, 'not unnecessary risk if max_depth < 3.5'
    print(
        f" {len(myattack.equiv_classes)} equiv_classes:\n{myattack.equiv_classes}\n"
        f"equiv_counts is {myattack.equiv_counts}\n"
        # f'equiv_members is {myattack.equiv_members}\n'
    )
    for i in range(len(myattack.equiv_members)):
        print(
            f" {len(myattack.equiv_members[i])} members for group {i}\n"
            f"{myattack.equiv_members[i]}"
        )

    print("\n\n\n======  Now Disclosive   ====\n\n")
    # highly disclosive
    param_dict = {
        "max_depth": None,
        "min_samples_leaf": 5,
        "min_samples_split": 2,
        "n_estimators": 5,
    }
    target2 = get_target("rf", **param_dict)
    myattack2 = StructuralAttack()
    myattack2.attack(target2)
    # assert myattack2.DoF_risk ==0 ,"should be no DoF risk with decision stump"
    # assert myattack2.k_anonymity_risk ==1, 'should be  k-anonymity risk with unlimited depth and min_samples_leaf 5'
    # assert myattack2.class_disclosure_risk ==1,'should be class disclosure risk with unlimited depth and min_samples_leaf 5'
    # assert myattack2.unnecessary_risk ==1, ' unnecessary risk with unlimited depth and min_samples_leaf 5'
    print(f" attack._get_param_names returns {myattack2._get_param_names()}")
    print(f" attack.get_params returns {myattack2.get_params()}")

    print(
        f" {len(myattack2.equiv_classes)} equiv_classes:\n{myattack2.equiv_classes}\n"
        f"equiv_counts is {myattack2.equiv_counts}\n"
        # f'equiv_members is {myattack2.equiv_members}\n'
    )
    for i in range(len(myattack2.equiv_members)):
        print(
            f" {len(myattack2.equiv_members[i])} members for group {i}\n"
            f"{myattack2.equiv_members[i]}"
        )

    # myattack.make_report()

In [13]:
test_rf()

INFO:acro:version: 0.4.2
INFO:acro:config: {'safe_threshold': 10, 'safe_dof_threshold': 10, 'safe_nk_n': 2, 'safe_nk_k': 0.9, 'safe_pratio_p': 0.1, 'check_missing_values': False}
INFO:acro:automatic suppression: False
INFO:structural_attack:Thresholds for count 10 and DoF 10
INFO:acro:version: 0.4.2
INFO:acro:config: {'safe_threshold': 10, 'safe_dof_threshold': 10, 'safe_nk_n': 2, 'safe_nk_k': 0.9, 'safe_pratio_p': 0.1, 'check_missing_values': False}
INFO:acro:automatic suppression: False
INFO:structural_attack:Thresholds for count 10 and DoF 10







 1 equiv_classes:
[[0.33919598 0.66080402]]
equiv_counts is [398]

 398 members for group 0
[  0   1   2   3   4   5   6   7   8   9  10  11  12  13  14  15  16  17
  18  19  20  21  22  23  24  25  26  27  28  29  30  31  32  33  34  35
  36  37  38  39  40  41  42  43  44  45  46  47  48  49  50  51  52  53
  54  55  56  57  58  59  60  61  62  63  64  65  66  67  68  69  70  71
  72  73  74  75  76  77  78  79  80  81  82  83  84  85  86  87  88  89
  90  91  92  93  94  95  96  97  98  99 100 101 102 103 104 105 106 107
 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143
 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179
 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197
 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215
 216 217 218 219 220 221 22

In [None]:
from acro import ACRO

acro = ACRO()

from scipy.io.arff import loadarff

path = os.path.join("../data", "nursery.arff")
data = loadarff(path)
df = pd.DataFrame(data[0])
df = df.select_dtypes([object])
df = df.stack().str.decode("utf-8").unstack()
df.rename(columns={"class": "recommend"}, inplace=True)
df.head()
df["children"].replace(to_replace={"more": "4"}, inplace=True)
df["children"] = pd.to_numeric(df["children"])

df["children"] = df.apply(
    lambda row: row["children"]
    if row["children"] in (1, 2, 3)
    else np.random.randint(4, 10),
    axis=1,
)

mytable = acro.crosstab(
    [data.survivor, data.year], data.grant_type, values=data.inc_grants, aggfunc="mean"
)

In [None]:
def get_whitebox_class_disclosure(yprobs:np.ndarray, 
                                  true_labels:np.array,
                                  threshold:int,
                                  ignore_zeros:Bool)->tuple[int,int]:
    """ 
    function that ingests the proba values created
    when a classifier is applied to a set of records
    and returns details of whitebox group membership     

    Parameters
    ----------
    yprobs: int
        numpy 2d array, one row per record, one column per output class
    true_labels: numpy 1Darray
        one element for each row in yprobs, giving the actual class label
    threshold :int
        minimum number of (non-zero) records of each class in each equivalence group
    ignore_zeros:Bool
        should the threshold checking ignore 'evidential zeros' i.e. unrepresented classes
    
    Returns
    --------
    tuple [int,int]: 
        model is whitebox class disclosive (1) or not (0)
        according to probability*membership tuple[0]
        or actual group member labels tuple[1]
        
    """
    n_classes = yprobs.shape[1]
    n_rows=yprobs.shape[0]
    assert len(true_labels)==n_rows, f"shape mismatch:lengths of yprobs {n_rows} and true_classes{len(true_classes)}"
 
    uniques = np.unique(yprobs,axis=0,return_counts=True)
    #groups are equivalance classes in predicted class probability space   
    uniq_probs=uniques[0]
    uniq_freqs=uniques[1]
    class_freqs= np.zeros( uniq_probs.shape,dtype=float)
    membership=[]

    #check disclosure according to proba values
    disclosive_by_freqs=1
    for group in range( len(uniq_probs)):
        class_freqs[group]= uniq_probs[group,:]*uniq_freqs[group]
        for label in range(n_classes):
            if class_freqs[group][label]== 0 and not ignore_zeros:
                disclosive_by_freqs = 1
            elif 0< class_freqs[group][label]< threshold :
                disclosive_by_freqs = 1
            else:
                pass
            
    #now according to the labels of records falling in to each group
    disclosive_by_labels=0
    for prob_vals in uniq_probs:
        ingroup = np.all(yprobs==prb_vals,axis=1)
    
    
def test_whitebox_class_disclosure():  
uprobs=uniques[0]
ufreqs=uniques[1]
class_freqs= np.zeros( uprobs.shape,dtype=float)
for group in range( len(uprobs)):
    class_freqs[group]= uprobs[group,:]*ufreqs[group]
    print(f'group {group} class_membership {class_freqs[group]}')
    errmsg=f'class sum {class_freqs[group].sum()} should equal group count {ufreqs[group]}'
    np.testing.assert_almost_equal( class_freqs[group].sum(),  ufreqs[group],0.001),errmsg
print(f'class freqs are:\n{class_freqs}')
    
    
uniqvals= [ [0.1,0.2,0.7],
          [0.6,0.4,0.0],
          [0.2,0.4,0.4]]


yprobs = np.zeros((20,3),dtype=float)
for i in range (20):
    randval = np.random.randint(0,3)
    yprobs[i] = np.array(uniqvals[randval])
#print( f'yprobs is \n{yprobs}')
sorted_probs = yprobs[np.lexsort(([yprobs[:, i] for i in range(yprobs.shape[1]-1, -1, -1)]))]
#print( f'sorted_probs is \n{sorted_probs}')
uniques = np.unique(sorted_probs,axis=0,return_counts=True)
print(f'np.uniq gives {len(uniques[0])}')        

uprobs=uniques[0]
ufreqs=uniques[1]
class_freqs= np.zeros( uprobs.shape,dtype=float)
for group in range( len(uprobs)):
    class_freqs[group]= uprobs[group,:]*ufreqs[group]
    print(f'group {group} class_membership {class_freqs[group]}')
    errmsg=f'class sum {class_freqs[group].sum()} should equal group count {ufreqs[group]}'
    np.testing.assert_almost_equal( class_freqs[group].sum(),  ufreqs[group],0.001),errmsg
print(f'class freqs are:\n{class_freqs}')

#class disclosure step 3:loop through all similarity groups
r_ends = []
group_first = 0
group_last= 0
possible_next=group_last+1
while possible_next<sorted_probs.shape[0]:  
    #get group of records with identical prob_a values
    while  (possible_next<sorted_probs.shape[0] and
          np.array_equal(sorted_probs[possible_next],sorted_probs[group_first])
        ):
        group_last +=1
        possible_next +=1
    print(f'group from {group_first} to {group_last} is {sorted_probs[group_first]}')
    r_ends.append(group_last)
    group_first= group_last+1
    group_last= group_first
    possible_next = group_first+1 
assert len(r_ends) == len(np.unique(sorted_probs,axis=0)), 'wrong number of groups found'
assert r_ends[-1] == sorted_probs.shape[0]-1,f'last group ends at {r_ends[-1]} should be {sorted_probs.shape[0]-1}'

In [None]:
a = [[0, 1], [0.5, 0.5], [1, 0]]

b = np.array(
    [[0, 1], [0.5, 0.5], [1, 0], [0, 1], [0.5, 0.5], [1, 0], [0, 1], [0.5, 0.5], [1, 0]]
)
blabels = np.array([10, 11, 12, 10, 14, 15, 16, 17, 18])
for prb_vals in a:
    print(prb_vals)
    matches = np.all(b == prb_vals, axis=1)
    print(f"matching elements {matches}")
    labs = blabels[matches]
    print(f"labels of matches{labs}")
    uniq_labs = np.unique(labs, return_counts=True)
    print(f"uniqs {uniq_labs}")

In [None]:
y = np.array([0, 1, 1, 1, 1, 0])
print(type(y), y.shape, len(y.shape))