In [0]:
#%run ./init

In [0]:
import re
import cv2
import pytesseract
import numpy as np
import pandas as pd
import logging
import json, os
import pytesseract
import pandas as pd
import logging
import json
from PIL import Image


In [0]:
# -----------------------------------------------------------------------------
# Global Logger & Output Directories
# -----------------------------------------------------------------------------
logger = logging.getLogger("OCRProduction")
logger.setLevel(logging.INFO)
if not logger.handlers:
    ch = logging.StreamHandler()
    ch.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
    logger.addHandler(ch)

OUTPUT_FOLDER = "/dbfs/mnt/mini-proj-dd/final_results"
CSV_FOLDER = os.path.join(OUTPUT_FOLDER, "csv")
JSON_FOLDER = os.path.join(OUTPUT_FOLDER, "json")
os.makedirs(CSV_FOLDER, exist_ok=True)
os.makedirs(JSON_FOLDER, exist_ok=True)

In [0]:
import os
import re
import cv2
import pytesseract
import pandas as pd
import logging
import json
from PIL import Image

# -----------------------------------------------------------------------------
# Global Logger & Output Directories
# -----------------------------------------------------------------------------
logger = logging.getLogger("OCRProduction")
logger.setLevel(logging.INFO)
if not logger.handlers:
    ch = logging.StreamHandler()
    ch.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
    logger.addHandler(ch)

OUTPUT_FOLDER = "/dbfs/mnt/mini-proj-dd/final_results"
CSV_FOLDER = os.path.join(OUTPUT_FOLDER, "csv")
JSON_FOLDER = os.path.join(OUTPUT_FOLDER, "json")
os.makedirs(CSV_FOLDER, exist_ok=True)
os.makedirs(JSON_FOLDER, exist_ok=True)

# -----------------------------------------------------------------------------
# COMMON UTILITY FUNCTIONS
# -----------------------------------------------------------------------------
def safe_read_image(img_path):
    local_path = img_path if not img_path.startswith("dbfs:") else img_path.replace("dbfs:", "/dbfs")
    if not os.path.exists(local_path):
        raise FileNotFoundError(f"File not found: {local_path}")
    img = cv2.imread(local_path)
    if img is None:
        raise ValueError(f"Failed to load image: {local_path}")
    logger.info(f"Image loaded from {local_path} with shape {img.shape}")
    return img

def preprocess_image(img, debug=False):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                   cv2.THRESH_BINARY, 15, 9)
    if debug:
        logger.info("Preprocessing completed (grayscale and threshold applied).")
    return thresh

def detect_text_regions(thresh_img, debug=False):
    contours, _ = cv2.findContours(thresh_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    rois = []
    for cnt in contours:
        x, y, w, h = cv2.boundingRect(cnt)
        if w > 30 and h > 15:
            rois.append((x, y, w, h))
    rois.sort(key=lambda b: (b[1], b[0]))
    if debug:
        logger.info(f"Detected {len(rois)} text regions.")
    return rois

def perform_ocr_on_rois(img, rois, debug=False):
    results = []
    for (x, y, w, h) in rois:
        roi = img[y:y+h, x:x+w]
        text = pytesseract.image_to_string(roi, config="--psm 6").strip() or "[BLANK]"
        results.append((x, y, w, h, text))
        if debug:
            logger.info(f"OCR Box ({x},{y},{w},{h}): {text}")
    return results

def perform_ocr(img):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    return pytesseract.image_to_string(gray, config="--psm 6")

def extract_key_value_from_text(text, expected_keys):
    combined = " ".join(line.strip() for line in text.splitlines() if line.strip())
    combined = re.sub(r'\s+', ' ', combined)
    result = {}
    for i, key in enumerate(expected_keys):
        if i < len(expected_keys) - 1:
            next_key = expected_keys[i+1]
            pattern = re.escape(key) + r'\s*:\s*(.*?)(?=\s*' + re.escape(next_key) + r'\s*:|$)'
        else:
            pattern = re.escape(key) + r'\s*:\s*(.*)'
        match = re.search(pattern, combined, re.IGNORECASE)
        result[key] = match.group(1).strip() if match and match.group(1).strip() else None
    return result

# -----------------------------------------------------------------------------
# PARSING HELPERS
# -----------------------------------------------------------------------------
def group_ocr_results(roi_texts, row_tolerance=10):
    rows = []
    current_row = []
    prev_y = None
    for (x, y, w, h, text) in roi_texts:
        if prev_y is None or abs(y - prev_y) <= row_tolerance:
            current_row.append((x, y, w, h, text))
        else:
            rows.append(current_row)
            current_row = [(x, y, w, h, text)]
        prev_y = y
    if current_row:
        rows.append(current_row)
    row_strings = []
    for row in rows:
        row.sort(key=lambda c: c[0])
        line = " ".join(cell[4] for cell in row)
        row_strings.append(line)
    return row_strings

# -----------------------------------------------------------------------------
# PIPELINE FUNCTIONS
# -----------------------------------------------------------------------------
def pipeline_cost_data(debug=False):
    section_path = "dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_13.png"
    img = safe_read_image(section_path)
    ocr_text = perform_ocr(img)
    logger.info("Cost OCR extraction complete.")
    expected_keys = [
        "Drilling AFE Amount", "Daily Drilling Cost", "Cumulative Drilling Cost",
        "Cumulative Well Cost", "Daily Mud Cost", "Cumulative Mud Cost"
    ]
    extracted = extract_key_value_from_text(ocr_text, expected_keys)
    df = pd.DataFrame(list(extracted.items()), columns=["Key", "Value"])
    logger.info(f"COST DataFrame shape: {df.shape}")
    return {"COST DATA": extracted}, df

def process_well_job_info(section_path, debug=False):
    img = safe_read_image(section_path)
    ocr_text = perform_ocr(img)
    logger.info("Well/Job OCR extraction complete.")
    expected_keys = [
        "Well Name", "Job Name", "Supervisor(s)", "Field", "Sec/Twn/Rng", "Phone",
        "AFE #", "API #", "Email", "Contractor", "Elevation", "RKB",
        "Spud Date", "Days from Spud", "Days on Loc", "MD/TVD", "24 Hr Footage",
        "Present Operations", "Activity Planned"
    ]
    combined = " ".join(line.strip() for line in ocr_text.splitlines() if line.strip())
    combined = re.sub(r'\s+', ' ', combined)
    result = {}
    for i, key in enumerate(expected_keys):
        if i < len(expected_keys) - 1:
            next_key = expected_keys[i+1]
            pattern = re.escape(key) + r'\s*:\s*(.*?)(?=\s*' + re.escape(next_key) + r'\s*:|$)'
        else:
            pattern = re.escape(key) + r'\s*:\s*(.*)'
        match = re.search(pattern, combined, re.IGNORECASE)
        result[key] = match.group(1).strip() if match else ""
    df = pd.DataFrame(list(result.items()), columns=["Key", "Value"])
    logger.info(f"WELL/JOB DataFrame shape: {df.shape}")
    return {"WELL/JOB INFORMATION": result}, df

def process_obs_int(section_path, debug=False):
    img = safe_read_image(section_path)
    thresh = preprocess_image(img, debug=debug)
    rois = detect_text_regions(thresh, debug=debug)
    roi_texts = perform_ocr_on_rois(img, rois, debug=debug)
    header_str = "daily numbers: observation & intervention"
    all_texts = [t[4] for t in roi_texts]
    types_list, numbers_list = [], []
    for txt in all_texts:
        clean = txt.strip()
        if clean.lower() in [header_str, "number", "[blank]"]:
            continue
        try:
            float(clean)
            numbers_list.append("" if clean.lower() == "[blank]" else clean)
            continue
        except ValueError:
            pass
        if "\n" in clean:
            for line in clean.splitlines():
                line = line.strip()
                if line and line.lower() != "[blank]":
                    types_list.append(line)
        else:
            types_list.append(clean)
    expected_count = 5
    while len(numbers_list) < expected_count:
        numbers_list.append("")
    types_list = types_list[:expected_count]
    numbers_list = numbers_list[:expected_count]
    structured = [{"Type": types_list[i], "Number": numbers_list[i]} for i in range(expected_count)]
    df = pd.DataFrame(structured)
    logger.info(f"OBS_INT DataFrame shape: {df.shape}")
    return {"DAILY NUMBERS: OBSERVATION & INTERVENTION": structured}, df

def process_bop(section_path, debug=False):
    img = safe_read_image(section_path)
    ocr_text = perform_ocr(img)
    logger.info("BOP OCR extraction complete.")
    patterns = {
        "Last BOP Test Date": r"Last BOP Test Date\s*:\s*(\d{1,2}/\d{1,2}/\d{2,4})",
        "Last BOP Drill": r"Last BOP Drill\s*:\s*(\d{1,2}/\d{1,2}/\d{2,4})",
        "Next BOP Test": r"Next BOP Test\s*:\s*(\d{1,2}/\d{1,2}/\d{2,4})"
    }
    result = {}
    for key, regex in patterns.items():
        match = re.search(regex, ocr_text, re.IGNORECASE)
        result[key] = match.group(1) if match else ""
    df = pd.DataFrame(list(result.items()), columns=["Key", "Value"])
    logger.info(f"BOP DataFrame shape: {df.shape}")
    return {"BOP": result}, df

def build_dir_info_dict_from_rois(roi_texts, debug=False):
    all_texts = [t[4] for t in roi_texts]
    daily_cum_idx = next((i for i, txt in enumerate(all_texts)
                           if "daily" in txt.lower() and "cumulative" in txt.lower()), None)
    if daily_cum_idx is None:
        logger.warning("Could not find 'Daily Cumulative' bounding box.")
        return {}, pd.DataFrame()
    cat_idx = daily_cum_idx + 1
    if cat_idx >= len(all_texts):
        logger.warning("No bounding box after 'Daily Cumulative'.")
        return {}, pd.DataFrame()
    categories_box = all_texts[cat_idx]
    lines = [ln.strip() for ln in categories_box.split("\n") if ln.strip()]
    if len(lines) < 5:
        logger.warning(f"Expected 5 category lines, got {len(lines)}: {lines}")
    def safe_get(idx):
        return all_texts[idx] if 0 <= idx < len(all_texts) else ""
    structured = []
    for i in range(4):
        cat_name = lines[i] if i < len(lines) else f"Unknown Category {i+1}"
        daily_box = safe_get(cat_idx + 1 + (i * 2))
        cum_box = safe_get(cat_idx + 2 + (i * 2))
        structured.append({
            "Category": cat_name,
            "Daily": "" if daily_box == "[BLANK]" else daily_box,
            "Cumulative": "" if cum_box == "[BLANK]" else cum_box
        })
    last_box = safe_get(cat_idx + 9)
    last_cat = lines[4] if len(lines) >= 5 else "Rotating Footage"
    remainder = last_box.replace(last_cat, "").strip()
    tokens = remainder.split()
    daily_val = tokens[0] if len(tokens) >= 2 else ""
    cum_val = tokens[1] if len(tokens) >= 2 else ""
    structured.append({
        "Category": last_cat,
        "Daily": "" if daily_val == "[BLANK]" else daily_val,
        "Cumulative": "" if cum_val == "[BLANK]" else cum_val
    })
    df = pd.DataFrame(structured)
    logger.info(f"DIR INFO DataFrame shape: {df.shape}")
    return {"DIR INFO": structured}, df

def process_dir_info(section_path, debug=False):
    img = safe_read_image(section_path)
    thresh = preprocess_image(img, debug=debug)
    rois = detect_text_regions(thresh, debug=debug)
    roi_texts = perform_ocr_on_rois(img, rois, debug=debug)
    return build_dir_info_dict_from_rois(roi_texts, debug=debug)

def build_survey_dict_from_rois(roi_texts, expected_headers):
    row_strings = group_ocr_results(roi_texts)
    logger.info(f"SURVEY - Grouped Rows: {row_strings}")
    all_lines = []
    for line in row_strings:
        for subline in line.split("\n"):
            subline = subline.strip()
            if subline:
                all_lines.append(subline)
    logger.info(f"SURVEY - All extracted lines: {all_lines}")
    data_lines = []
    for line in all_lines:
        tokens = re.split(r'\s{2,}', line)
        if len(tokens) == 1:
            tokens = line.split()
        lower_tokens = [t.lower() for t in tokens]
        if "md" in lower_tokens and "inclination" in lower_tokens:
            logger.info(f"SURVEY - Skipping header line: {tokens}")
            continue
        if len(tokens) < len(expected_headers):
            logger.warning(f"SURVEY - Line has fewer tokens than expected: {tokens}")
            continue
        tokens = tokens[:len(expected_headers)]
        data_lines.append(tokens)
    logger.info(f"SURVEY - Data lines to parse: {data_lines}")
    survey_list = [{expected_headers[i]: tokens[i] for i in range(len(expected_headers))}
                   for tokens in data_lines]
    return survey_list

def sort_survey_data(survey_list):
    def md_value(row):
        try:
            return float(row["MD"].replace(",", ""))
        except Exception:
            return 0
    return sorted(survey_list, key=md_value, reverse=True)

def pipeline_survey_data(debug=False):
    section_path = "dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_13.png"
    img = safe_read_image(section_path)
    thresh = preprocess_image(img, debug=debug)
    rois = detect_text_regions(thresh, debug=debug)
    roi_texts = perform_ocr_on_rois(img, rois, debug=debug)
    expected_headers = ["MD", "Inclination", "Azimuth", "DLS", "TVD"]
    survey_list = build_survey_dict_from_rois(roi_texts, expected_headers)
    survey_list = sort_survey_data(survey_list)
    df = pd.DataFrame(survey_list)
    logger.info(f"SURVEY DataFrame shape: {df.shape}")
    return {"SURVEY": survey_list}, df

def build_casing_dict_from_rois(roi_texts, expected_headers, debug=False):
    row_strings = group_ocr_results(roi_texts)
    all_lines = []
    for line in row_strings:
        for sub in line.split("\n"):
            sub = sub.strip()
            if sub:
                all_lines.append(sub)
    data_lines = []
    for line in all_lines:
        tokens = re.split(r'\s{2,}', line)
        if len(tokens) == 1:
            tokens = line.split()
        lower_tokens = [t.lower() for t in tokens]
        if "type" in lower_tokens and "size" in lower_tokens:
            logger.info(f"CASING - Skipping header line: {tokens}")
            continue
        if len(tokens) < len(expected_headers):
            logger.warning(f"CASING - Line has fewer tokens than expected: {tokens}")
            continue
        tokens = tokens[:len(expected_headers)]
        data_lines.append(tokens)
    casing_list = [{expected_headers[i]: tokens[i] for i in range(len(expected_headers))}
                   for tokens in data_lines]
    return casing_list

def pipeline_casing_data(debug=False):
    section_path = "dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_7.png"
    img = safe_read_image(section_path)
    thresh = preprocess_image(img, debug=debug)
    rois = detect_text_regions(thresh, debug=debug)
    roi_texts = perform_ocr_on_rois(img, rois, debug=debug)
    expected_headers = ["Type", "Size", "Weight", "Grade", "Connection", "Top MD", "Bottom MD", "TOC"]
    casing_list = build_casing_dict_from_rois(roi_texts, expected_headers, debug=debug)
    df = pd.DataFrame(casing_list)
    logger.info(f"CASING DataFrame shape: {df.shape}")
    return {"CASING": casing_list}, df

def build_consumables_dict_from_rois(roi_texts, debug=False):
    row_strings = group_ocr_results(roi_texts)
    data_rows = []
    for line in row_strings:
        lower_line = line.lower()
        if ("consumable" in lower_line and "received" in lower_line) or "nun" in lower_line:
            continue
        if len(line.split()) < 5:
            continue
        data_rows.append(line)
    consumables_list = []
    for line in data_rows:
        tokens = re.split(r'\s+', line)
        if len(tokens) > 5:
            first = " ".join(tokens[:-4])
            tokens = [first] + tokens[-4:]
        if len(tokens) != 5:
            logger.warning(f"CONSUMABLES - Skipping row (unexpected token count): {tokens}")
            continue
        consumables_list.append({
            "Consumable": tokens[0],
            "Daily Received (gal)": tokens[1],
            "Daily Used (gal)": tokens[2],
            "Cumulative Used (gal)": tokens[3],
            "Daily on Hand (gal)": tokens[4]
        })
    return consumables_list

def pipeline_consumables_data(debug=False):
    section_path = "dbfs:/mnt/mini-proj-dd/cropped_sections/page_2_section_2.png"
    img = safe_read_image(section_path)
    thresh = preprocess_image(img, debug=debug)
    rois = detect_text_regions(thresh, debug=debug)
    roi_texts = perform_ocr_on_rois(img, rois, debug=debug)
    consumables_list = build_consumables_dict_from_rois(roi_texts, debug=debug)
    df = pd.DataFrame(consumables_list)
    logger.info(f"CONSUMABLES DataFrame shape: {df.shape}")
    return {"CONSUMABLES": consumables_list}, df

def extract_bha_data(image_path, debug=False):
    image = Image.open(image_path)
    ocr_text = pytesseract.image_to_string(image)
    patterns = {
        "Drill Pipe Detail": r"Drill Pipe Detail:\s*([^\n]+)",
        "Size": r"Size:\s*([\d.]+)\b",
        "Wt./Ft": r"Wt\./Ft:\s*([\d.]+)\b",
        "Connection": r"Connection:\s*([\w\d-]+)\b",
        "ID": r"ID:\s*([\d.]+)\b",
        "Drill Bit": r"Drill Bit:\s*([^\n;]+)",
        "Motor": r"Motor:\s*([^\n;]+)",
        "MWD Tool": r"MWD Tool:\s*([^\n;]+)",
        "Monel Collar": r"Monel Collar:\s*([^\n;]+)",
        "X-Over": r"X-Over:\s*([^\n;]+)",
        "Sub": r"Sub:\s*([^\n;]+)",
        "HWDP": r"HWDP:\s*([^\n;]+)",
        "Drill Pipe": r"Drill Pipe:\s*([\d.]+(?:\" DP)?)",
        "Reamer": r"Reamer:\s*([^\n;]+)",
        "Shock Sub": r"Shock Sub:\s*([^\n;]+)",
        "Total Length": r"Total Length:\s*(\d+)\b"
    }
    bha_data = {}
    for key, pat in patterns.items():
        match = re.search(pat, ocr_text)
        if match:
            bha_data[key] = match.group(1).strip()
    if "Drill Pipe Detail" in bha_data:
        detail = bha_data["Drill Pipe Detail"]
        for remove_key in ["Size", "Wt./Ft", "Connection", "ID"]:
            if remove_key in bha_data:
                detail = re.sub(rf"{remove_key}:\s*{re.escape(bha_data[remove_key])}", "", detail).strip(",; ")
        bha_data["Drill Pipe Detail"] = detail
    structured_data = {
        "BHA": {
            "Drill Pipe Detail": bha_data.get("Drill Pipe Detail", ""),
            "Size": bha_data.get("Size", ""),
            "Wt./Ft": bha_data.get("Wt./Ft", ""),
            "Connection": bha_data.get("Connection", ""),
            "ID": bha_data.get("ID", ""),
            "BHA #4": {
                "Drill Bit": bha_data.get("Drill Bit", ""),
                "Motor": bha_data.get("Motor", ""),
                "MWD Tool": bha_data.get("MWD Tool", ""),
                "Monel Collar": bha_data.get("Monel Collar", ""),
                "X-Over": bha_data.get("X-Over", ""),
                "Sub": bha_data.get("Sub", ""),
                "HWDP": bha_data.get("HWDP", ""),
                "Drill Pipe": bha_data.get("Drill Pipe", ""),
                "Reamer": bha_data.get("Reamer", ""),
                "Shock Sub": bha_data.get("Shock Sub", "")
            },
            "Total Length": bha_data.get("Total Length", "")
        }
    }
    if debug:
        logger.info("Extracted BHA data:")
        logger.info(json.dumps(structured_data, indent=4))
    return structured_data

def pipeline_bha_data(debug=False):
    image_path = "dbfs:/mnt/mini-proj-dd/cropped_sections/page_2_section_?BHA.png"  # Adjust as needed
    bha_json = extract_bha_data(image_path, debug=debug)
    img = safe_read_image(image_path)
    ocr_text = perform_ocr(img)
    pump_data = parse_pumps_table(ocr_text)
    circ_data = parse_drilling_circ_rates(ocr_text)
    pumps_df = pd.DataFrame(pump_data)
    circ_df = pd.DataFrame(circ_data)
    bha_df = pd.DataFrame([bha_json])
    logger.info(f"BHA DataFrame shape: {bha_df.shape}")
    combined = {"BHA": bha_json, "Pumps": pump_data, "DrillingCircRates": circ_data}
    return combined, pumps_df, circ_df, bha_df

# -----------------------------------------------------------------------------
# Additional Parsing Functions for Pumps and Drilling/Circ Rates
# -----------------------------------------------------------------------------
def parse_pumps_table(ocr_text):
    pump_pattern = re.compile(
        r"^(\d+)?\s*(BOMCO)\s+(TRIPLEX)\s+(\d+)?\s*(\d+)\s+([\d.]+)\s+([\d.]+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s*$",
        re.IGNORECASE)
    pumps = []
    for line in ocr_text.splitlines():
        line = line.strip()
        match = pump_pattern.match(line)
        if match:
            (number, model, pump_type, hhp, efficiency, stroke, liner,
             p_rating, p_limit, spm_rating, spm_limit) = match.groups()
            pumps.append({
                "Number": number if number else "",
                "Model": model,
                "Type": pump_type,
                "HHP": hhp if hhp else "",
                "Efficiency": efficiency,
                "Stroke(in)": stroke,
                "Liner(in)": liner,
                "P-Rating(psi)": p_rating,
                "P-Limit(psi)": p_limit,
                "SPM Rating": spm_rating,
                "SPM Limit": spm_limit
            })
    return pumps

def parse_drilling_circ_rates(ocr_text):
    circ_pattern = re.compile(
        r"Drilling/Circ\s+Rate\s+(\d+)\s+(\d+)\s+PSI\s*@\s*(\d+)\s*SPM\s*([\d.]+)\s+Gal/Stoke\s+([\d.]+)\s+GPM\s+([\d.]+)\s+BPM\s+([\d.]+)\s+DC\s+([\d.]+)\s+DP",
        re.IGNORECASE)
    circ_rates = []
    for line in ocr_text.splitlines():
        line = line.strip()
        match = circ_pattern.search(line)
        if match:
            rate_id, pressure, spm, gal_stroke, gpm, bpm, dc, dp = match.groups()
            circ_rates.append({
                "RateID": rate_id,
                "Pressure(PSI)": pressure,
                "SPM": spm,
                "Gal/Stoke": gal_stroke,
                "GPM": gpm,
                "BPM": bpm,
                "DC": dc,
                "DP": dp
            })
    return circ_rates


def pipeline_mud_data(debug=False):
    section_path = "dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_3.png"
    img = safe_read_image(section_path)
    thresh = preprocess_image(img, debug=debug)
    rois = detect_text_regions(thresh, debug=debug)
    roi_texts = perform_ocr_on_rois(img, rois, debug=debug)
    expected_headers = [
        "Type", "Weight In", "Weight Out", "pH", "CAKE",
        "GELS (10s/10m/30m)", "Oil/Water", "FV", "ES", "PV",
        "YP", "CL", "Ca", "LGS", "WL", "HTHP Loss", "3 RPM",
        "6 RPM", "Mud Pits and Hole Volume", "24 Hr Loss",
        "Total Loss", "Comments"
    ]
    mud_dict = build_mud_dict_from_rois(roi_texts, expected_headers)
    return {"MUD": mud_dict}, pd.DataFrame(list(mud_dict.items()), columns=["Key", "Value"])

# mud ---------------------------------------------------------------------
# parse_value_row_tokens
# ---------------------------------------------------------------------
def parse_value_row_tokens(expected_headers, tokens):
    """
    Map a flat list of tokens to the expected headers.
    For "GELS (10s/10m/30m)", consume 3 tokens and create a sub-dictionary.
    Expected token count = (number of headers - 1) + 3.
    """
    expected_token_count = (len(expected_headers) - 1) + 3
    logger.info(f"Expected token count: {expected_token_count}, tokens extracted: {tokens}")
    
    # Pad or trim tokens as needed.
    if len(tokens) < expected_token_count:
        tokens += ["[BLANK]"] * (expected_token_count - len(tokens))
        logger.warning("Not enough tokens. Padding with [BLANK].")
    elif len(tokens) > expected_token_count:
        tokens = tokens[:expected_token_count]
        logger.warning("Too many tokens. Trimming the extra tokens.")
    
    result = {}
    idx = 0
    for header in expected_headers:
        if header == "GELS (10s/10m/30m)":
            gels_tokens = tokens[idx:idx+3]
            result[header] = {
                "10s": gels_tokens[0],
                "10m": gels_tokens[1],
                "30m": gels_tokens[2]
            }
            idx += 3
        else:
            result[header] = tokens[idx]
            idx += 1
    logger.info(f"Mapped dictionary: {result}")
    return result

# ---------------------------------------------------------------------
# build_mud_dict_from_rois
# ---------------------------------------------------------------------
def build_mud_dict_from_rois(roi_texts, expected_headers):
    """
    Group OCR results into rows based on the y coordinate.
    Identify header rows and corresponding data rows.
    
    In our case, we expect:
      - A header row (with labels) followed by a data row,
      - Then a second header row (for the remaining fields) followed by a second data row.
    
    We then combine the two data rows' tokens and map them to expected_headers.
    """
    row_tolerance = 10
    rows = []
    current_row = []
    prev_y = None

    # Group by row based on y coordinate.
    for (x, y, w, h, text) in roi_texts:
        if prev_y is None or abs(y - prev_y) <= row_tolerance:
            current_row.append((x, y, w, h, text))
        else:
            rows.append(current_row)
            current_row = [(x, y, w, h, text)]
        prev_y = y
    if current_row:
        rows.append(current_row)

    # Sort each row by x coordinate and log its text.
    row_strings = []
    for i, row_cells in enumerate(rows):
        row_cells.sort(key=lambda c: c[0])
        line_text = " ".join(cell[4] for cell in row_cells)
        row_strings.append(line_text)
        logger.info(f"Row {i} text: {line_text}")

    # Based on OCR output expectations:
    # Row 1: header row 1 (first set of labels)
    # Row 2: data row 1 (first set of values)
    # Row 3: header row 2 (remaining labels)
    # Row 4: data row 2 (remaining values)
    header1_line = None
    value1_line = None
    header2_line = None
    value2_line = None

    for i, r_text in enumerate(row_strings):
        if "Type" in r_text and not header1_line:
            header1_line = r_text
            if i + 1 < len(row_strings):
                value1_line = row_strings[i+1]
        elif header1_line and not header2_line and any(kw in r_text for kw in ["RPM", "Mud", "Loss", "Comments"]):
            header2_line = r_text
            if i + 1 < len(row_strings):
                value2_line = row_strings[i+1]
            break

    logger.info(f"Header1: {header1_line}")
    logger.info(f"Value1: {value1_line}")
    logger.info(f"Header2: {header2_line}")
    logger.info(f"Value2: {value2_line}")

    if value1_line is None:
        logger.error("No data row found for header1!")
        return {}

    # Split the data rows into tokens.
    tokens1 = value1_line.split()
    tokens2 = value2_line.split() if value2_line else []
    logger.info(f"Tokens from data row 1: {tokens1}")
    logger.info(f"Tokens from data row 2: {tokens2}")

    # Combine tokens from both data rows.
    combined_tokens = tokens1 + tokens2
    logger.info(f"Combined tokens: {combined_tokens}")

    # Map the tokens to the expected headers.
    return parse_value_row_tokens(expected_headers, combined_tokens)

# ---------------------------------------------------------------------
# main_pipeline
# ---------------------------------------------------------------------
def main_pipeline():
    expected_headers = [
        "Type", "Weight In", "Weight Out", "pH", "CAKE",
        "GELS (10s/10m/30m)", "Oil/Water", "FV", "ES", "PV",
        "YP", "CL", "Ca", "LGS", "WL", "HTHP Loss", "3 RPM",
        "6 RPM", "Mud Pits and Hole Volume", "24 Hr Loss",
        "Total Loss", "Comments"
    ]
    # Use a DBFS URI for the input image
    section_path = "dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_3.png"
    try:
        img = read_cropped_section_image(section_path)
        logger.info("Image loaded successfully.")
    except Exception as e:
        logger.error(e)
        return

    show_image("Original Cropped Section", img, size=(12,12))
    thresh_img = preprocess_image(img, debug=True)
    rois = detect_text_regions(thresh_img, debug=True)
    roi_texts = perform_ocr_on_rois(img, rois, debug=True)
    
    # Build the mud dictionary using the combined data rows.
    mud_dict = build_mud_dict_from_rois(roi_texts, expected_headers)
    final_dict = {"MUD": mud_dict}
    logger.info("===== FINAL EXTRACTED MUD DICTIONARY =====")
    logger.info(json.dumps(final_dict, indent=4))
    
    df_final = pd.DataFrame(list(mud_dict.items()), columns=["Key", "Value"])
    print("----- Extracted DataFrame -----")
    print(df_final)
    
    # Define the output folder using a DBFS URI and create it with dbutils.fs.mkdirs
    output_folder_dbfs = "dbfs:/mnt/mini-proj-dd/final_ocr_results"
    dbutils.fs.mkdirs(output_folder_dbfs)
    # Instead of writing using OS-level functions, convert the DataFrame to CSV text
    # and write it directly to DBFS.
    out_file = output_folder_dbfs + "/page_1_section_3_ocr.csv"
    csv_data = df_final.to_csv(index=False)
    dbutils.fs.put(out_file, csv_data, overwrite=True)
    logger.info(f"Final DataFrame saved to {out_file}")

if __name__ == "__main__":
    main_pipeline()

# -----------------------------------------------------------------------------
# MAIN RUNNER: STANDARDIZED PIPELINE EXECUTION
# -----------------------------------------------------------------------------
def run_pipeline(name, pipeline_info, debug=False):
    try:
        logger.info(f"Processing pipeline: {name}")
        output, df = pipeline_info["func"](debug)
        logger.info(f"{name} DataFrame shape: {df.shape} (Rows: {df.shape[0]}, Columns: {df.shape[1]})")
        json_path = os.path.join(JSON_FOLDER, pipeline_info["json"])
        with open(json_path, "w") as f:
            json.dump(output, f, indent=4)
        csv_path = os.path.join(CSV_FOLDER, pipeline_info["csv"])
        df.to_csv(csv_path, index=False)
        logger.info(f"{name} saved: JSON({json_path}), CSV({csv_path})")
        print(f"--- {name.upper()} DataFrame ---")
        print(f"Shape: {df.shape}")
        print(df.head(10))
    except Exception as e:
        logger.error(f"Error in pipeline {name}: {e}")

def main():
    debug = False  # Set True for detailed logging
    # Define all pipelines in sequence
    pipelines = {
        "cost_data": {
            "func": pipeline_cost_data,
            "csv": "cost_data.csv",
            "json": "cost_data.json"
        },
        "well_job": {
            "func": lambda d: process_well_job_info("dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_2.png", d),
            "csv": "well_job_data.csv",
            "json": "well_job_data.json"
        },
        "obs_int": {
            "func": lambda d: process_obs_int("dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_10.png", d),
            "csv": "obs_int_data.csv",
            "json": "obs_int_data.json"
        },
        "bop": {
            "func": lambda d: process_bop("dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_8.png", d),
            "csv": "bop_data.csv",
            "json": "bop_data.json"
        },
        "dir_info": {
            "func": lambda d: process_dir_info("dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_5.png", d),
            "csv": "dir_info_data.csv",
            "json": "dir_info_data.json"
        },
        "survey": {
            "func": pipeline_survey_data,
            "csv": "survey_data.csv",
            "json": "survey_data.json"
        },
        "casing": {
            "func": pipeline_casing_data,
            "csv": "casing_data.csv",
            "json": "casing_data.json"
        },
        "consumables": {
            "func": pipeline_consumables_data,
            "csv": "consumables_data.csv",
            "json": "consumables_data.json"
        },
        "mud": {
            "func": pipeline_mud_data,
            "csv": "mud_data.csv",
            "json": "mud_data.json"
        },
        "bha": {
            "func": pipeline_bha_data,
            "csv": "bha_data.csv",
            "json": "bha_data.json"
        }
    }
    
    for name, pipe in pipelines.items():
        run_pipeline(name, pipe, debug)

if __name__ == "__main__":
    main()


ERROR: name 'read_cropped_section_image' is not defined
ERROR:OCRProduction:name 'read_cropped_section_image' is not defined
INFO: Processing pipeline: cost_data
INFO:OCRProduction:Processing pipeline: cost_data
INFO: Image loaded from /dbfs/mnt/mini-proj-dd/cropped_sections/page_1_section_13.png with shape (105, 2502, 3)
INFO:OCRProduction:Image loaded from /dbfs/mnt/mini-proj-dd/cropped_sections/page_1_section_13.png with shape (105, 2502, 3)
INFO: Cost OCR extraction complete.
INFO:OCRProduction:Cost OCR extraction complete.
INFO: COST DataFrame shape: (6, 2)
INFO:OCRProduction:COST DataFrame shape: (6, 2)
INFO: cost_data DataFrame shape: (6, 2) (Rows: 6, Columns: 2)
INFO:OCRProduction:cost_data DataFrame shape: (6, 2) (Rows: 6, Columns: 2)
INFO: cost_data saved: JSON(/dbfs/mnt/mini-proj-dd/final_results/json/cost_data.json), CSV(/dbfs/mnt/mini-proj-dd/final_results/csv/cost_data.csv)
INFO:OCRProduction:cost_data saved: JSON(/dbfs/mnt/mini-proj-dd/final_results/json/cost_data.json),

--- COST_DATA DataFrame ---
Shape: (6, 2)
                        Key        Value
0       Drilling AFE Amount         None
1       Daily Drilling Cost  $167,006.63
2  Cumulative Drilling Cost   $1,747,745
3      Cumulative Well Cost   $1,914,752
4            Daily Mud Cost   $54,185.80
5       Cumulative Mud Cost  $299,370.66


INFO: Well/Job OCR extraction complete.
INFO:OCRProduction:Well/Job OCR extraction complete.
INFO: WELL/JOB DataFrame shape: (19, 2)
INFO:OCRProduction:WELL/JOB DataFrame shape: (19, 2)
INFO: well_job DataFrame shape: (19, 2) (Rows: 19, Columns: 2)
INFO:OCRProduction:well_job DataFrame shape: (19, 2) (Rows: 19, Columns: 2)
INFO: well_job saved: JSON(/dbfs/mnt/mini-proj-dd/final_results/json/well_job_data.json), CSV(/dbfs/mnt/mini-proj-dd/final_results/csv/well_job_data.csv)
INFO:OCRProduction:well_job saved: JSON(/dbfs/mnt/mini-proj-dd/final_results/json/well_job_data.json), CSV(/dbfs/mnt/mini-proj-dd/final_results/csv/well_job_data.csv)
INFO: Processing pipeline: obs_int
INFO:OCRProduction:Processing pipeline: obs_int
INFO: Image loaded from /dbfs/mnt/mini-proj-dd/cropped_sections/page_1_section_10.png with shape (241, 942, 3)
INFO:OCRProduction:Image loaded from /dbfs/mnt/mini-proj-dd/cropped_sections/page_1_section_10.png with shape (241, 942, 3)


--- WELL_JOB DataFrame ---
Shape: (19, 2)
             Key                     Value
0      Well Name  Ross Fee 4371-31-7-15 MH
1       Job Name                  Drilling
2  Supervisor(s)   CHAD MILLER / ED COOLEY
3          Field                       XBE
4    Sec/Twn/Rng              31, 43N, 71W
5          Phone              307-315-1908
6          AFE #                    240098
7          API #              49-005-78911
8          Email  cyclone39@aec-denver.com
9     Contractor                          


INFO: OBS_INT DataFrame shape: (5, 2)
INFO:OCRProduction:OBS_INT DataFrame shape: (5, 2)
INFO: obs_int DataFrame shape: (5, 2) (Rows: 5, Columns: 2)
INFO:OCRProduction:obs_int DataFrame shape: (5, 2) (Rows: 5, Columns: 2)
INFO: obs_int saved: JSON(/dbfs/mnt/mini-proj-dd/final_results/json/obs_int_data.json), CSV(/dbfs/mnt/mini-proj-dd/final_results/csv/obs_int_data.csv)
INFO:OCRProduction:obs_int saved: JSON(/dbfs/mnt/mini-proj-dd/final_results/json/obs_int_data.json), CSV(/dbfs/mnt/mini-proj-dd/final_results/csv/obs_int_data.csv)
INFO: Processing pipeline: bop
INFO:OCRProduction:Processing pipeline: bop
INFO: Image loaded from /dbfs/mnt/mini-proj-dd/cropped_sections/page_1_section_8.png with shape (71, 2502, 3)
INFO:OCRProduction:Image loaded from /dbfs/mnt/mini-proj-dd/cropped_sections/page_1_section_8.png with shape (71, 2502, 3)


--- OBS_INT DataFrame ---
Shape: (5, 2)
             Type Number
0      Stop Cards     14
1     Hazard ID's      2
2           JSA's      5
3  Permit to Work     21
4          Totals       


INFO: BOP OCR extraction complete.
INFO:OCRProduction:BOP OCR extraction complete.
INFO: BOP DataFrame shape: (3, 2)
INFO:OCRProduction:BOP DataFrame shape: (3, 2)
INFO: bop DataFrame shape: (3, 2) (Rows: 3, Columns: 2)
INFO:OCRProduction:bop DataFrame shape: (3, 2) (Rows: 3, Columns: 2)
INFO: bop saved: JSON(/dbfs/mnt/mini-proj-dd/final_results/json/bop_data.json), CSV(/dbfs/mnt/mini-proj-dd/final_results/csv/bop_data.csv)
INFO:OCRProduction:bop saved: JSON(/dbfs/mnt/mini-proj-dd/final_results/json/bop_data.json), CSV(/dbfs/mnt/mini-proj-dd/final_results/csv/bop_data.csv)
INFO: Processing pipeline: dir_info
INFO:OCRProduction:Processing pipeline: dir_info
INFO: Image loaded from /dbfs/mnt/mini-proj-dd/cropped_sections/page_1_section_5.png with shape (241, 1200, 3)
INFO:OCRProduction:Image loaded from /dbfs/mnt/mini-proj-dd/cropped_sections/page_1_section_5.png with shape (241, 1200, 3)


--- BOP DataFrame ---
Shape: (3, 2)
                  Key     Value
0  Last BOP Test Date   6/30/24
1      Last BOP Drill  7/3/2024
2       Next BOP Test   7/25/24


INFO: DIR INFO DataFrame shape: (5, 3)
INFO:OCRProduction:DIR INFO DataFrame shape: (5, 3)
INFO: dir_info DataFrame shape: (5, 3) (Rows: 5, Columns: 3)
INFO:OCRProduction:dir_info DataFrame shape: (5, 3) (Rows: 5, Columns: 3)
INFO: dir_info saved: JSON(/dbfs/mnt/mini-proj-dd/final_results/json/dir_info_data.json), CSV(/dbfs/mnt/mini-proj-dd/final_results/csv/dir_info_data.csv)
INFO:OCRProduction:dir_info saved: JSON(/dbfs/mnt/mini-proj-dd/final_results/json/dir_info_data.json), CSV(/dbfs/mnt/mini-proj-dd/final_results/csv/dir_info_data.csv)
INFO: Processing pipeline: survey
INFO:OCRProduction:Processing pipeline: survey
INFO: Image loaded from /dbfs/mnt/mini-proj-dd/cropped_sections/page_1_section_13.png with shape (105, 2502, 3)
INFO:OCRProduction:Image loaded from /dbfs/mnt/mini-proj-dd/cropped_sections/page_1_section_13.png with shape (105, 2502, 3)


--- DIR_INFO DataFrame ---
Shape: (5, 3)
           Category Daily Cumulative
0   Circ/Cond Hours              6.8
1     Sliding Hours   5.8       28.4
2   Sliding Footage   247       1488
3    Rotating Hours  17.8       75.9
4  Rotating Footage  2821      18941


INFO: SURVEY - Grouped Rows: ['COST DATA COST DATA [BLANK]', 'Drilling AFE Amount: Daily Drilling Cost: $167,006.63 Cumulative Drilling Cost: $1,747,745 Cumulative Well Cost: $1,914,752\nDaily Mud Cost: $54,185.80 Cumulative Mud Cost: $299,370.66']
INFO:OCRProduction:SURVEY - Grouped Rows: ['COST DATA COST DATA [BLANK]', 'Drilling AFE Amount: Daily Drilling Cost: $167,006.63 Cumulative Drilling Cost: $1,747,745 Cumulative Well Cost: $1,914,752\nDaily Mud Cost: $54,185.80 Cumulative Mud Cost: $299,370.66']
INFO: SURVEY - All extracted lines: ['COST DATA COST DATA [BLANK]', 'Drilling AFE Amount: Daily Drilling Cost: $167,006.63 Cumulative Drilling Cost: $1,747,745 Cumulative Well Cost: $1,914,752', 'Daily Mud Cost: $54,185.80 Cumulative Mud Cost: $299,370.66']
INFO:OCRProduction:SURVEY - All extracted lines: ['COST DATA COST DATA [BLANK]', 'Drilling AFE Amount: Daily Drilling Cost: $167,006.63 Cumulative Drilling Cost: $1,747,745 Cumulative Well Cost: $1,914,752', 'Daily Mud Cost: $54,18

--- SURVEY DataFrame ---
Shape: (3, 5)
         MD Inclination  Azimuth         DLS         TVD
0      COST        DATA     COST        DATA     [BLANK]
1  Drilling         AFE  Amount:       Daily    Drilling
2     Daily         Mud    Cost:  $54,185.80  Cumulative


INFO: CASING - Skipping header line: ['Type', 'Size', 'Weight', 'Grade', 'Connection', 'Top', 'MD', 'Bottom', 'MD', 'TOC']
INFO:OCRProduction:CASING - Skipping header line: ['Type', 'Size', 'Weight', 'Grade', 'Connection', 'Top', 'MD', 'Bottom', 'MD', 'TOC']
INFO: CASING DataFrame shape: (4, 8)
INFO:OCRProduction:CASING DataFrame shape: (4, 8)
INFO: casing DataFrame shape: (4, 8) (Rows: 4, Columns: 8)
INFO:OCRProduction:casing DataFrame shape: (4, 8) (Rows: 4, Columns: 8)
INFO: casing saved: JSON(/dbfs/mnt/mini-proj-dd/final_results/json/casing_data.json), CSV(/dbfs/mnt/mini-proj-dd/final_results/csv/casing_data.csv)
INFO:OCRProduction:casing saved: JSON(/dbfs/mnt/mini-proj-dd/final_results/json/casing_data.json), CSV(/dbfs/mnt/mini-proj-dd/final_results/csv/casing_data.csv)
INFO: Processing pipeline: consumables
INFO:OCRProduction:Processing pipeline: consumables
INFO: Image loaded from /dbfs/mnt/mini-proj-dd/cropped_sections/page_2_section_2.png with shape (209, 2502, 3)
INFO:OCRProd

--- CASING DataFrame ---
Shape: (4, 8)
           Type     Size   Weight  ...   Top MD Bottom MD      TOC
0     Conductor   16.000    36.94  ...    32.00    108.00       16
1       Surface   10.750     40.5  ...    31.17   2268.00       30
2  Intermediate    7.625     29.7  ...    28.89   9857.70     2750
3       [BLANK]  [BLANK]  [BLANK]  ...  [BLANK]   [BLANK]  [BLANK]

[4 rows x 8 columns]


INFO: CONSUMABLES DataFrame shape: (3, 5)
INFO:OCRProduction:CONSUMABLES DataFrame shape: (3, 5)
INFO: consumables DataFrame shape: (3, 5) (Rows: 3, Columns: 5)
INFO:OCRProduction:consumables DataFrame shape: (3, 5) (Rows: 3, Columns: 5)
INFO: consumables saved: JSON(/dbfs/mnt/mini-proj-dd/final_results/json/consumables_data.json), CSV(/dbfs/mnt/mini-proj-dd/final_results/csv/consumables_data.csv)
INFO:OCRProduction:consumables saved: JSON(/dbfs/mnt/mini-proj-dd/final_results/json/consumables_data.json), CSV(/dbfs/mnt/mini-proj-dd/final_results/csv/consumables_data.csv)
INFO: Processing pipeline: mud
INFO:OCRProduction:Processing pipeline: mud
INFO: Image loaded from /dbfs/mnt/mini-proj-dd/cropped_sections/page_1_section_3.png with shape (173, 2502, 3)
INFO:OCRProduction:Image loaded from /dbfs/mnt/mini-proj-dd/cropped_sections/page_1_section_3.png with shape (173, 2502, 3)


--- CONSUMABLES DataFrame ---
Shape: (3, 5)
  Consumable Daily Received (gal)  ... Cumulative Used (gal) Daily on Hand (gal)
0       Fuel              [BLANK]  ...                20,626               5,735
1  CNG (DGE)                1,652  ...                 6,535             [BLANK]
2   Mud Fuel                8,367  ...                24,150              11,643

[3 rows x 5 columns]


INFO: Row 0 text: MUD MUD [BLANK]
INFO:OCRProduction:Row 0 text: MUD MUD [BLANK]
INFO: Row 1 text: Type Weight In Weight Out pH CAKE GELS (10s/10m/30m) Oil/Water FV ES PV YP CL Ca LGS WL HTHP Loss
INFO:OCRProduction:Row 1 text: Type Weight In Weight Out pH CAKE GELS (10s/10m/30m) Oil/Water FV ES PV YP CL Ca LGS WL HTHP Loss
INFO: Row 2 text: OBM 11.5 11.5 [BLANK] 3 8 25 27 88/12 60.0 753 16 8 31,000 326,667 4.47 [BLANK] 5.00
INFO:OCRProduction:Row 2 text: OBM 11.5 11.5 [BLANK] 3 8 25 27 88/12 60.0 753 16 8 31,000 326,667 4.47 [BLANK] 5.00
INFO: Row 3 text: 3 RPM 6 RPM Mud Pits and Hole Volume 24 Hr Loss Total Loss Comments
INFO:OCRProduction:Row 3 text: 3 RPM 6 RPM Mud Pits and Hole Volume 24 Hr Loss Total Loss Comments
INFO: Row 4 text: 4 5 1023 13 481 [BLANK]
INFO:OCRProduction:Row 4 text: 4 5 1023 13 481 [BLANK]
INFO: Header1: Type Weight In Weight Out pH CAKE GELS (10s/10m/30m) Oil/Water FV ES PV YP CL Ca LGS WL HTHP Loss
INFO:OCRProduction:Header1: Type Weight In Weight Out pH CAK

--- MUD DataFrame ---
Shape: (22, 2)
                  Key                                   Value
0                Type                                     OBM
1           Weight In                                    11.5
2          Weight Out                                    11.5
3                  pH                                 [BLANK]
4                CAKE                                       3
5  GELS (10s/10m/30m)  {'10s': '8', '10m': '25', '30m': '27'}
6           Oil/Water                                   88/12
7                  FV                                    60.0
8                  ES                                     753
9                  PV                                      16


ERROR: Error in pipeline bha: [Errno 2] No such file or directory: '/Workspace/Repos/divya.dhaipullay@zeussolutionsinc.com/automate_ddr/dbfs:/mnt/mini-proj-dd/cropped_sections/page_2_section_?BHA.png'
ERROR:OCRProduction:Error in pipeline bha: [Errno 2] No such file or directory: '/Workspace/Repos/divya.dhaipullay@zeussolutionsinc.com/automate_ddr/dbfs:/mnt/mini-proj-dd/cropped_sections/page_2_section_?BHA.png'


In [0]:

import re
import cv2, os
import pytesseract
import pandas as pd
import logging
import json
from PIL import Image

# -----------------------------------------------------------------------------
# Global Logger & Output Directories
# -----------------------------------------------------------------------------
logger = logging.getLogger("OCRProduction")
logger.setLevel(logging.INFO)
if not logger.handlers:
    ch = logging.StreamHandler()
    ch.setFormatter(logging.Formatter("%(levelname)s: %(message)s"))
    logger.addHandler(ch)

OUTPUT_FOLDER = "/dbfs/mnt/mini-proj-dd/final_results"
CSV_FOLDER = os.path.join(OUTPUT_FOLDER, "csv")
JSON_FOLDER = os.path.join(OUTPUT_FOLDER, "json")
os.makedirs(CSV_FOLDER, exist_ok=True)
os.makedirs(JSON_FOLDER, exist_ok=True)

# -----------------------------------------------------------------------------
# Common Utility Functions (No image display)
# -----------------------------------------------------------------------------
def safe_read_image(img_path):
    """Read an image from a local or DBFS path."""
    local_path = img_path if not img_path.startswith("dbfs:") else img_path.replace("dbfs:", "/dbfs")
    if not os.path.exists(local_path):
        raise FileNotFoundError(f"File not found: {local_path}")
    img = cv2.imread(local_path)
    if img is None:
        raise ValueError(f"Failed to load image: {local_path}")
    logger.info(f"Image loaded from {local_path} with shape {img.shape}")
    return img

def preprocess_image(img, debug=False):
    """Convert image to grayscale and apply adaptive thresholding."""
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    thresh = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                   cv2.THRESH_BINARY, 15, 9)
    if debug:
        logger.info("Preprocessing completed (grayscale and threshold applied).")
    return thresh

def detect_text_regions(thresh_img, debug=False):
    """Detect text regions (bounding boxes) from the thresholded image."""
    contours, _ = cv2.findContours(thresh_img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    rois = []
    for cnt in contours:
        x, y, w, h = cv2.boundingRect(cnt)
        if w > 30 and h > 15:
            rois.append((x, y, w, h))
    rois.sort(key=lambda b: (b[1], b[0]))
    if debug:
        logger.info(f"Detected {len(rois)} text regions.")
    return rois

def perform_ocr_on_rois(img, rois, debug=False):
    """Perform OCR on each detected region and return list of (x,y,w,h,text)."""
    results = []
    for i, (x, y, w, h) in enumerate(rois):
        roi = img[y:y+h, x:x+w]
        text = pytesseract.image_to_string(roi, config="--psm 6").strip() or "[BLANK]"
        results.append((x, y, w, h, text))
        if debug:
            logger.info(f"OCR Box {i}: {text}")
    return results

def perform_ocr(img):
    """Perform OCR on the entire image."""
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    return pytesseract.image_to_string(gray, config="--psm 6")

def extract_key_value_from_text(text, expected_keys):
    """Extract key-value pairs from OCR text given expected keys."""
    combined = " ".join(line.strip() for line in text.splitlines() if line.strip())
    combined = re.sub(r'\s+', ' ', combined)
    result = {}
    for i, key in enumerate(expected_keys):
        if i < len(expected_keys) - 1:
            next_key = expected_keys[i+1]
            pattern = re.escape(key) + r'\s*:\s*(.*?)(?=\s*' + re.escape(next_key) + r'\s*:|$)'
        else:
            pattern = re.escape(key) + r'\s*:\s*(.*)'
        match = re.search(pattern, combined, re.IGNORECASE)
        result[key] = match.group(1).strip() if match and match.group(1).strip() else None
    return result

# -----------------------------------------------------------------------------
# [PARSE FUNCTIONS]
# These functions (e.g. build_bit_info_dict_from_rois, process_well_job_info, etc.)
# are your specialized parsers. They have been left mostly intact.
# -----------------------------------------------------------------------------
def build_bit_info_dict_from_rois(roi_texts, debug=False):
    row_tolerance = 10
    grouped_rows = []
    current_row = []
    prev_y = None
    for (x, y, w, h, text) in roi_texts:
        if prev_y is None or abs(y - prev_y) <= row_tolerance:
            current_row.append((x, y, w, h, text))
        else:
            grouped_rows.append(current_row)
            current_row = [(x, y, w, h, text)]
        prev_y = y
    if current_row:
        grouped_rows.append(current_row)
    row_strings = []
    for i, row in enumerate(grouped_rows):
        row.sort(key=lambda cell: cell[0])
        line = " ".join(cell[4] for cell in row)
        row_strings.append(line.replace("\n", " ").strip())
        if debug:
            logger.info(f"BIT - Grouped Row {i}: {row_strings[-1]}")
    if len(row_strings) < 3:
        logger.warning("Not enough rows found for BIT layout.")
        return {}, pd.DataFrame()
    data_lines = row_strings[3:]
    final_columns = [
        "Bit #", "Size", "Make", "Model", "Serial #",
        "Nozzle-(Number x Size)", "Nozzle-TFA",
        "Depth-In", "Depth-Out", "Depth-Feet", "Depth-ROP",
        "Hours-Total", "Hours-On Btm",
        "Dull Grade-I", "Dull Grade-O1", "Dull Grade-D", "Dull Grade-L",
        "Dull Grade-B", "Dull Grade-G", "Dull Grade-O2", "Dull Grade-RP"
    ]
    structured_data = []
    for line in data_lines:
        tokens = line.split()
        if len(tokens) < 21:
            tokens += [""] * (21 - len(tokens))
        elif len(tokens) > 21:
            tokens = tokens[:21]
        row_dict = {final_columns[i]: tokens[i] for i in range(21)}
        structured_data.append(row_dict)
        if debug:
            logger.info(f"BIT - Parsed row: {row_dict}")
    df = pd.DataFrame(structured_data)
    logger.info(f"BIT DataFrame shape: {df.shape}")
    return {"BIT DETAILS": df.to_dict(orient='records')}, df

def process_well_job_info(section_path, debug=False):
    img = safe_read_image(section_path)
    ocr_text = perform_ocr(img)
    if debug:
        logger.info("Well/Job OCR extraction complete.")
        logger.info(f"OCR Text: {ocr_text}")
    expected_keys = [
        "Well Name", "Job Name", "Supervisor(s)", "Field", "Sec/Twn/Rng", "Phone",
        "AFE #", "API #", "Email", "Contractor", "Elevation", "RKB",
        "Spud Date", "Days from Spud", "Days on Loc", "MD/TVD", "24 Hr Footage",
        "Present Operations", "Activity Planned"
    ]
    combined = " ".join(line.strip() for line in ocr_text.splitlines() if line.strip())
    combined = re.sub(r'\s+', ' ', combined)
    result = {}
    for i, key in enumerate(expected_keys):
        if i < len(expected_keys) - 1:
            next_key = expected_keys[i+1]
            pattern = re.escape(key) + r'\s*:\s*(.*?)(?=\s*' + re.escape(next_key) + r'\s*:|$)'
        else:
            pattern = re.escape(key) + r'\s*:\s*(.*)'
        match = re.search(pattern, combined, re.IGNORECASE)
        result[key] = match.group(1).strip() if match else ""
    df = pd.DataFrame(list(result.items()), columns=["Key", "Value"])
    logger.info(f"WELL/JOB DataFrame shape: {df.shape}")
    return {"WELL/JOB INFORMATION": result}, df
def build_obs_int_data_from_rois(roi_texts):
    """
    Dynamically builds structured data from OCR results for the
    "DAILY NUMBERS: OBSERVATION & INTERVENTION" section.
    
    Processing rules:
      - Skip header texts ("DAILY NUMBERS: OBSERVATION & INTERVENTION" or "Number").
      - If an OCR box has multiple lines, split it into separate type entries.
      - If an OCR box is numeric (or a blank marker like "[BLANK]"),
        assign its value to the earliest record that has no number yet.
      - If no unpaired type exists when a numeric value is found, a new record is created.
      - Finally, any record with an empty "Type" is filtered out.
    
    This approach avoids hard-coding the expected count and ensures that no blank key/value pair is included.
    """
    header_str = "daily numbers: observation & intervention"
    records = []

    # Process each OCR box in order.
    for (_, _, _, _, raw_text) in roi_texts:
        text = raw_text.strip()
        low_text = text.lower()
        
        # Skip header texts and column labels.
        if low_text in [header_str, "number"]:
            continue
        
        # If the text contains newlines, treat each line as a separate type.
        if "\n" in text:
            for line in text.splitlines():
                line = line.strip()
                if not line or line.lower() in [header_str, "number", "[blank]"]:
                    continue
                records.append({"Type": line, "Number": None})
        else:
            # Determine if the text represents a number.
            is_numeric = False
            value = text
            if text == "" or low_text == "[blank]":
                is_numeric = True
                value = ""
            else:
                try:
                    float(text)
                    is_numeric = True
                except ValueError:
                    is_numeric = False

            if is_numeric:
                # Pair the number with the earliest record without a number.
                for rec in records:
                    if rec["Number"] is None:
                        rec["Number"] = value
                        break
                else:
                    # No pending record exists; create one with an empty type.
                    records.append({"Type": "", "Number": value})
            else:
                # Non-numeric text is treated as a type.
                records.append({"Type": text, "Number": None})

    # Clean up: replace any None values with an empty string.
    for rec in records:
        if rec["Number"] is None:
            rec["Number"] = ""

    # Filter out any records where the "Type" is empty.
    records = [rec for rec in records if rec["Type"].strip() != ""]

    # Log the structured data.
    logging.getLogger("daily_numbersExtractor").info(f"Structured Data: {records}")

    # Save the results as CSV and JSON.
    df = pd.DataFrame(records)
    csv_filename = "obs_int_data.csv"
    json_filename = "obs_int_data.json"
    df.to_csv(csv_filename, index=False)
    logging.getLogger("daily_numbersExtractor").info(f"Data saved successfully as CSV: {csv_filename}")

    with open(json_filename, "w") as json_file:
        json.dump(records, json_file, indent=4)
    logging.getLogger("daily_numbersExtractor").info(f"Data saved successfully in JSON format: {json_filename}")

    return records, df

def process_obs_int(section_path, debug=False):
    img = safe_read_image(section_path)
    thresh = preprocess_image(img, debug=debug)
    rois = detect_text_regions(thresh, debug=debug)
    roi_texts = perform_ocr_on_rois(img, rois, debug=debug)
    header_str = "daily numbers: observation & intervention"
    all_texts = [t[4] for t in roi_texts]
    types_list, numbers_list = [], []
    for txt in all_texts:
        clean = txt.strip()
        if clean.lower() in [header_str, "number", "[blank]"]:
            continue
        try:
            float(clean)
            numbers_list.append("" if clean.lower() == "[blank]" else clean)
            continue
        except ValueError:
            pass
        if "\n" in clean:
            for line in clean.splitlines():
                line = line.strip()
                if line and line.lower() != "[blank]":
                    types_list.append(line)
        else:
            types_list.append(clean)
    expected_count = 5
    while len(numbers_list) < expected_count:
        numbers_list.append("")
    types_list = types_list[:expected_count]
    numbers_list = numbers_list[:expected_count]
    structured = [{"Type": types_list[i], "Number": numbers_list[i]} for i in range(expected_count)]
    df = pd.DataFrame(structured)
    logger.info(f"OBS_INT DataFrame shape: {df.shape}")
    return {"DAILY NUMBERS: OBSERVATION & INTERVENTION": structured}, df

def pipeline_cost_data(debug=False):
    """Extract Cost Data from a specified image."""
    section_path = "dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_13.png"
    try:
        img = safe_read_image(section_path)
        logger.info("Cost image loaded.")
    except FileNotFoundError as e:
        logger.error(e)
        return {}, pd.DataFrame()
    ocr_text = perform_ocr(img)
    logger.info("Cost OCR extraction complete.")
    expected_keys = [
        "Drilling AFE Amount", "Daily Drilling Cost", "Cumulative Drilling Cost",
        "Cumulative Well Cost", "Daily Mud Cost", "Cumulative Mud Cost"
    ]
    extracted = extract_key_value_from_text(ocr_text, expected_keys)
    final_dict = {"COST DATA": extracted}
    logger.info(json.dumps(final_dict, indent=4))
    df = pd.DataFrame(list(extracted.items()), columns=["Key", "Value"])
    return final_dict, df

def process_bop(section_path, debug=False):
    img = safe_read_image(section_path)
    ocr_text = perform_ocr(img)
    if debug:
        logger.info("BOP OCR extraction complete.")
        logger.info(f"OCR Text: {ocr_text}")
    patterns = {
        "Last BOP Test Date": r"Last BOP Test Date\s*:\s*(\d{1,2}/\d{1,2}/\d{2,4})",
        "Last BOP Drill": r"Last BOP Drill\s*:\s*(\d{1,2}/\d{1,2}/\d{2,4})",
        "Next BOP Test": r"Next BOP Test\s*:\s*(\d{1,2}/\d{1,2}/\d{2,4})"
    }
    result = {}
    for key, regex in patterns.items():
        match = re.search(regex, ocr_text, re.IGNORECASE)
        result[key] = match.group(1) if match else ""
    df = pd.DataFrame(list(result.items()), columns=["Key", "Value"])
    logger.info(f"BOP DataFrame shape: {df.shape}")
    return {"BOP": result}, df

def build_dir_info_dict_from_rois(roi_texts, debug=False):
    all_texts = [t[4] for t in roi_texts]
    daily_cum_idx = next((i for i, txt in enumerate(all_texts)
                           if "daily" in txt.lower() and "cumulative" in txt.lower()), None)
    if daily_cum_idx is None:
        logger.warning("Could not find 'Daily Cumulative' bounding box.")
        return {}, pd.DataFrame()
    cat_idx = daily_cum_idx + 1
    if cat_idx >= len(all_texts):
        logger.warning("No bounding box after 'Daily Cumulative'.")
        return {}, pd.DataFrame()
    categories_box = all_texts[cat_idx]
    lines = [ln.strip() for ln in categories_box.split("\n") if ln.strip()]
    if len(lines) < 5:
        logger.warning(f"Expected 5 category lines, got {len(lines)}: {lines}")
    def safe_get(idx):
        return all_texts[idx] if 0 <= idx < len(all_texts) else ""
    structured = []
    for i in range(4):
        cat_name = lines[i] if i < len(lines) else f"Unknown Category {i+1}"
        daily_box = safe_get(cat_idx + 1 + (i * 2))
        cum_box = safe_get(cat_idx + 2 + (i * 2))
        structured.append({
            "Category": cat_name,
            "Daily": "" if daily_box == "[BLANK]" else daily_box,
            "Cumulative": "" if cum_box == "[BLANK]" else cum_box
        })
    last_box = safe_get(cat_idx + 9)
    last_cat = lines[4] if len(lines) >= 5 else "Rotating Footage"
    remainder = last_box.replace(last_cat, "").strip()
    tokens = remainder.split()
    daily_val = tokens[0] if len(tokens) >= 2 else ""
    cum_val = tokens[1] if len(tokens) >= 2 else ""
    structured.append({
        "Category": last_cat,
        "Daily": "" if daily_val == "[BLANK]" else daily_val,
        "Cumulative": "" if cum_val == "[BLANK]" else cum_val
    })
    df = pd.DataFrame(structured)
    logger.info(f"DIR INFO DataFrame shape: {df.shape}")
    return {"DIR INFO": structured}, df

def process_dir_info(section_path, debug=False):
    img = safe_read_image(section_path)
    thresh = preprocess_image(img, debug=debug)
    rois = detect_text_regions(thresh, debug=debug)
    roi_texts = perform_ocr_on_rois(img, rois, debug=debug)
    return build_dir_info_dict_from_rois(roi_texts, debug=debug)

def build_survey_dict_from_rois(roi_texts, expected_headers):
    row_tolerance = 10
    rows = []
    current_row = []
    prev_y = None
    for (x, y, w, h, text) in roi_texts:
        if prev_y is None or abs(y - prev_y) <= row_tolerance:
            current_row.append((x, y, w, h, text))
        else:
            rows.append(current_row)
            current_row = [(x, y, w, h, text)]
        prev_y = y
    if current_row:
        rows.append(current_row)
    row_strings = []
    for i, row in enumerate(rows):
        row.sort(key=lambda c: c[0])
        line = " ".join(cell[4] for cell in row)
        row_strings.append(line)
        logger.info(f"SURVEY - Grouped Row {i}: {line}")
    all_lines = []
    for line in row_strings:
        for subline in line.split("\n"):
            subline = subline.strip()
            if subline:
                all_lines.append(subline)
    logger.info(f"SURVEY - All extracted lines: {all_lines}")
    data_lines = []
    for line in all_lines:
        tokens = re.split(r'\s{2,}', line)
        if len(tokens) == 1:
            tokens = line.split()
        lower_tokens = [t.lower() for t in tokens]
        if "md" in lower_tokens and "inclination" in lower_tokens:
            logger.info(f"SURVEY - Skipping header line: {tokens}")
            continue
        if len(tokens) < len(expected_headers):
            logger.warning(f"SURVEY - Line has fewer tokens than expected: {tokens}")
            continue
        tokens = tokens[:len(expected_headers)]
        data_lines.append(tokens)
    logger.info(f"SURVEY - Data lines to parse: {data_lines}")
    survey_list = []
    for tokens in data_lines:
        row_dict = {expected_headers[i]: tokens[i] for i in range(len(expected_headers))}
        survey_list.append(row_dict)
    return survey_list

def sort_survey_data(survey_list):
    def md_value(row):
        try:
            return float(row["MD"].replace(",", ""))
        except Exception:
            return 0
    return sorted(survey_list, key=md_value, reverse=True)

def pipeline_survey_data(debug=False):
    section_path = "dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_13.png"
    img = safe_read_image(section_path)
    thresh = preprocess_image(img, debug=debug)
    rois = detect_text_regions(thresh, debug=debug)
    roi_texts = perform_ocr_on_rois(img, rois, debug=debug)
    expected_headers = ["MD", "Inclination", "Azimuth", "DLS", "TVD"]
    survey_list = build_survey_dict_from_rois(roi_texts, expected_headers)
    survey_list = sort_survey_data(survey_list)
    df = pd.DataFrame(survey_list)
    logger.info(f"SURVEY DataFrame shape: {df.shape}")
    return {"SURVEY": survey_list}, df

def build_casing_dict_from_rois(roi_texts, expected_headers, debug=False):
    row_tolerance = 10
    rows = []
    current_row = []
    prev_y = None
    for (x, y, w, h, text) in roi_texts:
        if prev_y is None or abs(y - prev_y) <= row_tolerance:
            current_row.append((x, y, w, h, text))
        else:
            rows.append(current_row)
            current_row = [(x, y, w, h, text)]
        prev_y = y
    if current_row:
        rows.append(current_row)
    row_strings = []
    for i, row in enumerate(rows):
        row.sort(key=lambda c: c[0])
        line = " ".join(cell[4] for cell in row).strip()
        row_strings.append(line)
        if debug:
            logger.info(f"CASING - Grouped Row {i}: {line}")
    all_lines = []
    for line in row_strings:
        for sub in line.split("\n"):
            sub = sub.strip()
            if sub:
                all_lines.append(sub)
    data_lines = []
    for line in all_lines:
        tokens = re.split(r'\s{2,}', line)
        if len(tokens) == 1:
            tokens = line.split()
        lower_tokens = [t.lower() for t in tokens]
        if "type" in lower_tokens and "size" in lower_tokens:
            logger.info(f"CASING - Skipping header line: {tokens}")
            continue
        if len(tokens) < len(expected_headers):
            logger.warning(f"CASING - Line has fewer tokens than expected: {tokens}")
            continue
        tokens = tokens[:len(expected_headers)]
        data_lines.append(tokens)
    casing_list = []
    for tokens in data_lines:
        row_dict = {expected_headers[i]: tokens[i] for i in range(len(expected_headers))}
        casing_list.append(row_dict)
    return casing_list

def pipeline_casing_data(debug=False):
    section_path = "dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_7.png"
    img = safe_read_image(section_path)
    thresh = preprocess_image(img, debug=debug)
    rois = detect_text_regions(thresh, debug=debug)
    roi_texts = perform_ocr_on_rois(img, rois, debug=debug)
    expected_headers = ["Type", "Size", "Weight", "Grade", "Connection", "Top MD", "Bottom MD", "TOC"]
    casing_list = build_casing_dict_from_rois(roi_texts, expected_headers, debug=debug)
    df = pd.DataFrame(casing_list)
    logger.info(f"CASING DataFrame shape: {df.shape}")
    return {"CASING": casing_list}, df

def build_consumables_dict_from_rois(roi_texts, debug=False):
    row_tolerance = 10
    rows = []
    current_row = []
    prev_y = None
    for (x, y, w, h, text) in roi_texts:
        if prev_y is None or abs(y - prev_y) <= row_tolerance:
            current_row.append((x, y, w, h, text))
        else:
            rows.append(current_row)
            current_row = [(x, y, w, h, text)]
        prev_y = y
    if current_row:
        rows.append(current_row)
    grouped_rows = []
    for i, row in enumerate(rows):
        row.sort(key=lambda cell: cell[0])
        line = " ".join(cell[4] for cell in row).strip()
        grouped_rows.append(line)
        if debug:
            logger.info(f"CONSUMABLES - Grouped Row {i}: {line}")
    data_rows = []
    for line in grouped_rows:
        lower_line = line.lower()
        if ("consumable" in lower_line and "received" in lower_line) or "nun" in lower_line:
            continue
        if len(line.split()) < 5:
            continue
        data_rows.append(line)
    consumables_list = []
    for line in data_rows:
        tokens = re.split(r'\s+', line)
        if len(tokens) > 5:
            first = " ".join(tokens[:-4])
            tokens = [first] + tokens[-4:]
        if len(tokens) != 5:
            logger.warning(f"CONSUMABLES - Skipping row (unexpected token count): {tokens}")
            continue
        row_dict = {
            "Consumable": tokens[0],
            "Daily Received (gal)": tokens[1],
            "Daily Used (gal)": tokens[2],
            "Cumulative Used (gal)": tokens[3],
            "Daily on Hand (gal)": tokens[4]
        }
        consumables_list.append(row_dict)
    return consumables_list

def pipeline_consumables_data(debug=False):
    section_path = "dbfs:/mnt/mini-proj-dd/cropped_sections/page_2_section_2.png"
    img = safe_read_image(section_path)
    thresh = preprocess_image(img, debug=debug)
    rois = detect_text_regions(thresh, debug=debug)
    roi_texts = perform_ocr_on_rois(img, rois, debug=debug)
    consumables_list = build_consumables_dict_from_rois(roi_texts, debug=debug)
    df = pd.DataFrame(consumables_list)
    logger.info(f"CONSUMABLES DataFrame shape: {df.shape}")
    return {"CONSUMABLES": consumables_list}, df

def extract_bha_data(image_path, debug=False):
    image = Image.open(image_path)
    ocr_text = pytesseract.image_to_string(image)
    patterns = {
        "Drill Pipe Detail": r"Drill Pipe Detail:\s*([^\n]+)",
        "Size": r"Size:\s*([\d.]+)\b",
        "Wt./Ft": r"Wt\./Ft:\s*([\d.]+)\b",
        "Connection": r"Connection:\s*([\w\d-]+)\b",
        "ID": r"ID:\s*([\d.]+)\b",
        "Drill Bit": r"Drill Bit:\s*([^\n;]+)",
        "Motor": r"Motor:\s*([^\n;]+)",
        "MWD Tool": r"MWD Tool:\s*([^\n;]+)",
        "Monel Collar": r"Monel Collar:\s*([^\n;]+)",
        "X-Over": r"X-Over:\s*([^\n;]+)",
        "Sub": r"Sub:\s*([^\n;]+)",
        "HWDP": r"HWDP:\s*([^\n;]+)",
        "Drill Pipe": r"Drill Pipe:\s*([\d.]+(?:\" DP)?)",
        "Reamer": r"Reamer:\s*([^\n;]+)",
        "Shock Sub": r"Shock Sub:\s*([^\n;]+)",
        "Total Length": r"Total Length:\s*(\d+)\b"
    }
    bha_data = {}
    for key, pat in patterns.items():
        match = re.search(pat, ocr_text)
        if match:
            bha_data[key] = match.group(1).strip()
    if "Drill Pipe Detail" in bha_data:
        detail = bha_data["Drill Pipe Detail"]
        for remove_key in ["Size", "Wt./Ft", "Connection", "ID"]:
            if remove_key in bha_data:
                detail = re.sub(rf"{remove_key}:\s*{re.escape(bha_data[remove_key])}", "", detail).strip(",; ")
        bha_data["Drill Pipe Detail"] = detail
    structured_data = {
        "BHA": {
            "Drill Pipe Detail": bha_data.get("Drill Pipe Detail", ""),
            "Size": bha_data.get("Size", ""),
            "Wt./Ft": bha_data.get("Wt./Ft", ""),
            "Connection": bha_data.get("Connection", ""),
            "ID": bha_data.get("ID", ""),
            "BHA #4": {
                "Drill Bit": bha_data.get("Drill Bit", ""),
                "Motor": bha_data.get("Motor", ""),
                "MWD Tool": bha_data.get("MWD Tool", ""),
                "Monel Collar": bha_data.get("Monel Collar", ""),
                "X-Over": bha_data.get("X-Over", ""),
                "Sub": bha_data.get("Sub", ""),
                "HWDP": bha_data.get("HWDP", ""),
                "Drill Pipe": bha_data.get("Drill Pipe", ""),
                "Reamer": bha_data.get("Reamer", ""),
                "Shock Sub": bha_data.get("Shock Sub", "")
            },
            "Total Length": bha_data.get("Total Length", "")
        }
    }
    if debug:
        logger.info("Extracted BHA data:")
        logger.info(json.dumps(structured_data, indent=4))
    return structured_data

def pipeline_bha_data(debug=False):
    image_path = "dbfs:/mnt/mini-proj-dd/cropped_sections/page_2_section_?BHA.png"  # Adjust as needed
    bha_json = extract_bha_data(image_path, debug=debug)
    img = safe_read_image(image_path)
    ocr_text = perform_ocr(img)
    pump_pattern = re.compile(
        r"^(\d+)?\s*(BOMCO)\s+(TRIPLEX)\s+(\d+)?\s*(\d+)\s+([\d.]+)\s+([\d.]+)\s+(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s*$",
        re.IGNORECASE)
    pumps = []
    for line in ocr_text.splitlines():
        line = line.strip()
        match = pump_pattern.match(line)
        if match:
            (number, model, pump_type, hhp, efficiency, stroke,
             liner, p_rating, p_limit, spm_rating, spm_limit) = match.groups()
            pumps.append({
                "Number": number if number else "",
                "Model": model,
                "Type": pump_type,
                "HHP": hhp if hhp else "",
                "Efficiency": efficiency,
                "Stroke(in)": stroke,
                "Liner(in)": liner,
                "P-Rating(psi)": p_rating,
                "P-Limit(psi)": p_limit,
                "SPM Rating": spm_rating,
                "SPM Limit": spm_limit
            })
    circ_pattern = re.compile(
        r"Drilling/Circ\s+Rate\s+(\d+)\s+(\d+)\s+PSI\s*@\s*(\d+)\s*SPM\s*([\d.]+)\s+Gal/Stoke\s+([\d.]+)\s+GPM\s+([\d.]+)\s+BPM\s+([\d.]+)\s+DC\s+([\d.]+)\s+DP",
        re.IGNORECASE)
    circ_rates = []
    for line in ocr_text.splitlines():
        line = line.strip()
        match = circ_pattern.search(line)
        if match:
            rate_id, pressure, spm, gal_stroke, gpm, bpm, dc, dp = match.groups()
            circ_rates.append({
                "RateID": rate_id,
                "Pressure(PSI)": pressure,
                "SPM": spm,
                "Gal/Stoke": gal_stroke,
                "GPM": gpm,
                "BPM": bpm,
                "DC": dc,
                "DP": dp
            })
    pumps_df = pd.DataFrame(pumps)
    circ_df = pd.DataFrame(circ_rates)
    bha_df = pd.DataFrame([bha_json])
    logger.info(f"BHA DataFrame shape: {bha_df.shape}")
    combined = {"BHA": bha_json, "Pumps": pumps, "DrillingCircRates": circ_rates}
    return combined, pumps_df, circ_df, bha_df

def build_mud_dict_from_rois(roi_texts, expected_headers, debug=False):
    row_tolerance = 10
    rows = []
    current_row = []
    prev_y = None
    for (x, y, w, h, text) in roi_texts:
        if prev_y is None or abs(y - prev_y) <= row_tolerance:
            current_row.append((x, y, w, h, text))
        else:
            rows.append(current_row)
            current_row = [(x, y, w, h, text)]
        prev_y = y
    if current_row:
        rows.append(current_row)
    row_strings = []
    for i, row in enumerate(rows):
        row.sort(key=lambda c: c[0])
        line_text = " ".join(cell[4] for cell in row)
        row_strings.append(line_text)
        if debug:
            logger.info(f"MUD - Row {i} text: {line_text}")
    tokens1 = row_strings[1].split() if len(row_strings) > 1 else []
    tokens2 = row_strings[2].split() if len(row_strings) > 2 else []
    combined_tokens = tokens1 + tokens2
    def parse_value_row_tokens(expected_headers, tokens):
        expected_token_count = (len(expected_headers) - 1) + 3
        if len(tokens) < expected_token_count:
            tokens += ["[BLANK]"] * (expected_token_count - len(tokens))
        elif len(tokens) > expected_token_count:
            tokens = tokens[:expected_token_count]
        result = {}
        idx = 0
        for header in expected_headers:
            if header == "GELS (10s/10m/30m)":
                gels_tokens = tokens[idx:idx+3]
                result[header] = {"10s": gels_tokens[0], "10m": gels_tokens[1], "30m": gels_tokens[2]}
                idx += 3
            else:
                result[header] = tokens[idx]
                idx += 1
        return result
    mapped = parse_value_row_tokens(expected_headers, combined_tokens)
    return mapped

def pipeline_mud_data(debug=False):
    section_path = "dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_3.png"
    img = safe_read_image(section_path)
    thresh = preprocess_image(img, debug=debug)
    rois = detect_text_regions(thresh, debug=debug)
    roi_texts = perform_ocr_on_rois(img, rois, debug=debug)
    expected_headers = ["Type", "Weight In", "Weight Out", "pH", "CAKE",
                        "GELS (10s/10m/30m)", "Oil/Water", "FV", "ES", "PV",
                        "YP", "CL", "Ca", "LGS", "WL", "HTHP Loss", "3 RPM",
                        "6 RPM", "Mud Pits and Hole Volume", "24 Hr Loss",
                        "Total Loss", "Comments"]
    mud_dict = build_mud_dict_from_rois(roi_texts, expected_headers, debug=debug)
    df = pd.DataFrame([mud_dict])
    logger.info(f"MUD DataFrame shape: {df.shape}")
    return {"MUD": mud_dict}, df
import pytesseract
import re
import json
from PIL import Image

def extract_bha_data(image_path):
    # Load image and perform OCR
    image = Image.open(image_path)
    ocr_text = pytesseract.image_to_string(image)
    
    # Define regex patterns to extract key values without repetition
    patterns = {
        "Drill Pipe Detail": r"Drill Pipe Detail:\s*([^\n]+)",  # Extracts full text but **won't duplicate fields**
        "Size": r"Size:\s*([\d.]+)\b",
        "Wt./Ft": r"Wt\./Ft:\s*([\d.]+)\b",
        "Connection": r"Connection:\s*([\w\d-]+)\b",
        "ID": r"ID:\s*([\d.]+)\b",
        "Drill Bit": r"Drill Bit:\s*([^\n;]+)",
        "Motor": r"Motor:\s*([^\n;]+)",
        "MWD Tool": r"MWD Tool:\s*([^\n;]+)",
        "Monel Collar": r"Monel Collar:\s*([^\n;]+)",
        "X-Over": r"X-Over:\s*([^\n;]+)",
        "Sub": r"Sub:\s*([^\n;]+)",
        "HWDP": r"HWDP:\s*([^\n;]+)",
        "Drill Pipe": r"Drill Pipe:\s*([\d.]+(?:\" DP)?)",  
        "Reamer": r"Reamer:\s*([^\n;]+)",
        "Shock Sub": r"Shock Sub:\s*([^\n;]+)",
        "Total Length": r"Total Length:\s*(\d+)\b"
    }
    
    # Extract data
    bha_data = {}
    for key, pattern in patterns.items():
        match = re.search(pattern, ocr_text)
        if match:
            bha_data[key] = match.group(1).strip()
    
    # **Fix duplication issue:** Remove Size, Wt./Ft, Connection, ID from `"Drill Pipe Detail"`
    if "Drill Pipe Detail" in bha_data:
        detail = bha_data["Drill Pipe Detail"]
        for remove_key in ["Size", "Wt./Ft", "Connection", "ID"]:
            if remove_key in bha_data:
                detail = re.sub(rf"{remove_key}:\s*{re.escape(bha_data[remove_key])}", "", detail).strip(",; ")
        bha_data["Drill Pipe Detail"] = detail  # Store cleaned version

    # **Final structured JSON without repetition**
    structured_data = {
        "BHA": {
            "Drill Pipe Detail": bha_data.get("Drill Pipe Detail", ""),
            "Size": bha_data.get("Size", ""),
            "Wt./Ft": bha_data.get("Wt./Ft", ""),
            "Connection": bha_data.get("Connection", ""),
            "ID": bha_data.get("ID", ""),
            "BHA #4": {
                "Drill Bit": bha_data.get("Drill Bit", ""),
                "Motor": bha_data.get("Motor", ""),
                "MWD Tool": bha_data.get("MWD Tool", ""),
                "Monel Collar": bha_data.get("Monel Collar", ""),
                "X-Over": bha_data.get("X-Over", ""),
                "Sub": bha_data.get("Sub", ""),
                "HWDP": bha_data.get("HWDP", ""),
                "Drill Pipe": bha_data.get("Drill Pipe", ""),
                "Reamer": bha_data.get("Reamer", ""),
                "Shock Sub": bha_data.get("Shock Sub", "")
            },
            "Total Length": bha_data.get("Total Length", "")  # ✅ Now correctly placed at the end
        }
    }
    
    return structured_data

# ------------------------------------------------------------------
# 4) Parse Pumps Table
# ------------------------------------------------------------------
def parse_pumps_table(ocr_text):
    """
    Parses the pumps table from the OCR text.
    Expected lines look like:
      Number Model Type   HHP  Efficiency  Stroke(in)  Liner(in)  P-Rating(psi)  P-Limit(psi)  SPM Rating  SPM Limit
      1      BOMCO TRIPLEX 1600 95       12.000       4.75       7500           7100          120         110
      2      BOMCO TRIPLEX 1600 95       12.000       4.75       7500           7100          120         110
      (possibly missing fields in some rows)
    """

    # We’ll search for lines that look like:
    #   <Number> BOMCO TRIPLEX <HHP> <Eff> <Stroke> <Liner> <P-Rating> <P-Limit> <SPM Rating> <SPM Limit>
    #   or possibly missing the Number or HHP.
    # We'll capture them with a regex that checks for 8-11 columns.
    # You can refine further as needed.
    pump_pattern = re.compile(
        r"^(\d+)?\s*"               # Number (optional)
        r"(BOMCO)\s+(TRIPLEX)\s+"    # Model, Type
        r"(\d+)?\s*"                 # HHP (optional)
        r"(\d+)\s+"                  # Efficiency
        r"([\d.]+)\s+"               # Stroke(in)
        r"([\d.]+)\s+"               # Liner(in)
        r"(\d+)\s+"                  # P-Rating(psi)
        r"(\d+)\s+"                  # P-Limit(psi)
        r"(\d+)\s+"                  # SPM Rating
        r"(\d+)\s*$",                # SPM Limit
        re.IGNORECASE
    )

    lines = ocr_text.splitlines()
    pumps = []

    for line in lines:
        line = line.strip()
        match = pump_pattern.match(line)
        if match:
            # Extract fields
            number, model, pump_type, hhp, efficiency, stroke, liner, p_rating, p_limit, spm_rating, spm_limit = match.groups()

            # Store as dictionary
            pumps.append({
                "Number": number if number else "",
                "Model": model,
                "Type": pump_type,
                "HHP": hhp if hhp else "",
                "Efficiency": efficiency,
                "Stroke(in)": stroke,
                "Liner(in)": liner,
                "P-Rating(psi)": p_rating,
                "P-Limit(psi)": p_limit,
                "SPM Rating": spm_rating,
                "SPM Limit": spm_limit
            })

    return pumps

# ------------------------------------------------------------------
# 5) Parse Drilling/Circ Rates
# ------------------------------------------------------------------
def parse_drilling_circ_rates(ocr_text):
    """
    Parses lines like:
      Drilling/Circ Rate 1 4325 PSI @ 134 SPM 2.63 Gal/Stoke 351.76 GPM 8.38 BPM 468.11 DC 340.61 DP
      Drilling/Circ Rate 2 4475 PSI @ 134 SPM 2.63 Gal/Stoke 351.76 GPM 8.38 BPM 468.11 DC 340.61 DP
    We'll store them in a structured list of dicts.
    """

    # We'll define a pattern capturing Rate #, Pressure, SPM, Gal/Stoke, GPM, BPM, DC, DP, etc.
    # Example line:
    #   Drilling/Circ Rate 1 4325 PSI @ 134 SPM 2.63 Gal/Stoke 351.76 GPM 8.38 BPM 468.11 DC 340.61 DP
    circ_pattern = re.compile(
        r"Drilling/Circ\s+Rate\s+(\d+)\s+(\d+)\s+PSI\s*@\s*(\d+)\s*SPM\s*([\d.]+)\s+Gal/Stoke\s+([\d.]+)\s+GPM\s+([\d.]+)\s+BPM\s+([\d.]+)\s+DC\s+([\d.]+)\s+DP",
        re.IGNORECASE
    )

    lines = ocr_text.splitlines()
    circ_rates = []

    for line in lines:
        line = line.strip()
        match = circ_pattern.search(line)
        if match:
            rate_id, pressure, spm, gal_stroke, gpm, bpm, dc, dp = match.groups()
            circ_rates.append({
                "RateID": rate_id,
                "Pressure(PSI)": pressure,
                "SPM": spm,
                "Gal/Stoke": gal_stroke,
                "GPM": gpm,
                "BPM": bpm,
                "DC": dc,
                "DP": dp
            })

    return circ_rates


# -----------------------------------------------------------------------------
# Main Function: Run All Pipelines in Sequence & Save Outputs
# -----------------------------------------------------------------------------
def run_pipeline(name, pipeline_info, debug=False):
    try:
        logger.info(f"Processing pipeline: {name}")
        output_json, df = pipeline_info["func"](debug)
        logger.info(f"{name} DataFrame shape: {df.shape} (Rows: {df.shape[0]}, Columns: {df.shape[1]})")
        json_path = os.path.join(JSON_FOLDER, pipeline_info["json"])
        with open(json_path, "w") as f:
            json.dump(output_json, f, indent=4)
        csv_path = os.path.join(CSV_FOLDER, pipeline_info["csv"])
        df.to_csv(csv_path, index=False)
        logger.info(f"{name} saved: JSON({json_path}), CSV({csv_path})")
        print(f"--- {name.upper()} DataFrame ---")
        print(f"Shape: {df.shape}")
        print(df.head(10))
    except Exception as e:
        logger.error(f"Error in pipeline {name}: {e}")

def main():
    debug = False  # Set True for detailed logging
    pipelines = {
        "cost_data": {
            "func": pipeline_cost_data,
            "csv": "cost_data.csv",
            "json": "cost_data.json"
        },
        "well_job": {
            "func": lambda d: process_well_job_info("dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_2.png", d),
            "csv": "well_job_data.csv",
            "json": "well_job_data.json"
        },
        "obs_int": {
            "func": lambda d: process_obs_int("dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_10.png", d),
            "csv": "obs_int_data.csv",
            "json": "obs_int_data.json"
        },
        "bop": {
            "func": lambda d: process_bop("dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_8.png", d),
            "csv": "bop_data.csv",
            "json": "bop_data.json"
        },
        "dir_info": {
            "func": lambda d: process_dir_info("dbfs:/mnt/mini-proj-dd/cropped_sections/page_1_section_5.png", d),
            "csv": "dir_info_data.csv",
            "json": "dir_info_data.json"
        },
        "survey": {
            "func": pipeline_survey_data,
            "csv": "survey_data.csv",
            "json": "survey_data.json"
        },
        "casing": {
            "func": pipeline_casing_data,
            "csv": "casing_data.csv",
            "json": "casing_data.json"
        },
        "consumables": {
            "func": pipeline_consumables_data,
            "csv": "consumables_data.csv",
            "json": "consumables_data.json"
        },
        "mud": {
            "func": pipeline_mud_data,
            "csv": "mud_data.csv",
            "json": "mud_data.json"
        },
        "bha": {
            "func": extract_bha_data,
            "csv": "bha_data.csv",
            "json": "bha_data.json"
        },
        "pumps": {
            "func": parse_pumps_table + parse_drilling_circ_rates, # append them both
            "csv": "pumps_data.csv",
            "json": "pumps_data.json"
        }
    }
    for name, pipe in pipelines.items():
        run_pipeline(name, pipe, debug)



if __name__ == "__main__":
    main()


[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-4749686794680557>, line 911[0m
[1;32m    906[0m         run_pipeline(name, pipe, debug)
[1;32m    910[0m [38;5;28;01mif[39;00m [38;5;18m__name__[39m [38;5;241m==[39m [38;5;124m"[39m[38;5;124m__main__[39m[38;5;124m"[39m:
[0;32m--> 911[0m     main()

File [0;32m<command-4749686794680557>, line 900[0m, in [0;36mmain[0;34m()[0m
[1;32m    846[0m [38;5;28;01mdef[39;00m [38;5;21mmain[39m():
[1;32m    847[0m     debug [38;5;241m=[39m [38;5;28;01mFalse[39;00m  [38;5;66;03m# Set True for detailed logging[39;00m
[1;32m    848[0m     pipelines [38;5;241m=[39m {
[1;32m    849[0m         [38;5;124m"[39m[38;5;124mcost_data[39m[38;5;124m"[39m: {
[1;32m    850[0m             [38;5;124m"[39m[38;5;124mfunc[39m[38;5;124m"[39m: pipeline_cost_data,
[1;32m   