In [1]:
import pandas as pd
# numpy 2.0 for bitwise count operations
import numpy as np
import operator
import itertools 
import re

Instructions:

1. Place the paths to the dataset you want to extract DCs from in "dataset".
2. Set the number of rows to use from the dataset. It must be a multiple of 8, but a power of 2 is preferable.
3. Remember that DCs work with tuple pairs. Both time and memory usage scale quadratically. Using 2000 (2¹¹) tuples usually works well.
4. This results in analyzing 4 million tuple pairs. In the paper, we used 16000 (2¹⁴) tuples, but it takes much longer as it needs to process 300 million tuple pairs.
*Note*:
Simply specifying the required dataset should be enough for it to work.

Classes to parse and represent Datasets and DCs as defined in the literature.

In [2]:
class Operator:
    def __init__(self,func,expFunc) -> None:
        self.func=func
        self.expFunc=expFunc
        self.neg=None
        self.imp=None
    def __call__(self,a,b):
        return self.func(a,b)
    def negate(self):
        return Operator(operator.invert(self.func))
    def expected(self,c1,c2):
        return self.expFunc(c1,c2)
    def __repr__(self) -> str:
        return revopmap[self]
    def __eq__(self, other: object) -> bool:
        if isinstance(other, Operator):
            return self.func==other.func
        return False
    def __hash__(self):
        fields=(self.func)
        hash_value = hash(fields)
        return hash_value

def eqExp(l,r):
    n=sum(l)**2
    return np.sum(l**2)/n
       

eq=Operator(operator.eq,eqExp)

def neExp(l,r):
    n=sum(l)
    return 1-np.sum(l**2)/n**2
ne=Operator(operator.ne,neExp)

def geExp(l,r):
    n=sum(l)
    cumFreq=np.cumsum(l)
    return np.sum(l*(cumFreq))/n**2
ge=Operator(operator.ge,geExp)

def leExp(l,r):
    n=sum(l)
    cumFreq=np.cumsum(l)
    return np.sum(l*(n-cumFreq+l))/n**2
le=Operator(operator.le,leExp)

def gtExp(l,r):
    n=sum(l)
    cumFreq=np.cumsum(l)
    return np.sum(l*(cumFreq-l))/n**2
gt=Operator(operator.gt,gtExp)

def ltExp(l,r):
    n=sum(l)
    cumFreq=np.cumsum(l)
    return np.sum(l*(n-cumFreq))/n**2
lt=Operator(operator.lt,ltExp)
operatorMap={
    "EQUAL":eq,
    "UNEQUAL":ne,
    "LESS_EQUAL":le,
    "GREATER_EQUAL":ge,
    "LESS":lt,
    "GREATER":gt
}

opmap={"==":eq,"<>":ne,">=":ge,"<=":le,">":gt,"<":lt}
revopmap={y:x for x,y in opmap.items()}

eq.neg=ne
ne.neg=eq
gt.neg=le
le.neg=gt
lt.neg=ge
ge.neg=lt

eq.imp=[ge,le,eq]
ne.imp=[ne]
gt.imp=[gt,ge,ne]
lt.imp=[lt,le,ne]
ge.imp=[ge]
le.imp=[le]


#wrong implications to avoid DCs of the sort > -> !=
eq.imp=[ge,le,eq]
ne.imp=[ne,lt,gt]
gt.imp=[gt,ge]
lt.imp=[lt,le]
ge.imp=[ge]
le.imp=[le]


class Predicate:
    def __init__(self,l:str,op:Operator,r:str) -> None:
        self.l=l
        self.r=r
        self.op=op
        self.exp=None
    def __repr__(self) -> str:
        return 't0.'+self.l +' '+self.op.__repr__()+' t1.'+self.r+''
    def __hash__(self):
        fields=(self.l,self.r)
        hash_value = hash(fields)
        return hash_value
    def __eq__(self, other):
        if isinstance(other, Predicate):
            sFields=(self.l,self.op,self.r)
            oFields=(other.l,other.op,other.r)
            return sFields==oFields
        return False
    def impliesPred(self,other):
        #True if predicate being false implies other being false
        return self.l==other.l and self.r==other.r and other.op.neg in self.op.neg.imp

class Dataset:
    def __init__(self,file,**args):
        self.columns=pd.read_csv(file,nrows=0).columns
        self.header=[re.match(r'([^\(\)]*)(?:\(| )([^\(\)]*)\)?',col) for col in self.columns]
        self.names=[match[1] for match in self.header]
        typeMap={'String':str,'Integer':float,'Int':float,'Double':float,'int':float,'str':str,'float':float}
        self.types={col:typeMap[match[2]] for col,match in zip(self.columns,self.header)}
        
        self.df=pd.read_csv(file,**args,dtype=self.types)
        for i,col in enumerate(self.columns):
            self.df[col]=self.df[col].astype(self.types[col])
        
    def randRows(self,n):
        ids=np.random.randint(0,len(self.df),n)
        return self.df.iloc[ids]
    def randFields(self,n):
        return pd.DataFrame({col:dfs[col].iloc[list(np.random.randint(0,len(dfs),n))].values for dfs in [self.df] for col in dfs.columns})

    def buildPLIs(self):
        self.PLI= {col:self.df.groupby(by=col).groups for col in self.df}
        self.PLILen={col:np.array([len(self.PLI[col][v])for v in self.PLI[col]]) for col in self.df}
        self.vals={col:np.array([v for v in self.PLI[col]]) for col in self.df}
    def shuffle(self):
        self.df=self.randFields(len(self.df))

    def buildPreds(self):
        self.preds=[]
        self.predMap={}
        self.colPreds=[]
        self.predCols=[]
        
        for col in self.columns:
            ops=[eq,ne] if self.types[col] ==str else [eq,ne,gt,ge,lt,le]
            self.colPreds.append([]) 
            for op in ops:
                pred=Predicate(col,op,col)
                self.predMap[(col,op,col)]=len(self.preds)
                self.predCols.append(len(self.colPreds))
                self.colPreds[-1].append(len(self.preds))
                self.preds.append(pred)
                
    def buildEvi(self):
        n=len(self.df)
        m=len(self.preds)
        self.eviSize=n*(n-1)

        self.evi=[None]*m
        self.predProbs=[None]*m
        
        for p in range(m):
            pred=self.preds[p]
            col=self.df[pred.l]
            evis=[]
            for i in range(n):
                c1=col.iloc[i]
                c2=col.iloc[i+1:n]
                evis.append(pred.op(c1,c2))
                c2=col.iloc[:i]
                evis.append(pred.op(c1,c2))

            allTPs=np.concatenate(evis)
            self.evi[p]=np.packbits(allTPs,axis=0,bitorder='little')
            self.predProbs[p]=allTPs.sum()/(n*(n-1))*2
        self.sortedPreds=sorted(range(len(self.predProbs)),key=lambda i:self.predProbs[i])
                
class DenialConstraint:
    def __init__(self,preds) -> None:
        self.preds=preds
        
    def __sub__(self,other):
        return DenialConstraint(self.preds-other.preds)
    def __le__(self,other):
        other:DenialConstraint=other
        return all([ any([p==pp for pp in other.preds]) for p in self.preds])

    def __eq__(self, value: object) -> bool:
        return self<=value and value<=self
    def __repr__(self) -> str:
        return "¬("+" ^ ".join([pred.__repr__() for pred in self.preds])+")"
    
class DenialConstraintSet:
    def __init__(self,path,dataset,dss,algorithm) -> None:        
        self.predMap={}
        self.preds=[]
        self.dss=dss
        opmap={"==":eq,"<>":ne,">=":ge,"<=":le,">":gt,"<":lt}
        def getPred(c1,op,c2):
            op=opmap[op]
            return dss.predMap[(c1,op,c2)]
        
        self.DCs=[]
        
        with open(path) as f:
            for line in f:
                line=line.strip()[2:-1] #strip !(...)
                preds=line.split('^')
                regex=r't0\.'+dataset+'\.csv\.([^=><]*)(==|<>|>=|<=|>|<)t1\.'+dataset+'\.csv\.([^=><]*)'
                if algorithm in ['ADCMiner','FastADC','ours']:
                    regex=r't0\.([^=><]*) (==|<>|>=|<=|>|<) t1\.([^=><]*)'
                preds = [getPred(*re.match(regex,pred.strip()).groups()) for pred in preds]
                self.DCs.append(preds)

    def buildGraph(self):
        self.root=[{},None]
        for dc in self.DCs:
            node=self.root
            for pred in sorted(dc):
                if pred not in node[0]:
                    node[0][pred]=[{},None]
                node=node[0][pred]
            node[1]=dc

    def getReduced(self):
       
        notImplied=[True]*len(self.DCs)

        def impliesDC(dc1,dc2):
            return all([any([self.dss.preds[pred].impliesPred(self.dss.preds[otherpred]) for otherpred in dc2]) for pred in dc1])

        for i,dc1 in enumerate(self.DCs):
            for j,dc2 in enumerate(self.DCs):
                if impliesDC(dc1,dc2):
                    if impliesDC(dc2,dc1):
                        notImplied[j]=notImplied[j] and j<=i
                    else:
                        notImplied[j]=False
        return [dc for i,dc in enumerate(self.DCs) if notImplied[i]]
        

In [3]:
#Path to dataset for which  to discover DCs
dataset="tax500k"
#Number of rows of the dataset to use
rowCount=1<<11

#Load dataset
ds=Dataset(dataset+".csv",nrows=rowCount,encoding='unicode_escape')

# Build Position List Indexes (PLIs)
# These hold, for every unique element in each column, the rows that have that value in the respective column.
# Allow to find tuple pairs that satisfy a certain predicate quickly. Ex: To find a 2 tuples on the same State, pick any 
# value in the PLI of column state, and choose two tuples from that group. They will both have the same State by definition.
ds.buildPLIs()

# Builds the predicates list. Ex: If there is a column "State", generates possible predicates "t_x.State=t_y.State" and "t_x.State != t_y.State".
ds.buildPreds()

# Builds the evidence set. Takes a long time.
# The evidence set contains, for each pair of tuples, which predicates they fulfill.
ds.buildEvi()

We present a novel approach to determine the validity of a DC. Current methods use a very broad DC Validity definition that leads to discovery algorithms generating thousands of meaningless Denial Constraints. We show how by only keeping those DCs valid under our more restrictive definition we obtain entirely true, high-quality constraints of the data.

We present a redefinition of the set of rules that qualify a DC as valid, not a DC discovery algorithm. Thus, the naive way to apply this contribution is by checking which elements among the vast space of sets of predicates constitute valid DCs. This is inefficient, but enough to showcase the significant improvement in the quality of the discovered DCs.

This first cell computes the number of tuple pairs that satisfy each set of predicates with up to **"depth"** elements. There are almost never more than 3 predicates in valid DCs, so if you want it to run much faster, just set **depth = 3**. We use **depth = 3** for completeness.

In [5]:
#This utility function lists all subsets of a given set.
def powerset(iterable):
    "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"
    s = list(iterable)
    return itertools.chain.from_iterable(itertools.combinations(s, r) for r in range(len(s)))


#Max size of predicate sets to search for
depth=4
DCCounts={}
counts={}

#Recursive function that performs a DFS search across the space of predicate sets up to length 4.
def search(preds,cols,x):
    #For every visited set of predicates, compute the proportion of tuple pairs it satisfies and store it.
    counts[preds]=np.bitwise_count(x).sum()
    #Stop the search if we are at max depth
    if len(preds)>=depth:
            return
    #Otherwise, continue the DFS by exploring all predicate sets reachable from the current one by adding a new predicate.
    for pred in ds.sortedPreds:
            #We do not attempt to add predicates over the same column. This generates trivial DCs. Ex: State=State and State !=State.
            ncol=ds.predCols[pred]
            if ncol in cols:
                continue
            #This is a predicate set we want to visit. Explore it only if it has not been visited already.
            npreds=preds|{pred}
            ncols=cols|{ncol}
            if npreds in counts:
                continue
            #This is an unvisited predicate set we want to visit. Filter the tuple pairs so as to keep those that satisfy the new set of predicates.
            newx=np.bitwise_and(x,ds.evi[pred])
            #Recursive call
            search(npreds,ncols,newx)

#Begin search:
search(frozenset()  #Begin with an empty set of predicates
       ,frozenset(),#Begin with no columns having predicates on the current set, as it is empty
       np.full((ds.eviSize//8,),255,dtype=np.uint8))#Begin with all tuple pairs fulfilling the predicate set, as there is no predicate to not fulfill. This gets filtered as predicates are added during the DFS.

DCCounts=counts

This next cell uses the previous information to determine which DCs are statistically significant, in the sense that the behavior of the predicates is determined by other predicates in the DC. This is our contribution, and by ensuring no DC is valid without representing some statistically significant relationship we guarantee discovered DCs correspond to a true relationship of the data.

Let {A,B,C} be a set of predicates of a DC. We reject the DC if p(A|B C)=p(A|B) or p(A|C) or p(A). If this is the case, there is no relationship between ABC that is not captured by one of the subsets, so this set of predicates does not correspond to a valid DC, as some predicate has been added without it being part of the data constraint.

We model the conditionals by means of a Type IV logistic distribution, and determine them to be different when it would be statistically implausible for the distributions to have the same population value.

In [None]:
#Memoization for the digamma (y1) and trigamma (y2) functions used to compute mean and variance of the probability distribution we use.
yconst=0.5772156649
def y1():
    mem=[-yconst]
    def f(n):
        while len(mem)<n:
            mem.append(mem[-1]+1/len(mem))
        return mem[n-1]
    return f
y1=y1()


def y2():
    mem=[np.pi**2/6]
    def f(n):
        while len(mem)<n:
            mem.append(mem[-1]-1/len(mem)**2)
        return mem[n-1]
    return f
y2=y2()


DCResults={}

counts=DCCounts
DCResult=[]
visited=set()
# Recursive function like before, to explore in DFS the space of predicate sets up to a given depth.
def search(preds,cols):
    #Every candidate needs to be evaluated only once
    if preds in visited:
            return
    visited.add(preds)    
    
    #Like before, stop at maximum depth
    if len(preds)>=depth:
            return
    
    #If we are evaluating {A,B,C}, we iterate through p(A|BC),p(B|AC),p(C|AB).
    #For each of them, we compare it with the respective subsets:
    # p(A|BC)< p(A|B), p(A|C), p(A)
    # p(B|AC)< p(B|A), p(B|C), p(B)
    # p(C|AB)< p(C|A), p(C|B), p(C)

    #A DC is only accepted when the inequalities are statistically significant
    # indicating there is some relationship among this particular set of predicates
    # that is not captured by any of its subsets
    for pred in ds.sortedPreds:
            #As before, we do not add predicates over already used columns to avoid trivial DCs.
            ncol=ds.predCols[pred]
            if ncol in cols:
                continue

            #Determine if the DC is valid
            npreds=preds|{pred}
            ncols=cols|{ncol}

            #a and b are number of successes and failures of a Bernouilli radnom variable
            #In our case, the number of tuple pairs that fulfill pred when the others in preds are true.
            # Ex: a1=|ABC|/|BC|, b1=|(!A)BC|/|BC|
            a1=int(counts[npreds])
            b1=int(counts[preds])-a1

            #Assume validity until some conditional of a subset is not significantly different from the current conditional
            valid=True
            for subPreds in powerset(preds):
                #For every subset of preds, compare the conditionals
                #Ex: compare p(A|BC) with p(A|B)
                subPreds=frozenset(subPreds)
                npreds2=subPreds|{pred}

                # Ex: a1=|AB|/|B|, b1=|(!A)B|/|B|
                a2=int(counts[npreds2])
                b2=int(counts[subPreds])-a2

                #If the population probability of p(A|BC) is u1 and the one of p(A|B) is u2, 
                #given our information a1,a2,b1,b2, we know the distribution of ln((a1*b2)/(b1*a2)) has mean:                
                u=y1(a1+1)-y1(b1+1)-y1(a2+1)+y1(b2+1)

                #and standard deviation
                s=np.sqrt(y2(a1+1)+y2(b1+1)+y2(a2+1)+y2(b2+1))

                #If the mean is more than 2 standard deviations from 0, it statistically significant enough for us to claim
                #the conditional p(A|BC) is smaller than p(A|B), and therefore there is some relationship
                #Otherwise, there is not enough evidence and this difference in distribution could be due to randomness.
                if u+2*s>0:
                    valid=False
                    break
            if valid:
                #If is valid, there is some significant relationship between the predicates
                #However, we only accept a DC when it is a set of exclusive predicates.
                #This means p(A|BC) must be 0.
                #Optionally, approximate DCs may be discovered by changing this condition to:
                # a1/(a1+b1)<0.01
                # where 0.01 is a 1% approximation factor.
                if a1==0:                                          
                    DCResult.append((preds,pred))
                else:
                    search(npreds,ncols)
                    
#Begin search on the empty set of predicates
search(frozenset(),frozenset())

DCResults=DCResult

This final cell of code exports the valid DCs to a file.

In [10]:
def getPred(i,ds):
    return (ds.preds[i])


dcs=set({})

with open(f"result_{dataset}","w") as f:
    for dc in [ DenialConstraint([ getPred(p,ds) for p in preds]+[getPred(pred,ds)]) for preds,pred in DCResults]:
        s=frozenset(dc.preds)
        if s not in dcs:
            dcs.add(s)
            f.write(dc.__repr__()+"\n")