This notebook shows simple implementations of a few of the algorithms for reasoning with functional dependencies, for normalization during relational database design. Specifically, it contains code for finding the closure of a set of functional dependencies, finding keys and candidate keys, and decomposing a schema into BCNF.

First, we create a class to represent a Functional Dependency, to make the code cleaner.

# Some Basic Classes

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from itertools import chain, combinations

# A class to encapsulate a Functional Dependency, and some helper functions
class FD:
        def __init__(self, lhs, rhs):
                self.lhs = frozenset(list(lhs))
                self.rhs = frozenset(list(rhs))
        def __str__(self):
                return ''.join(self.lhs) + " -> " + ''.join(self.rhs)
        def __eq__(self, other):
                return (self.lhs == other.lhs) & (self.rhs == other.rhs)
        def __hash__(self):
                return hash(self.lhs) * hash(self.rhs)
        def isTrivial(self):
                """A functional dependency is trivial if the right hand side is a subset of the left h.s."""
                return self.lhs >= self.rhs


In [2]:
# The following is not really needed for normalization, but may be useful to get intuitions about FDs
# IMPORTANT: Don't forget that for an FD to hold on a relation schema, it must hold on all legal (possible) instances
# It is not sufficient to hold on a few of the instances
class Relation:
        def __init__(self, schema):
                self.tuples = list()
                self.schema = schema
        def add(self, t):
                if len(t) == len(self.schema):
                        self.tuples.append(t)
                else:
                        print("Added tuple does not match the length of the schema")
        def checkIfMatch(self, t1, t2, attr_set):
                return all(t1[self.schema.index(attr)] == t2[self.schema.index(attr)] for attr in attr_set)
        def checkFDHolds(self, fd):
                """Go over all pairs of tuples and see if the FD is violated"""
                holds = True
                for t1 in self.tuples:
                        for t2 in self.tuples:
                                if t1 < t2 and self.checkIfMatch(t1, t2, fd.lhs) and not self.checkIfMatch(t1, t2, fd.rhs):
                                        print("Tuples " + str(t1) + " and " + str(t2) + " violate the FD " + str(fd))
                                        holds = False
                if holds: print("FD " + str(fd) + " holds on this relation")

## Checking if an FD holds
Given a relation instance and a Functional Dependency, we can ask whether the FD holds on that relation instance. The algorithm is pretty simple: for every pair of tuples, we check if the two tuples have the same value for the attributes in the LHS of the FD, and if yes, we check whether they have the same value for the attributes in the RHS. E.g., to check if C --> A holds on the relation below, we note that both the tuples agree on 'C' (the LHS), but they don't agree on 'A' (the RHS). So the FD C --> A doesn't hold. 

The actual code for doing this is above in Relation class. Below are some examples.

In [3]:
r = Relation(['A', 'B', 'C'])
r.add([1, 2, 3])
r.add([2, 2, 3])
r.checkFDHolds(FD('C', 'A'))
r.checkFDHolds(FD('A', 'B'))
r.checkFDHolds(FD('A', 'BC'))

Tuples [1, 2, 3] and [2, 2, 3] violate the FD C -> A
FD A -> B holds on this relation
FD A -> CB holds on this relation


## Armstrong Axioms
Armostrong axioms are used to infer new FDs given a set of FDs. There are three basic rules as implemented below.

In [4]:
# We need to construct powersets quite frequently.
def powerset(S):
        """Returns the powerset of a set, except for the empty set, i.e., if S = {A, B, C}, returns {{A}, {B}, {C}, {A,B}, {B,C}, {A,C}, {A,B,C}"""
        return list(chain.from_iterable(combinations(S, r) for r in range(1, len(S)+1)))

The following routine creates all trivial FDs using the **Reflexivity Axiom**. The reflexivity axiom states that: if A subsetof B, then B --> A is a valid FD.

In [5]:
def applyreflexivity(R):
        """Generates all trivial dependencies, i.e., of the type X -> subset(X)"""
        return { FD(i, j) for i in powerset(R) for j in powerset(i) }

Next, this routine does **Augmentation** given a set of functional dependencies. The augmentation axiom states that: 
if X --> Y, then XZ --> YZ, for all Z.

In [6]:
def applyaugmentation(F, PW, printflag):
        """Augmentation: if X --> Y, then XZ --> YZ
        PW is powerset of the schema
        """
        N = {FD(x.lhs.union(y), x.rhs.union(y)) for x in F for y in PW}
        for fd in N - F:
                if printflag: print("   Adding " + str(fd) + " by Augmenting " + str(x) + " using " + "".join(y))
        return F.union(N)

Finally, apply Transitivity to infer more axioms. Both this and the above function could be more concise if I am not printing out things.

In [7]:
def applytransitivity(F, printflag):
        """Transitivity: if X --> Y, and Y --> Z, then X --> Z"""
        N = { FD(x.lhs, y.rhs)  for x in F for y in F if x.rhs == y.lhs }
        for fd in N - F:
                if printflag:
                        print(" Adding " + str(fd) + " using Transitivity from " + str(x) + " and " + str(y))
        return F.union(N)

The following routine computes the Closure of a set of FDs by repeated application of the three axioms. Figure 8.7 from the textbook.

In [8]:
def findClosure(R, F, printflag = False):
        """Finds closure by repeatedly applying the three Armstrong Axioms, until there is no change"""

        # Start with adding all trivial dependencies generated by using Reflexivity
        F = F.union(applyreflexivity(R))
        powersetR = list(chain.from_iterable(combinations(list(R), r) for r in range(1, len(R)+1)))

        # Repeat application of the other two rules until no change
        done = False;
        while done == False:
                if printflag: print("Trying to find new FDs using Transitivity")
                F2 = applytransitivity(F, printflag)
                if printflag: print("Trying to find new FDs using Augmentation")
                F2 = applyaugmentation(F2, powerset(R), printflag)
                done = len(F2) == len(F)
                F = F2
        if printflag: print("Finished")
        return F

Once we have the closure, finding keys and candidate keys is trivial. A key is any set of attribute X such that X --> R is a FD. For X to be candidate key, there shouldn't be any subset of it that is also a key.

In [9]:
def findKeys(R, FClosure):
        """Keys are those where there is an FD with rhs = R"""
        return { fd.lhs for fd in FClosure if len(fd.rhs) == len(list(R)) }

def findCandidateKeys(R, FClosure):
        """Candidate keys are minimal -- go over the keys increasing order by size, and add if no subset is present"""
        keys = findKeys(R, FClosure)
        ckeys = set()
        for k in sorted(keys, key = lambda x: len(x)):
                dontadd = False
                for ck in ckeys:
                        if(ck <= k):
                                dontadd = True  #Found a subset already in ckeys
                if not dontadd:
                        ckeys.add(k)
        return ckeys

## Checking for BCNF 
The following three routines are pretty similar: one checks whether a schema is in BCNF, another lists out all the violations of BCNF, and the third one finds one of the smallest FDs that violate the BCNF condition (for decomposition purposes).

In [20]:
def isInBCNF(R, FClosure, keys):
        """Find if there is a FD alpha --> beta s.t. alpha is not a key"""
        if keys is None: keys = Keys(R, FClosure)
        for fd in FClosure:
                if (not fd.isTrivial()) and (fd.lhs not in keys):
                        return False
        return True

def listAllBCNFViolations(R, FClosure, keys):
        """Same as above, but finds all violations and prints them"""
        for fd in FClosure:
                if (not fd.isTrivial()) and (fd.lhs not in keys):
                        print(str(fd) + " is an FD whose LHS is not a key")

def findSmallestViolatingFD(R, FClosure, keys):
        """Same as above, but finds a small FD that violates"""
        for fd in sorted(FClosure, key = lambda x: (len(x.lhs), len(x.rhs))):
                if (not fd.isTrivial()) and (fd.lhs not in keys):
                        return fd

## BCNF Decomposition
Now we are ready to BCNF decomposition itself. First a subroutine to do a single decomposition, and then a recursive decomposition to do this until everything is in BCNF

In [21]:
def DecomposeUsingFD(R, FClosure, fd):
        """Uses the given FD to decompose the schema -- returns the resulting schemas and their closures
        Let the fd be X --> Y
        Then we create two relations: R1 = X UNION Y, and R2 = X UNION (R - Y)
        Then, for R1, we find all FDs from FClosure that apply to R1 (i.e., that only contain attributes from R1)
        And do the same for R2
        """
        R1 = fd.lhs | fd.rhs
        R2 = set(R) - R1 | fd.lhs
        F1Closure = { fd for fd in FClosure if (fd.lhs <= R1) and (fd.rhs <= R1) }
        F2Closure = { fd for fd in FClosure if (fd.lhs <= R2) and (fd.rhs <= R2) }

        return (R1, R2, F1Closure, F2Closure)

# Do a recursive BCNF Decomposition, and print out the results
def BCNFDecomposition(R, FClosure):
        keys = findKeys(R, FClosure)
        if not isInBCNF(R, FClosure, keys):
                print("".join(R) + " is not in BCNF")
                fd = findSmallestViolatingFD(R, FClosure, keys)

                # Decompose using that FD
                (R1, R2, F1Closure, F2Closure) = DecomposeUsingFD(R, FClosure, fd)
                print("Decomposing " + "".join(R) + " using " + str(fd) + " into relations " + "".join(R1) + " and " + "".join(R2))

                # Recurse
                BCNFDecomposition(R1, F1Closure)
                BCNFDecomposition(R2, F2Closure)
        else:
                print("".join(R) + " is in BCNF")

## Example 1
Here we define a relational schema and a set of FDs over it; then we find its candidate keys and print those out, and then check for BCNF and decompose it into BCNF.

In [22]:
R = "ABCD"
F = {FD('A', 'B'), FD('BC', 'D'), FD('D', 'A')}

In [23]:
Fclosure = findClosure(R, F)
for i in Fclosure:
        if not i.isTrivial(): print(i)

keys = findKeys(R, Fclosure)
print("Keys are:")
for i in keys:
        print("".join(i))

CD -> CDA
CDB -> DA
DB -> BA
CA -> B
CBA -> CDBA
CB -> CBA
CA -> BA
CD -> CDBA
CD -> CB
CB -> CD
DA -> B
D -> DA
CB -> DA
D -> B
CB -> CDBA
D -> DBA
CA -> D
CB -> BA
CA -> DA
CD -> B
CA -> CDA
CBA -> CD
CD -> CBA
CBA -> CDA
CDA -> CBA
CDB -> A
CB -> DB
CA -> CB
CB -> CA
CA -> DBA
CD -> DBA
CBA -> CDB
CDB -> CBA
CBA -> DBA
CB -> D
CDB -> BA
A -> BA
CDA -> B
D -> DB
CDB -> DBA
DB -> DBA
CDA -> BA
CA -> DB
D -> BA
CB -> CDB
A -> B
CA -> CDBA
CBA -> D
CDB -> CA
CA -> CDB
CD -> BA
CB -> A
DA -> DB
DB -> DA
CB -> DBA
DA -> BA
CDB -> CDBA
CD -> CA
CA -> CD
CDA -> CB
CB -> CDA
D -> A
CDA -> DB
CDB -> CDA
DA -> DBA
CDA -> CDB
CDA -> DBA
DB -> A
CA -> CBA
CBA -> DB
CD -> CDB
CD -> DB
CD -> DA
CD -> A
CBA -> DA
CDA -> CDBA
Keys are:
CDBA
CDB
CBA
CDA
CB
CA
CD


That's a lot of keys -- let's find just the candidate keys.

In [24]:
candidatekeys = findCandidateKeys(R, Fclosure)
print("Candidate Keys are:")
for i in candidatekeys:
        print("".join(i))

Candidate Keys are:
CD
CB
CA


In [25]:
print("Checking if the schema is in BCNF")
if isInBCNF(R, Fclosure, keys):
        print("The schema is in BCNF")

(R1, R2, F1Closure, F2Closure) = DecomposeUsingFD(R, Fclosure, FD('B', 'C'))
print("Decomposing using " + str(FD('B', 'C')) + " into relations " + "".join(R1) + " and " + "".join(R2))

Checking if the schema is in BCNF
Decomposing using B -> C into relations CB and DBA


In [26]:
print("-------------- Doing a full BCNF Decompisition -------")
BCNFDecomposition(R, Fclosure)
print("------------------------------------------------------")

-------------- Doing a full BCNF Decompisition -------
ABCD is not in BCNF
Decomposing ABCD using D -> B into relations DB and CDA
DB is in BCNF
CDA is not in BCNF
Decomposing CDA using D -> A into relations DA and CD
DA is in BCNF
CD is in BCNF
------------------------------------------------------


## Example 2
One of the examples covered in the slides. Note that: here we get a different, non-dependency-preserving decomposition into lots of 2-attribute relations. The algorithm above is not very smart -- if it had picked better FDs to do the decomposition, the final result may have been better.

In [27]:
R2 = "ABCDEH"
F2 = {FD('A', 'BC'), FD('E', 'HA')}
Fclosure2 = findClosure(R2, F2)
keys = findKeys(R2, Fclosure2)
candidatekeys = findCandidateKeys(R2, Fclosure2)
print("Candidate Keys are:")
for i in candidatekeys:
        print("".join(i))
print("Checking if the schema is in BCNF")
if isInBCNF(R2, Fclosure2, keys):
        print("The schema is in BCNF")
else:
    print("-------------- NO: Doing a full BCNF Decompisition -------")
    BCNFDecomposition(R2, Fclosure2)
    print("------------------------------------------------------")

Candidate Keys are:
DE
Checking if the schema is in BCNF
-------------- NO: Doing a full BCNF Decompisition -------
ABCDEH is not in BCNF
Decomposing ABCDEH using E -> H into relations HE and ECDBA
HE is in BCNF
ECDBA is not in BCNF
Decomposing ECDBA using E -> C into relations CE and EDBA
CE is in BCNF
EDBA is not in BCNF
Decomposing EDBA using A -> B into relations BA and DAE
BA is in BCNF
DAE is not in BCNF
Decomposing DAE using E -> A into relations AE and DE
AE is in BCNF
DE is in BCNF
------------------------------------------------------


## Non-Dependency Preserving BCNF
Below is the smallest relation schema and FDs such that, there is no dependency-preserving BCNF decomposition. In this case, the schema is not in BCNF and needs to be split; but if we do that, then we cannot check for the FD JK --> L without doing a join. The original schema ("JKL") is in 3NF

In [28]:
BCNFDecomposition("JKL", findClosure("JKL", {FD('JK', 'L'), FD('L', 'K')}))

JKL is not in BCNF
Decomposing JKL using L -> K into relations LK and JL
LK is in BCNF
JL is in BCNF
