
# NYC ACRIS Legal Data Pipeline with AI Integration

I built an ELT pipeline to explore how public legal-property records can be structured and transformed for downstream analytics.

I used **NYC’s ACRIS dataset**  ([real estate deed filings](https://data.cityofnewyork.us/City-Government/ACRIS-Real-Property-Legals/8h5j-fqxa/about_data)) as a proxy for litigation-adjacent legal data.

This notebook illustrates how to build an  a dbt‑managed ELT pipeline with a vector search layer and an AI model:

1. **Data pulling**: Download real ACRIS court-records using NYC’s public API.
2. **Model building:** A modular dbt pipeline to transform the data.
3. **Pipeline running:** Run the pipeline with DuckDB database.

4. **Data ingestion:** A dbt model reads the raw ACRIS CSV and exposes a cleaned view.  
5. **Preprocessing:** Convert each record into a human‑readable description, using the data dictionary to map codes to meaningful values.  
6. **Vectorisation:** Train a TF‑IDF model to embed each description and build a nearest neighbour index.  
7. **Retrieval & generation:** Use the index to retrieve context for a user query and pass this context to an LLM to generate an answer.

With dbt managing the ELT and data quality, this layered architecture provides a robust foundation for building question‑answering systems over structured open data.


Potential improvements include using a neural embedding model for better semantic matching, and exposing the functionality via an API or web interface.  


## dbt Data Pipeline

In [55]:
!pip install dbt-duckdb


[31mERROR: Operation cancelled by user[0m[31m
[0mTraceback (most recent call last):
  File "/usr/local/lib/python3.11/dist-packages/pip/_internal/cli/base_command.py", line 179, in exc_logging_wrapper
    status = run_func(*args)
             ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pip/_internal/cli/req_command.py", line 67, in wrapper
    return func(self, options, args)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pip/_internal/commands/install.py", line 447, in run
    conflicts = self._determine_conflicts(to_install)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pip/_internal/commands/install.py", line 578, in _determine_conflicts
    return check_install_conflicts(to_install)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/dist-packages/pip/_internal/operations/check.py", line 101, in check_install_conflicts
    package_set, _

In [56]:
import os

os.makedirs("/root/.dbt", exist_ok=True)

profile = """
legal_demo:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: /content/legal_demo/legal_demo.duckdb
"""

with open("/root/.dbt/profiles.yml", "w") as f:
    f.write(profile.strip())

print("DuckDB profile has been created")


DuckDB profile has been created


In [57]:
!dbt init legal_demo

Traceback (most recent call last):
    _processoptions(sys.warnoptions)
    _setoption(arg)
    import re
  File "/usr/lib/python3.11/re/__init__.py", line 124, in <module>
    import enum
  File "<frozen importlib._bootstrap>", line 1176, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1147, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 690, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 936, in exec_module
  File "<frozen importlib._bootstrap_external>", line 1069, in get_code
  File "<frozen importlib._bootstrap_external>", line 729, in _compile_bytecode
KeyboardInterrupt
[0m17:03:16  Running with dbt=1.10.6
[0m17:03:16  A project called legal_demo already exists here.


In [58]:
!pip install sodapy

Collecting sodapy
  Downloading sodapy-2.2.0-py2.py3-none-any.whl.metadata (15 kB)
Downloading sodapy-2.2.0-py2.py3-none-any.whl (15 kB)
Installing collected packages: sodapy
Successfully installed sodapy-2.2.0


In [None]:
import time
from sodapy import Socrata
import pandas as pd

client = Socrata("data.cityofnewyork.us", None, timeout=60)
dataset_id = "8h5j-fqxa"

limit = 50000
max_rows = 500_000
offset = 0
all_rows = []

while offset < max_rows:
    batch_limit = min(limit, max_rows - offset)
    # retry logic
    for attempt in range(5):
        try:
            batch = client.get(dataset_id, limit=batch_limit, offset=offset)
            break
        except Exception:
            time.sleep(5 * (attempt + 1))
    else:
        raise RuntimeError(f"Failed after 5 retries at offset {offset}")

    if not batch:
        break

    all_rows.extend(batch)
    offset += len(batch)
    print(f"fetched {offset} rows")

df = pd.DataFrame.from_records(all_rows)
df.to_csv("first_500k_acris.csv", index=False)
print("done, total rows:", len(df))


In [None]:
sql = """
SELECT
  document_id,
  record_type,
  borough,
  block,
  lot,
  property_type,
  street_number,
  street_name,
  good_through_date
FROM read_csv_auto('/content/first_500k_acris.csv')
"""
os.makedirs("/content/legal_demo/models/acris", exist_ok=True)
with open("/content/legal_demo/models/acris/acris_cases.sql", "w") as f:
    f.write(sql.strip())


In [None]:
path = "/content/legal_demo/dbt_project.yml"
with open(path, "r") as f:
    lines = f.readlines()
for i, line in enumerate(lines):
    if "models:" in line:
        lines.insert(i + 1, "  legal_demo:\n    acris:\n      materialized: view\n")
        break
with open(path, "w") as f:
    f.writelines(lines)


In [None]:
!cd /content/legal_demo && dbt run --select acris_cases


# Vector DB & AI Pipeline

This notebook demonstrates how to extend a traditional ELT pipeline built with **dbt** and **DuckDB** into a retrieval‑augmented AI workflow.

Using the data dictionary, each record is transformed into a natural‑language description.  

A simple **TF‑IDF** based vector index is then built to support semantic search. Then, the vector index is connected to a large language model (LLM) so it can answer questions with context from the retrieved records.



## 1. Load the dataset

The existing dbt pipeline ingests the raw CSV into DuckDB and exposes it as a view. Here, the same file is read directly into a pandas DataFrame.  

(*To read from DuckDB, use the Python `duckdb` package to run a SQL query against the dbt‑generated database instead of reading the CSV directly.*)


In [1]:

import pandas as pd

# Path to the CSV produced by the Socrata extract / dbt pipeline
csv_path = '/content/first_500k_acris.csv'

# Load only selected columns (these correspond to the view defined in dbt)
dtypes = {
    'document_id': str,
    'record_type': str,
    'borough': str,
    'block': str,
    'lot': str,
    'easement': str,
    'partial_lot': str,
    'air_rights': str,
    'subterranean_rights': str,
    'property_type': str,
    'street_number': str,
    'street_name': str,
    'good_through_date': str,
    'unit': str
}

# Read the dataset into a DataFrame
acris_df = pd.read_csv(csv_path, dtype=dtypes)

# Display first few rows
print('Dataset shape:', acris_df.shape)
acris_df.head()


Dataset shape: (500000, 14)


Unnamed: 0,document_id,record_type,borough,block,lot,easement,partial_lot,air_rights,subterranean_rights,property_type,street_number,street_name,good_through_date,unit
0,2025061800453002,L,1,914,1,N,E,N,N,AP,201,EAST 33RD STREET,2025-06-30T00:00:00.000,
1,2025052100948004,L,1,1118,1305,N,E,N,N,SC,50,WEST 66TH STREET,2025-06-30T00:00:00.000,50N
2,2025061300503004,L,3,3209,21,N,E,N,N,D6,1134,WILLOUGHBY AVENUE,2025-06-30T00:00:00.000,
3,2025061100086003,L,3,2539,1206,N,E,N,N,SC,69,JAVA STREET,2025-06-30T00:00:00.000,4A
4,2025060900686005,L,1,1283,1002,N,E,N,N,CC,12,EAST 48 STREET,2025-06-30T00:00:00.000,TIMEA



## 2. Preprocess records into natural language

To build a vector index we need to convert each structured row into a piece of text.

The ACRIS Real Property Legals data dictionary and dataset guide indicate that `borough` is coded 1–5 for Manhattan, Bronx, Brooklyn, Queens and Staten Island.  

Mapping each code to its human‑readable borough name and building a sentence describing the lot. You can enrich the description further with codes from the various control tables (document types, property type codes, etc.).

For this pipeline, only readily available fields are used.


In [2]:

# Define a mapping from borough code to name
borough_map = {
    '1': 'Manhattan',
    '2': 'Bronx',
    '3': 'Brooklyn',
    '4': 'Queens',
    '5': 'Staten Island'
}

# Function to convert a row into a natural‑language description
def record_to_text(row):
    parts = []
    parts.append(f"Document ID {row['document_id']}")
    parts.append(f"Record type {row['record_type']}")
    # Translate borough code if possible
    borough_name = borough_map.get(row['borough'], row['borough'])
    parts.append(f"Borough {borough_name}")
    parts.append(f"Block {row['block']}")
    parts.append(f"Lot {row['lot']}")
    if pd.notna(row.get('property_type')):
        parts.append(f"Property type {row['property_type']}")
    if pd.notna(row.get('street_number')):
        parts.append(f"Street number {row['street_number']}")
    if pd.notna(row.get('street_name')):
        parts.append(f"Street name {row['street_name']}")
    if pd.notna(row.get('unit')):
        parts.append(f"Unit {row['unit']}")
    # The good through date marks the last update for this record
    parts.append(f"Good through date {row['good_through_date']}")
    return ', '.join(parts)

# Create a new column with the natural‑language descriptions
acris_df['text'] = acris_df.apply(record_to_text, axis=1)

# Show a sample of the transformed text
acris_df[['text']].head()


Unnamed: 0,text
0,"Document ID 2025061800453002, Record type L, B..."
1,"Document ID 2025052100948004, Record type L, B..."
2,"Document ID 2025061300503004, Record type L, B..."
3,"Document ID 2025061100086003, Record type L, B..."
4,"Document ID 2025060900686005, Record type L, B..."



## 3. Build a vector index

A vector database stores high‑dimensional embeddings instead of raw text.  

Because this environment has no internet access, we use the TF‑IDF model from scikit‑learn to turn each description into a sparse vector.  

For larger projects, use a neural embedding mode.

Training  a `NearestNeighbors` model to perform cosine‑similarity search over the TF‑IDF vectors.

In [4]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.neighbors import NearestNeighbors

# Fit a TF‑IDF vectorizer on the text
vectorizer = TfidfVectorizer(stop_words='english')
X = vectorizer.fit_transform(acris_df['text'])

# Build a nearest neighbor index using cosine distance
nn_model = NearestNeighbors(n_neighbors=5, metric='cosine')
nn_model.fit(X)

print('Vector index built on', X.shape[0], 'documents with', X.shape[1], 'features')

Vector index built on 500000 documents with 322184 features



## 4. Query the vector index

To answer a question, we encode the query using the same TF‑IDF vectorizer and retrieve the nearest neighbours.  
The following helper function returns the top results along with their cosine distances.  

In [5]:

def search_acris(query, k=5):
    query_vec = vectorizer.transform([query])
    distances, indices = nn_model.kneighbors(query_vec, n_neighbors=k)
    results = acris_df.iloc[indices[0]].copy()
    results['distance'] = distances[0]
    return results

# Example query
query = 'Lot on 5th Avenue Manhattan'
results = search_acris(query, k=5)

# Display the results
results[['document_id','borough','block','lot','street_number','street_name','property_type','distance']]


Unnamed: 0,document_id,borough,block,lot,street_number,street_name,property_type,distance
329122,2024082700166001,1,551,1,2,5TH AVENUE,SP,0.615111
236726,2024122000498001,1,566,1,11,5TH AVENUE,SP,0.620591
307323,2024102900827001,1,820,46,138,5TH AVENUE,CR,0.624846
331490,2024090901028001,3,1005,6,431,5TH AVENUE,F1,0.627258
443463,2024062000513001,3,1101,5,445,5TH AVENUE,CR,0.631706


*Distance 0 means identical while higher values are less similar.*


## 5. Integrate with an AI model

After getting the most relevant records,a question‑answering system is built so that a user can feed these records into a large language model (LLM) and ask it to generate a response.  

Most LLMs do not have direct access to the database, so the model must be provided with both the user question and the retrieved context.


### Integrating with Google's Generative AI models

Using Google's Generative AI models, such as Gemini, which are available through the `google.generativeai` library.

A Google Cloud account and an API key is needed. Storing the key securely in Colab secrets named `GOOGLE_API_KEY`.

In [21]:
import google.generativeai as genai
from google.colab import userdata

# Get your API key from Colab secrets
try:
    GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')
    if not GOOGLE_API_KEY:
        raise ValueError("Google API key not found in Colab secrets.")
except Exception as e:
    print(f"Error getting API key from Colab secrets: {e}")
    print("Please store your Google API key in Colab secrets named 'GOOGLE_API_KEY'")
    GOOGLE_API_KEY = None

if GOOGLE_API_KEY:
    genai.configure(api_key=GOOGLE_API_KEY)

    # Initialize the Gemini model (Other models at: https://ai.google.dev/models/generative/0
    try:
        gemini_model = genai.GenerativeModel('gemini-1.5-flash-latest')
    except Exception as e:
        print(f"Error initializing the model: {e}")
        print("Please check if the model name is correct and available.")
        gemini_model = None
else:
    gemini_model = None

Creating a function similar to `ask_llm` that uses the Gemini model to answer questions based on the retrieved context:

In [22]:
def ask_gemini(question: str, top_k: int = 5) -> str:
    """
    Retrieve top-k ACRIS records semantically related to the question,
    then ask a Gemini chat model to answer using that context.
    """
    if not gemini_model:
        return "Gemini model is not initialized. Please check API key and model name."

    # 1) Vector search (function already defined)
    retrieved = search_acris(question, k=top_k)
    context_text = "\n".join(retrieved["text"].tolist())

    # 2) Build the prompt for Gemini
    system_instruction = (
        "You are an assistant who answers questions about New York City real estate records.\n"
        "Use only the information in the provided context to answer factually.\n"
        "If the answer is not present in the context, say that you don’t know."
    )

    prompt_parts = [
        system_instruction,
        "Context:",
        context_text,
        f"Question: {question}",
        "Answer:"
    ]

    # 3) Call the Gemini API
    try:
        response = gemini_model.generate_content(prompt_parts)
        return response.text.strip()
    except Exception as e:
        return f"An error occurred while calling the Gemini API: {e}"

In [23]:
# Interct with the database
answer_gemini = ask_gemini("What borough is document 2025061800453002 in?")
print(answer_gemini)

Manhattan
