In [41]:
import pandas as pd
import sqlite3,re

def create_tables(conn:sqlite3.Connection,cursor:sqlite3.Cursor,schema_path:str):
    with open(schema_path, "r") as f:
        sql_script = f.read()
        cursor.executescript(sql_script)
        conn.commit()

def get_tables(cursor:sqlite3.Cursor):
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    return cursor.fetchall()

In [42]:
def add_locations(cursor: sqlite3.Cursor, loc_sheet_path: str):

    df = pd.read_csv(loc_sheet_path, header=1)

    suppliers = df.columns.tolist()
    # print("suppliers:", suppliers)

    cursor.executemany("INSERT OR IGNORE INTO suppliers (name) VALUES (?);",
                    [(m,) for m in suppliers[1:]])

    cursor.connection.commit()

    cursor.execute("SELECT * FROM suppliers;")
    rows = cursor.fetchall()

    supplier_id_dict = {row[1]: row[0] for row in rows}

    cursor.executemany("INSERT OR IGNORE INTO branches (supplier_id, postcode) VALUES (?, ?);",
                    [(supplier_id_dict[supplier], postcode) for supplier in suppliers[1:] for postcode in df[supplier] if not pd.isna(postcode)])

    cursor.connection.commit()

In [43]:
def get_sup_sheet(sup_path:str, sheet_name:str, usecols=None, header_num=0) -> pd.DataFrame:
    xls = pd.ExcelFile(sup_path)
    if usecols is None:
        df = pd.read_excel(xls, sheet_name=sheet_name, header=header_num)
    else:
        df = pd.read_excel(xls, sheet_name=sheet_name, usecols=usecols, header=header_num)
    return df

def get_specialism_types_sheet(sup_path) -> pd.Series:
    supplier_df = get_sup_sheet(
        sup_path,
        "Strengths",
        ["Accounts, Contacts and Strengths"]
    )
    return supplier_df["Accounts, Contacts and Strengths"]

def clean_supplier_name(name: str) -> str:
    return re.sub(r'\s*[-]?\s*Credit$', '', name, flags=re.IGNORECASE).strip()

def add_supplier_to_db(db_path: str, supplier_name: str):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    cursor.execute("INSERT INTO suppliers (name) VALUES (?);", (supplier_name,))
    conn.commit()
    conn.close()

In [44]:
def create_placeholder_branches_all(db_path: str):
    conn = sqlite3.connect(db_path)

    cursor = conn.cursor()

    placeholder_branches_query = """
        INSERT INTO branches (supplier_id, postcode, notes)
        SELECT s.supplier_id, 'PLACEHOLDER', 'Auto-created placeholder for supplier-level data'
        FROM suppliers s
        WHERE NOT EXISTS (
        SELECT 1 FROM branches b WHERE b.supplier_id = s.supplier_id
        );
    """

    cursor.execute(placeholder_branches_query)

    conn.commit()
    conn.close()

def delete_placeholder_branches(db_path: str):
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    cursor.execute("""
    DELETE FROM branches
    WHERE postcode = 'PLACEHOLDER';
    """)

    conn.commit()
    conn.close()

In [45]:
def get_supplier(cursor: sqlite3.Cursor,supplier_name) -> list:
    cursor.execute("""
        SELECT * FROM suppliers s
            WHERE TRIM(LOWER(s.name)) = TRIM(LOWER(?))
    """, (supplier_name,))

    rows = cursor.fetchall()
    return rows

def get_branches_from_supplier(cursor: sqlite3.Cursor, supplier_name: str) -> list:
    cursor.execute("""
        SELECT b.*
        FROM branches b
        JOIN suppliers s ON b.supplier_id = s.supplier_id
        WHERE s.name = ?;
    """, (supplier_name,))
    return cursor.fetchall()

def ensure_supplier(cursor: sqlite3.Cursor, supplier_name: str) -> list | None:
    rows = get_supplier(cursor, supplier_name)

    if rows:
        return rows
    else:
        cursor.execute("""
            INSERT INTO suppliers (name)
            VALUES (?);
        """, (supplier_name,))

        rows = get_supplier(cursor, supplier_name)
        return rows

def ensure_supplier_branch(cursor: sqlite3.Cursor, supplier_name: str) -> list | None:
    ensure_supplier(cursor, supplier_name)
    rows = get_branches_from_supplier(cursor, supplier_name)

    if rows:
        return rows
    else:
        cursor.execute(
            """
            INSERT OR IGNORE INTO branches (supplier_id, postcode, notes)
            SELECT s.supplier_id, 'PLACEHOLDER', ?
            FROM suppliers s
            WHERE s.name = ?;
            """,
            (f"Auto-created placeholder for supplier-level data about {supplier_name}", supplier_name)
        )

        rows = get_branches_from_supplier(cursor, supplier_name)
        return rows
    
def ensure_specialism_type(cursor: sqlite3.Cursor, specialism_name: str) -> int | None:
    cursor.execute("SELECT specialism_id FROM specialism_types WHERE name = ?", (specialism_name,))
    row = cursor.fetchone()
    if row:
        return row[0]
    cursor.execute("INSERT INTO specialism_types (name) VALUES (?)", (specialism_name,))
    return cursor.lastrowid

def add_specialisms_by_supplier(cursor: sqlite3.Cursor, supplier_name: str, specialism_name: str):
    branch_rows = ensure_supplier_branch(cursor, supplier_name)

    if not branch_rows:
        raise ValueError(f"No branches found for supplier '{supplier_name}'.")

    specialism_id = ensure_specialism_type(cursor, specialism_name)

    for branch_row in branch_rows:
        cursor.execute("""
        INSERT INTO branch_specialisms (branch_id, specialism_id)
        SELECT ?, ?
        WHERE NOT EXISTS (
            SELECT 1
            FROM branch_specialisms
            WHERE branch_id = ?
            AND specialism_id = ?
        );
    """, (branch_row[0], specialism_id,
        branch_row[0], specialism_id))

def add_specialisms_from_sheet(cursor: sqlite3.Cursor, sup_path: str):    
    supplier_df = get_sup_sheet(sup_path, "Strengths", header_num=2)
    suppliers = [col for col in supplier_df.columns if not col.startswith("Unnamed:")]

    for _, row in supplier_df.iterrows():
        specialism_name = row["Unnamed: 0"]
        for supplier in suppliers:
            if row[supplier] == "Y":
                add_specialisms_by_supplier(cursor, supplier, specialism_name)

In [46]:
def get_company_dict(row: pd.Series, cols: list[str]):
    supplier_name = clean_supplier_name(row[cols[0]])
    contacts = []
    for i in range(1, len(cols), 3):
        contact = {
            "name": row[cols[i]],
            "phone": row[cols[i+1]],
            "email": row[cols[i+2]]
        }
        contacts.append(contact)
    return {
        "supplier_name": supplier_name,
        "contacts": contacts
    }

def add_contact_info_db(cursor:sqlite3.Cursor,supplier_name:str, contact:dict[str, str]):
    branch_rows = ensure_supplier_branch(cursor, supplier_name)

    if not branch_rows:
        raise ValueError(f"No branches found for supplier '{supplier_name}'.")
    # Insert contact for every branch, but only if not already present for that branch
    for branch_row in branch_rows:
        cursor.execute("""
        INSERT INTO contacts (branch_id, phone_number, email, name)
        SELECT ?, ?, ?, ?
        WHERE NOT EXISTS (
            SELECT 1 FROM contacts
            WHERE branch_id = ?
            AND (phone_number IS ? OR (phone_number IS NULL AND ? IS NULL))
            AND (email        IS ? OR (email        IS NULL AND ? IS NULL))
            AND (name         IS ? OR (name         IS NULL AND ? IS NULL))
        )
        """, (
            branch_row[0], contact.get("phone"), contact.get("email"), contact.get("name"),
            branch_row[0],
            contact.get("phone"), contact.get("phone"),
            contact.get("email"), contact.get("email"),
            contact.get("name"),  contact.get("name"),
        ))

def add_contact_info_from_sheet(cursor: sqlite3.Cursor, sup_path: str):
    cols = ["Merchant",'Contact 1. ', 'Contact No 1. ', 'Contact email No1.', 'Contact 2. ', 'Contact No 2. ', 'Contact email No2.']
    summary_df = get_sup_sheet(
        sup_path,
        "Summary",
        cols
    )

    for _, row in summary_df.iterrows():
        contact_dict = get_company_dict(row, cols)

        for contact in contact_dict["contacts"]:
            if pd.notna(contact["name"]) or pd.notna(contact["phone"]) or pd.notna(contact["email"]):
                add_contact_info_db(cursor, contact_dict["supplier_name"], contact)


In [None]:
import csv

def dump_specialism(cursor: sqlite3.Cursor):
    query = """
    SELECT 
        s.supplier_id,
        s.name AS supplier_name,
        b.branch_id,
        b.postcode,
        b.notes AS branch_notes,
        st.name AS specialism_name
    FROM suppliers s
    LEFT JOIN branches b 
        ON s.supplier_id = b.supplier_id
    LEFT JOIN branch_specialisms bs
        ON b.branch_id = bs.branch_id
    LEFT JOIN specialism_types st
        ON bs.specialism_id = st.specialism_id
    ORDER BY s.supplier_id, b.branch_id, st.specialism_id;
    """

    cursor.execute(query)
    rows = cursor.fetchall()

    with open("../output/suppliers_branches_specialisms.csv", "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        # write headers
        colnames = [desc[0] for desc in cursor.description]
        writer.writerow(colnames)
        # write data
        writer.writerows(rows)

def dump_contacts(cursor: sqlite3.Cursor):
    query = """
    SELECT
        s.supplier_id,
        s.name AS supplier_name,
        b.branch_id,
        b.postcode,
        c.contact_id,
        c.name AS contact_name,
        c.phone_number,
        c.email
    FROM suppliers s
    LEFT JOIN branches b
        ON b.supplier_id = s.supplier_id
    LEFT JOIN contacts c
        ON c.branch_id = b.branch_id
    ORDER BY s.supplier_id, b.branch_id, c.contact_id;
    """

    cursor.execute(query)
    rows = cursor.fetchall()

    with open("../output/suppliers_branches_contacts.csv", "w", newline="", encoding="utf-8") as f:
        writer = csv.writer(f)
        headers = [d[0] for d in cursor.description]
        writer.writerow(headers)
        writer.writerows(rows)


In [48]:
import json

with open("config.json", "r") as f:
    config = json.load(f)

paths = config["paths"]

conn = sqlite3.connect(paths["db_path"])
cursor = conn.cursor()

create_tables(conn, cursor, paths["schema_path"])

add_locations(cursor, paths["loc_sheet_path"])
add_specialisms_from_sheet(cursor, paths["sup_path"])
add_contact_info_from_sheet(cursor, paths["sup_path"])

dump_specialism(cursor)
dump_contacts(cursor)

conn.commit()
conn.close()