In [1]:
# Imports
import os
import re
import pandas as pd
from sqlalchemy import (
    select, create_engine, text, Table, Column, Integer, String, MetaData, ForeignKey, LargeBinary)
from sqlalchemy.orm import sessionmaker
import pdfplumber
import fitz  # PyMuPDF
# For Image Display within the df
from IPython.display import Image
from PIL import Image, ImageOps
from io import BytesIO

In [2]:
# Intializing database connection
# Replace values with your actual database info
username = "tpmpams_user"
password = "X5Lx2fWLXQ18cxaEngOODl3gXtMq7H8f"
host = "dpg-d0r91k2dbo4c73a4kip0-a.singapore-postgres.render.com"
port = "5432"
database = "tpmpams"

# SQLAlchemy connection URL
DATABASE_URL = f"postgresql://{username}:{password}@{host}:{port}/{database}"

# Create engine
engine = create_engine(DATABASE_URL)

In [3]:
# Creating the 2 tables
metadata = MetaData()

master_parts_list = Table(
    "master_parts_list", metadata,
    Column("mpl_id", Integer, primary_key=True),
    Column("pdf_id", String, nullable=False),
    Column("year", Integer, nullable=False),
    Column("brand", String, nullable=False),
    Column("model", String, nullable=False),
    Column("section", Integer, nullable=False), # AKA "fig_no" for pdf
    Column("component_name", String, nullable=False),
    Column("ref_no", Integer, nullable=False),
    Column("part_no", String, nullable=False),
    Column("description", String, nullable=False),
    Column("remarks", String),
    Column("image_id", String, ForeignKey("parts_images.image_id", ondelete="SET NULL"))
)

parts_images = Table(
    "parts_images", metadata,
    Column("image_id", String, primary_key=True),
    Column("pdf_id", String, nullable=False),
    Column("section", Integer, nullable=False),
    Column("image", LargeBinary, nullable=False),
)

# Create the tables
metadata.create_all(engine)

In [4]:
# Functions
# ---------- HELPERS ----------
def extract_pdf_id(pdf_path):
    # To-Do: modify pdf_id extraction to suit both pdf format
    base_filename = os.path.basename(pdf_path).split('.')[0]
    match = re.match(r"([A-Za-z0-9 ]+)", base_filename)
    if match:
        return match.group(1).replace(" ", "")  # Remove all spaces
    return None

def extract_year(pdf_path):
    year_match = re.search(r"'(\d{2})", pdf_path)
    return f"20{year_match.group(1)}" if year_match else None

def extract_model(pdf_path):
    base_filename = os.path.basename(pdf_path)
    match = re.search(r"\((.*?)\)", base_filename)
    if match:
        return match.group(1)  # e.g., "B65P, B65R, B65S"
    return None

# ---------- IMAGE EXTRACTION ----------
def normalize_image_background(image_bytes):
    img = Image.open(BytesIO(image_bytes)).convert("L")  # Grayscale
    mean_brightness = sum(img.getdata()) / (img.width * img.height)

    if mean_brightness < 128:  # Invert if it's a dark background
        img = ImageOps.invert(img)

    img = img.convert("RGB")  # Convert back to RGB

    output = BytesIO()
    img.save(output, format="PNG")
    return output.getvalue()

def get_existing_fig_combos(engine, pdf_id):
    with engine.connect() as conn:
        result = conn.execute(
            text("SELECT section FROM parts_images WHERE pdf_id = :pdf_id"),
            {"pdf_id": pdf_id}
        )
        return set(str(row[0]) for row in result.fetchall())  # Cast to str for consistent comparison

def extract_images_with_fig_labels(pdf_path, pdf_id, engine):
    doc = fitz.open(pdf_path)
    data = []

    # Step 1: Get existing (pdf_id, fig_no) combos from DB
    existing_figs = get_existing_fig_combos(engine, pdf_id)

    seen_figs = set()  # Track unique figs within the PDF

    for page_num in range(len(doc)):
        page = doc.load_page(page_num)
        text = page.get_text()

        matches = re.findall(r"FIG\.\s*([\w-]+)", text)
        if not matches:
            continue

        section = matches[0]

        if section in seen_figs or section in existing_figs:
            continue  # Skip if already handled or exists in DB

        image_list = page.get_images(full=True)
        if not image_list:
            continue

        xref = image_list[0][0]
        base_image = doc.extract_image(xref)
        image = normalize_image_background(base_image["image"])

        image_id = "_".join([pdf_id, section])

        data.append({
            "image_id" : image_id,
            "pdf_id": pdf_id,
            "section": section,
            "image": image
        })
        seen_figs.add(section)

    return pd.DataFrame(data)

# ---------- TEXT EXTRACTION ----------
def extract_text_from_pdf(pdf_path):
    all_text = ""
    with pdfplumber.open(pdf_path) as pdf:
        for page_num, page in enumerate(pdf.pages):
            text = page.extract_text() or ""
            lines = text.split('\n')
            first_line = lines[0].strip() if lines else ""
            if not any("FIG." in line for line in lines):
                continue
            if "NUMERICAL INDEX" in first_line:
                break
            all_text += f"\n--- Page {page_num + 1} ---\n{text}\n"
    return all_text

def yamaha_process_data(text, pdf_id, year, model, num_model):
    rows = []
    lines = text.strip().split('\n')
    section = c_name = prev_fig_no = prev_c_name = prev_ref_no = ""
    collect_data = False

    for line in lines:
        line = line.strip()
        if not line: continue
        if line.startswith('FIG.'):
            tokens = line.split()
            if len(tokens) >= 3:
                section = tokens[1]
                c_name = " ".join(tokens[2:])
                prev_fig_no, prev_c_name = section, c_name
                collect_data = True
            continue
        if not collect_data: continue
        if not section:
            section, c_name = prev_fig_no, prev_c_name

        parts = line.split()
        is_valid_data_line = (
            len(parts) >= 2 and 
            (re.match(r'\w+[-–]\w+', parts[0]) or parts[0].isdigit())
        )
        if not is_valid_data_line:
            continue

        if parts[0].isdigit():
            ref_no = parts[0]
            part_no = parts[1]
            rest = parts[2:]
            prev_ref_no = ref_no
        else:
            ref_no = prev_ref_no
            part_no = parts[0]
            rest = parts[1:]

        rest = " ".join(rest).split()
        description = remarks = ""
        numbers = []
        found_numbers = False
        for item in rest:
            if item.isdigit():
                numbers.append(item)
                found_numbers = True
                continue
            if not found_numbers:
                description += item + " "
            else:
                remarks += item + " "
        if len(numbers) > num_model:
            description += numbers[0]

        image_id = "_".join([pdf_id, section])

        rows.append([pdf_id, year, "Yamaha", model, section, c_name, ref_no, part_no, description.strip(), remarks.strip(), image_id])

    return pd.DataFrame(rows, columns=[
        'pdf_id', 'year', 'brand', 'model', 'section', 'component_name',
        'ref_no', 'part_no', 'description', 'remarks', 'image_id'
    ])

# ---------- MAIN PROCESS ----------
def yamaha_data_extraction(pdf_path):

    pdf_id = extract_pdf_id(pdf_path)
    year = extract_year(pdf_path)
    model = extract_model(pdf_path)

    SessionLocal = sessionmaker(bind=engine)
    session = SessionLocal()
    try:
        df_images = extract_images_with_fig_labels(pdf_path, pdf_id, engine)
        image_message = f"[INFO] Inserted {len(df_images)} new images for '{pdf_id}'."
        if not df_images.empty:
            df_images.to_sql("parts_images", engine, if_exists="append", index=False, method="multi")
            print(image_message)
        else:
            print(image_message + f" All images for '{pdf_id}' already exist.")

        # Step 2: Check if parts data already exists
        existing = session.execute(
            select(1).select_from(master_parts_list).where(master_parts_list.c.pdf_id == pdf_id)
        ).first()

        if existing:
            print(f"[INFO] Master Parts data for '{pdf_id}' already exists.")
            return

    finally:
        session.close()
            
    # Step 3: Extract and process parts data (outside session scope)
    all_text = extract_text_from_pdf(pdf_path)
    df_parts = yamaha_process_data(all_text, pdf_id, year, model, num_model=3)

    if not df_parts.empty:
        #print(df_parts.to_string(index=False))
        df_parts.to_sql("master_parts_list", engine, if_exists="append", index=False, method="multi")
        print(f"[INFO] Inserted parts data for '{pdf_id}'.")
    else:
        print(f"[INFO] Error, no parts data extracted for '{pdf_id}'.")

In [6]:
# Format 1: Yamaha, Important images from page 6-60
pdf_1 = "Manuals/AEROX 155 '19 (B65P, B65R, B65S).pdf"
pdf_2= "Manuals/FJR1300A '15 (1MCH, 1MCG).PDF"
# Format 2: Honda
pdf_3 = "Manuals/CRF1000 A_PC_13MJPG02_(G.H).pdf"
pdf_4 = "Manuals/NC750XAP_13MKWM02_PC_2022_2023.pdf"

pdf_path = pdf_2
brand = "Yamaha"
supported_brands = ['Yamaha', 'Honda']

if brand in supported_brands:
    if brand == "Yamaha":
        yamaha_data_extraction(pdf_path)
    elif brand == "Honda":
        print("Hi")
else:
    print (f'"{brand}" not supported \nAvailable Brands: {supported_brands}')

[INFO] Inserted 49 new images for 'FJR1300A'.
[INFO] Inserted parts data for 'FJR1300A'.


In [2]:
import re

def extract_info(text):
    # Bike model: From start, grab all alphanumerics until non-alphanumeric
    bike_model_match = re.match(r"^([A-Z0-9]+)", text)

    # PDF ID: Look for a substring starting with 13 followed by exactly 6 alphanumeric characters
    pdf_id_match = re.search(r"13[A-Z0-9]{6}", text)

    bike_model = bike_model_match.group(1) if bike_model_match else None
    pdf_id = pdf_id_match.group(0) if pdf_id_match else None

    return bike_model, pdf_id

# Test examples
examples = [
    "CRF1000 A_PC_13MJPG02_(G.H)",
    "NC750XAP_13MKWM02_PC_2022_2023"
]

for e in examples:
    bike, pdf = extract_info(e)
    print(f"Bike Model: {bike}, PDF ID: {pdf}")

Bike Model: CRF1000, PDF ID: 13MJPG02
Bike Model: NC750XAP, PDF ID: 13MKWM02
