<a href="https://colab.research.google.com/github/P-arjunie/SL-Legal-Act-Parser/blob/main/Act_extraction.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# =========================================
# INSTALL DEPENDENCIES
# =========================================
!pip install pdfplumber pytesseract scikit-learn pillow beautifulsoup4 lxml > /dev/null

import pdfplumber
import pytesseract
import requests
from bs4 import BeautifulSoup
from PIL import Image
import numpy as np
from sklearn.cluster import KMeans
import io
import json
import re

In [3]:
def _get_act_identifier(act):
    """
    Generates a unique identifier for an act based on its official_name and year.
    Args:
        act (dict): A dictionary representing a parsed act, expected to have 'act_info' with 'official_name' and 'year'.
    Returns:
        str: A unique identifier string or None if essential information is missing.
    """
    official_name = act.get('act_info', {}).get('official_name')
    year = act.get('act_info', {}).get('year')

    if official_name and year:
        # Normalize name for consistent comparison (e.g., lowercase, remove extra spaces)
        normalized_name = re.sub(r'\s+', ' ', official_name).strip().lower()
        return f"{normalized_name}_{year}"
    return None

print("'_get_act_identifier' function defined.")

'_get_act_identifier' function defined.


In [10]:
##correct code


# =========================================
# PDF LAYOUT EXTRACTION FUNCTIONS
# =========================================

def pdf_page_is_scanned(page):
    return not bool(page.extract_text())


def ocr_image_from_page(page):
    pil = page.to_image(resolution=180).original
    data = pytesseract.image_to_data(pil, output_type=pytesseract.Output.DICT)

    words = []
    for i in range(len(data['text'])):
        txt = data['text'][i].strip()
        if not txt:
            continue
        x, y, w, h = data['left'][i], data['top'][i], data['width'][i], data['height'][i]
        words.append({
            'text': txt,
            'x0': x, 'x1': x + w,
            'y0': y, 'y1': y + h,
            'cx': x + w / 2,
            'cy': y + h / 2
        })
    return words


def words_from_pdfplumber_page(page):
    words = []
    for w in page.extract_words():
        words.append({
            'text': w["text"],
            'x0': float(w["x0"]),
            'x1': float(w["x1"]),
            'y0': float(w["top"]),
            'y1': float(w["bottom"]),
            'cx': (float(w["x0"]) + float(w["x1"])) / 2,
            'cy': (float(w["top"]) + float(w["bottom"])) / 2,
        })
    return words


def detect_column_clusters(words, n_clusters=2):
    xs = np.array([[w['cx']] for w in words])
    if xs.max() - xs.min() < 120:
        return np.zeros(len(words), dtype=int)
    return KMeans(n_clusters=n_clusters, random_state=0).fit(xs).labels_


def build_lines_by_column(words, labels):
    columns = []

    for cid in sorted(set(labels)):
        cluster = [w for w, l in zip(words, labels) if l == cid]
        cluster = sorted(cluster, key=lambda w: (w['cy'], w['cx']))

        lines, current = [], [cluster[0]]
        for w in cluster[1:]:
            prev = current[-1]
            if abs(w['cy'] - prev['cy']) < max(prev['y1'] - prev['y0'], w['y1'] - w['y0']):
                current.append(w)
            else:
                lines.append(current)
                current = [w]
        lines.append(current)

        blocks = [{
            "y": np.mean([t['cy'] for t in line]),
            "text": " ".join(t['text'] for t in line)
        } for line in lines]

        columns.append(blocks)

    return columns


def merge_columns_reading_order(columns):
    merged = []
    for col_idx, col in enumerate(columns):
        for b in col:
            merged.append((b['y'], col_idx, b['text']))
    merged.sort()
    return "\n".join(t[2] for t in merged)


def extract_layout_text_from_pdf(path):
    pages = []
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            words = ocr_image_from_page(page) if pdf_page_is_scanned(page) else words_from_pdfplumber_page(page)
            if not words:
                pages.append("")
                continue
            labels = detect_column_clusters(words)
            blocks = build_lines_by_column(words, labels)
            pages.append(merge_columns_reading_order(blocks))
    return "\n\n".join(pages)

def extract_short_title_from_text(text):
    """
    Robustly extracts the official short title from clauses like:
    'This Act may be cited as the Finance (Amendment) Act, No. 35 of 1969.'
    """

    # Normalize OCR chaos
    normalized = re.sub(r"\s+", " ", text)

    pattern = re.compile(
        r"This\s+(Act|Law|Ordinance)\s+may\s+be\s+cited\s+as\s+(?:the\s+)?(.+?)(?:\.|\n)",
        re.IGNORECASE
    )

    match = pattern.search(normalized)
    if not match:
        return None, None

    raw_title = match.group(2).strip()

    # Trim trailing junk like "Short title"
    raw_title = re.sub(r"\bShort title\b.*", "", raw_title, flags=re.I).strip()

    # Extract year separately (much safer)
    year_match = re.search(r"\b(18|19|20)\d{2}\b", raw_title)
    year = int(year_match.group()) if year_match else None

    # Canonical cleanup
    title = re.sub(r"\s{2,}", " ", raw_title)
    title = title.rstrip(",.;")

    return title, year



# =========================================
# FIXED PARSER (Sri Lankan Act Format) - Modified to extract long_title and year
# =========================================

def parse_act_text(text):
    lines = [l.strip() for l in text.split("\n") if l.strip()]

    act = {
        "act_info": {
            "official_name": None,
            "long_title": None,
            "year": None
        },
        "chapters": []
    }

    # --- Extract official short title if available ---
    short_title, short_title_year = extract_short_title_from_text(text)

    if short_title:
        act["act_info"]["official_name"] = short_title
        act["act_info"]["year"] = short_title_year


    if not lines:
        return act

    # Pre-processing step to merge split section numbers
    processed_lines = []
    i = 0
    while i < len(lines):
        current_line = lines[i]
        # Check for split section number pattern: "1" followed by ". This Act..."
        if re.fullmatch(r'\d+', current_line.strip()) and i + 1 < len(lines) and lines[i+1].strip().startswith('.'):
            # Merge the current line (number) with the next line (starts with .)
            merged_line = current_line.strip() + lines[i+1].strip()
            processed_lines.append(merged_line)
            i += 2 # Skip the next line as it's been merged
        else:
            processed_lines.append(current_line)
            i += 1
    lines = processed_lines

    # Extract official name (first non-empty line)
    if not act["act_info"]["official_name"]:
      act["act_info"]["official_name"] = lines[0]

    line_idx = 1 # Start checking from the second line

    # Collect long title
    long_title_parts = []
    # Keywords indicating the end of the long title and start of operational text
    # 'AN ACT TO' should now be part of the long title, so it's removed from end patterns.
    long_title_end_patterns = [
        re.compile(r"^(BE IT ENACTED|THE PARLIAMENT OF|PROVIDING FOR)", re.IGNORECASE), # 'AN ACT TO' removed
        re.compile(r"^Short title and dates of operation\.$", re.IGNORECASE),
        re.compile(r"^\d+\.\s*"), # First section
        re.compile(r"CHAPTER\s+([A-Z]+)", re.I)
    ]

    while line_idx < len(lines):
        line = lines[line_idx]
        should_break = False
        for pattern in long_title_end_patterns:
            if pattern.match(line):
                should_break = True
                break
        if should_break:
            break
        long_title_parts.append(line)
        line_idx += 1

    if long_title_parts:
        act["act_info"]["long_title"] = "\n".join(long_title_parts)

    current_chapter = {
        "chapter_number": None,
        "chapter_title": None,
        "sections": []
    }

    prev_line = None
    current_section = None

    # Continue parsing from where long title collection stopped
    for i in range(line_idx, len(lines)):
        line = lines[i]

        # Year extraction is deferred to post-processing

        # ---------- CHAPTER ----------
        chap = re.match(r"CHAPTER\s+([A-Z]+)", line, re.I)
        if chap:
            if current_section: # if there's an ongoing section, add it to current chapter
                current_chapter["sections"].append(current_section)
                current_section = None # reset current_section

            if current_chapter["sections"]: # if current chapter has sections, add it to act
                act["chapters"].append(current_chapter)

            current_chapter = {
                "chapter_number": chap.group(1),
                "chapter_title": None,
                "sections": []
            }
            prev_line = None
            continue

        # ---------- SECTION ----------
        # Regex for section: starts with digit(s) followed by a period and space
        sec = re.match(r"^(\d+)\.\s*(.*)", line)
        if sec:
            if current_section:
                current_chapter["sections"].append(current_section)

            sec_num = sec.group(1)
            body_after = sec.group(2).strip()

            # Determine section heading: It's usually the line immediately preceding the section number,
            # but only if it's not a subsection or continuation of previous text.
            heading = None
            if prev_line and not prev_line.endswith(":"):
                # Ensure it's not a subsection or other structured element
                if not re.match(r"^(?:\(|[a-z]\))", prev_line) and not re.match(r"^(?:\d+|[a-z]|[ivx]+)\)\s*", prev_line):
                     heading = prev_line

            current_section = {
                "section_number": sec_num,
                "section_heading": heading,
                "section_body": body_after,
                "subsections": []
            }

            prev_line = None # Reset prev_line after a new section starts
            continue

        # ---------- SUBSECTIONS ----------
        sub = re.match(r"^\((\d+|[a-z]|[ivx]+)\)\s*(.*)", line, re.I)
        if sub and current_section:
            current_section["subsections"].append({
                "sub_number": f"({sub.group(1)})",
                "text": sub.group(2)
            })
            prev_line = None # Reset prev_line for consistency
            continue

        # ---------- BODY TEXT ----------
        if current_section:
            # Append to section body if not a duplicate of prev_line (e.g., from double newlines)
            if line != prev_line: # prevent appending same line twice
                current_section["section_body"] += "\n" + line
            prev_line = line # Update prev_line
        else:
            # If we are outside a section/chapter and not collecting long_title,
            # this line might be an unparsed preamble or just noise.
            prev_line = line


    # finalize last section/chapter
    if current_section:
        current_chapter["sections"].append(current_section)

    if current_chapter["sections"]:
        act["chapters"].append(current_chapter)

    # Post-processing to extract year from collected data
    # More robust year extraction after all text is structured.
    year_found = False

    # 1. Check official name
    year_match = re.search(r'(\d{4})', act["act_info"]["official_name"])
    if year_match:
        act["act_info"]["year"] = int(year_match.group(1))
        year_found = True

    # 2. Check long title
    if not year_found and act["act_info"]["long_title"]:
        year_match = re.search(r'(\d{4})', act["act_info"]["long_title"])
        if year_match:
            act["act_info"]["year"] = int(year_match.group(1))
            year_found = True

    # 3. Check first section's body (most common place for citation year)
    if not year_found and act["chapters"] and act["chapters"][0]["sections"]:
        first_section_body = act["chapters"][0]["sections"][0]["section_body"]
        # Look for patterns like "No. XX of YYYY" or just "YYYY"
        year_match = re.search(r'\b(?:No\.\s+\d+\s+of\s+)?(\d{4})\b', first_section_body)
        if year_match:
            act["act_info"]["year"] = int(year_match.group(1))
            year_found = True

    return act

# Helper function to generate a unique identifier for an act
def _get_act_identifier(act):
    """
    Generates a unique identifier for an act based on its official_name and year.
    Args:
        act (dict): A dictionary representing a parsed act, expected to have 'act_info' with 'official_name' and 'year'.
    Returns:
        str: A unique identifier string or None if essential information is missing.
    """
    official_name = act.get('act_info', {}).get('official_name')
    year = act.get('act_info', {}).get('year')

    if official_name and year:
        # Normalize name for consistent comparison (e.g., lowercase, remove extra spaces)
        normalized_name = re.sub(r'\s+', ' ', official_name).strip().lower()
        return f"{normalized_name}_{year}"
    return None

def read_urls_from_input():
    print("Paste URLs (one per line OR space-separated).")
    print("Press Enter on an empty line to finish:")

    urls = []
    while True:
        line = input().strip()
        if not line:
            break

        # Split by whitespace to handle space-separated URLs
        parts = line.split()
        for p in parts:
            if p.startswith("http"):
                urls.append(p)

    # Remove duplicates while preserving order
    seen = set()
    cleaned = []
    for u in urls:
        if u not in seen:
            cleaned.append(u)
            seen.add(u)

    return cleaned



# =========================================
# HTML EXTRACTION
# =========================================

def extract_html_text(url):
    response = requests.get(url)
    soup = BeautifulSoup(response.content, 'html.parser')

    for bad in soup(['script', 'style', 'header', 'footer', 'nav']):
        bad.decompose()

    content = (
        soup.find("div", class_="entry-content")
        or soup.find("div", class_="post-content")
        or soup.find("article")
        or soup.find("div", id="content")
        or soup.body
    )

    return content.get_text(separator="\n", strip=True)


# =========================================
# MAIN EXECUTION FLOW
# =========================================

# url = input("Enter the URL for the legal act (PDF or HTML): ").strip()
# all_parsed_acts = []

# if url:
#     print("Fetching document…")

#     if url.lower().endswith(".pdf"):
#         print("Downloading PDF…")
#         pdf_bytes = requests.get(url).content
#         temp_path = "/content/temp_law.pdf"
#         with open(temp_path, "wb") as f:
#             f.write(pdf_bytes)

#         print("Extracting layout-aware text (OCR if needed)…")
#         raw_text = extract_layout_text_from_pdf(temp_path)

#     else:
#         print("Extracting text from HTML…")
#         raw_text = extract_html_text(url)

#     print("\n=== Extracted Text (first 1500 chars) ===")
#     print(raw_text[:1500])
#     print("\n=========================================\n")

#     print("Parsing Act Structure…")
#     parsed = parse_act_text(raw_text)

#     all_parsed_acts.append(parsed)

#     print("=== Parsed JSON Preview (first part) ===")
#     print(json.dumps(parsed, indent=2, ensure_ascii=False)[:2000])

# else:
#     print("No URL entered.")

all_parsed_acts = []

urls = read_urls_from_input()

if not urls:
    print("No URLs entered.")
else:
    print(f"Fetching {len(urls)} documents…")

    for url in urls:
        print("\n==============================")
        print("Processing:", url)

        try:
            if url.lower().endswith(".pdf"):
                print("Downloading PDF…")
                pdf_bytes = requests.get(url, timeout=30).content
                temp_path = "/content/temp_law.pdf"
                with open(temp_path, "wb") as f:
                    f.write(pdf_bytes)

                print("Extracting layout-aware text (OCR if needed)…")
                raw_text = extract_layout_text_from_pdf(temp_path)

            else:
                print("Extracting HTML text…")
                raw_text = extract_html_text(url)

            if not raw_text.strip():
                print("⚠️ Empty content, skipping.")
                continue

            print("Parsing Act Structure…")
            parsed = parse_act_text(raw_text)
            all_parsed_acts.append(parsed)

            # ✅ JSON PREVIEW (moved INSIDE loop)
            print("=== Parsed JSON Preview (first part) ===")
            print(json.dumps(parsed, indent=2, ensure_ascii=False)[:2000])

            print("✔ Parsed:",
                  parsed["act_info"]["official_name"],
                  parsed["act_info"]["year"])

        except Exception as e:
            print("❌ Failed to process:", e)



import os
import json

# Define the output file name, now pointing to Google Drive
output_filename = "/content/drive/MyDrive/parsed_act.json"

# Initialize an empty list for the final acts
final_acts_to_save = []

# Check if the output file exists and load its content
if os.path.exists(output_filename):
    with open(output_filename, 'r') as f:
        final_acts_to_save = json.load(f)
    print(f"Loaded {len(final_acts_to_save)} existing acts from {output_filename}")
else:
    print(f"No existing acts found at {output_filename}. Initializing an empty list.")

print(f"Current state of final_acts_to_save: {len(final_acts_to_save)} items.")

unique_act_identifiers = set()
# Populate the set with identifiers from already loaded acts
for act in final_acts_to_save:
    identifier = _get_act_identifier(act)
    if identifier:
        unique_act_identifiers.add(identifier)

print(f"Initial unique identifiers from loaded acts: {len(unique_act_identifiers)}")

# Integrate newly parsed acts (from all_parsed_acts) with duplicate checking
newly_added_count = 0
for act in all_parsed_acts:
    identifier = _get_act_identifier(act)
    if identifier:
        if identifier not in unique_act_identifiers:
            final_acts_to_save.append(act)
            unique_act_identifiers.add(identifier)
            newly_added_count += 1
        else:
            print(f"Skipping duplicate act: {identifier}")
    else:
        print(f"Skipping act due to missing identifier info: {act.get('act_info', {})}")

print(f"Added {newly_added_count} new unique acts. Total acts in collection: {len(final_acts_to_save)}")



with open(output_filename, 'w', encoding='utf-8') as f:
    json.dump(final_acts_to_save, f, indent=2, ensure_ascii=False)

print(f"Successfully saved {len(final_acts_to_save)} unique acts to {output_filename}")

Paste URLs (one per line OR space-separated).
Press Enter on an empty line to finish:
https://lankalaw.net/wp-content/uploads/2025/02/1981Y3V62C-1.html https://lankalaw.net/wp-content/uploads/2025/02/1981Y15V508C-1.html https://lankalaw.net/wp-content/uploads/2025/02/1983Y0V0C42A.html https://lankalaw.net/wp-content/uploads/2025/02/2000Y0V0C45A.html https://lankalaw.net/wp-content/uploads/2025/02/1981Y11V295C-1.html https://lankalaw.net/wp-content/uploads/2025/02/1981Y13V353C-1.html https://lankalaw.net/wp-content/uploads/2025/02/1981Y13V353C-1.html https://lankalaw.net/wp-content/uploads/2025/02/1981Y11V287C-1.html https://lankalaw.net/wp-content/uploads/2025/02/1981Y11V297C-1.html https://lankalaw.net/wp-content/uploads/2025/02/1981Y11V287C-1.html https://lankalaw.net/wp-content/uploads/2025/02/1981Y11V297C-1.html https://lankalaw.net/wp-content/uploads/2025/02/1981Y11V297C-1.html https://lankalaw.net/wp-content/uploads/2025/02/1981Y11V299C-1.html https://lankalaw.net/wp-content/uplo

In [5]:
import os
import json

# Define the output file name, now pointing to Google Drive
output_filename = "/content/drive/MyDrive/parsed_act.json"

# Initialize an empty list for the final acts
final_acts_to_save = []

# Check if the output file exists and load its content
if os.path.exists(output_filename):
    with open(output_filename, 'r') as f:
        final_acts_to_save = json.load(f)
    print(f"Loaded {len(final_acts_to_save)} existing acts from {output_filename}")
else:
    print(f"No existing acts found at {output_filename}. Initializing an empty list.")

print(f"Current state of final_acts_to_save: {len(final_acts_to_save)} items.")

unique_act_identifiers = set()
# Populate the set with identifiers from already loaded acts
for act in final_acts_to_save:
    identifier = _get_act_identifier(act)
    if identifier:
        unique_act_identifiers.add(identifier)

print(f"Initial unique identifiers from loaded acts: {len(unique_act_identifiers)}")

# Integrate newly parsed acts (from all_parsed_acts) with duplicate checking
newly_added_count = 0
for act in all_parsed_acts:
    identifier = _get_act_identifier(act)
    if identifier:
        if identifier not in unique_act_identifiers:
            final_acts_to_save.append(act)
            unique_act_identifiers.add(identifier)
            newly_added_count += 1
        else:
            print(f"Skipping duplicate act: {identifier}")
    else:
        print(f"Skipping act due to missing identifier info: {act.get('act_info', {})}")

print(f"Added {newly_added_count} new unique acts. Total acts in collection: {len(final_acts_to_save)}")




with open(output_filename, 'w', encoding='utf-8') as f:
    json.dump(final_acts_to_save, f, indent=2, ensure_ascii=False)

print(f"Successfully saved {len(final_acts_to_save)} unique acts to {output_filename}")

Loaded 436 existing acts from /content/drive/MyDrive/parsed_act.json
Current state of final_acts_to_save: 436 items.
Initial unique identifiers from loaded acts: 436
Skipping duplicate act: food and drugs (amendment) act, no. 35 of 1968_1968
Skipping duplicate act: regulation of foreign fishing boats_1979
Skipping duplicate act: fisheries (regulation of foreign fishing boats) (amendment) act, no. 37 of 1982_1982
Skipping duplicate act: fiscal management (responsibility) act, no. 3 of 2003_2003
Skipping duplicate act: firing ranges and military training (amendment) act, no. 19 of 1986_1986
Skipping duplicate act: firearms (amendment)_1996
Skipping duplicate act: food act, no. 26 of 1980_1980
Skipping duplicate act: fugitive persons act, no. 29 of 1969_1969
Skipping duplicate act: fuel conservation-five day week_1978
Skipping duplicate act: foreign-going aircraft (exemption from customs duty)_1938
Skipping duplicate act: foreign probates_1973
Skipping duplicate act: foreign marriages_190