# Vigone Municipality Data Harvester

## Purpose
This notebook orchestrates an end-to-end pipeline for extracting municipal indicators from Vigone (TO) website PDFs.

## Prerequisites
- **Google Drive Mount**: Required for accessing CSV templates and storing outputs
- **Templates**: 6 CSV indicator templates must exist in Drive at configured path
- **System**: Runs on Google Colab with sufficient disk space for PDF processing

## Configuration
- `ANNO_TARGET`: Target year for data extraction (default: 2024)
- `MAX_PAGES`: Web crawl depth limit (default: 50)
- `MAX_PDFS`: Maximum PDFs to process (default: 30)

## Output Structure
```
/content/drive/MyDrive/vigone_extraction/
├── docs/          # Downloaded PDFs
├── marker/        # Converted JSON/Markdown
├── output/        # Populated CSV templates
└── manifest.json  # Resume state tracking
```

**Workflow**: Web Discovery → PDF Download → Marker Conversion → Indicator Extraction → CSV Population → Reporting

In [None]:
# Environment
import importlib.util
import subprocess
import sys

from IPython.display import display, Markdown

def ensure_pip(pkg):
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q", pkg])

for pkg in ["requests", "beautifulsoup4", "pandas", "tenacity", "tqdm", "marker-pdf", "PyMuPDF", "pdf2image", "pytesseract"]:
    ensure_pip(pkg)

subprocess.check_call(["apt-get", "update", "-qq"])
subprocess.check_call(["apt-get", "install", "-y", "-qq", "poppler-utils", "tesseract-ocr", "tesseract-ocr-ita"])

print("✅ Dependencies installed")
subprocess.run(["marker_single", "--version"], check=False)
subprocess.run(["marker_single", "--help"], check=False)


In [None]:
# Mount Google Drive
from google.colab import drive

drive.mount('/content/drive', force_remount=False)


In [None]:
# Config
import os
from pathlib import Path
from datetime import datetime

comune_url = "https://www.comune.vigone.to.it/"
comune_name = "Vigone"
year = 2024

candidate_template_paths = [
    Path("/content/drive/MyDrive/dataset_dati_comuni"),
    Path("/content/drive/MyDrive/templates"),
]
template_source = next((p for p in candidate_template_paths if p.exists()), candidate_template_paths[0])

workspace = Path(f"/content/drive/MyDrive/estrazione_dati_comuni/Comuni/{comune_name}/{year}")
docs_dir = workspace / "docs"
parsed_dir = workspace / "parsed"
cache_dir = workspace / "cache"
output_dir = workspace / "output"
state_path = workspace / "manifest.json"

for p in [workspace, docs_dir, parsed_dir, cache_dir, output_dir]:
    p.mkdir(parents=True, exist_ok=True)

required_templates = [
    "01_governo.csv",
    "02_territorio_popolazione.csv",
    "03_risultati_pillole.csv",
    "04_servizi_civici.csv",
    "05_rifiuti.csv",
    "06_progetti.csv",
]

if not Path('/content/drive').exists():
    raise RuntimeError("Google Drive non risulta montato in /content/drive.")

missing = [x for x in required_templates if not (template_source / x).exists()]
if missing:
    raise RuntimeError(f"Template mancanti in {template_source}: {missing}")

print(f"✅ Template source: {template_source}")
print(f"✅ Workspace: {workspace}")


In [None]:
# Imports
import hashlib
import io
import json
import re
import shutil
import subprocess
import time
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from urllib.parse import urljoin, urlparse, urldefrag

import fitz
import pandas as pd
import pytesseract
import requests
from bs4 import BeautifulSoup
from pdf2image import convert_from_path
from tenacity import retry, stop_after_attempt, wait_exponential
from tqdm.auto import tqdm

USER_AGENT = "Mozilla/5.0 (compatible; VigoneDataBot/2.0)"


In [None]:
# Utilities

def sha256_text(value: str) -> str:
    return hashlib.sha256(value.encode("utf-8")).hexdigest()


def sha256_bytes(value: bytes) -> str:
    return hashlib.sha256(value).hexdigest()


def now_iso() -> str:
    return datetime.utcnow().isoformat() + "Z"


def canonicalize_for_crawl(base_url: str, href: str) -> Optional[str]:
    if not href:
        return None
    url = urldefrag(urljoin(base_url, href.strip()))[0]
    p = urlparse(url)
    if p.scheme not in {"http", "https"}:
        return None
    if p.netloc != urlparse(comune_url).netloc:
        return None
    if is_pdf_url(url):
        return None
    return url


def canonicalize_pdf_link(base_url: str, href: str) -> Optional[str]:
    if not href:
        return None
    url = urldefrag(urljoin(base_url, href.strip()))[0]
    p = urlparse(url)
    if p.scheme not in {"http", "https"}:
        return None
    if is_pdf_url(url):
        return url
    return None


def is_pdf_url(url: str) -> bool:
    return urlparse(url).path.lower().endswith(".pdf")


def pick_markdown_generated(folder: Path) -> Optional[Path]:
    mds = list(folder.rglob("*.md"))
    if not mds:
        return None
    return max(mds, key=lambda p: p.stat().st_size)


def extract_number_near_text(text: str) -> Optional[str]:
    m = re.search(r"(?<!\d)(\d{1,3}(?:[\.\s]\d{3})*(?:,\d+)?|\d+(?:[\.,]\d+)?)(?!\d)", text)
    return m.group(1) if m else None


def infer_unit(indicator: str, snippet: str) -> str:
    t = f"{indicator} {snippet}".lower()
    if "%" in t or "percent" in t:
        return "%"
    if "euro" in t or "€" in t:
        return "EUR"
    if "ton" in t:
        return "TON"
    return "COUNT"


In [None]:
# Manifest state

def read_manifest(path: Path) -> Dict:
    if path.exists():
        with open(path, "r", encoding="utf-8") as f:
            return json.load(f)
    return {
        "run": {},
        "url_catalog": {"items": [], "total": 0, "updated_at": None},
        "pdf_registry": {},
        "pdf_index": {},
        "conversion_log": [],
        "extractions": [],
        "metrics": {
            "urls_discovered": 0,
            "pdfs_acquired": 0,
            "pdfs_parsed_ok": 0,
            "pdfs_parsed_failed": 0,
            "templates_found": 0,
            "fields_total": 0,
            "fields_filled": 0,
        },
    }


def write_manifest(path: Path, manifest: Dict) -> None:
    manifest["updated_at"] = now_iso()
    with open(path, "w", encoding="utf-8") as f:
        json.dump(manifest, f, indent=2, ensure_ascii=False)


In [None]:
# Stage 1 - Web Discovery

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=8))
def fetch_url(url: str) -> requests.Response:
    r = requests.get(url, timeout=25, headers={"User-Agent": USER_AGENT})
    r.raise_for_status()
    return r


def discover_urls(seed_urls: List[str], max_pages: int = 300, max_depth: int = 4, sleep_s: float = 0.05) -> Tuple[List[str], Dict]:
    visited = set()
    queue = deque([(u, 0) for u in seed_urls])
    discovered_pdf = set()
    pdf_discarded = []
    visited_log = []

    while queue and len(visited) < max_pages:
        url, depth = queue.popleft()
        if depth > max_depth or url in visited:
            continue
        visited.add(url)
        visited_log.append(url)

        try:
            resp = fetch_url(url)
        except Exception as exc:
            continue

        ctype = (resp.headers.get("Content-Type") or "").lower()
        is_xml = "xml" in ctype or urlparse(url).path.lower().endswith(".xml")
        parser = "xml" if is_xml else "html.parser"
        soup = BeautifulSoup(resp.text, features=parser)

        if is_xml:
            for loc in soup.find_all("loc"):
                nxt = canonicalize_for_crawl(url, loc.get_text(strip=True))
                if nxt and nxt not in visited:
                    queue.append((nxt, depth + 1))
            continue

        for tag, attr in [("a", "href"), ("iframe", "src"), ("embed", "src"), ("object", "data"), ("link", "href")]:
            for node in soup.find_all(tag):
                raw = node.get(attr)
                if not raw:
                    continue
                pdf_link = canonicalize_pdf_link(url, raw)
                if pdf_link:
                    discovered_pdf.add(pdf_link)
                else:
                    reason = "non_pdf_or_invalid"
                    pdf_discarded.append({"from": url, "href": raw, "reason": reason})
                html_link = canonicalize_for_crawl(url, raw)
                if html_link and html_link not in visited:
                    queue.append((html_link, depth + 1))

        time.sleep(sleep_s)

    diag = {
        "visited_top10": visited_log[:10],
        "pdf_seen_discarded_top20": pdf_discarded[:20],
    }
    return sorted(discovered_pdf), diag


In [None]:
# Stage 2 - PDF Acquisition

@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10))
def download_pdf(url: str) -> bytes:
    r = requests.get(url, timeout=45, headers={"User-Agent": USER_AGENT})
    r.raise_for_status()
    return r.content


def acquire_pdfs(pdf_urls: List[str], manifest: Dict, max_pdfs: int = 120) -> Dict:
    reg = manifest.setdefault("pdf_registry", {})
    idx = manifest.setdefault("pdf_index", {})

    def priority(u: str):
        s = u.lower()
        return 0 if str(year) in s or str(year - 1) in s else 1

    acquired = 0
    for url in sorted(pdf_urls, key=priority)[:max_pdfs]:
        url_key = sha256_text(url)
        existing = reg.get(url_key)
        if existing and Path(existing.get("local_path", "")).exists():
            continue
        try:
            content = download_pdf(url)
        except Exception:
            continue
        local = docs_dir / f"{url_key}.pdf"
        with open(local, "wb") as f:
            f.write(content)
        doc_id = sha256_bytes(content)

        if doc_id not in idx:
            idx[doc_id] = {
                "source_url": url,
                "local_path": str(local),
                "url_key": url_key,
                "filename_original": Path(urlparse(url).path).name,
            }
            acquired += 1
        reg[url_key] = {"doc_id": doc_id, "local_path": str(local), "source_url": url, "updated_at": now_iso()}

    manifest["metrics"]["pdfs_acquired"] = len(idx)
    return manifest


In [None]:
# Stage 3 - Marker + fallback

def parse_pdf_with_fallback(manifest: Dict) -> Dict:
    conversion_log = manifest.setdefault("conversion_log", [])
    idx = manifest.get("pdf_index", {})
    ok = 0
    failed = 0

    for doc_id, meta in tqdm(idx.items(), desc="Stage 3 parse"):
        pdf_path = Path(meta["local_path"])
        md_path = parsed_dir / f"{doc_id}.md"
        meta_json = parsed_dir / f"{doc_id}.meta.json"

        if md_path.exists():
            ok += 1
            continue

        method, status, stdout_head, stderr_head = "marker", "failed", "", ""

        try:
            tmp_out = cache_dir / f"marker_{doc_id}"
            if tmp_out.exists():
                shutil.rmtree(tmp_out)
            tmp_out.mkdir(parents=True, exist_ok=True)
            proc = subprocess.run(
                ["marker_single", str(pdf_path), "--output_dir", str(tmp_out)],
                capture_output=True,
                text=True,
            )
            stdout_head = proc.stdout[:2000]
            stderr_head = proc.stderr[:2000]
            generated = pick_markdown_generated(tmp_out)
            if proc.returncode == 0 and generated and generated.exists():
                shutil.copy2(generated, md_path)
                jsons = list(tmp_out.rglob("*.json"))
                if jsons:
                    shutil.copy2(jsons[0], meta_json)
                status = "ok"
            else:
                raise RuntimeError("marker failed")
        except Exception:
            method = "pymupdf"
            try:
                doc = fitz.open(pdf_path)
                text = "\n\n".join(page.get_text() for page in doc)
                doc.close()
                if len(text.strip()) < 300:
                    method = "ocr"
                    images = convert_from_path(str(pdf_path), first_page=1, last_page=2, dpi=170)
                    ocr_text = []
                    for im in images:
                        ocr_text.append(pytesseract.image_to_string(im, lang="ita+eng"))
                    text = text + "\n\n" + "\n".join(ocr_text)
                if len(text.strip()) >= 20:
                    md_path.write_text(text, encoding="utf-8")
                    status = "ok"
                else:
                    status = "failed"
            except Exception as exc:
                stderr_head = (stderr_head + "\n" + str(exc))[:2000]
                status = "failed"

        conversion_log.append({
            "doc_id": doc_id,
            "source_url": meta.get("source_url"),
            "method": method,
            "status": status,
            "stderr_head": stderr_head,
            "stdout_head": stdout_head,
            "md_path": str(md_path),
            "ts": now_iso(),
        })

        if status == "ok":
            ok += 1
        else:
            failed += 1

    manifest["metrics"]["pdfs_parsed_ok"] = ok
    manifest["metrics"]["pdfs_parsed_failed"] = failed
    return manifest


In [None]:
# Stage 4 - Indicator extraction (rule-based)

def load_templates() -> Dict[str, pd.DataFrame]:
    loaded = {}
    for t in required_templates:
        p = template_source / t
        if not p.exists():
            raise RuntimeError(f"Template obbligatorio mancante: {p}")
        loaded[t] = pd.read_csv(p)
    return loaded


def get_indicator_column(df: pd.DataFrame) -> str:
    candidates = [c for c in df.columns if any(k in c.lower() for k in ["indic", "voce", "descr", "nome", "item"]) ]
    return candidates[0] if candidates else df.columns[0]


def extract_from_corpus(templates: Dict[str, pd.DataFrame], manifest: Dict) -> Dict[str, List[Dict]]:
    corpus = []
    for doc_id, meta in manifest.get("pdf_index", {}).items():
        md_path = parsed_dir / f"{doc_id}.md"
        if md_path.exists():
            txt = md_path.read_text(encoding="utf-8", errors="ignore")
            lines = txt.splitlines()
            corpus.append((doc_id, meta.get("source_url"), lines))

    results = {}
    fields_total = 0
    fields_filled = 0

    for name, df in templates.items():
        ind_col = get_indicator_column(df)
        rows = []
        for _, row in df.iterrows():
            indicator = str(row.get(ind_col, "")).strip()
            if not indicator or indicator.lower() == "nan":
                continue
            fields_total += 1
            best = None
            terms = [w for w in re.findall(r"[a-zA-Zàèéìòù0-9]+", indicator.lower()) if len(w) > 2]
            for doc_id, source_url, lines in corpus:
                for i, line in enumerate(lines):
                    l = line.lower()
                    score = sum(1 for t in terms if t in l)
                    if score <= 0:
                        continue
                    snippet = "\n".join(lines[max(0, i-1): i+2])
                    num = extract_number_near_text(snippet)
                    cand = {
                        "indicator": indicator,
                        "value": num,
                        "unit": infer_unit(indicator, snippet),
                        "year": year,
                        "confidence": round(min(0.95, 0.35 + score * 0.1 + (0.25 if num else 0.0)), 2),
                        "snippet": snippet[:1200],
                        "source_url": source_url,
                        "doc_id": doc_id,
                    }
                    if best is None or cand["confidence"] > best["confidence"]:
                        best = cand
            if best is None:
                best = {
                    "indicator": indicator,
                    "value": None,
                    "unit": None,
                    "year": year,
                    "confidence": 0.0,
                    "snippet": "",
                    "source_url": None,
                    "doc_id": None,
                }
            else:
                fields_filled += 1 if best["value"] is not None else 0
            rows.append(best)
        results[name] = rows

    manifest["metrics"]["templates_found"] = len(templates)
    manifest["metrics"]["fields_total"] = fields_total
    manifest["metrics"]["fields_filled"] = fields_filled
    manifest["extractions"] = [{"template": k, "records": v} for k, v in results.items()]
    return results


In [None]:
# Stage 5 - CSV population

def populate_csvs(templates: Dict[str, pd.DataFrame], extracted: Dict[str, List[Dict]]) -> List[Path]:
    written = []
    summary_rows = []

    for template_name, df in templates.items():
        out_df = df.copy()
        ind_col = get_indicator_column(out_df)

        year_col = str(year) if str(year) in out_df.columns else None
        prev_col = str(year - 1) if str(year - 1) in out_df.columns else None

        if year_col is None:
            year_col = f"Anno_{year}"
            out_df[year_col] = pd.NA
        if prev_col is None and f"Anno_{year-1}" in out_df.columns:
            prev_col = f"Anno_{year-1}"

        map_ext = {r["indicator"]: r for r in extracted.get(template_name, [])}

        for idx, row in out_df.iterrows():
            indicator = str(row.get(ind_col, "")).strip()
            ext = map_ext.get(indicator)
            if not ext:
                continue
            out_df.at[idx, year_col] = ext.get("value")
            summary_rows.append({
                "template": template_name,
                "indicator": indicator,
                "year": year,
                "value": ext.get("value"),
                "unit": ext.get("unit"),
                "confidence": ext.get("confidence"),
                "source_url": ext.get("source_url"),
                "doc_id": ext.get("doc_id"),
            })

        out_name = template_name.replace(".csv", f"_{comune_name.lower()}_{year}.csv")
        out_path = output_dir / out_name
        out_df.to_csv(out_path, index=False)
        written.append(out_path)

    pd.DataFrame(summary_rows).to_csv(output_dir / "summary.csv", index=False)
    return written


In [None]:
# Stage 6 - report + manifest

def build_report(manifest: Dict, diagnostics: Dict) -> Dict:
    top_errors = [
        {
            "doc_id": x.get("doc_id"),
            "source_url": x.get("source_url"),
            "method": x.get("method"),
            "stderr_head": x.get("stderr_head", "")[:500],
        }
        for x in manifest.get("conversion_log", []) if x.get("status") == "failed"
    ][:10]

    report = {
        "run": {
            "comune": comune_name,
            "comune_url": comune_url,
            "year": year,
            "paths": {
                "workspace": str(workspace),
                "docs": str(docs_dir),
                "parsed": str(parsed_dir),
                "cache": str(cache_dir),
                "output": str(output_dir),
            },
        },
        "metrics": manifest.get("metrics", {}),
        "diagnostics": diagnostics,
        "top_errors": top_errors,
        "generated_at": now_iso(),
    }

    with open(workspace / "report.json", "w", encoding="utf-8") as f:
        json.dump(report, f, indent=2, ensure_ascii=False)

    return report


In [None]:
# Main orchestrator

def run_pipeline(max_pages: int = 300, max_depth: int = 4, max_pdfs: int = 120) -> Dict:
    manifest = read_manifest(state_path)
    manifest["run"] = {"comune": comune_name, "url": comune_url, "year": year}

    seed_urls = [
        comune_url,
        "https://www.comune.vigone.to.it/ita/pnrr.aspx",
        "https://www.comune.vigone.to.it/ita/amministrazione_trasparente.aspx",
    ]

    pdf_urls, diagnostics = discover_urls(seed_urls, max_pages=max_pages, max_depth=max_depth)
    manifest["url_catalog"] = {"items": pdf_urls, "total": len(pdf_urls), "updated_at": now_iso()}
    manifest["metrics"]["urls_discovered"] = len(pdf_urls)

    manifest = acquire_pdfs(pdf_urls, manifest, max_pdfs=max_pdfs)
    manifest = parse_pdf_with_fallback(manifest)

    templates = load_templates()
    extracted = extract_from_corpus(templates, manifest)
    written_files = populate_csvs(templates, extracted)

    write_manifest(state_path, manifest)
    report = build_report(manifest, diagnostics)

    if manifest["metrics"]["templates_found"] == 0:
        raise RuntimeError("templates_found=0: controlla template_source e preflight")
    if manifest["metrics"]["pdfs_parsed_ok"] == 0:
        raise RuntimeError("pdfs_parsed_ok=0: controlla conversion_log in manifest")

    print("\n===== FINAL SUMMARY =====")
    m = manifest["metrics"]
    print(f"urls_discovered: {m['urls_discovered']}")
    print(f"pdfs_acquired: {m['pdfs_acquired']}")
    print(f"pdfs_parsed_ok/failed: {m['pdfs_parsed_ok']}/{m['pdfs_parsed_failed']}")
    print(f"templates_found: {m['templates_found']}")
    print(f"fields_total: {m['fields_total']}")
    print(f"fields_filled: {m['fields_filled']}")

    if m["urls_discovered"] <= 3:
        print("\n[DIAGNOSTICS] Top 10 URL visitate:")
        for u in diagnostics.get("visited_top10", []):
            print(" -", u)
        print("\n[DIAGNOSTICS] Top 20 link PDF visti/scartati:")
        for d in diagnostics.get("pdf_seen_discarded_top20", []):
            print(" -", d)

    return report


In [None]:
# Quick test run (Vigone)
report = run_pipeline(max_pages=300, max_depth=4, max_pdfs=120)
print(json.dumps(report["metrics"], indent=2, ensure_ascii=False))


## Next Steps

### Full Production Run
```python
production_results = execute_vigone_pipeline(
    origin_url=vigone_origin,
    page_limit=vigone_page_limit,
    pdf_limit=vigone_pdf_cap,
    target_year=vigone_year,
    pdf_storage=vigone_pdf_storage,
    converted_storage=vigone_converted_storage,
    results_storage=vigone_results_storage,
    template_source=vigone_template_source,
    state_file=vigone_state_file
)
```

### Troubleshooting

**No PDFs discovered:**
- Verify base URL is accessible
- Increase `vigone_page_limit` parameter
- Check if website structure changed
- Inspect `manifest.json` for discovered URLs

**Low extraction coverage:**
- Review template indicator phrasing
- Adjust similarity threshold in `calculate_token_similarity`
- Check PDF quality (scanned vs. digital)
- Verify Tesseract OCR is working

**Marker conversion errors:**
- Ensure sufficient disk space (>2GB free)
- Check PDF file integrity
- Install Italian language pack: `!apt-get install tesseract-ocr-ita`
- Review conversion logs in terminal output

**Resume interrupted pipeline:**
- State is automatically saved in `manifest.json`
- Re-run pipeline - it skips completed stages
- Check `conversion_log` and `pdf_registry` in manifest

### Output Files Structure

```
vigone_extraction/
├── docs/
│   └── vigone_[hash].pdf        # Downloaded PDFs (hash-deduplicated)
├── marker/
│   ├── vigone_[hash].json       # Structured JSON from Marker
│   └── vigone_[hash].md         # Markdown text from Marker
├── output/
│   ├── [template]_vigone_2024.csv  # Populated CSV templates
│   └── vigone_extraction_report.json # Detailed statistics
└── manifest.json                # Pipeline state tracking
```

### Advanced Configuration

**Adjust similarity threshold:**
```python
# In calculate_token_similarity function
if similarity > 0.3:  # Change threshold (0.0-1.0)
```

**Modify crawl depth:**
```python
# In construct_web_crawler function
if current_depth > 3:  # Increase for deeper crawling
```

**Add custom unit patterns:**
```python
# In unit_inference_from_context function
unit_rules = [
    (r'your_pattern', 'your_unit'),
    ...
]
```