In [None]:
import os
import re
import json
import time
import logging
from pathlib import Path
from typing import Optional, List, Dict, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
import openai
from PyPDF2 import PdfReader
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, BarColumn, TextColumn, TimeElapsedColumn, TimeRemainingColumn
from rich.table import Table
import gc

In [None]:
CONFIG = {
    "pdf_path": Path(os.getenv("PDF_PATH", "./books/Isfahan.pdf")),
    "province": os.getenv("PROVINCE", "اصفهان"),
    "start_page": int(os.getenv("START_PAGE", 11)),
    "end_page": None if os.getenv("END_PAGE") in (None, "") else int(os.getenv("END_PAGE")),
    "chunk_size": int(os.getenv("CHUNK_SIZE", 2000)),
    "overlap_size": int(os.getenv("OVERLAP_SIZE", 50)),
    "model": os.getenv("GPT_MODEL", "gpt-4"),
    "max_tokens": int(os.getenv("MAX_TOKENS", 1024)),
    "temperature": float(os.getenv("TEMPERATURE", 1.0)),
    "workers": int(os.getenv("WORKERS", 1)),
    "log_file": os.getenv("LOG_FILE", "extraction.log"),
}

In [None]:
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "your_api_key_here")
openai.api_key = OPENAI_API_KEY

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler(CONFIG["log_file"]),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
console = Console()

if not OPENAI_API_KEY:
    raise RuntimeError("OPENAI_API_KEY is not set. Please assign your API key to the OPENAI_API_KEY variable.")

In [None]:
def clean_json_string(s: str) -> str:
    s = re.sub(r"//.*", "", s)
    s = re.sub(r"/\*.*?\*/", "", s, flags=re.DOTALL)
    s = re.sub(r",(\s*[}\]])", r"\1", s)
    return s


def extract_json_block(text: str) -> Optional[str]:
    blocks = []
    start = None
    depth = 0
    for i, ch in enumerate(text):
        if ch == '{':
            if depth == 0:
                start = i
            depth += 1
        elif ch == '}' and depth > 0:
            depth -= 1
            if depth == 0 and start is not None:
                blocks.append(text[start:i+1])
                start = None
    if not blocks:
        return None
    return max(blocks, key=len)


def clean_json_block(raw: str) -> str:
    s = raw.strip()
    if s.startswith("```"):
        lines = s.splitlines()
        if lines[0].startswith("```"):
            lines = lines[1:]
        if lines and lines[-1].startswith("```"):
            lines = lines[:-1]
        s = "\n".join(lines)
    return s


def extract_text_from_pdf(path: Path, start: int, end: Optional[int]) -> str:
    reader = PdfReader(str(path))
    pages = reader.pages[start-1:end] if end else reader.pages[start-1:]
    texts = []
    for page in pages:
        try:
            texts.append(page.extract_text() or "")
        except Exception as e:
            logger.warning(f"Failed to extract page text: {e}")
    return "\n".join(texts)


def normalize_text(text: str) -> str:
    text = re.sub(r"\s+", " ", text)
    text = re.sub(r"\d+", "", text)
    return text.strip()

In [None]:
def chunk_text(text: str, max_chars: int, overlap: int) -> List[str]:
    words = text.split()
    if not words:
        return []
    chunks = []
    current = []
    length = 0
    for w in words:
        if length + len(w) + 1 > max_chars:
            chunk = " ".join(current)
            chunks.append(chunk)
            if overlap > 0 and len(chunk) > overlap:
                carry = chunk[-overlap:]
                current = carry.split()
                length = sum(len(x)+1 for x in current)
            else:
                current = []
                length = 0
        current.append(w)
        length += len(w) + 1
    if current:
        chunks.append(" ".join(current))
    return chunks

In [None]:

JSON_SCHEMA_EXACT = """
نمونه ساختار دقیق JSON برای هر استان:

{
  \"title\": \"string\",
  \"location\": {\"province\":\"string\",\"city\":\"string\"},
  \"geographical_features\":[{\"name\":\"string\",\"items\":[{\"name\":\"string\",\"images\":[\"string\"]}]}],
  \"natural_resources\":[{\"name\":\"string\",\"description\":[\"string\"]}],
  \"vegetation\":[\"string\"],
  \"topography\":[{\"name\":\"string\",\"description\":[\"string\"]}],
  \"tourist_attractions\":[{\"name\":\"string\",\"images\":[\"string\"],\"year_built\":\"string\",\"constructor\":\"string\",\"architect\":\"string\",\"description\":\"string\"}],
  \"climate_impacts\":[{\"impact\":\"string\",\"description\":[\"string\"]}],
  \"additional_info\":{\"books_source\":\"string\",\"other_sources\":[\"string\"]}
}
"""

EXAMPLE_JSON = """
{
  \"title\": \"ویژگی‌های جغرافیایی اصفهان\",
  \"location\": {\"province\":\"اصفهان\",\"city\":\"اصفهان\"},
  \"geographical_features\":[
    {\"name\": \"رودخانه‌ها\", \"items\":[{\"name\":\"زاینده‌رود\",\"images\":[]}]} ,
    {\"name\": \"کوه‌ها\", \"items\":[{\"name\":\"کوه صفه\",\"images\":[]}]}
  ],
  \"natural_resources\":[],
  \"vegetation\":[],
  \"topography\":[],
  \"tourist_attractions\":[],
  \"climate_impacts\":[],
  \"additional_info\":{\"books_source\":\"\",\"other_sources\":[]}
}
"""

PROMPT_TEMPLATE = f"""
شما مدل GPT هستید و **تنها** باید یک شیء JSON یکتا و **معتبر** تولید کنید.
۱. کلیدها و مقدارهای رشته‌ای حتماً با گیومهٔ دوگانه (\"") باشند.
۲. اگر داده‌ای وجود ندارد، از \"\" یا [] استفاده کنید؛ **هرگز** {{}} خالی ننویسید.
۳. اگر برای فیلدی اطمینان کمتر از ۹۰٪ دارید یا داده نیست، آن را \"\" یا [] بگذارید.
۴. حتماً حداقل یک مورد واقعی برای هر لیست استخراج‌شده در متن بیاورید.
۵. **هرگز** JSON را داخل code fence (```…```) یا تگ Markdown قرار ندهید—فقط جسم خالص JSON را برگردانید!
۶. فقط JSON خالص، بدون توضیح یا کامنت.
۷. **هرگز** تنها مجموعهٔ نمونه (EXAMPLE_JSON) را به‌عنوان خروجی نهایی برنگردانید؛ حتماً داده‌های استخراج‌شده از متن ورودی را نمایش دهید.
۸. در صورت نیاز برای تکمیل داده‌ها، ویکی‌پدیا و منابع آنلاین معتبر را جست‌وجو کرده و اطلاعات به‌روز را استخراج کنید.

{JSON_SCHEMA_EXACT}

{EXAMPLE_JSON}

--- متن منبع ---\n"""

In [None]:
seen_entries = {
    "geographical_features": set(),
    "natural_resources": set(),
    "vegetation": set(),
    "topography": set(),
    "tourist_attractions": set(),
    "climate_impacts": set(),
}
seen_subitems = set()

def process_chunk(chunk: str, idx: int) -> Optional[Dict[str, Any]]:
    prompt = PROMPT_TEMPLATE + chunk + "\n"
    if idx <= 5:
        console.rule(f"[bold green]Chunk {idx} Prompt[/]")
        console.print(prompt)

    try:
        resp = openai.ChatCompletion.create(
            model=CONFIG['model'],
            messages=[{'role': 'user', 'content': prompt}],
            max_tokens=CONFIG['max_tokens'],
            temperature=CONFIG['temperature'],
            n=1,
        )
        content = resp.choices[0].message.content.strip()
    except Exception:
        return None

    if idx <= 5:
        console.rule(f"[bold blue]Chunk {idx} GPT Output[/]")
        console.print(content)

    raw = extract_json_block(content)
    if not raw:
        return None

    raw = clean_json_block(raw)
    raw = clean_json_string(raw)

    try:
        parsed = json.loads(raw)
    except json.JSONDecodeError:
        return None

    if parsed:
        for section in ["geographical_features", "natural_resources", "topography",
                        "tourist_attractions", "climate_impacts"]:
            new_list = []
            for item in parsed.get(section, []):
                name = item.get("name")
                if not name or name in seen_entries[section]:
                    continue
                seen_entries[section].add(name)

                if section == "geographical_features":
                    kept_subs = []
                    for sub in item.get("items", []):
                        subname = sub.get("name")
                        if subname and subname not in seen_subitems:
                            seen_subitems.add(subname)
                            kept_subs.append(sub)
                    item["items"] = kept_subs
                    if kept_subs:
                        new_list.append(item)
                else:
                    new_list.append(item)
            parsed[section] = new_list

        for section in ["vegetation"]:
            new_list = []
            for val in parsed.get(section, []):
                if not val:
                    continue
                val_key = json.dumps(val, sort_keys=True)
                if val_key not in seen_entries[section]:
                    seen_entries[section].add(val_key)
                    new_list.append(val)
            parsed[section] = new_list

    if idx <= 5:
        console.rule(f"[bold magenta]Chunk {idx} Parsed & Deduped JSON[/]")
        console.print(json.dumps(parsed, ensure_ascii=False, indent=2))
        console.rule()

    return parsed

In [None]:
def main():
    start = time.time()
    console.rule("[bold green]Starting Extraction[/]")
    text = normalize_text(extract_text_from_pdf(CONFIG['pdf_path'], CONFIG['start_page'], CONFIG['end_page']))
    chunks = chunk_text(text, CONFIG['chunk_size'], CONFIG['overlap_size'])
    console.print(f"[bold]Province:[/] {CONFIG['province']}")
    console.print(f"[bold]Total chunks:[/] {len(chunks)}\n")
    combined = {"province": CONFIG['province']}
    partials = []
    with Progress(SpinnerColumn(style="bold green"), TextColumn("[progress.description]{task.description}"), BarColumn(bar_width=None), TextColumn("[bold magenta]{task.completed}/{task.total} chunks"), TimeElapsedColumn(), TimeRemainingColumn(), console=console) as progress:
        task = progress.add_task("Extracting chunks", total=len(chunks))
        with ThreadPoolExecutor(max_workers=CONFIG['workers']) as executor:
            futures = {executor.submit(process_chunk, chunk, idx+1): idx+1 for idx, chunk in enumerate(chunks)}
            for fut in as_completed(futures):
                res = fut.result()
                if res:
                    partials.append(res)
                progress.advance(task)
    for part in partials:
        for k, v in part.items():
            if k == "province":
                continue
            if k not in combined:
                combined[k] = v
            elif isinstance(v, list) and isinstance(combined[k], list):
                combined[k].extend(v)
    out_file = Path(f"./{CONFIG['province']}_dataset.json")
    with open(out_file, "w", encoding="utf-8") as f:
        json.dump(combined, f, ensure_ascii=False, indent=2)
    elapsed = time.time() - start
    total_items = sum(len(v) if isinstance(v, list) else 1 for k, v in combined.items() if k != "province")
    console.rule("[bold green]Extraction Complete[/]")
    console.print(f"• Saved to: [bold]{out_file}[/]")
    console.print(f"• Items: [bold]{total_items}[/]")
    console.print(f"• Chunks: [bold]{len(chunks)}[/]")
    console.print(f"• Time: [bold]{elapsed:.2f}s[/]\n")
    table = Table(title="✅ Extraction Summary")
    table.add_column("Province", style="cyan")
    table.add_column("Items", justify="right", style="magenta")
    table.add_column("Chunks", justify="right", style="magenta")
    table.add_column("Time (s)", justify="right", style="magenta")
    table.add_row(CONFIG['province'], str(total_items), str(len(chunks)), f"{elapsed:.2f}")
    console.print(table)

In [None]:
if __name__ == '__main__':
    main()