In [1]:
from pathlib import Path
import nltk

# -----------------------------------------------------------------------------
# Paths
# -----------------------------------------------------------------------------
PROJECT_ROOT = Path.cwd()
if not (PROJECT_ROOT / "cap_data").exists() and (PROJECT_ROOT.parent / "cap_data").exists():
    PROJECT_ROOT = PROJECT_ROOT.parent
CAP_DIR = PROJECT_ROOT / "cap_data"

# -----------------------------------------------------------------------------
# NLTK resources (safe if already downloaded)
# -----------------------------------------------------------------------------
nltk.download("punkt")
nltk.download("stopwords")

# -----------------------------------------------------------------------------
# spaCy model: do NOT download inside notebook by default (non-reproducible).
# If you need it, install it in your environment and then load.
# -----------------------------------------------------------------------------
import spacy

try:
    nlp = spacy.load("en_core_web_sm")
except Exception as e:
    nlp = None
    print("WARNING: spaCy model 'en_core_web_sm' not available.")
    print("Install with: python -m spacy download en_core_web_sm")
    print("Underlying error:", e)



[nltk_data] Downloading package punkt to /Users/araj/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /Users/araj/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [2]:
import json
import pandas as pd

# CAP metadata lives under `cap_data/metadata/`
metadata_dir = CAP_DIR / "metadata"

# In this repo, `CasesMetadata.json` is a *list* of dicts.
cases_metadata_path = metadata_dir / "CasesMetadata.json"
volume_metadata_path = metadata_dir / "VolumeMetadata.json"

metadata_rows = []

# Volume metadata (dict)
if volume_metadata_path.exists():
    with open(volume_metadata_path, "r", encoding="utf-8") as f:
        volume_meta = json.load(f)
    # Normalize into a single row
    volume_row = {"__source__": "VolumeMetadata.json", **volume_meta}
    metadata_rows.append(volume_row)

# Cases metadata (list[dict])
if cases_metadata_path.exists():
    with open(cases_metadata_path, "r", encoding="utf-8") as f:
        cases_meta = json.load(f)

    if isinstance(cases_meta, list):
        for row in cases_meta:
            if isinstance(row, dict):
                metadata_rows.append({"__source__": "CasesMetadata.json", **row})
    elif isinstance(cases_meta, dict):
        metadata_rows.append({"__source__": "CasesMetadata.json", **cases_meta})
    else:
        raise TypeError(f"Unexpected JSON type for {cases_metadata_path}: {type(cases_meta)}")

print(f"Loaded metadata rows: {len(metadata_rows)}")
print(metadata_rows[0] if metadata_rows else "<no metadata loaded>")

# Flatten metadata
if metadata_rows:
    df_metadata = pd.json_normalize(metadata_rows)
    print(df_metadata.head(3))
    print("Metadata columns:", len(df_metadata.columns))
else:
    df_metadata = pd.DataFrame()



Loaded metadata rows: 70
{'__source__': 'VolumeMetadata.json', 'volume_number': '219', 'title': None, 'publisher': None, 'publication_year': None, 'start_year': 2017, 'end_year': 2017, 'series_volume_number': None, 'jurisdictions': [{'id': 30, 'name': 'Cal.', 'name_long': 'California'}], 'id': 'CalRptr3d_219', 'harvard_hollis_id': None, 'spine_start_year': 2017, 'spine_end_year': 2017, 'publication_city': None, 'second_part_of_id': None, 'redacted': False, 'nominative_reporter': None}
            __source__ volume_number  title  publisher  publication_year  \
0  VolumeMetadata.json           219    NaN        NaN               NaN   
1   CasesMetadata.json           NaN    NaN        NaN               NaN   
2   CasesMetadata.json           NaN    NaN        NaN               NaN   

   start_year  end_year  series_volume_number  \
0      2017.0    2017.0                   NaN   
1         NaN       NaN                   NaN   
2         NaN       NaN                   NaN   

        

In [None]:
from bs4 import BeautifulSoup

html_dir = CAP_DIR / "html"

def extract_case_text_from_cap_html(html: str) -> str:
    """Extract opinion text from CAP HTML.

    CAP HTML in this repo uses:
    - <article class="opinion"> ... <p> ... </p>
    """
    soup = BeautifulSoup(html, "lxml")

    # Prefer opinion article; fallback to whole casebody
    opinion = soup.select_one("article.opinion")
    scope = opinion if opinion is not None else soup.select_one("section.casebody")
    if scope is None:
        return ""

    # Remove footnotes to reduce noise
    for foot in scope.select("aside.footnote"):
        foot.decompose()

    paras = [p.get_text(" ", strip=True) for p in scope.select("p")]
    text = "\n".join([t for t in paras if t])
    return text

rows = []

if not html_dir.exists():
    raise FileNotFoundError(f"CAP HTML dir not found: {html_dir}")

for path in sorted(html_dir.glob("*.html")):
    with open(path, "r", encoding="utf-8", errors="replace") as f:
        html = f.read()

    case_id = path.stem  # e.g. 0065-01
    text = extract_case_text_from_cap_html(html)

    # Pull a title if present
    soup = BeautifulSoup(html, "lxml")
    parties = soup.select_one("section.head-matter p.parties")
    title = parties.get_text(" ", strip=True) if parties else None

    rows.append(
        {
            "case_id": case_id,
            "title": title,
            "opinion_text": text,
            "opinion_char_len": len(text),
        }
    )

df_text = pd.DataFrame(rows)
print(df_text.head(3))
print("Extracted cases:", len(df_text))
print("Empty texts:", (df_text["opinion_char_len"] == 0).sum())



In [None]:
# -----------------------------------------------------------------------------
# Merge CAP JSON metadata onto extracted text + export `data/legal_text_data.csv`
# -----------------------------------------------------------------------------
import json

json_dir = CAP_DIR / "json"

meta_rows = []
for path in sorted(json_dir.glob("*.json")):
    case_id = path.stem  # matches html stem
    with open(path, "r", encoding="utf-8", errors="replace") as f:
        obj = json.load(f)

    # Extract a compact, stable schema
    citations = obj.get("citations") or []
    official_cite = None
    for c in citations:
        if isinstance(c, dict) and c.get("type") == "official":
            official_cite = c.get("cite")
            break

    court = obj.get("court") or {}
    juris = obj.get("jurisdiction") or {}

    meta_rows.append(
        {
            "case_id": case_id,
            "cap_case_numeric_id": obj.get("id"),
            "name": obj.get("name"),
            "name_abbreviation": obj.get("name_abbreviation"),
            "decision_date": obj.get("decision_date"),
            "docket_number": obj.get("docket_number"),
            "first_page": obj.get("first_page"),
            "last_page": obj.get("last_page"),
            "official_citation": official_cite,
            "court_name": court.get("name"),
            "court_abbrev": court.get("name_abbreviation"),
            "jurisdiction": juris.get("name_long") or juris.get("name"),
        }
    )

df_cap_meta = pd.DataFrame(meta_rows)
print("CAP JSON meta rows:", len(df_cap_meta))
print(df_cap_meta.head(3))

# Join meta onto text
legal_df = df_text.merge(df_cap_meta, on="case_id", how="left")

# -----------------------------------------------------------------------------
# Bias proxy features (NOT ground-truth labels)
# -----------------------------------------------------------------------------
# This is a *weak heuristic* to give you something to iterate on.
# Treat it as a feature, not a definitive "bias" label.
BIAS_TERMS = [
    # race/ethnicity
    "black",
    "white",
    "hispanic",
    "latino",
    "asian",
    "native",
    "indian",
    "race",
    "racial",
    "ethnicity",
    # gender
    "male",
    "female",
    "woman",
    "women",
    "man",
    "men",
    "gender",
    # citizenship/immigration
    "alien",
    "immigrant",
    "immigration",
    "citizen",
    "citizenship",
    "deport",
    "deportation",
]

def bias_proxy_hits(text: str) -> int:
    t = (text or "").lower()
    return sum(t.count(term) for term in BIAS_TERMS)

legal_df["bias_proxy_term_hits"] = legal_df["opinion_text"].astype(str).apply(bias_proxy_hits)
legal_df["BIAS_LABEL"] = (legal_df["bias_proxy_term_hits"] > 0).astype(int)

# Export
out_path = PROJECT_ROOT / "data" / "legal_text_data.csv"
legal_df.to_csv(out_path, index=False)
print(f"Wrote: {out_path} | shape={legal_df.shape}")

# Preview
print(legal_df[["case_id", "decision_date", "court_abbrev", "opinion_char_len", "bias_proxy_term_hits", "BIAS_LABEL"]].head(10))



In [None]:
import re
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

# Define stopwords
stop_words = set(stopwords.words("english"))

def preprocess_text(text: str) -> str:
    # Remove citations and special characters
    text = re.sub(r"\[\d+\]", "", text)  # Remove [1], [2], etc.
    text = re.sub(r"\(\d+\)", "", text)  # Remove (1), (2), etc.
    text = re.sub(r"\*\d+", "", text)  # Remove *123

    text = text.lower()
    text = re.sub(r"[^\w\s]", "", text)

    words = word_tokenize(text)
    words = [word for word in words if word not in stop_words]
    return " ".join(words)

# Apply preprocessing
df_text["clean_opinion"] = df_text["opinion_text"].astype(str).apply(preprocess_text)
print(df_text[["case_id", "clean_opinion"]].head())


In [None]:
from nltk.sentiment.vader import SentimentIntensityAnalyzer

nltk.download("vader_lexicon")

sid = SentimentIntensityAnalyzer()

def sentiment_score(text: str) -> float:
    return sid.polarity_scores(text)["compound"]

df_text["sentiment_score"] = df_text["clean_opinion"].astype(str).apply(sentiment_score)
print(df_text[["case_id", "sentiment_score"]].head())


In [None]:
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.decomposition import LatentDirichletAllocation

# Vectorize the text
vectorizer = CountVectorizer(max_df=0.95, min_df=2, stop_words='english')
dtm = vectorizer.fit_transform(df_text['clean_opinion'])

# Define number of topics
num_topics = 5

# Initialize LDA
lda = LatentDirichletAllocation(n_components=num_topics, random_state=42)
lda.fit(dtm)

# Display topics
for index, topic in enumerate(lda.components_):
    print(f'Topic #{index + 1}:')
    print([vectorizer.get_feature_names_out()[i] for i in topic.argsort()[-10:]])
    print('\n')
