In [None]:
import json
from pathlib import Path

# -----------------------------
# CONFIG
# -----------------------------
BASE_DIR = Path("/content/drive/MyDrive/Upwork/david-grass/pilot_folder")  # project root
BUNDLE_MANIFEST_PATH = BASE_DIR / "inputs" / "bundle_manifest.json"
SITEMAP_ANALYSIS_PATH = BASE_DIR / "inputs" / "sitemap_analysis.json"

OUTPUT_PATH = BASE_DIR / "processing_index.json"

# Page types we NEVER want to process
SKIP_PAGE_TYPES = {
    "homepage",
    "category_landing",
    "blog",
    "cms",
}

# -----------------------------
# LOAD INPUT FILES
# -----------------------------
with open(BUNDLE_MANIFEST_PATH, "r", encoding="utf-8") as f:
    bundle_manifest = json.load(f)

with open(SITEMAP_ANALYSIS_PATH, "r", encoding="utf-8") as f:
    sitemap_analysis = json.load(f)

selected_urls = set(bundle_manifest.get("selected_urls", []))

# -----------------------------
# BUILD URL ‚Üí SITEMAP LOOKUP
# -----------------------------
sitemap_by_url = {}
for row in sitemap_analysis:
    url = row.get("url")
    if url:
        sitemap_by_url[url] = row

# -----------------------------
# PROCESS SELECTED URLS
# -----------------------------
processing_index = []
errors = []

for url in sorted(selected_urls):
    sitemap_row = sitemap_by_url.get(url)

    if not sitemap_row:
        errors.append({
            "url": url,
            "reason": "URL not found in sitemap_analysis.json"
        })
        continue

    status = sitemap_row.get("status")
    page_type = sitemap_row.get("page_type")

    # Skip explicitly marked or unsupported pages
    if status == "skip":
        continue

    if page_type in SKIP_PAGE_TYPES:
        continue

    dir_path = BASE_DIR / sitemap_row.get("dir", "")
    page_html_path = dir_path / "page.html"
    tables_json_path = dir_path / "tables.json"

    record = {
        "source_url": url,
        "slug": sitemap_row.get("slug"),
        "page_type": page_type,
        "status": status,
        "dir": str(dir_path),
        "page_html_path": str(page_html_path),
        "tables_json_path": str(tables_json_path),
        "has_page_html": page_html_path.exists(),
        "has_tables_json": tables_json_path.exists(),
    }

    # Track missing critical files
    if not record["has_page_html"]:
        errors.append({
            "url": url,
            "reason": "Missing page.html",
            "expected_path": str(page_html_path)
        })

    processing_index.append(record)

# -----------------------------
# WRITE OUTPUTS
# -----------------------------
with open(OUTPUT_PATH, "w", encoding="utf-8") as f:
    json.dump(processing_index, f, indent=2, ensure_ascii=False)

if errors:
    with open("processing_index_errors.json", "w", encoding="utf-8") as f:
        json.dump(errors, f, indent=2, ensure_ascii=False)

print("Step 1 complete.")
print(f"Total selected URLs: {len(selected_urls)}")
print(f"Processing index entries: {len(processing_index)}")
print(f"Errors logged: {len(errors)}")

Step 1 complete.
Total selected URLs: 386
Processing index entries: 144
Errors logged: 0


In [None]:
import json
import re
from pathlib import Path
from typing import Optional, Tuple

# -----------------------------
# CONFIG
# -----------------------------
# BASE_DIR = Path(".")
PROCESSING_INDEX_PATH = BASE_DIR / "processing_index.json"
OUTPUT_PATH = BASE_DIR / "sku_index.json"
WARNINGS_PATH = BASE_DIR / "sku_index_warnings.json"

# -----------------------------
# BULLETPROOF SKU REGEXES
# -----------------------------

SKU_VALUE_PATTERN = r'([A-Z0-9_-]{5,30})'

RE_KLAVIYO_SKU = re.compile(
    rf'"SKU"\s*:\s*"{SKU_VALUE_PATTERN}"',
    re.IGNORECASE
)

RE_DATA_PRODUCT_SKU = re.compile(
    rf'data-product-sku\s*=\s*"{SKU_VALUE_PATTERN}"',
    re.IGNORECASE
)

RE_VISIBLE_SKU = re.compile(
    rf'<label[^>]*>\s*SKU\s*:\s*</label>\s*'
    rf'<span[^>]*>\s*{SKU_VALUE_PATTERN}\s*</span>',
    re.IGNORECASE | re.DOTALL
)


# -----------------------------
# SKU EXTRACTION
# -----------------------------
def extract_sterlitech_sku(html: str) -> Tuple[Optional[str], Optional[str]]:
    """
    Extract one canonical SKU from a Sterlitech product page.
    Priority:
      1. Klaviyo JS
      2. data-product-sku
      3. Visible attribute block
    """

    match = RE_KLAVIYO_SKU.search(html)
    if match:
        return match.group(1), "klaviyo_js"

    match = RE_DATA_PRODUCT_SKU.search(html)
    if match:
        return match.group(1), "data_product_sku"

    match = RE_VISIBLE_SKU.search(html)
    if match:
        return match.group(1), "visible_attribute"

    return None, None


# -----------------------------
# LOAD INPUT
# -----------------------------
with open(PROCESSING_INDEX_PATH, "r", encoding="utf-8") as f:
    processing_index = json.load(f)

sku_index = []
qa_warnings = []


# -----------------------------
# MAIN LOOP
# -----------------------------
for page in processing_index:
    source_url = page["source_url"]
    slug = page["slug"]
    page_html_path = Path(page["page_html_path"])

    print(f"Processing page: {source_url}")

    if not page_html_path.exists():
        qa_warnings.append({
            "source_url": source_url,
            "reason": "page.html missing"
        })
        print("  ‚ö† page.html missing")
        continue

    html = page_html_path.read_text(encoding="utf-8", errors="ignore")

    sku, sku_source = extract_sterlitech_sku(html)

    if sku:
        sku_index.append({
            "source_url": source_url,
            "page_slug": slug,
            "sku": sku,
            "sku_source": sku_source,
            "page_html_path": page["page_html_path"],
            "tables_json_path": page["tables_json_path"]
        })
        print(f"  ‚Üí SKU found: {sku} ({sku_source})")

    else:
        qa_warnings.append({
            "source_url": source_url,
            "reason": "No SKU found in HTML"
        })
        print("  ‚ö† No SKU found")


# -----------------------------
# WRITE OUTPUTS
# -----------------------------
with open(OUTPUT_PATH, "w", encoding="utf-8") as f:
    json.dump(sku_index, f, indent=2, ensure_ascii=False)

if qa_warnings:
    with open(WARNINGS_PATH, "w", encoding="utf-8") as f:
        json.dump(qa_warnings, f, indent=2, ensure_ascii=False)

print("\nStep 2 complete.")
print(f"Total pages processed: {len(processing_index)}")
print(f"SKUs extracted: {len(sku_index)}")
print(f"Warnings: {len(qa_warnings)}")

Processing page: https://www.sterlitech.com/50mm-disc-capsule-glass-fiber-5-um-prefilter-over-0-20um-pes-final-filter-1-8-barb-inlet-outlet.html
  ‚Üí SKU found: 1470016 (klaviyo_js)
Processing page: https://www.sterlitech.com/50mm-disc-capsule-glass-fiber-5-um-prefilter-over-0-45um-pes-final-filter-1-8-barb-inlet-outlet.html
  ‚Üí SKU found: 1470017 (klaviyo_js)
Processing page: https://www.sterlitech.com/50mm-disc-capsule-glass-fiber-5-um-prefilter-over-0-80um-pes-final-filter-1-8-barb-inlet-outlet.html
  ‚Üí SKU found: 1470018 (klaviyo_js)
Processing page: https://www.sterlitech.com/aluminum-oxide-membrane-filters-0-2-micron-13-mm-10-pk.html
  ‚Üí SKU found: 1360007 (klaviyo_js)
Processing page: https://www.sterlitech.com/aluminum-oxide-membrane-filters-1360001.html
  ‚Üí SKU found: 1360001 (klaviyo_js)
Processing page: https://www.sterlitech.com/aluminum-oxide-membrane-filters-1360002.html
  ‚Üí SKU found: 1360002 (klaviyo_js)
Processing page: https://www.sterlitech.com/aluminum-ox

In [None]:
import json
import re
from pathlib import Path
from copy import deepcopy
from html import unescape
from collections import OrderedDict

# ---------------------------------
# CONFIG
# ---------------------------------
# BASE_DIR = Path(".")
SKU_INDEX_PATH = BASE_DIR / "sku_index.json"
OUTPUT_PATH = BASE_DIR / "product_records.json"
WARNINGS_PATH = BASE_DIR / "step3_warnings.json"

# ---------------------------------
# LOAD INPUT
# ---------------------------------
with open(SKU_INDEX_PATH, "r", encoding="utf-8") as f:
    sku_index = json.load(f)

product_records = []
warnings = []

# ---------------------------------
# REGEX UTILITIES
# ---------------------------------
RE_HTML_TAGS = re.compile(r"<[^>]+>")

RE_H1 = re.compile(r"<h1[^>]*>(.*?)</h1>", re.I | re.S)

RE_ATTRIBUTE_BLOCK = re.compile(
    r"""
    <label[^>]*class="[^"]*attribute-label[^"]*"[^>]*>
        (?P<label>.*?)
    </label>
    \s*
    <span[^>]*class="[^"]*attribute-value[^"]*"[^>]*>
        (?P<value>.*?)
    </span>
    """,
    re.IGNORECASE | re.DOTALL | re.VERBOSE
)

RE_IMG = re.compile(
    r'<img[^>]+src="([^"]+)"',
    re.IGNORECASE
)

RE_PDF = re.compile(
    r'href="([^"]+\.pdf)"',
    re.IGNORECASE
)

# ---------------------------------
# HELPERS
# ---------------------------------
def clean_text(text: str) -> str:
    text = unescape(text)
    text = RE_HTML_TAGS.sub("", text)
    text = re.sub(r"\s+", " ", text)
    return text.strip()


def load_html(path: Path) -> str:
    if not path or not path.is_file():
        return ""
    return path.read_text(encoding="utf-8", errors="ignore")


def load_tables(path: Path) -> list:
    if not path or not path.is_file():
        return []
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)


def extract_product_name(html: str) -> str | None:
    m = RE_H1.search(html)
    return clean_text(m.group(1)) if m else None


def extract_attributes(html: str) -> dict:
    attributes = OrderedDict()

    for match in RE_ATTRIBUTE_BLOCK.finditer(html):
        label = clean_text(match.group("label"))
        value = clean_text(match.group("value"))

        if not label or not value:
            continue

        if label.endswith(":"):
            label = label[:-1].strip()

        if label not in attributes:
            attributes[label] = value

    return attributes


def extract_images(html: str) -> list:
    images = []
    for src in RE_IMG.findall(html):
        if src.startswith("//"):
            src = "https:" + src
        elif src.startswith("/"):
            src = "https://www.sterlitech.com" + src
        if src not in images:
            images.append(src)
    return images


def extract_pdfs(html: str) -> list:
    pdfs = []
    for href in RE_PDF.findall(html):
        if href.startswith("/"):
            href = "https://www.sterlitech.com" + href
        if href not in pdfs:
            pdfs.append(href)
    return pdfs


# ---------------------------------
# MAIN LOOP
# ---------------------------------
for entry in sku_index:
    record = {
        "sku": entry["sku"],
        "product_name": None,
        "source_url": entry["source_url"],
        "html_source_path": entry.get("page_html_path"),
        "category_path": None,

        "short_description_text": None,
        "specifications_text": None,
        "applications_text": None,

        "attributes": {},
        "tables": [],
        "images": [],
        "pdfs": [],
        "qa_flags": []
    }

    html_path = Path(entry.get("page_html_path", ""))
    tables_path = Path(entry.get("tables_json_path", ""))

    html = load_html(html_path)
    tables = load_tables(tables_path)

    # -----------------------------
    # EXTRACTION
    # -----------------------------
    record["product_name"] = extract_product_name(html)
    if not record["product_name"]:
        record["qa_flags"].append("missing_product_name")

    record["attributes"] = extract_attributes(html)
    if not record["attributes"]:
        record["qa_flags"].append("no_attributes")

    record["tables"] = tables
    if not tables:
        record["qa_flags"].append("no_tables")

    record["images"] = extract_images(html)
    if not record["images"]:
        record["qa_flags"].append("no_images")

    record["pdfs"] = extract_pdfs(html)

    product_records.append(record)

# ---------------------------------
# WRITE OUTPUTS
# ---------------------------------
with open(OUTPUT_PATH, "w", encoding="utf-8") as f:
    json.dump(product_records, f, indent=2, ensure_ascii=False)

if warnings:
    with open(WARNINGS_PATH, "w", encoding="utf-8") as f:
        json.dump(warnings, f, indent=2, ensure_ascii=False)

print("Step 3 complete.")
print(f"Total product records: {len(product_records)}")


NameError: name 'BASE_DIR' is not defined

In [None]:
import json
from pathlib import Path
from bs4 import BeautifulSoup
import re

# -----------------------------
# CONFIG
# -----------------------------
BASE_DIR = Path("/content/drive/MyDrive/Upwork/david-grass/pilot_folder")
SKU_INDEX_PATH = BASE_DIR / "sku_index.json"
OUTPUT_PATH = BASE_DIR / "product_records.json"

# -----------------------------
# HELPERS
# -----------------------------
def clean_text(text):
    if not text:
        return ""
    return re.sub(r"\s+", " ", text).strip()


def load_tables(path: Path) -> list:
    if not path or not path.is_file():
        return []
    with open(path, "r", encoding="utf-8") as f:
        return json.load(f)

def extract_category_path(soup):
    crumbs = soup.select(".breadcrumbs ul.items li")

    categories = []
    for li in crumbs:
        text = clean_text(li.get_text())
        if text:
            categories.append(text)

    # Expect: Home > ...categories... > Product
    if len(categories) < 3:
        return ""

    return " / ".join(categories[1:-1])


def extract_short_description(soup):
    meta = soup.find("meta", attrs={"name": "description"})
    if meta and meta.get("content"):
        return clean_text(meta["content"])
    return ""


def extract_images(soup):
    images = []

    for frame in soup.select(".fotorama__stage__frame"):
        img = frame.find("img")
        if img and img.get("src"):
            images.append(img["src"])
            continue

        href = frame.get("href")
        if href:
            images.append(href)

    return list(dict.fromkeys(images))


def extract_tab_text(soup, tab_keyword):
    for title in soup.select(".data.item.title"):
        title_text = clean_text(title.get_text()).lower()
        if tab_keyword.lower() not in title_text:
            continue

        content_id = title.get("aria-controls")
        if not content_id:
            continue

        content = soup.find(id=content_id)
        if not content:
            continue

        for tag in content(["script", "style", "noscript"]):
            tag.decompose()

        raw_text = content.get_text(separator="\n")

        lines = []
        for line in raw_text.splitlines():
            line = clean_text(line)
            if line and len(line) > 2:
                lines.append(line)

        return "\n".join(lines)

    return ""


def extract_attachments(soup):
    attachments = []

    for line in soup.select(".am-attachments .am-fileline"):
        link = line.find("a", href=True)
        if not link:
            continue

        attachments.append({
            "title": clean_text(link.get_text()),
            "url": link["href"]
        })

    return attachments


# -----------------------------
# LOAD INPUT
# -----------------------------
with open(SKU_INDEX_PATH, "r", encoding="utf-8") as f:
    sku_index = json.load(f)

product_records = []

# -----------------------------
# MAIN LOOP
# -----------------------------
for entry in sku_index:
    record = {
        "sku": entry["sku"],
        "source_url": entry["source_url"],
        "page_slug": entry.get("page_slug"),
        "sku_source": entry.get("sku_source"),
        "category_path": "",
        "short_description": "",
        "tables": [],
        "images": [],
        "specifications": "",
        "applications": "",
        "attachments": [],
        "qa_flags": []
    }

    html_path = Path(entry.get("page_html_path", ""))
    tables_path = Path(entry.get("tables_json_path", ""))
    tables = load_tables(tables_path)

    if not html_path or not html_path.exists():
        record["qa_flags"].append("missing_page_html")
        product_records.append(record)
        continue

    soup = BeautifulSoup(
        html_path.read_text(encoding="utf-8", errors="ignore"),
        "lxml"
    )

    # üß≠ Category Path
    record["category_path"] = extract_category_path(soup)
    if not record["category_path"]:
        record["qa_flags"].append("missing_category_path")

    # üìù Short Description
    record["short_description"] = extract_short_description(soup)
    if not record["short_description"]:
        record["qa_flags"].append("missing_short_description")

    # Tables
    record["tables"] = tables
    if not tables:
        record["qa_flags"].append("no_tables")

    # üñºÔ∏è Images
    record["images"] = extract_images(soup)
    if not record["images"]:
        record["qa_flags"].append("no_images")

    # üìã Specifications
    record["specifications"] = extract_tab_text(soup, "Specifications")
    if not record["specifications"]:
        record["qa_flags"].append("no_specifications")

    # üß™ Applications
    record["applications"] = extract_tab_text(soup, "Applications")
    if not record["applications"]:
        record["qa_flags"].append("no_applications")

    # üìé Attachments
    record["attachments"] = extract_attachments(soup)
    if not record["attachments"]:
        record["qa_flags"].append("no_attachments")

    product_records.append(record)

# -----------------------------
# WRITE OUTPUT
# -----------------------------
with open(OUTPUT_PATH, "w", encoding="utf-8") as f:
    json.dump(product_records, f, indent=2, ensure_ascii=False)

print("Step 3 complete.")
print(f"Total product records: {len(product_records)}")
print(
    "Records with QA flags:",
    sum(1 for r in product_records if r["qa_flags"])
)

Step 3 complete.
Total product records: 144
Records with QA flags: 119
