ËºâÂÖ•Â•ó‰ª∂

In [5]:
import fitz  # PyMuPDF
import re
import pandas as pd

In [2]:
def extract_counsel_from_pdf(pdf_file_path):
    with fitz.open(pdf_file_path) as doc:
        full_text = "\n".join([page.get_text("text") for page in doc])

    counsel = ""
    try:
        # Â∞çÊñº Appellate Âà§Ê±∫ÔºöCounsel ÂèØËÉΩÂá∫ÁèæÂú® LexisNexis Headnotes ‰πãÂæå
        match = re.search(
            r'Counsel:\s*(.+?)(?=\n(?:HN\d\[|Headnotes|Judges?:|Opinion by:|Core Terms|Subsequent History:|Prior History:|Disposition:|$))',
            full_text,
            re.DOTALL
        )
        if match:
            counsel = match.group(1).replace('\n', ' ').strip()
    except:
        pass

    return counsel


In [None]:
pdf_path = "C:/Users/owoyi/Downloads/ÁõßÊïôÊéà/Â∞àÂà©ÂàÜÊûê/data/testing/cp13-38.pdf"
counsel_text = extract_counsel_from_pdf(pdf_path)
print("Counsel:", counsel_text)


Counsel: [*1] For Warner Bros Home Entertainment Inc, Plaintiff: James Andrew Coombs, Nicole L Drey, J  Andrew Coombs APC, Glendale, CA.


In [13]:
# Redefine the function with a fixed "Opinion by" extraction
# This function extracts structured case-level metadata from a Lexis-style PDF
def extract_case_metadata_fixed_opinion(pdf_file_path):
    # Open the PDF and concatenate the full text of all pages
    # full_text is used for regex-based global searches
    with fitz.open(pdf_file_path) as doc:
        full_text = "\n".join([page.get_text("text") for page in doc])

        # Extract block-level information from the first page only
        # This is used as a fallback for structured elements (e.g., party names)
        first_page_blocks = doc[0].get_text("dict")["blocks"]

    # Helper function:
    # Extract text following a given section header (pattern),
    # stopping at the next detected section header in end_patterns.
    def extract_with_regex(pattern, end_patterns, max_len=1000):
        try:
            match = re.search(
                rf'{pattern}\s+([\s\S]+?)(?=\n(?:{"|".join(end_patterns)}))',
                full_text
            )
            if match:
                # Normalize whitespace and enforce a length limit
                content = match.group(1).replace('\n', ' ').strip()
                return content if len(content) <= max_len else ""
        except:
            # Fail silently if regex fails
            return ""
        return ""

    # --- Core Terms ---
    # Extract the "Core Terms" section and split it into a list
    # Stop extraction at common section boundaries
    core_terms_raw = extract_with_regex(
        "Core Terms",
        ["Counsel:", "LexisNexis", "HN\\d\\[", "Headnotes", "Opinion by:", "Judges?:"]
    )
    core_terms = [term.strip() for term in core_terms_raw.split(',')] if core_terms_raw else []

    # --- Judges ---
    # Extract the list of judges associated with the opinion
    judges = extract_with_regex(
        "Judges?:",
        ["Opinion by:", "Core Terms", "Counsel:"],
        max_len=300
    )

    # --- Opinion by ---
    # Extract the authoring judge using a simple one-line regex
    # This avoids over-capturing multi-line sections
    opinion_by = ""
    try:
        op_match = re.search(r'Opinion by:\s*(.+)', full_text)
        if op_match:
            opinion_by = op_match.group(1).strip()
    except:
        pass

    # --- Prior History ---
    # Extract procedural history before the current decision
    prior_history = extract_with_regex(
        "Prior History:",
        ["Disposition:", "Core Terms", "Judges?:", "Opinion by:"],
        max_len=1000
    )

    # --- Subsequent History ---
    # Extract information about later procedural developments
    subsequent_history = extract_with_regex(
        "Subsequent History:",
        ["Prior History:", "Disposition:", "Core Terms"],
        max_len=1000
    )

    # --- Plaintiff / Defendant ---
    # First attempt: extract party roles from the Counsel section
    plaintiff_defendant = ""
    try:
        pd_counsel_match = re.search(
            r'Counsel:\s*\[\*?\d*\]\s*For (.+?Plaintiff.*?)\.\s*',
            full_text,
            re.DOTALL
        )
        if pd_counsel_match:
            plaintiff_defendant = pd_counsel_match.group(1).replace('\n', ' ').strip()
    except:
        pass

    # Fallback:
    # If Counsel-based extraction fails, search the first page blocks
    # for text containing both "Plaintiff" and "Defendant"
    if not plaintiff_defendant:
        for block in first_page_blocks:
            if "lines" not in block:
                continue
            block_text = " ".join(
                span["text"]
                for line in block["lines"]
                for span in line["spans"]
            ).strip()
            if "Plaintiff" in block_text and "Defendant" in block_text:
                plaintiff_defendant = block_text
                break

    # Return all extracted metadata as a dictionary
    return {
        "core term": core_terms,
        "judges": judges,
        "plaintiff_defendant": plaintiff_defendant,
        "opinion by": opinion_by,
        "prior history": prior_history,
        "subsequent history": subsequent_history
    }


In [14]:
print(extract_case_metadata_fixed_opinion("C:/Users/owoyi/Downloads/ÁõßÊïôÊéà/Â∞àÂà©ÂàÜÊûê/data/testing/cp16-78.pdf"))

{'core term': ['Bui', 'infringement', 'cases', 'default judgment', 'statutory damages', "attorney's fees", 'hourly rate', 'injunction', 'costs', 'Courts', 'copyright infringement', 'factors', 'copied', 'permanent', 'default', 'allegations', 'distributed', 'subscriber', 'requests', 'skill', 'weigh'], 'judges': 'Honorable Richard A. Jones, United States District Judge.', 'plaintiff_defendant': 'Dallas Buyers Club, LLC, Plaintiff: David Allen Lowe, LOWE GRAHAM JONES, SEATTLE, WA', 'opinion by': 'Richard A. Jones', 'prior history': 'Dallas Buyers Club, LLC v. Doe, 2015 U.S. Dist. LEXIS 87450 (W.D. Wash., July 1, 2015)', 'subsequent history': 'Motion granted by, Judgment entered by Dallas Buyers Club, LLC v. Nydam, 2016 U.S. Dist.  LEXIS 184269 (W.D. Wash., Aug. 8, 2016)'}


In [None]:
MONGO_URI = "MONGO_URI"
DB_NAME = "copyright"
COLLECTION_NAME = "index_todo"

# === Metadata extraction from a single page ===
def extract_metadata_from_page(pdf_path, page_num):
    try:
        with fitz.open(pdf_path) as doc:
            if page_num >= len(doc):
                raise ValueError(f"Page {page_num} out of range for {pdf_path}")
            page = doc[page_num]
            page_text = page.get_text("text")
            blocks = page.get_text("dict")["blocks"]

        def extract_field(pattern):
            match = re.search(rf"{pattern}:\s*(.+)", page_text)
            return match.group(1).strip() if match else ""

        def extract_section(pattern, end_patterns):
            match = re.search(
                rf'{pattern}\s+([\s\S]+?)(?=\n(?:{"|".join(end_patterns)}))',
                page_text
            )
            if match:
                return match.group(1).replace('\n', ' ').strip()
            return ""

        core_raw = extract_section("Core Terms", ["Counsel:", "Opinion by:", "Judges?:"])
        core_terms = [x.strip() for x in core_raw.split(',')] if core_raw else []

        judges = extract_field("Judges?")
        opinion_by = extract_field("Opinion by")
        prior_history = extract_section("Prior History", ["Disposition:", "Core Terms"])
        subsequent_history = extract_section("Subsequent History", ["Prior History:", "Core Terms"])

        plaintiff_defendant = ""
        counsel_match = re.search(
            r'Counsel:\s*\[\*?\d*\]\s*For (.+?Plaintiff.*?)\.\s*',
            page_text, re.DOTALL
        )
        if counsel_match:
            plaintiff_defendant = counsel_match.group(1).replace('\n', ' ').strip()
        else:
            for block in blocks:
                if "lines" not in block:
                    continue
                text = " ".join(span["text"] for line in block["lines"] for span in line["spans"]).strip()
                if "Plaintiff" in text and "Defendant" in text:
                    plaintiff_defendant = text
                    break

        return {
            "core term": core_terms,
            "judges": judges,
            "plaintiff_defendant": plaintiff_defendant,
            "opinion by": opinion_by,
            "prior history": prior_history,
            "subsequent history": subsequent_history
        }

    except Exception as e:
        raise RuntimeError(f"Failed to extract from {pdf_path}, page {page_num}: {str(e)}")



In [None]:
# === Main process: update MongoDB ===
def process_index_collection():
    client = MongoClient(MONGO_URI)
    db = client[DB_NAME]
    col = db[COLLECTION_NAME]

    errors = []
    for doc in col.find():
        pdf = doc.get("pdf")
        page = doc.get("page")
        _id = doc["_id"]

        try:
            if not pdf or page is None:
                raise ValueError("Missing 'pdf' or 'page' field")

            pdf_path = os.path.join("data", pdf)
            if not os.path.exists(pdf_path):
                raise FileNotFoundError(f"File not found: {pdf_path}")

            metadata = extract_metadata_from_page(pdf_path, page)
            col.update_one({"_id": _id}, {"$set": metadata})
            print(f"‚úÖ Updated {_id} ({pdf}, page {page})")

        except Exception as e:
            error_entry = {
                "id": str(_id),
                "pdf": pdf,
                "page": page,
                "error": str(e)
            }
            print(f"‚ùå Error: {error_entry}")
            errors.append(error_entry)

    with open("error_log.json", "w", encoding="utf-8") as f:
        json.dump(errors, f, indent=2, ensure_ascii=False)


In [None]:
process_index_collection()

In [None]:
from pymongo import MongoClient
import fitz  # PyMuPDF
import re
import pandas as pd
import os
import json

# MongoDB config
MONGO_URI = "mongodb://yihua:Yh%40copyright@140.117.75.100:27017/?authSource=copyright"
DB_NAME = "copyright"
COLLECTION_NAME = "index_todo"

# ÊäΩÂèñÂñÆÈ†Å metadata
# Finalized robust extraction function for legacy-style PDFs
def extract_metadata_from_page_robust(pdf_path, page_num):
    import fitz
    import re

    with fitz.open(pdf_path) as doc:
        if page_num >= len(doc):
            raise ValueError(f"Page {page_num} out of range for {pdf_path}")
        page = doc[page_num]
        text = page.get_text("text")
        blocks = page.get_text("dict")["blocks"]
        lines = [line["spans"][0]["text"].strip() for block in blocks if "lines" in block for line in block["lines"]]

    # --- 1. Prior History (single-line version) ---
    prior_history = ""
    match_prior = re.search(r'Prior History:\s*\[*\*?\d*\]*\s*(.+)', text)
    if match_prior:
        prior_history = match_prior.group(1).strip()

    # --- 2. Core Terms (appears on line "Core Terms", followed by next line list) ---
    core_terms = []
    try:
        for i, line in enumerate(lines):
            if line.strip().startswith("Core Terms"):
                core_line = lines[i + 1].strip() if i + 1 < len(lines) else ""
                core_terms = [x.strip() for x in core_line.split(',') if x.strip()]
                break
    except Exception:
        core_terms = []

    # --- 3. Counsel (multi-line after "Counsel:") ---
    plaintiff_defendant = ""
    try:
        match = re.search(r'Counsel:\s*(.+?)(?:\n[A-Z][^\n]*:|\nLexisNexis|\n+)', text, re.DOTALL)
        if match:
            block = match.group(1).replace('\n', ' ').strip()
            plaintiff_defendant = re.sub(r'\s+', ' ', block)
    except:
        pass

    # --- 4. Judges ---
    judges = ""
    match_judge = re.search(r'Judges?:\s*(.+?Judge\.)', text)
    if match_judge:
        judges = match_judge.group(1).strip()

    # --- 5. Opinion by ---
    opinion_by = ""
    match_op = re.search(r'Opinion by:\s*(.+)', text)
    if match_op:
        opinion_by = match_op.group(1).strip()

    # --- 6. Subsequent History ---
    subsequent_history = ""
    match_sub = re.search(r'Subsequent History:\s*(.+?)\n', text)
    if match_sub:
        subsequent_history = match_sub.group(1).strip()

    return {
        "core term": core_terms,
        "judges": judges,
        "plaintiff_defendant": plaintiff_defendant,
        "opinion by": opinion_by,
        "prior history": prior_history,
        "subsequent history": subsequent_history
    }

# ‰∏ªÁ®ãÂºèÔºöÊï¥ÊâπÊì∑Âèñ‰∏¶ÂØ´Âá∫CSV
def extract_all_metadata_to_csv():
    client = MongoClient(MONGO_URI)
    db = client[DB_NAME]
    col = db[COLLECTION_NAME]

    data = []
    errors = []

    for doc in col.find():
        pdf = doc.get("pdf")
        page = doc.get("page")
        _id = doc["_id"]

        try:
            if not pdf or page is None:
                raise ValueError("Missing 'pdf' or 'page'")

            pdf_path = os.path.join("data", pdf)
            if not os.path.exists(pdf_path):
                raise FileNotFoundError(f"File not found: {pdf_path}")

            metadata = extract_metadata_from_page(pdf_path, page)
            metadata["pdf"] = pdf
            metadata["page"] = page
            data.append(metadata)

        except Exception as e:
            errors.append({
                "id": str(_id),
                "pdf": pdf,
                "page": page,
                "error": str(e)
            })

    # Ëº∏Âá∫ÊàêÂäüË≥áÊñô
    df = pd.DataFrame(data)
    df.to_csv("metadata_preview.csv", index=False)
    print("‚úÖ Â∑≤Ëº∏Âá∫ metadata_preview.csv")

    # ÈåØË™§Êó•Ë™å
    if errors:
        with open("error_log_preview.json", "w", encoding="utf-8") as f:
            json.dump(errors, f, indent=2, ensure_ascii=False)
        print(f"‚ö† Êúâ {len(errors)} Á≠ÜÈåØË™§ÔºåÂ∑≤ÂÑ≤Â≠òËá≥ error_log_preview.json")

# Âü∑Ë°å‰∏ªÁ®ãÂºè
if __name__ == "__main__":
    extract_all_metadata_to_csv()


In [33]:
def extract_case_title_by_font(blocks):
    candidate = ""
    max_size = 0

    for block in blocks:
        if "lines" not in block:
            continue
        for line in block["lines"]:
            for span in line["spans"]:
                text = span["text"].strip()
                if "v." in text and span["size"] > max_size:
                    max_size = span["size"]
                    candidate = text
    return candidate

In [55]:
# Final version with extended page scan for 'Counsel' and 'Opinion by' extraction

def extract_case_metadata_from_page_fixed(pdf_file_path, page_num):
    import fitz
    import re

    with fitz.open(pdf_file_path) as doc:
        # Read the main page (page_num) and one more for local metadata blocks
        pages_text = "\n".join([
            doc[p].get_text("text")
            for p in range(page_num, min(page_num + 2, len(doc)))
        ])

        # Read an extended range (up to 4 pages) for Counsel and Opinion by
        extended_text = "\n".join([
            doc[p].get_text("text")
            for p in range(page_num, min(page_num + 13, len(doc)))
        ])

        first_page_blocks = doc[page_num].get_text("dict")["blocks"]

    def extract_with_regex(pattern, end_patterns, text, max_len=1000):
        try:
            match = re.search(
                rf'{pattern}\s+([\s\S]+?)(?=\n(?:{"|".join(end_patterns)}))',
                text
            )
            if match:
                content = match.group(1).replace('\n', ' ').strip()
                return content if len(content) <= max_len else ""
        except:
            return ""
        return ""

    # Core Terms from 1‚Äì2 pages
    core_terms_raw = extract_with_regex("Core Terms", [
        "Counsel:", "LexisNexis", "HN\\d\\[", "Headnotes", "Opinion by:", "Judges?:"
    ], pages_text)
    core_terms = [term.strip() for term in core_terms_raw.split(',')] if core_terms_raw else []

    judges = extract_with_regex("Judges?:", ["Opinion by:", "Core Terms", "Counsel:"], pages_text, max_len=300)

    # Opinion by (extended scan)
    opinion_by = ""
    try:
        op_match = re.search(r'Opinion by:\s*(.+)', extended_text)
        if op_match:
            opinion_by = op_match.group(1).strip()
    except:
        pass

    # Prior History (local)
    prior_history = ""
    try:
        match_prior = re.search(r'Prior History:\s*(.+)', pages_text)
        if match_prior:
            prior_history = re.sub(r'^\[\*\*\d+\]\s*', '', match_prior.group(1).strip())
    except:
        pass

    subsequent_history = extract_with_regex("Subsequent History:", ["Prior History:", "Disposition:", "Core Terms"], pages_text, 1000)

    # --- Plaintiff/Defendant Logic ---
    def extract_case_title_by_font(blocks):
        candidate = ""
        max_size = 0
        for block in blocks:
            if "lines" not in block:
                continue
            for line in block["lines"]:
                for span in line["spans"]:
                    text = span["text"].strip()
                    if "v." in text and span["size"] > max_size:
                        candidate = text
                        max_size = span["size"]
        return candidate

    plaintiff_defendant = ""
    try:
        match = re.search(
            r'Counsel:\s*\[\*?\d*\]\s*For (.+?Plaintiff.*?)\.\s*',
            extended_text, re.DOTALL
        )
        if match:
            plaintiff_defendant = match.group(1).replace('\n', ' ').strip()
    except:
        pass

    all_block_texts = []
    for block in first_page_blocks:
        if "lines" in block:
            block_text = " ".join(span["text"] for line in block["lines"] for span in line["spans"]).strip()
            all_block_texts.append(block_text)

    if not plaintiff_defendant:
        for block_text in all_block_texts:
            if re.match(r'^[A-Z0-9,\.\-\(\)\s]+v\.', block_text):
                plaintiff_defendant = block_text
                break

    if not plaintiff_defendant:
        for block_text in all_block_texts:
            if "Plaintiff" in block_text and "Defendant" in block_text:
                plaintiff_defendant = block_text
                break

    if not plaintiff_defendant:
        plaintiff_defendant = extract_case_title_by_font(first_page_blocks)

    return {
        "core term": core_terms,
        "judges": judges,
        "plaintiff_defendant": plaintiff_defendant,
        "opinion by": opinion_by,
        "prior history": prior_history,
        "subsequent history": subsequent_history
    }


In [66]:
metadata = extract_case_metadata_from_page_fixed("data/cp13.pdf", 37)

In [67]:
metadata

{'core term': ['Programs',
  'infringement',
  'statutory damages',
  'default',
  'default judgment',
  'advertising',
  'distributing',
  'copies',
  'copyright  infringement',
  'products',
  'unauthorized',
  'damages',
  'manufacturing',
  'Importing',
  'reproduce',
  'picture',
  'merits',
  'personal  jurisdiction',
  'permanent injunction',
  'offering',
  'factors',
  'selling',
  'notice',
  'injunction',
  'entry of default',
  'actual damage',
  'certificate',
  'counterfeit',
  'similarity',
  'considers'],
 'judges': 'Fernando M. Olguin, United States District Judge.',
 'plaintiff_defendant': 'Warner Bros Home Entertainment Inc, Plaintiff: James Andrew Coombs, Nicole L Drey, J  Andrew Coombs APC, Glendale, CA',
 'opinion by': 'Fernando M. Olguin',
 'prior history': "Warner Bros Home Entm't v. Jimenez, 2013 U.S. Dist. LEXIS 37212 (C.D. Cal., Mar. 18, 2013)",
 'subsequent history': ''}

In [None]:
import os
import fitz
import re
from pymongo import MongoClient
from tqdm import tqdm

# === Ë®≠ÂÆö MongoDB ===
client = MongoClient("mongodb://yihua:Yh%40copyright@140.117.75.100:27017/?authSource=copyright")
db = client["copyright"]
collection = db["index_todo"]

# === ÊîæÂÖ•‰Ω†ÂÆåÊï¥ÁöÑÂáΩÊï∏ extract_case_metadata_from_page_fixed ===
# ÂèØÂæûÂâçÈù¢Ë§áË£Ω

# === ÊâπÊ¨°ËôïÁêÜ‰∏¶ÂØ´Âõû MongoDB ===
errors = []

for doc in tqdm(collection.find()):
    pdf = doc.get("pdf")
    page = doc.get("page")
    _id = doc.get("_id")

    try:
        if not pdf or page is None:
            raise ValueError("Missing 'pdf' or 'page'")
        corrected_page = page - 1
        pdf_path = os.path.join("data", pdf)
        if not os.path.exists(pdf_path):
            raise FileNotFoundError(f"PDF not found: {pdf_path}")

        metadata = extract_case_metadata_from_page_fixed(pdf_path, corrected_page)

        # === Êõ¥Êñ∞Ë≥áÊñôÂõûÂéü collection ===
        collection.update_one(
            {"_id": _id},
            {"$set": metadata}
        )
    except Exception as e:
        errors.append({"_id": str(_id), "pdf": pdf, "page": page, "error": str(e)})

# === ÈåØË™§Á¥ÄÈåÑÔºàÂèØÈÅ∏Ôºâ===
if errors:
    import json
    with open("writeback_errors.json", "w", encoding="utf-8") as f:
        json.dump(errors, f, ensure_ascii=False, indent=2)
    print(f"‚ö†Ô∏è ÂÆåÊàêÔºå‰ΩÜÊúâ {len(errors)} Á≠ÜÈåØË™§ÔºåË´ãÊ™¢Êü• writeback_errors.json")
else:
    print("‚úÖ ÊâÄÊúâË≥áÊñôÂ∑≤ÊàêÂäüÂØ´ÂÖ• MongoDBÔºÅ")


In [4]:
import re

def extract_prior_history(text):
    """
    ÂæûÊèê‰æõÁöÑ page_text ‰∏≠ÊèêÂèñ Prior History ÂçÄÊÆµÂÖßÂÆπ„ÄÇ
    Êé°Áî®Êõ¥Á©©ÂÆöÁöÑÊ≠£ÂâáÈÇèËºØÔºåÈÅøÂÖçË¢´ÊÆµËêΩÂàáÊñ∑„ÄÇ
    """
    pattern = r"Prior History[:\s]+([\s\S]+?)(?=\n(?:Disposition:|Core Terms))"
    match = re.search(pattern, text)
    if match:
        content = match.group(1).replace('\n', ' ').strip()
        return content
    return ""


In [6]:
sample_text = """
Prior History
Appeal from the United States District Court for the Eastern District of Pennsylvania.
(D.C. Civil Action No. 2-10-cv-02680). District Judge: Honorable J. Curtis Joyner.
Am. Bd. of Internal Med. v. Muller, 2012 U.S. Dist. LEXIS 123481 (E.D. Pa., Aug. 29, 2012)

Disposition: Judgment of the district court affirmed.
"""

print(extract_prior_history(sample_text))


Appeal from the United States District Court for the Eastern District of Pennsylvania. (D.C. Civil Action No. 2-10-cv-02680). District Judge: Honorable J. Curtis Joyner. Am. Bd. of Internal Med. v. Muller, 2012 U.S. Dist. LEXIS 123481 (E.D. Pa., Aug. 29, 2012)


In [7]:
sample_text = """
Prior History: [**1] APPEAL FROM THE UNITED STATES DISTRICT COURT FOR THE WESTERN DISTRICT
OF OKLAHOMA. (D.C. No. CV-93-1212-R). David L. Russell, District Judge.
Disposition: AFFIRMED.
"""

print(extract_prior_history(sample_text))


[**1] APPEAL FROM THE UNITED STATES DISTRICT COURT FOR THE WESTERN DISTRICT OF OKLAHOMA. (D.C. No. CV-93-1212-R). David L. Russell, District Judge.


In [8]:
sample_text = """
Prior History: [**1] Appeals from the United States District Court for the District of Minnesota. CIV 4-87-454.
Honorable David S. Doty, District Judge. Honorable James M. Rosenbaum, District Judge.
Disposition: Affirmed.
"""

print(extract_prior_history(sample_text))

[**1] Appeals from the United States District Court for the District of Minnesota. CIV 4-87-454. Honorable David S. Doty, District Judge. Honorable James M. Rosenbaum, District Judge.


In [None]:
from pymongo import MongoClient
import fitz  # PyMuPDF
import re
import os


# === MongoDB config ===
MONGO_URI = "mongodb://yihua:Yh%40copyright@140.117.75.100:27017/?authSource=copyright"
DB_NAME = "copyright"
COLLECTION_NAME = "index_todo"

# === PDF folder path ===
PDF_DIR = "./data"

# === ÊèêÂèñ Prior History ÁöÑÂáΩÊï∏ ===
def extract_prior_history(text):
    pattern = r"Prior History[:\s]+([\s\S]+?)(?=\n(?:Disposition:|Core Terms))"
    match = re.search(pattern, text)
    if match:
        return match.group(1).replace('\n', ' ').strip()
    return ""

# === ÈÄ£Êé• MongoDB ===
client = MongoClient(MONGO_URI)
collection = client[DB_NAME][COLLECTION_NAME]

# === ËôïÁêÜÊØè‰∏ÄÁ≠ÜÊñá‰ª∂ ===
error_logs = []

docs = collection.find({
    "pdf": {"$exists": True},
    "page": {"$type": "int"}
})

for doc in docs:
    pdf_filename = doc["pdf"]
    start_page = doc["page"]
    page_index = start_page - 1 
    _id = doc["_id"]
    pdf_path = os.path.join(PDF_DIR, pdf_filename)

    try:
        if not os.path.exists(pdf_path):
            raise FileNotFoundError(f"{pdf_path} not found")

        with fitz.open(pdf_path) as doc_pdf:
            if page_index >= len(doc_pdf):
                raise IndexError(f"Page {page_index} out of range in {pdf_filename}")
            text = doc_pdf[page_index].get_text("text")

        prior = extract_prior_history(text)

        # Êõ¥Êñ∞ MongoDB
        collection.update_one(
            {"_id": _id},
            {"$set": {"prior history": prior}}
        )
        print(f"Updated {_id} with prior history: {prior[:50]}...")

    except Exception as e:
        print(f"‚ùå Error with {_id}: {e}")
        error_logs.append({"_id": _id, "error": str(e)})

# === ÂèØÈÅ∏ÔºöËº∏Âá∫ÈåØË™§Á¥ÄÈåÑ ===
if error_logs:
    with open("prior_history_errors.json", "w", encoding="utf-8") as f:
        import json
        json.dump(error_logs, f, ensure_ascii=False, indent=2)
    print(f"\n‚ö†Ô∏è Saved error logs to prior_history_errors.json")

print("\n‚úÖ All done.")


In [13]:
from pymongo import MongoClient
import re

# === MongoDB config ===
MONGO_URI = "mongodb://yihua:Yh%40copyright@140.117.75.100:27017/?authSource=copyright"
DB_NAME = "copyright"
COLLECTION_NAME = "index_todo"

client = MongoClient(MONGO_URI)
collection = client[DB_NAME][COLLECTION_NAME]

# Ê∏ÖÈô§ [*1], [**12] ÈÄôÈ°ûÊ®ôË®òÁöÑÊ≠£ÂâáË°®ÈÅîÂºè
footnote_pattern = re.compile(r'\[\*+\d+\]')

def clean_text(text):
    return footnote_pattern.sub('', text).strip()

def clean_field(field):
    if isinstance(field, str):
        return clean_text(field)
    elif isinstance(field, list):
        return [clean_text(x) for x in field if isinstance(x, str)]
    return field

# Êü•Ë©¢Êúâ‰ªª‰∏ÄÊ¨Ñ‰ΩçÂ≠òÂú®ÁöÑÊñá‰ª∂
docs = collection.find({
    "$or": [
        {"prior history": {"$exists": True, "$ne": None}},
        {"subsequent history": {"$exists": True, "$ne": None}},
        {"opinion by": {"$exists": True, "$ne": None}},
        {"judges": {"$exists": True, "$ne": None}},
    ]
})

updated = 0

for doc in docs:
    _id = doc["_id"]
    old_fields = {
        "prior history": doc.get("prior history", ""),
        "subsequent history": doc.get("subsequent history", ""),
        "opinion by": doc.get("opinion by", ""),
        "judges": doc.get("judges", ""),
    }

    new_fields = {k: clean_field(v) for k, v in old_fields.items()}

    # ÊØîÂ∞çÊòØÂê¶ÊúâËÆäÂåñ
    if new_fields != old_fields:
        collection.update_one(
            {"_id": _id},
            {"$set": new_fields}
        )
        updated += 1

print(f"\n‚úÖ Cleaned and updated {updated} documents.")



‚úÖ Cleaned and updated 301 documents.


In [15]:
from pymongo import MongoClient
import csv

# === MongoDB config ===
MONGO_URI = "mongodb://yihua:Yh%40copyright@140.117.75.100:27017/?authSource=copyright"
DB_NAME = "copyright"
COLLECTION_NAME = "index_todo"

# === ÈÄ£Êé• MongoDB ===
client = MongoClient(MONGO_URI)
collection = client[DB_NAME][COLLECTION_NAME]

# === ÂÑ≤Â≠òÊâÄÊúâÊ≥ïÂÆòÂêçÂ≠óÁöÑ setÔºàÈÅøÂÖçÈáçË§áÔºâ===
all_judges = set()

# === Êü•Ë©¢ÊâÄÊúâÊúâ opinion by ÁöÑË≥áÊñô ===
docs = collection.find({
    "opinion by": {"$exists": True, "$ne": None}
})

for doc in docs:
    raw = doc["opinion by"]

    if isinstance(raw, list):
        names = raw
    elif isinstance(raw, str):
        if raw.strip():
            names = [raw]
        else:
            names = []
    else:
        names = []

    # Áµ±‰∏ÄÊ†ºÂºèÔºåtitle() ‰∏¶ÂéªÁ©∫ÁôΩ
    formatted = [name.strip().title() for name in names if name.strip()]
    all_judges.update(formatted)

# === Â∞áÁµêÊûúÂØ´ÂÖ• CSV ===
unique_judges = sorted(all_judges)

csv_filename = "judges_raw.csv"
with open(csv_filename, mode="w", newline="", encoding="utf-8") as file:
    writer = csv.writer(file)
    writer.writerow(["name"])  # Ê®ôÈ°åÂàó
    for name in unique_judges:
        writer.writerow([name])

print(f"\n‚úÖ Total unique judges: {len(unique_judges)}")
print(f"üìÑ Saved to {csv_filename}")




‚úÖ Total unique judges: 1059
üìÑ Saved to judges_raw.csv


In [16]:
import requests
from bs4 import BeautifulSoup
import json

headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/136.0.0.0 Safari/537.36'
}

def extract_court_names(text):
    return any(kw in text for kw in ["Court", "Circuit"])

def scrape_ballotpedia_judge_info(judge_name):
    # Convert name to Ballotpedia URL format
    url_name = judge_name.strip().title().replace(" ", "_")
    url = f"https://ballotpedia.org/{url_name}"

    res = requests.get(url, headers=headers)
    res.encoding = 'utf-8'
    soup = BeautifulSoup(res.text, "html.parser")

    box = soup.find('div', class_='infobox person')
    data = {}
    education = {}
    circuit_history = []
    Aliases = []

    if not box:
        data["error"] = f"No infobox found for {judge_name}"
        return data

    rows = box.find_all('div', class_='widget-row')

    # Name and current office (excluding party labels)
    name_candidates = box.find_all('div', class_=lambda x: x and 'widget-row value-only' in x)
    filtered = [div for div in name_candidates if div.get_text(strip=True) not in ["Democratic Party", "Republican Party", "Nonpartisan"]]
    
    # Get the official name from the page title
    page_name_tag = soup.find('span', class_='mw-page-title-main')
    if page_name_tag:
        page_name = page_name_tag.get_text(strip=True)
        data["Name"] = page_name

    # Check if judge_name (normalized) differs from the page name
    normalized_input = judge_name.strip().lower().replace("_", "").replace(" ", "")
    normalized_page = page_name.lower().replace(" ", "")
    if normalized_input != normalized_page:
        Aliases.append(judge_name)

    # Party
    party_tag = box.find('a', href=lambda x: x and ("Democratic_Party" in x or "Republican_Party" in x or "Nonpartisan" in x))
    if party_tag:
        data["Party"] = party_tag.get_text(strip=True)

    # Main infobox parsing
    for row in rows:
        key_tag = row.find('div', class_='widget-key')
        value_tag = row.find('div', class_='widget-value')

        if key_tag and value_tag:
            key = key_tag.get_text(strip=True)
            val = value_tag.get_text(separator=' ', strip=True)

            if key in ["Bachelor's", "Law"]:
                education[key] = val
            elif extract_court_names(key):
                circuit_history.append(key)
            elif extract_court_names(val):
                circuit_history.append(val)
            else:
                data[key] = val

    # Bold label above, value below
    bold_divs = box.find_all('div', style=lambda x: x and 'font-weight: bold' in x)
    for div in bold_divs:
        key = div.get_text(strip=True)
        next_div = div.find_next_sibling('div')
        if next_div:
            val = next_div.get_text(strip=True)
            if extract_court_names(key):
                circuit_history.append(key)
            if extract_court_names(val):
                circuit_history.append(val)
            if not extract_court_names(key) and not extract_court_names(val):
                data[key] = val

    # Clean circuit entries
    cleaned_circuits = list(set([
        entry.strip() for entry in circuit_history if ':' not in entry
    ]))

    # Attach structured fields
    if education:
        data["Education"] = education
    if cleaned_circuits:
        data["Circuit"] = cleaned_circuits

    # Paragraphs and gender detection
    paragraphs = soup.find_all('p')
    all_text = ' '.join(p.get_text(strip=True) for p in paragraphs).lower()
    data["content"] = all_text

    she_count = all_text.count(' she ')
    her_count = all_text.count(' her ')
    he_count = all_text.count(' he ')
    his_count = all_text.count(' his ')

    if she_count + her_count > he_count + his_count:
        data["Gender"] = "Female"
    elif he_count + his_count > 0:
        data["Gender"] = "Male"

    data["Ballotpedia URL"] = url
    if Aliases:
        data["Aliases"] = Aliases


    return data


In [None]:
from pymongo import MongoClient
from datetime import datetime

# ‚úÖ 1. ÈÄ£Á∑öÂà∞ MongoDB
client = MongoClient("mongodb://yihua:Yh%40copyright@140.117.75.100:27017/?authSource=copyright")  # Â¶ÇÊûú‰Ω†ÊúâÂ∏≥ÂØÜÊàñÈÅ†Á´Ø URIÔºåË´ãÊîπÊéâÈÄôË£°
db = client["copyright"]  
collection = db["index_todo"]

# ‚úÖ 2. Êó•ÊúüÂ≠ó‰∏≤ËΩâÊèõÂáΩÂºè
def try_parse_date(date_str):
    try:
        return datetime.strptime(date_str, "%Y/%m/%d")
    except Exception:
        return None

# ‚úÖ 3. ËôïÁêÜÊØèÁ≠ÜÊñá‰ª∂
for doc in collection.find():
    updates = {}
    for field in ["Decided", "Others", "Argued"]:
        if field in doc and isinstance(doc[field], str):
            parsed_date = try_parse_date(doc[field])
            if parsed_date:
                updates[field] = parsed_date

    if updates:
        collection.update_one({"_id": doc["_id"]}, {"$set": updates})


In [3]:
from pymongo import MongoClient
from datetime import datetime
from zoneinfo import ZoneInfo  # Python 3.9+

client = MongoClient("mongodb://yihua:Yh%40copyright@140.117.75.100:27017/?authSource=copyright")
db = client["copyright"]
collection = db["index_todo"]

def to_gmt8(date_str):
    try:
        dt = datetime.strptime(date_str, "%Y/%m/%d")
        return dt.replace(tzinfo=ZoneInfo("Asia/Taipei"))  # ÊåáÂÆöÁÇ∫ GMT+8
    except:
        return None

for doc in collection.find():
    updates = {}
    for field in ["Decided", "Others", "Argued"]:
        if field in doc and isinstance(doc[field], str):
            dt = to_gmt8(doc[field])
            if dt:
                updates[field] = dt
    if updates:
        collection.update_one({"_id": doc["_id"]}, {"$set": updates})
