# Pipeline

Purpose:
  - 1. Connect to a local SQLite database (calibration_data.db).
  - 2. Create tables for equipment, test_points, failed_qc, and invoices.
  - 3. Insert sample rows.
  - 4. Query & display results.

## 1. Path

In [1]:
import os
import glob
import pandas as pd
import sqlite3
import difflib

In [2]:
ROOT_DATA_DIR = "../raw_data"
ROOT_PROCESSED_DIR = '../processed_data'
DB_DIR = "../db"
DB_PATH = os.path.join(DB_DIR, "calibration_data.db")
CENTRAL_DIR = os.path.join(ROOT_DATA_DIR, "centralized")

CITIES = ["Chicago", "Houston", "Los_Angeles", "New_York", "Phoenix"]
os.makedirs(DB_DIR, exist_ok=True)

In [3]:
print("Pipeline starting...")
print("Current directory:", os.getcwd())
print("DB path:", DB_PATH)

Pipeline starting...
Current directory: /home/ubuntu/PycharmProjects/aaron_accredited_labs_assessment/code
DB path: ../db/calibration_data.db


In [4]:
conn = sqlite3.connect(DB_PATH)
cur = conn.cursor()

In [5]:
cur.execute("DROP TABLE IF EXISTS invoices;")
cur.execute("DROP TABLE IF EXISTS failed_qc;")
cur.execute("DROP TABLE IF EXISTS test_points;")
cur.execute("DROP TABLE IF EXISTS equipment;")
cur.execute("DROP TABLE IF EXISTS workorders;")
conn.commit()

## 2.Create Tables

### 2.1 equipment: a simple reference table for all known instruments

In [6]:
cur.execute("""
CREATE TABLE workorders (
    workorder_id   TEXT,
    equipment_id   TEXT,
    equipment_type TEXT,
    branch         TEXT,
    customer_name  TEXT,
    accredited     TEXT,
    technician     TEXT,
    PRIMARY KEY (workorder_id, equipment_id)
);
""")

<sqlite3.Cursor at 0x7595fe9e49c0>

### 2.2 test_points: store measurement details or aggregated data

In [7]:
cur.execute("""
CREATE TABLE IF NOT EXISTS test_points (
    equipment_id     TEXT,
    completed_date   TEXT,
    testpoint_count  INTEGER,
    PRIMARY KEY (equipment_id)          
);
""")

<sqlite3.Cursor at 0x7595fe9e49c0>

### 2.3 failed_qc:  store the failed qc data

In [8]:
cur.execute("""
CREATE TABLE IF NOT EXISTS failed_qc (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    workorder_id TEXT,
    equipment_id TEXT,
    equipment_type TEXT,
    branch TEXT,
    customer_name TEXT,
    accredited TEXT,
    technician TEXT,
    failed_reasons TEXT
);
""")

<sqlite3.Cursor at 0x7595fe9e49c0>

### 2.4 invoices: store the invoices data

In [9]:
cur.execute("""
CREATE TABLE IF NOT EXISTS invoices (
    invoice_id INTEGER PRIMARY KEY AUTOINCREMENT,
    work_completion_date TEXT,
    work_order_number TEXT,
    branch TEXT,
    accreditation_type TEXT,
    equipment_id TEXT,
    equipment_type TEXT,
    price FLOAT,
    customer_id TEXT,
    customer_name TEXT,
    customer_email TEXT
);
""")

<sqlite3.Cursor at 0x7595fe9e49c0>

In [10]:
conn.commit()

print("Tables created or verified successfully.")

Tables created or verified successfully.


## 3. Load reference tables (customers, pricing, technician matrix)

### 3.1 Loading Data

This cell pulls three reference files from **CENTRAL_DIR** into DataFrames:

* `customer_master.csv`  → `df_customers`  
* `equipment_pricing.csv` → `df_pricing`  
* `technician_training_matrix.xlsx` → `df_tech`  

It also builds `certified_pairs`, a set of `(technician, equipment_type)` tuples for quick certification checks later, and prints record counts for a sanity check.

In [11]:
df_customers = pd.read_csv(os.path.join(CENTRAL_DIR, "customer_master.csv"))
df_pricing = pd.read_csv(os.path.join(CENTRAL_DIR, "equipment_pricing.csv"))
df_tech = pd.read_excel(os.path.join(CENTRAL_DIR, "technician_training_matrix.xlsx"))

certified_pairs = set(zip(df_tech["technician"], df_tech["equipment_type"]))

print("References loaded:")
print("  customers:", len(df_customers))
print("  pricing:", len(df_pricing))
print("  tech matrix:", len(df_tech))

References loaded:
  customers: 100
  pricing: 30
  tech matrix: 75


### 3.2 Customer Name Fuzzy Matching

Data entry typos are common when creating work orders. To avoid issues in later QC steps
(e.g., `"customer_not_found"`), we'll perform fuzzy matching on each row's `customer_name`.
We'll compare the raw name to our `df_customers["customer_name"]` list, pick the closest match
if it's above a similarity threshold, and replace the raw name with that corrected version.

Once done, we'll save the resulting work orders in a **processed** folder under `ROOT_PROCESSED_DIR`
so our subsequent steps can reference these cleaned files. This ensures that we no longer fail
due to simple spelling errors or minor variations in spacing and punctuation.

In [12]:
for city in CITIES:
    # Path to the original workorders
    workorders_csv = os.path.join(ROOT_DATA_DIR, city, "workorders", "workorders.csv")
    
    if not os.path.exists(workorders_csv):
        print(f"[{city}] Missing workorders.csv; skipping fuzzy matching.")
        continue

    df_work = pd.read_csv(workorders_csv)
    print(f"\n=== {city} | Original rows:", len(df_work), "===")

    valid_names = df_customers["customer_name"].unique().tolist()

    corrected_names = []

    for idx in df_work.index:
        raw_name = df_work.at[idx, "customer_name"]
        
        if raw_name in valid_names:
            corrected_names.append(raw_name)
            continue
        
        matches = difflib.get_close_matches(raw_name, valid_names, n=1, cutoff=0.75)
        if matches:
            corrected_names.append(matches[0])
        else:
            corrected_names.append(None)

    df_work["customer_name"] = corrected_names

    out_csv = os.path.join(ROOT_PROCESSED_DIR, city, "workorders", "workorders.csv")
    df_work.to_csv(out_csv, index=False)
    print(f"[{city}] => Saved corrected file: {out_csv}")



=== Chicago | Original rows: 409 ===
[Chicago] => Saved corrected file: ../processed_data/Chicago/workorders/workorders.csv

=== Houston | Original rows: 387 ===
[Houston] => Saved corrected file: ../processed_data/Houston/workorders/workorders.csv

=== Los_Angeles | Original rows: 397 ===
[Los_Angeles] => Saved corrected file: ../processed_data/Los_Angeles/workorders/workorders.csv

=== New_York | Original rows: 393 ===
[New_York] => Saved corrected file: ../processed_data/New_York/workorders/workorders.csv

=== Phoenix | Original rows: 398 ===
[Phoenix] => Saved corrected file: ../processed_data/Phoenix/workorders/workorders.csv


## 4. Insert or update work-orders for every city

For each city in **CITIES**, this cell:

1. Locates `workorders.csv` inside that city’s folder.  
2. Loads it to `df_work` (expected columns: `workorder_id`, `equipment_id`, `equipment_type`, `branch`, `customer_name`, `accredited`, `technician`).  
3. Executes an `INSERT OR REPLACE` into the **workorders** table so repeated runs update existing rows.  
4. Commits the batch and prints how many rows were written for the city.

In [13]:
for city in CITIES:
    workorders_csv = os.path.join(ROOT_PROCESSED_DIR, city, "workorders", "workorders.csv")
    if not os.path.exists(workorders_csv):
        print(f"[{city}] Missing workorders.csv; skipping insertion.")
        continue

    df_work = pd.read_csv(workorders_csv)

    inserted_count = 0
    for _, row in df_work.iterrows():
        cur.execute(
            """
            INSERT OR REPLACE INTO workorders
                (workorder_id, equipment_id, equipment_type, branch,
                 customer_name, accredited, technician)
            VALUES (?, ?, ?, ?, ?, ?, ?);
            """,
            (
                row["workorder_id"],
                row["equipment_id"],
                row["equipment_type"],
                row["branch"],
                row["customer_name"],
                row["accredited"],
                row["technician"],
            )
        )
        inserted_count += 1

    conn.commit()
    print(f"Inserted/updated {inserted_count} workorders for city={city}.")


Inserted/updated 409 workorders for city=Chicago.
Inserted/updated 387 workorders for city=Houston.
Inserted/updated 397 workorders for city=Los_Angeles.
Inserted/updated 393 workorders for city=New_York.
Inserted/updated 398 workorders for city=Phoenix.


## 5. Build `merged_test_points.csv` and load the **test_points** table

For each city in **CITIES**, this cell:

1. Looks inside `<ROOT_DATA_DIR>/<city>/test_points` for every file that ends with `_test_points.xlsx`.
2. Extracts  
   * **equipment_id** — taken from the file name.  
   * **completed_date** — parsed from a line like `Completed: YYYY-MM-DD` in the optional **metadata** sheet.  
   * **testpoint_count** — the number of columns whose header matches `TP\d+` in the **test_points** sheet.  
3. Collects those fields into a list of records, writes them to  
   `merged_test_points.csv` under `<ROOT_PROCESSED_DIR>/<city>/test_points/`.
4. Inserts each record into the **test_points** table (`equipment_id`, `completed_date`, `testpoint_count`).  
   Make sure the table defines `testpoint_count` as an `INTEGER`.

In [14]:
for city in CITIES:
    in_dir = os.path.join(ROOT_DATA_DIR, city, "test_points")
    if not os.path.exists(in_dir):
        print(f"[{city}] No test_points folder; skipping.")
        continue

    print(f"\n=== Processing test_points in {city} ===")
    records = []

    for fp in glob.glob(os.path.join(in_dir, "*_test_points.xlsx")):
        # 1️⃣ equipment_id from filename
        equipment_id = os.path.basename(fp).replace("_test_points.xlsx", "")

        # 2️⃣ open workbook
        xls = pd.ExcelFile(fp)

        # ---- metadata sheet: Completed date ----
        completed_date = None
        if "metadata" in xls.sheet_names:
            meta_df = xls.parse("metadata", header=None)
            if not meta_df.empty:
                col0 = meta_df.iloc[:, 0].dropna().astype(str).str.strip()
                matches = col0[col0.str.startswith("Completed:")]
                if not matches.empty:
                    completed_date = matches.iloc[0].split(":", 1)[1].strip()

        # ---- test_points sheet: count TP columns ----
        testpoint_count = 0
        if "test_points" in xls.sheet_names:
            tp_df = xls.parse("test_points", header=None)
            if not tp_df.empty:
                header_row = tp_df.iloc[0, :].dropna().astype(str)
                testpoint_count = header_row.str.match(r"^TP\d+$").sum()

        records.append((equipment_id, completed_date, int(testpoint_count)))

    # ---------- write merged_test_points.csv ----------
    if records:
        df = pd.DataFrame(records, columns=["equipment_id", "completed_date", "testpoint_count"])
        out_dir = os.path.join(ROOT_PROCESSED_DIR, city, "test_points")
        os.makedirs(out_dir, exist_ok=True)
        out_fp = os.path.join(out_dir, "merged_test_points.csv")
        df.to_csv(out_fp, index=False)
        print(f"  => Saved CSV: {out_fp}  (rows={len(df)})")

        # ---------- insert into DB ----------
        for eq_id, c_date, t_count in records:
            print("  Insert:", eq_id, c_date, t_count, type(t_count))
            cur.execute(
                """
                INSERT INTO test_points (equipment_id, completed_date, testpoint_count)
                VALUES (?, ?, ?);
                """,
                (eq_id, c_date, t_count),
            )

        conn.commit()
        print(f"  => Inserted {len(records)} rows into 'test_points' table.")



=== Processing test_points in Chicago ===
  => Saved CSV: ../processed_data/Chicago/test_points/merged_test_points.csv  (rows=409)
  Insert: Ch-EQ-00166-0 2024-11-23 15 <class 'int'>
  Insert: Ch-EQ-00077-1 2025-03-28 15 <class 'int'>
  Insert: Ch-EQ-00189-2 2024-06-14 15 <class 'int'>
  Insert: Ch-EQ-00197-2 2024-12-10 10 <class 'int'>
  Insert: Ch-EQ-00053-0 2025-03-09 10 <class 'int'>
  Insert: Ch-EQ-00112-1 2024-10-02 10 <class 'int'>
  Insert: Ch-EQ-00169-0 2024-10-19 10 <class 'int'>
  Insert: Ch-EQ-00005-1 2024-11-23 15 <class 'int'>
  Insert: Ch-EQ-00056-1 2024-10-20 15 <class 'int'>
  Insert: Ch-EQ-00144-1 2024-09-02 15 <class 'int'>
  Insert: Ch-EQ-00142-1 2025-03-03 10 <class 'int'>
  Insert: Ch-EQ-00006-0 2025-02-06 15 <class 'int'>
  Insert: Ch-EQ-00176-0 2024-12-31 10 <class 'int'>
  Insert: Ch-EQ-00072-0 2024-08-25 8 <class 'int'>
  Insert: Ch-EQ-00003-1 2024-06-03 15 <class 'int'>
  Insert: Ch-EQ-00190-0 2024-10-26 10 <class 'int'>
  Insert: Ch-EQ-00114-1 2025-03-20 10

## 6. Quality-control

### 6.1 Pull work-orders & test-points from the DB, then merge

1. Reads the entire **workorders** and **test_points** tables from the SQLite database into `df_work_db` and `df_tp_db`.
2. Left-joins them on `equipment_id` so every work-order keeps its row even if no matching test-points exist.
3. Starts two empty lists—`invoice_rows` and `failed_rows`—that later QC logic will append to.

In [15]:
df_work_db = pd.read_sql_query("SELECT * FROM workorders", conn)
df_tp_db   = pd.read_sql_query("SELECT * FROM test_points", conn)

merged = pd.merge(df_work_db, df_tp_db, on="equipment_id", how="left")

invoice_rows = []
failed_rows  = []

### 6.2 Build **invoice_rows** / **failed_rows**

1. **Customer exists** in `df_customers`  
2. **Technician certified** for the given `equipment_type` (`certified_pairs`)  
3. **Completed date present**  
4. **Pricing row found** in `df_pricing` (branch × equipment_type)  
5. **Exact test-point count** — 15 if accredited, otherwise 10  

In [16]:
for _, row in merged.iterrows():
    fail_reasons = []

    workorder_id   = row["workorder_id"]
    equipment_id   = row["equipment_id"]
    equipment_type = row["equipment_type"]
    branch         = row["branch"]
    customer_name  = row["customer_name"]
    accredited     = str(row["accredited"]).lower()          # "yes"/"no"
    technician     = row["technician"]
    completed_date = row["completed_date"]
    testpoints     = row["testpoint_count"]                  # int

    # ---------- 1. Customer check ----------
    if not (df_customers["customer_name"] == customer_name).any():
        fail_reasons.append("customer_not_found")

    # ---------- 2. Technician certification ----------
    if (technician, equipment_type) not in certified_pairs:
        fail_reasons.append("tech_not_certified")

    # ---------- 3. Completed date ----------
    if pd.isna(completed_date) or str(completed_date).strip() == "":
        fail_reasons.append("missing_completed_date")

    # ---------- 4. Pricing ----------
    price_subset = df_pricing[
        (df_pricing["branch"] == branch) &
        (df_pricing["equipment_type"] == equipment_type)
    ]
    price = None
    if price_subset.empty:
        fail_reasons.append("pricing_not_found")
    else:
        pr = price_subset.iloc[0]
        price = pr["accredited_price"] if accredited == "yes" else pr["standard_price"]

    # ---------- 5. Required test-points ----------
    required_points = 15 if accredited == "yes" else 10
    if pd.isna(testpoints) or testpoints != required_points:
        fail_reasons.append("incorrect_testpoint_count")

    # ---------- Pass / Fail ----------
    if fail_reasons:
        failed_rows.append({
            "workorder_id":   workorder_id,
            "equipment_id":   equipment_id,
            "equipment_type": equipment_type,
            "branch":         branch,
            "customer_name":  customer_name,
            "accredited":     accredited,
            "technician":     technician,
            "failed_reasons": ";".join(fail_reasons),
        })
    else:
        cust_info = df_customers[df_customers["customer_name"] == customer_name]
        if len(cust_info) == 1:
            cid, cemail = cust_info.iloc[0][["customer_id", "email"]]
        else:
            cid, cemail = None, None

        invoice_rows.append({
            "work_completion_date": completed_date,
            "work_order_number":    workorder_id,
            "branch":               branch,
            "accreditation_type":   accredited,
            "equipment_id":         equipment_id,
            "equipment_type":       equipment_type,
            "price":                price,
            "customer_id":          cid,
            "customer_name":        customer_name,
            "customer_email":       cemail,
        })

print(f"QC complete. {len(invoice_rows)} invoices, {len(failed_rows)} fails.")

QC complete. 1532 invoices, 452 fails.


## 7. Persist invoices & QC failures, verify, and close connection

1. **Inserts** each record in `invoice_rows` into the **invoices** table and each in `failed_rows` into **failed_qc**.  

In [17]:
for inv in invoice_rows:
    sql = """
    INSERT INTO invoices
    (work_completion_date, work_order_number, branch, accreditation_type,
     equipment_id, equipment_type, price, customer_id, customer_name, customer_email)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
    """
    cur.execute(sql, (
        inv["work_completion_date"],
        inv["work_order_number"],
        inv["branch"],
        inv["accreditation_type"],
        inv["equipment_id"],
        inv["equipment_type"],
        inv["price"],
        inv["customer_id"],
        inv["customer_name"],
        inv["customer_email"]
    ))

2. **Commits** the transaction.  

In [18]:
for fail in failed_rows:
    sql = """
    INSERT INTO failed_qc
    (workorder_id, equipment_id, equipment_type, branch, customer_name,
     accredited, technician, failed_reasons)
    VALUES (?, ?, ?, ?, ?, ?, ?, ?);
    """
    cur.execute(sql, (
        fail["workorder_id"],
        fail["equipment_id"],
        fail["equipment_type"],
        fail["branch"],
        fail["customer_name"],
        fail["accredited"],
        fail["technician"],
        fail["failed_reasons"]
    ))

conn.commit()

3. **Reads back** both tables to `df_invoices_db` and `df_failed_db` for a quick visual sanity-check (`display(...)`).  

In [19]:
df_invoices_db = pd.read_sql_query("SELECT * FROM invoices", conn)
df_failed_db = pd.read_sql_query("SELECT * FROM failed_qc", conn)

4. **Closes** the SQLite connection and prints a completion message that the database file now lives in `../db/calibration_data.db`.

In [20]:
print("INVOICES:")
display(df_invoices_db)

print("FAILED QC:")
display(df_failed_db)

conn.close()
print("Pipeline finished. All data is in ../db/calibration_data.db.")

INVOICES:


Unnamed: 0,invoice_id,work_completion_date,work_order_number,branch,accreditation_type,equipment_id,equipment_type,price,customer_id,customer_name,customer_email
0,1,2024-09-25,Ch-WO-00001,Chicago,no,Ch-EQ-00001-1,torque,b'\xad\x00\x00\x00\x00\x00\x00\x00',CUST0043,Rodriguez-Summers,paul64@thompson-gonzalez.org
1,2,2024-06-18,Ch-WO-00001,Chicago,no,Ch-EQ-00001-2,torque,b'\xad\x00\x00\x00\x00\x00\x00\x00',CUST0043,Rodriguez-Summers,paul64@thompson-gonzalez.org
2,3,2025-01-06,Ch-WO-00002,Chicago,yes,Ch-EQ-00002-1,torque,224.9,CUST0037,Kelley-Ramirez,deborahwright@smith.net
3,4,2024-11-05,Ch-WO-00002,Chicago,yes,Ch-EQ-00002-2,flow_meter,260.0,CUST0037,Kelley-Ramirez,deborahwright@smith.net
4,5,2024-05-21,Ch-WO-00003,Chicago,no,Ch-EQ-00003-0,torque,b'\xad\x00\x00\x00\x00\x00\x00\x00',CUST0067,"Wade, Cruz and White",nathaniel92@miller.com
...,...,...,...,...,...,...,...,...,...,...,...
1527,1528,2024-04-28,Ph-WO-00196,Phoenix,yes,Ph-EQ-00196-1,scale,188.5,CUST0052,Hammond-Guerrero,michael60@wright.com
1528,1529,2024-10-06,Ph-WO-00197,Phoenix,yes,Ph-EQ-00197-0,pressure_gauge,256.1,CUST0068,Riddle-Faulkner,larry94@stevens.net
1529,1530,2024-07-22,Ph-WO-00198,Phoenix,no,Ph-EQ-00198-0,pressure_gauge,b'\xc5\x00\x00\x00\x00\x00\x00\x00',CUST0020,"Copeland, Torres and Morales",suzanne41@green.com
1530,1531,2024-06-22,Ph-WO-00199,Phoenix,no,Ph-EQ-00199-0,scale,b'\x91\x00\x00\x00\x00\x00\x00\x00',CUST0028,Neal-Kelley,dalvarado@evans.com


FAILED QC:


Unnamed: 0,id,workorder_id,equipment_id,equipment_type,branch,customer_name,accredited,technician,failed_reasons
0,1,Ch-WO-00000,Ch-EQ-00000-0,caliper,Chicago,Walker-Morales,no,Amanda Henry,tech_not_certified
1,2,Ch-WO-00001,Ch-EQ-00001-0,caliper,Chicago,Rodriguez-Summers,no,Amanda Henry,tech_not_certified
2,3,Ch-WO-00002,Ch-EQ-00002-0,pressure_gauge,Chicago,Kelley-Ramirez,yes,Traci Larson,tech_not_certified
3,4,Ch-WO-00003,Ch-EQ-00003-1,pressure_gauge,Chicago,"Wade, Cruz and White",yes,Laura Owens,tech_not_certified
4,5,Ch-WO-00003,Ch-EQ-00003-2,pressure_gauge,Chicago,"Wade, Cruz and White",yes,Laura Owens,tech_not_certified
...,...,...,...,...,...,...,...,...,...
447,448,Ph-WO-00183,Ph-EQ-00183-1,scale,Phoenix,Kelley-Young,no,Carol Rasmussen,incorrect_testpoint_count
448,449,Ph-WO-00186,Ph-EQ-00186-1,thermometer,Phoenix,Mccann-Hoffman,no,Lori Mcgee,tech_not_certified
449,450,Ph-WO-00186,Ph-EQ-00186-2,flow_meter,Phoenix,Mccann-Hoffman,yes,Stephanie Cooper,tech_not_certified
450,451,Ph-WO-00193,Ph-EQ-00193-0,thermometer,Phoenix,Wells Inc,yes,Lori Mcgee,tech_not_certified


Pipeline finished. All data is in ../db/calibration_data.db.
