In [6]:
import pandas as pd
from typing import List, Tuple, Dict
from itertools import combinations

def load_data(file_path: str) -> pd.DataFrame:
    print(f"Loading data from {file_path}...")
    data_frame = pd.read_excel(file_path)
    print(f"Data loaded successfully. Shape: {data_frame.shape}")
    return data_frame

def load_fds_from_file(file_path: str) -> List[Tuple[List[str], List[str]]]:
    functional_dependencies = []
    try:
        with open(file_path, 'r') as file:
            for line in file:
                if '-->' in line:
                    left_attrs, right_attrs = line.strip().split("-->")
                    lhs = [attr.strip() for attr in left_attrs.split(",")]
                    rhs = [attr.strip() for attr in right_attrs.split(",")]
                    functional_dependencies.append((lhs, rhs))
    except Exception as error:
        print(f"Error loading FDs from file {file_path}: {error}")
    return functional_dependencies

def load_mvds_from_file(file_path: str) -> List[Tuple[List[str], List[str]]]:
    multivalued_dependencies = []
    try:
        with open(file_path, 'r') as file:
            for line in file:
                if '->>' in line:
                    left_attrs, right_attrs = line.strip().split("->>")
                    lhs = [attr.strip() for attr in left_attrs.split(",")]
                    rhs = [attr.strip() for attr in right_attrs.split(",")]
                    multivalued_dependencies.append((lhs, rhs))
    except Exception as error:
        print(f"Error loading MVDs from file {file_path}: {error}")
    return multivalued_dependencies

def detect_multivalued_columns(data_frame: pd.DataFrame) -> Tuple[List[str], Dict[str, List[Tuple[int, str]]]]:
    data_frame.columns = data_frame.iloc[0]
    data_frame = data_frame.drop(data_frame.index[0]).reset_index(drop=True)
    
    multivalued_columns = []
    validation_issues = {}

    for column in data_frame.columns:
        column_issues = []
        for idx, value in data_frame[column].items():
            if isinstance(value, str) and ',' in value:
                column_issues.append((idx, value))
        if column_issues:
            multivalued_columns.append(column)
            validation_issues[column] = column_issues

    return multivalued_columns, validation_issues

def to_1nf(data_frame: pd.DataFrame, primary_keys: List[List[str]]) -> List[Tuple[pd.DataFrame, List[str]]]:
    multivalued_columns, validation_issues = detect_multivalued_columns(data_frame)

    if validation_issues:
        print("1NF Violations identified:")
        for column, issues in validation_issues.items():
            print(f"Column '{column}' contains multiple values per field:")
            for idx, value in issues:
                print(f"  Row {idx}: {value}")
        print()

    base_table = data_frame.drop(columns=multivalued_columns, errors='ignore')
    normalized_tables = [(base_table, primary_keys[0])]

    for multivalued_column in multivalued_columns:
        for primary_key in primary_keys:
            normalized_table = data_frame[list(primary_key) + [multivalued_column]].copy()
            normalized_table[multivalued_column] = normalized_table[multivalued_column].apply(lambda x: x.split(',') if isinstance(x, str) else x)
            normalized_table = normalized_table.explode(multivalued_column).dropna().drop_duplicates()
            normalized_tables.append((normalized_table, primary_key + [multivalued_column]))

    return normalized_tables

def to_2nf(tables: List[Tuple[pd.DataFrame, List[str]]], fds: List[Tuple[List[str], List[str]]]) -> Tuple[List[Tuple[pd.DataFrame, List[str]]], List[Tuple[List[str], List[str]]]]:
    print("Converting to 2NF...")
    if not fds:
        print("No functional dependencies available.")
        return tables, fds

    updated_tables = []
    remaining_fds = fds.copy()

    for table, primary_key in tables:
        partial_dependencies = []
        for lhs, rhs in remaining_fds:
            if set(lhs).issubset(primary_key) and not set(lhs) == set(primary_key):
                if len(lhs) == 1 and len(rhs) == 1 and rhs[0] not in primary_key:
                    continue
                partial_dependencies.append((lhs, rhs))

        for lhs, rhs in partial_dependencies:
            columns_to_keep = [col for col in (lhs + rhs) if col in table.columns]
            normalized_table = table[columns_to_keep].drop_duplicates()
            updated_tables.append((normalized_table, lhs))
            table = table.drop(columns=[col for col in rhs if col in table.columns]).drop_duplicates()
            remaining_fds.remove((lhs, rhs))

        remaining_columns = list(set(table.columns) - set(primary_key))
        remaining_columns = [col for col in remaining_columns if col in table.columns]
        if remaining_columns:
            remaining_table = table[primary_key + remaining_columns].drop_duplicates()
            updated_tables.append((remaining_table, primary_key))
        else:
            updated_tables.append((table, primary_key))

    return updated_tables, remaining_fds

def to_3nf(tables: List[Tuple[pd.DataFrame, List[str]]], fds: List[Tuple[List[str], List[str]]]) -> Tuple[List[Tuple[pd.DataFrame, List[str]]], List[Tuple[List[str], List[str]]]]:
    print("Converting to 3NF...")
    if not fds:
        print("No functional dependencies available.")
        return tables, fds

    transitive_dependencies = []
    for lhs, rhs in fds:
        for table, primary_key in tables:
            if set(lhs).issubset(table.columns) and not set(lhs).issubset(primary_key):
                if not any(attr in primary_key for attr in lhs) and not any(attr in primary_key for attr in rhs):
                    transitive_dependencies.append((lhs, rhs))

    updated_tables = []
    remaining_fds = fds.copy()
    for table, primary_key in tables:
        columns = set(table.columns)
        table_fds = [fd for fd in fds if set(fd[0]).issubset(columns) and set(fd[1]).issubset(columns)]

        for lhs, rhs in table_fds:
            if (lhs, rhs) in transitive_dependencies:
                new_table = table[lhs + rhs].drop_duplicates()
                updated_tables.append((new_table, lhs))
                table = table.drop(columns=[col for col in rhs if col in table.columns])
                remaining_fds.remove((lhs, rhs))

        updated_tables.append((table, primary_key))

    return updated_tables, remaining_fds

def to_bcnf(tables: List[Tuple[pd.DataFrame, List[str]]], fds: List[Tuple[List[str], List[str]]], primary_keys: List[List[str]]) -> Tuple[List[Tuple[pd.DataFrame, List[str]]], List[Tuple[List[str], List[str]]]]:
    print("Converting to BCNF...")
    if not fds:
        print("No functional dependencies available.")
        return tables, fds

    def is_superkey(attrs: List[str], primary_keys: List[List[str]]) -> bool:
        for key in primary_keys:
            if set(key).issubset(set(attrs)):
                return True
        return False

    updated_tables = []
    remaining_fds = fds.copy()
    bcnf_violations = []

    for table, primary_key in tables:
        columns = set(table.columns)
        table_fds = [fd for fd in fds if set(fd[0]).issubset(columns) and set(fd[1]).issubset(columns)]

        for lhs, rhs in table_fds:
            if not is_superkey(lhs, primary_keys):
                bcnf_violations.append((lhs, rhs))

                new_primary_key = lhs
                if not any(set(new_primary_key).issubset(set(key)) for key in primary_keys):
                    primary_keys.append(new_primary_key)

                new_table = table[lhs + rhs].drop_duplicates()
                updated_tables.append((new_table, lhs))
                table = table.drop(columns=[col for col in rhs if col in table.columns])
                remaining_fds.remove((lhs, rhs))

        updated_tables.append((table, primary_key))

    return updated_tables, remaining_fds

def to_4nf(tables: List[Tuple[pd.DataFrame, List[str]]], mvds: List[Tuple[List[str], List[str]]], primary_keys: List[List[str]]) -> List[Tuple[pd.DataFrame, List[str]]]:
    print("Converting to 4NF...")
    if not mvds:
        print("No multivalued dependencies available.")
        return tables

    updated_tables = []
    for table, primary_key in tables:
        columns = set(table.columns)
        for lhs, rhs in mvds:
            if set(rhs).issubset(columns):
                if not any(set(lhs).issubset(set(key)) for key in primary_keys):
                    new_table = table[lhs + rhs].drop_duplicates()
                    updated_tables.append((new_table, lhs))
                    table = table.drop(columns=[col for col in rhs if col in table.columns])
        updated_tables.append((table, primary_key))
    return updated_tables

def to_5nf(tables: List[Tuple[pd.DataFrame, List[str]]], primary_keys: List[List[str]]) -> List[Tuple[pd.DataFrame, List[str]]]:
    print("Converting to 5NF...")

    def check_join_dependency(df: pd.DataFrame, primary_keys: List[List[str]]) -> bool:
        for primary_key in primary_keys:
            non_key_attrs = list(set(df.columns) - set(primary_key))
            for i in range(1, len(non_key_attrs) + 1):
                for subset in combinations(non_key_attrs, i):
                    lhs = list(primary_key) + list(subset)
                    rhs = list(set(df.columns) - set(lhs))
                    if set(lhs).issubset(df.columns) and set(rhs).issubset(df.columns):
                        if df[lhs + rhs].drop_duplicates().shape[0] != df.drop_duplicates().shape[0]:
                            return True
        return False

    final_tables = []
    for table, primary_key in tables:
        if check_join_dependency(table, primary_keys):
            for primary_key in primary_keys:
                non_key_attrs = list(set(table.columns) - set(primary_key))
                for i in range(1, len(non_key_attrs) + 1):
                    for subset in combinations(non_key_attrs, i):
                        lhs = list(primary_key) + list(subset)
                        rhs = list(set(table.columns) - set(lhs))
                        if set(lhs).issubset(table.columns) and set(rhs).issubset(table.columns):
                            if table[lhs + rhs].drop_duplicates().shape[0] != table.drop_duplicates().shape[0]:
                                new_table_lhs = table[lhs].drop_duplicates()
                                new_table_rhs = table[rhs].drop_duplicates()
                                final_tables.append((new_table_lhs, primary_key))
                                final_tables.append((new_table_rhs, rhs))
                                break
                    else:
                        continue
                    break
            else:
                final_tables.append((table, primary_key))
        else:
            final_tables.append((table, primary_key))

    return final_tables

def compile_schema(data_tables: List[Tuple[pd.DataFrame, List[str]]], primary_keys: List[List[str]]) -> str:
    schema_text = ""
    for i, (table_df, primary_key) in enumerate(data_tables):
        if isinstance(table_df, pd.DataFrame):
            table_name = f"Relation_{i+1}"
            table_df.columns = table_df.iloc[0]
            columns_list = ", ".join(map(str, table_df.columns))
            primary_key_str = ", ".join(primary_key)

            reordered_columns = [col for col in table_df.columns if col in primary_key] + \
                                [col for col in table_df.columns if col not in primary_key]
            columns_list = ", ".join(reordered_columns)

            schema_text += f"{table_name} ({columns_list})\n"
            schema_text += f"PK: {primary_key_str}\n\n"
        else:
            print(f"Error: Expected DataFrame but got {type(table_df)}")
    return schema_text

def main():
    data_file = 'TestingData (1NF-5NF).xlsx'
    fd_file = 'fds.txt'
    mvd_file = 'mvd.txt'

    data_frame = load_data(data_file)
    print(f"Initial data: \n{data_frame.head()}\n")

    fds = load_fds_from_file(fd_file)
    print(f"Functional Dependencies from file: {fds}")

    mvds = load_mvds_from_file(mvd_file)
    print(f"Multi-valued Dependencies from file: {mvds}")

    primary_keys_input = input("Enter the primary keys (can be composite, separated by commas, no spaces between; multiple keys separated by semicolons): ")
    primary_keys = [key.strip().split(',') for key in primary_keys_input.split(';')]
    print(f"Provided Primary Keys: {primary_keys}\n")

    highest_normal_form = input("Enter the highest normalization form (1NF, 2NF, 3NF, BCNF, 4NF, 5NF): ").upper()

    tables = to_1nf(data_frame, primary_keys)

    if highest_normal_form == "1NF":
        schema = compile_schema(tables, primary_keys)
        print("Normalized Database Schema:")
        print(schema)
        return

    if highest_normal_form in ["2NF", "3NF", "BCNF", "4NF", "5NF"]:
        tables, fds = to_2nf(tables, fds)

    if highest_normal_form in ["3NF", "BCNF", "4NF", "5NF"]:
        tables, fds = to_3nf(tables, fds)

    if highest_normal_form in ["BCNF", "4NF", "5NF"]:
        tables, fds = to_bcnf(tables, fds, primary_keys)

    if highest_normal_form in ["4NF", "5NF"]:
        tables = to_4nf(tables, mvds, primary_keys)

    if highest_normal_form == "5NF":
        tables = to_5nf(tables, primary_keys)

    schema = compile_schema(tables, primary_keys)
    print("Normalized Database Schema:")
    print(schema)

if __name__ == "__main__":
    main()


Loading data from TestingData (1NF-5NF).xlsx...
Data loaded successfully. Shape: (5, 20)
Initial data: 
  Unnamed: 0           Unnamed: 1            Unnamed: 2 Unnamed: 3  \
0    OrderID                 Date         PromocodeUsed  TotalCost   
1       1001  2024-06-30 00:00:00                  NONE       7.25   
2       1002  2026-06-30 00:00:00             SUMMERFUN       9.98   
3       1002  2026-06-30 00:00:00             SUMMERFUN       9.98   
4       1003  2024-06-29 00:00:00  {SUMMERFUN, JUNEVIP}        115   

       Unnamed: 4     Unnamed: 5  Unnamed: 6    Unnamed: 7 Unnamed: 8  \
0  TotalDrinkCost  TotalFoodCost  CustomerID  CustomerName    DrinkID   
1            7.25              0           1   Alice Brown          1   
2            5.99           3.99           2  David Miller          2   
3            5.99           3.99           2  David Miller          3   
4             115              0           3  Emily Garcia          4   

                 Unnamed: 9 Unnamed:

Enter the primary keys (can be composite, separated by commas, no spaces between; multiple keys separated by semicolons):  OrderID,DrinkID,FoodID


Provided Primary Keys: [['OrderID', 'DrinkID', 'FoodID']]



Enter the highest normalization form (1NF, 2NF, 3NF, BCNF, 4NF, 5NF):  BCNF


1NF Violations identified:
Column 'PromocodeUsed' contains multiple values per field:
  Row 3: {SUMMERFUN, JUNEVIP}
Column 'DrinkIngredient' contains multiple values per field:
  Row 0: {Espresso, Oat Milk}
  Row 1: {Expresso, Vanilla Syrup, Milk, Ice}
  Row 2: {Matcha, Coconut Milk, Ice} 
  Row 3: {Coffee, Ice, Vanilla Syrup, Soy Milk}
Column 'DrinkAllergen' contains multiple values per field:
  Row 1: {Dairy, Nuts}
  Row 3: {Nuts, Soy}
Column 'FoodIngredient' contains multiple values per field:
  Row 1: {Flour, Sugar, Blueberries, Eggs}
  Row 2: {Flour, Sugar, Blueberries, Eggs}
Column 'FoodAllergen' contains multiple values per field:
  Row 1: {Wheat, Egg}
  Row 2: {Wheat, Egg}

Converting to 2NF...
Converting to 3NF...
Converting to BCNF...
Normalized Database Schema:
Relation_1 (CustomerID, CustomerName)
PK: CustomerID

Relation_2 (OrderID, Date, TotalCost, TotalDrinkCost, TotalFoodCost, CustomerID)
PK: OrderID

Relation_3 (OrderID, DrinkID, DrinkSize, DrinkQuantity, Milk)
PK: Ord