In [1]:
import pandas as pd
import math
import itertools

In [2]:
def load_data(filename, folder = "csv"): 
    """Load the csv file, preprocess if necessary and return a dataframe

    Args: 
        filename (str): Name of the file to load 
        folder (str): Name of the folder where the file is located

    Returns:
        pandas.DataFrame: Dataframe with the data from the csv file
    """
    assert isinstance(filename, str), "filename must be a string"
    assert isinstance(folder, str), "folder must be a string"
    assert filename.endswith(".csv"), "filename must be a csv file"

    # Load the data
    if filename == "abalone.csv": 
        df = pd.read_csv(f"{folder}/{filename}", header = None)
    elif filename == "adult.csv":
        df = pd.read_csv(f"{folder}/{filename}", header = None, sep=";")
    elif filename == "balance-scale.csv":
        df = pd.read_csv(f"{folder}/{filename}", header = None)
    elif filename == "breast-cancer-wisconsin.csv":
        df = pd.read_csv(f"{folder}/{filename}", header = None, sep=",")
    elif filename == "bridges.csv":
        df = pd.read_csv(f"{folder}/{filename}", header = None, na_values="?")
    elif filename == "chess.csv":
        df = pd.read_csv(f"{folder}/{filename}", header = None)
    elif filename == "echocardiogram.csv":
        df = pd.read_csv(f"{folder}/{filename}", header = None,na_values="?")
    elif filename == "flight_1k.csv":
        df = pd.read_csv(f"{folder}/{filename}", header = None, sep=";",na_values='""')
        # remove the first line
        df = df.iloc[1:]
        # drop nan by column
        df = df.dropna(axis=1, how='all')
    elif filename == "hepatitis.csv":
        df = pd.read_csv(f"{folder}/{filename}", header = None, na_values="?")
    elif filename == "horse.csv":
        df = pd.read_csv(f"{folder}/{filename}", header = None, na_values="?", sep=";")
    elif filename == "iris.csv":
        df = pd.read_csv(f"{folder}/{filename}")
    elif filename == "letter.csv":
        df = pd.read_csv(f"{folder}/{filename}", header = None)
    elif filename == "nursery.csv":
        df = pd.read_csv(f"{folder}/{filename}", header = None)
    elif filename == "plista_1k.csv":
        df = pd.read_csv(f"{folder}/{filename}", header = None, sep=";")
    else: 
        df = pd.read_csv(f"{folder}/{filename}", header = None)

    colnames = {i: chr(65 + i) for i in range(26)}
    return df.rename(columns = colnames)
df = load_data("bridges.csv")

In [3]:
def union_inter_element(s):
    """Strict union of all tuple

    Rule : If we have element of size 1, 2 element of size 1 are needed to create the new element
    If we have element of size 2, we need 3 elements of size 2...

    {(a,b),(b,c),(c,a)} gives us : {(a,b,c)}
    {(a,b),(c,a)} gives us : {}

    Args:
        s (set): Set of tuple

    Returns:
        set: Union of all tuple 

    """
    if len(s) == 0:
        return set()
    
    res = {}

    n = len(next(iter(s))) + 1 
    for comb in itertools.combinations(s, 2):
        # comb 0 and comb 1 are tuple
        a = set({i for i in comb[0]})
        b = set({i for i in comb[1]})

        u = tuple(a.union(b))
        if len(u) == n:
            if u not in res:
                res[u] = 1
            else:
                res[u] += 1
        max_val = max([v for k, v in res.items()], default=0)
    return set([k for k, v in res.items() if v == max_val])


In [4]:
def latice_exploration(df, prettyPrint = True):
    """Explore the latice of combinations to find functionnal dependencies.

    Args:
        df (pandas.DataFrame): Dataframe with the data to explore
        prettyPrint (bool): If True, print the result in a nice way (default: True)

    Returns:
        list: List of functionnal dependencies
    """

    def test_fd(X, Y):
        return df.groupby(list(X))[Y].nunique().le(1).all().all()
    count = 0
    rules = {}
    for y in df.columns:
        rules[y] = []
        candidates = set((i,) for i in df.columns if i != y)
        
        while len(candidates) >= 1:
            l = candidates.copy()
            for candidate in l:
                #print(candidate)
                if test_fd(candidate, y):
                    count += 1
                    rules[y].append([candidate])
                    candidates.remove(candidate)
            
            candidates = union_inter_element(candidates)

        if len(rules[y]) == 0:
            del rules[y]
    if prettyPrint:
        inverseMap = {}
        for k, v in rules.items():
            for i in v:
                if tuple(i) not in inverseMap:
                    inverseMap[tuple(i)] = [k]
                else:
                    inverseMap[tuple(i)].append(k)
        print(f"Number of minimals FD found : {count}")
        for k, v in inverseMap.items():
            print(f"{''.join(k[0])} -> {''.join(v)}")
            
    else:
        return count
df = load_data("abalone.csv")
latice_exploration(df)

Number of minimals FD found : 69
ECF -> AG
EGC -> A
EHF -> ABCDGI
EGH -> ABCDFI
EGF -> AC
GHFD -> ABCEI
EBFD -> ACGH
GCFI -> AH
EBDI -> A
EBCH -> ADFGI
EBCD -> A
GFDI -> ABCEH
ECHD -> A
HIGCD -> A
HIBCFD -> AG
ECFD -> BHI
GCFD -> B
EGFD -> BHI
GFAI -> CH
EBFA -> CDGH
EIBFD -> C
EIBFA -> C
HIGFA -> C
EBCF -> DH
HIBCGD -> F
HIBCGA -> F
DHIGCBA -> F
AIBCFD -> GH
DHIBCFA -> G
EBF -> I


In [5]:
def latice_exploration_naïve(df):
    """Explore the latice of combinations to find functionnal dependencies.

    Args:
        df (pandas.DataFrame): Dataframe with the data to explore
    Returns:
        list: List of functionnal dependencies
    """
    def test_fd(X, Y):
        return df.groupby(X)[Y].nunique().le(1).all().all()
    rules = []
    for y in df.columns:
        for n in range(1, len(df.columns)):
            for X in itertools.combinations(set(df.columns) - {y}, n):
                if test_fd(list(X), y):
                    rules.append([list(X), [y]])
    return rules

In [7]:
filenames = ["abalone.csv", "bridges.csv", "car.csv", "hepatitis.csv", "horse.csv", "iris.csv", "letter.csv", "nursery.csv", "plista_1k.csv"]
tOpti = {}
minFD = {}
tNaive = {}
fd = {}
def timeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result
import time
print("| Filename | TimeOpti | TimeNaive | ResOpti | ResNaive |")
print("| --- | --- | --- | --- | --- |")
for filename in filenames:
    df = load_data(filename)
    start = time.time()
    minFD[filename] = timeout(latice_exploration, args=(df,False), timeout_duration=2000)
    end = time.time()
    if minFD[filename] is None:
        minFD[filename] = 0
        tOpti[filename] = ">2000s"
    else:
        tOpti[filename] = end - start
    start = time.time()
    fd[filename] = timeout(latice_exploration_naïve, args=(df,), timeout_duration=2000)
    end = time.time()
    if fd[filename] is None:
        fd[filename] = []
        tNaive[filename] = ">2000s"
    else:
        tNaive[filename] = end - start
    print(f"| {filename} | {tOpti[filename]} | {tNaive[filename]} | {minFD[filename]} | {len(fd[filename])} |")

| Filename | TimeOpti | TimeNaive | ResOpti | ResNaive |
| --- | --- | --- | --- | --- |
coucou
| iris.csv | 0.026702404022216797 | 0.022899627685546875 | 4 | 5 |
| letter.csv | >2000s | >2000s | 0 | 0 |
| nursery.csv | 2.5882556438446045 | 5.732923984527588 | 0 | 1 |
| plista_1k.csv | >2000s | >2000s | 0 | 0 |


In [None]:
# If you want to test on a specific dataset and see the results
df = load_data("chess.csv")
latice_exploration(df)

In [None]:
# If you want to test on a specific dataset and see the results
df = load_data("bridges.csv")
latice_exploration(df)