In [9]:
# -------------------- standard libs --------------------
import os, re, json, csv, time, pathlib, hashlib, textwrap
from datetime import datetime
from typing import Optional, List

# -------------------- third‑party ----------------------
import requests
from bs4 import BeautifulSoup
from tqdm.auto import tqdm

# -------------------- paths & constants ----------------
JSON_ROOT     = pathlib.Path("Legal_doc_test_v2")          # source JSONs
OUT_DIR       = pathlib.Path("scraped_laws_v7")            # where snippets + log go
CACHE_DIR     = OUT_DIR / ".cache"                      # html cache (optional)

CHARS_AROUND      = 2_000      # chars to grab before/after a hit
REQUEST_TIMEOUT   = 15         # seconds
SLEEP_BETWEEN     = 1.0        # polite pause between requests
MAX_HTML_SIZE_MB  = 20          # skip pages >20 MiB (Can reduce to 5)

OUT_DIR.mkdir(parents=True, exist_ok=True)
CACHE_DIR.mkdir(parents=True, exist_ok=True)

print("JSON root :", JSON_ROOT.resolve())
print("Output dir:", OUT_DIR.resolve())


JSON root : /Users/mannanxanand/Legal-Document-Discrepancy-Benchmark-Dataset/Legal_doc_test_v2
Output dir: /Users/mannanxanand/Legal-Document-Discrepancy-Benchmark-Dataset/scraped_laws_v7


In [10]:
UA_STRINGS = [
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7)",
    "curl/8.5.0",
    "python-requests/2.31"
]

session = requests.Session()                # TCP reuse


def resilient_get(url: str, ua: str) -> str:
    """
    Download `url` with the given User‑Agent.
    Falls back to Google web‑cache if 404/410.
    """
    hdr = {"User-Agent": ua, "Referer": "https://google.com"}
    r = session.get(url, timeout=REQUEST_TIMEOUT, headers=hdr, stream=True)

    # protect against very large downloads
    r.raise_for_status()
    max_bytes = MAX_HTML_SIZE_MB * 1024 * 1024
    content = r.raw.read(max_bytes + 1, decode_content=True)
    if len(content) > max_bytes:
        raise RuntimeError("HTML bigger than limit")
    return content.decode(r.encoding or "utf‑8", errors="ignore")


In [11]:
def section_candidates(citation: str) -> List[str]:
    """
    Pull reasonable numeric section identifiers from a citation.
    e.g. '§ 672.719(3)' -> ['672.719', '719', '672']
    """
    nums = re.findall(r'\d+\.\d+|\d+', citation)
    # longest first
    nums.sort(key=lambda s: (-len(s), s))
    return nums


def extract_relevant_section(text: str, citation: str,
                             context: int = CHARS_AROUND) -> str:
    """
    Return ~`context` chars before/after the first plausible match
    for a section number.  Fallback: first 2*context chars.
    """
    for sec in section_candidates(citation):
        m = re.search(rf'\b{re.escape(sec)}\b', text)
        if m:
            start = max(0, m.start() - context)
            end   = m.end() + context
            return text[start:end]

    # fallback – nothing matched
    return text[:context * 2]


In [12]:
def fetch_snippet_with_retry(url: str, citation: str) -> Optional[str]:
    """
    Try each UA in UA_STRINGS.  Cache HTML.  Strip scripts/styles, collapse
    whitespace, slice around the citation.  Return snippet or None.
    """
    for ua in UA_STRINGS:
        try:
            # ---------- caching ----------
            key = CACHE_DIR / hashlib.sha1(f"{ua}|{url}".encode()).hexdigest()
            if key.exists():
                html = key.read_text(encoding="utf-8")
            else:
                html = resilient_get(url, ua)
                key.write_text(html, encoding="utf-8")

            # ---------- clean ----------
            soup = BeautifulSoup(html, "html.parser")
            for tag in soup(["script", "style", "nav", "footer", "header"]):
                tag.decompose()
            text = re.sub(r"\s+", " ", soup.get_text(" ", strip=True))
            text = text.replace("\ufeff", "")            # BOM

            # ---------- slice ----------
            return extract_relevant_section(text, citation)

        except Exception:
            continue    # try next UA

    return None          # all attempts failed


In [13]:
def all_json_files(root: pathlib.Path):
    """Yield every *.json under `root`, skipping .ipynb_checkpoints and *_snippet.json."""
    for p in root.rglob("*.json"):
        if ".ipynb_checkpoints" in p.parts or p.suffix == ".snippet.json":
            continue
        yield p


def target_path(json_path: pathlib.Path) -> pathlib.Path:
    """Mirror directory tree under OUT_DIR with *.snippet.json suffix."""
    return OUT_DIR / json_path.relative_to(JSON_ROOT).with_suffix(".snippet.json")


In [14]:
log_path = OUT_DIR / "scrape_log.tsv"
log_fh   = open(log_path, "a", newline="", encoding="utf-8")
log      = csv.writer(log_fh, delimiter="\t")

if log_fh.tell() == 0:          # header only once
    log.writerow(["timestamp", "json_file", "law_url", "status", "chars"])

def note(jfile, url, status, chars=0):
    log.writerow([datetime.utcnow().isoformat(), jfile, url, status, chars])
    print(f"{jfile:80s}  ->  {status}")

In [15]:
errors = []

for jpath in tqdm(list(all_json_files(JSON_ROOT)), desc="scraping"):
    out_path = target_path(jpath)
    out_path.parent.mkdir(parents=True, exist_ok=True)
    if out_path.exists():
        continue                               # already processed

    try:
        data = json.loads(jpath.read_text(encoding="utf-8"))

        # some datasets wrap in a list
        if isinstance(data, list):
            data_root = data
        else:
            data_root = [data]

        for pert in data_root[0]["perturbation"]:
            citation = (pert.get("law_citation") or "").strip()

            # --- collect urls (old & new schemas) ---
            urls_1 = pert.get("law_url1") or pert.get("law_url") or []
            urls_2 = pert.get("law_url2") or []

            # normalise to lists
            urls_1 = urls_1 if isinstance(urls_1, list) else [urls_1]
            urls_2 = urls_2 if isinstance(urls_2, list) else [urls_2]

            # ---------- first url ----------
            snippet1 = None
            for u in urls_1:
                if not u or u.lower().startswith("n/a"):
                    continue
                snippet1 = fetch_snippet_with_retry(u, citation)
                note(jpath.name, u, "OK" if snippet1 else "EMPTY", len(snippet1 or ""))
                if snippet1:
                    break
                time.sleep(SLEEP_BETWEEN)

            # ---------- second url ----------
            snippet2 = None
            for u in urls_2:
                if not u or u.lower().startswith("n/a"):
                    continue
                snippet2 = fetch_snippet_with_retry(u, citation)
                note(jpath.name, u, "OK" if snippet2 else "EMPTY", len(snippet2 or ""))
                if snippet2:
                    break
                time.sleep(SLEEP_BETWEEN)

            # ---------- write back ----------
            pert["scraped_snippet_1"] = snippet1
            pert["scraped_snippet_2"] = snippet2
            success_count = int(bool(snippet1)) + int(bool(snippet2))
            pert["scrape_success"] = success_count  # 0 / 1 / 2

        out_path.write_text(json.dumps(data, indent=2, ensure_ascii=False),
                            encoding="utf-8")

    except Exception as exc:
        errors.append((jpath, str(exc)))
        note(jpath.name, "⟨parsing⟩", f"ERROR: {exc}")

log_fh.close()
print(f"\nFinished.  {len(errors)} errors logged → {log_path.name}")


scraping:   0%|          | 0/99 [00:00<?, ?it/s]

perturbed_DOMINIADVISORTRUST_02_18_2005-EX-99.(H)(2)-SPONSORSHIPAGREEMENT.txt.json  ->  OK
perturbed_DOMINIADVISORTRUST_02_18_2005-EX-99.(H)(2)-SPONSORSHIPAGREEMENT.txt.json  ->  OK
perturbed_DOMINIADVISORTRUST_02_18_2005-EX-99.(H)(2)-SPONSORSHIPAGREEMENT.txt.json  ->  OK
perturbed_DOMINIADVISORTRUST_02_18_2005-EX-99.(H)(2)-SPONSORSHIPAGREEMENT.txt.json  ->  OK
perturbed_DOMINIADVISORTRUST_02_18_2005-EX-99.(H)(2)-SPONSORSHIPAGREEMENT.txt.json  ->  OK
perturbed_DOMINIADVISORTRUST_02_18_2005-EX-99.(H)(2)-SPONSORSHIPAGREEMENT.txt.json  ->  EMPTY
perturbed_PACIRAPHARMACEUTICALS,INC.-A_RSTRATEGICLICENSING,DISTRIBUTIONANDMARKETINGAGREEMENT.txt.json  ->  OK
perturbed_PACIRAPHARMACEUTICALS,INC.-A_RSTRATEGICLICENSING,DISTRIBUTIONANDMARKETINGAGREEMENT.txt.json  ->  OK
perturbed_PACIRAPHARMACEUTICALS,INC.-A_RSTRATEGICLICENSING,DISTRIBUTIONANDMARKETINGAGREEMENT.txt.json  ->  OK
perturbed_PACIRAPHARMACEUTICALS,INC.-A_RSTRATEGICLICENSING,DISTRIBUTIONANDMARKETINGAGREEMENT.txt.json  ->  OK
perturbed_P

In [16]:
example_json = next(OUT_DIR.rglob("*.snippet.json"))
example = json.loads(example_json.read_text(encoding="utf-8"))[0]
first = example["perturbation"][0]

print("→", example_json.relative_to(OUT_DIR))
print("Law citation :", first.get("law_citation"))
print("Success flag :", first.get("scrape_success"))
print("\nSnippet 1 :\n")
print(textwrap.fill(first.get("scraped_snippet_1") or "⟨nothing⟩", width=90))
print("\nSnippet 2 :\n")
print(textwrap.fill(first.get("scraped_snippet_2") or "⟨nothing⟩", width=90))


→ ambiguity_legal/perturbed_WHITESMOKE,INC_11_08_2011-EX-10.26-PROMOTIONANDDISTRIBUTIONAGREEMENT.txt.snippet.json
Law citation : 15 U.S. Code § 45 - Unfair methods of competition unlawful; prevention by Commission
Success flag : 2

Snippet 1 :

Federal Trade Commission Act | Federal Trade Commission Skip to main content The .gov
means it’s official. Federal government websites often end in .gov or .mil. Before sharing
sensitive information, make sure you’re on a federal government site. The site is secure.
The https:// ensures that you are connecting to the official website and that any
information you provide is encrypted and transmitted securely. Español Report Fraud Get
Consumer Alerts Search the Legal Library Federal Trade Commission Act Tags: Consumer
Protection Competition Appliances Alcohol Automobiles Clothing and Textiles Finance
Franchises, Business Opportunities, and Investments Funerals Jewelry Real Estate and
Mortgages Tobacco Advertising and Marketing Children Endorsement