# Imports

In [1]:
import os
from openai import OpenAI
from dotenv import load_dotenv
from typing import List, Tuple
from transformers import AutoTokenizer

load_dotenv()

client = OpenAI(
    api_key=os.environ.get("OPENAI_API_KEY"),
)

model = "gpt-4o-mini"

In [2]:
file_path = '/Users/renatoboemer/code/developer/luthor/data/Memo 2  - Crypto assets disposal - FINISHED .docx'

# Data processing

## Load

In [3]:
import os
import fitz
from docx import Document

def read_file(file_path):
    # Check if file exists
    if not os.path.isfile(file_path):
        raise FileNotFoundError(f"The file {file_path} does not exist.")

    # Get the file extension
    _, file_extension = os.path.splitext(file_path)

    # Handle .txt files
    if file_extension.lower() == '.txt':
        return read_txt(file_path)

    # Handle .docx files
    elif file_extension.lower() == '.docx':
        return read_docx(file_path)

    # Handle .pdf files
    elif file_extension.lower() == '.pdf':
        return read_pdf(file_path)

    else:
        raise ValueError(f"Unsupported file extension: {file_extension}")

In [4]:
def read_txt(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        return file.read()

def read_docx(file_path):
    document = Document(file_path)
    full_text = []
    for paragraph in document.paragraphs:
        full_text.append(paragraph.text)
    return '\n'.join(full_text)


def read_pdf(file_path):
    document = fitz.open(file_path)
    all_text = []
    for page in document:
        text = page.get_text()
        all_text.append(text)
    return '\n'.join(all_text)

In [5]:
text = read_file(file_path)
print(text[:100])

Input:
I would like to seek expert advice regarding our company's upcoming transaction involving the


## Process

In [6]:
import re

In [7]:
def text_segmentation(text: str) -> List[str]:
    """
    Split text into smaller, manageable chunks with consideration for legal memo structures.

    Args:
        text (str): The full text to be segmented.

    Returns:
        List[str]: A list of segmented text chunks.
    """
    # Identify section breaks (e.g. double newlines, headings, and bullet points).
    pattern = r'(?<=\n)(?=\n)|(?<=\n)(?=\s*[\d-]+\s)|(?<=\n)(?=Section \d+|Article \d+)|(?<=\n)(?=\s*-\s)|(?<=\n)(?=\s*\*\s)'

    # Split the text using the defined pattern
    segments = re.split(pattern, text.strip())

    # Clean up the segments to remove any leading/trailing whitespace
    segmented_text = [segment.strip() for segment in segments if segment.strip()]

    return segmented_text


# segmented_text = text_segmentation(text)
# segmented_text

In [9]:
import re
from typing import List
import nltk
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer

# Ensure nltk resources are downloaded during setup or first run
def setup_nltk():
    nltk.download('stopwords', quiet=True)
    nltk.download('punkt', quiet=True)
    nltk.download('wordnet', quiet=True)

setup_nltk()

def tokenize_text(text: str) -> List[str]:
    """
    Tokenize the text into words, considering legal-specific tokens and preprocessing.

    Args:
        text (str): The text to be tokenized.

    Returns:
        List[str]: A list of tokens (words).
    """
    # # Remove unwanted characters, retain alphanumeric and some punctuation relevant to legal text
    # cleaned_text = re.sub(r'[^\w\s.,;:()\'\"-]', ' ', text)

    # Tokenize using NLTK's word tokenizer, which handles punctuation better than simple regex
    tokens = word_tokenize(text)

    # Convert to lowercase to maintain consistency
    tokens = [token.lower() for token in tokens]

    # Remove stopwords specific to legal context if needed
    stop_words = set(stopwords.words('english'))
    tokens = [token for token in tokens if token not in stop_words]

    # Lemmatize tokens to reduce them to their base form
    lemmatizer = WordNetLemmatizer()
    tokens = [lemmatizer.lemmatize(token) for token in tokens]

    return tokens


In [10]:
def clean_special_characters(text: str) -> str:
    """
    Clean up non-informative special characters or artifacts.

    Args:
        text (str): The text from which to remove special characters.

    Returns:
        str: Cleaned text with unnecessary special characters removed.
    """
    # Remove characters not usually found in legal texts
    cleaned_text = re.sub(r'[^\w\s,.!?;:()-]', '', text)

    return cleaned_text

In [11]:
def preserve_structure(text: str) -> str:
    """
    Maintain the document's structural elements, such as headings.

    Args:
        text (str): The text to process for structural preservation.

    Returns:
        str: Text with preserved structure for headings and sections.
    """
    # Keep lines starting with capital words as headings
    structured_text = re.sub(r'(?m)^(?=[A-Z])(.+)$', r'## \1', text)

    return structured_text

In [12]:
def create_chunks(text: str, tokenizer, chunk_size=4096) -> List[str]:
    """
    Creates chunks of text for preprocessing, ensuring each chunk is within the specified size.

    Args:
        text (str): The full text to be chunked.
        tokenizer (object): The tokenizer object to encode and decode text.
        chunk_size (int): The desired size of each chunk.

    Returns:
        List[str]: A list of text chunks.
    """
    # Tokenize the text
    tokens = tokenizer.encode(text, add_special_tokens=True, truncation=True, max_length=chunk_size)

    # Split tokens into chunks of the specified size
    chunks = [tokens[i:i + chunk_size] for i in range(0, len(tokens), chunk_size)]

    # Decode each chunk back into text
    text_chunks = [tokenizer.decode(chunk, skip_special_tokens=True) for chunk in chunks]

    return text_chunks


In [13]:
def preprocess_doc(file_path: str, tokenizer, chunk_size=4096, overlap=0) -> Tuple[str, List[str], str, List[str], str, List[str]]:
    """
    Preprocess a legal document by executing a series of text processing steps, including chunking.

    Args:
        file_path (str): The path to the legal document text file.
        tokenizer (object): The tokenizer object to encode and decode text.
        chunk_size (int): The desired size of each chunk.
        overlap (int): The number of tokens to overlap between chunks.

    Returns:
        Tuple: A tuple containing:
            - Original text (str)
            - Segmented text chunks (List[str])
            - Cleaned text (str)
            - Tokenized words (List[str])
            - Structured text (str)
            - Chunks (List[str])
    """
    # Load text from the file
    text = read_file(file_path)

    # Split text into segments (paragraphs)
    segments = text_segmentation(text)

    # Clean up non-informative special characters
    cleaned_text = clean_special_characters(text)

    # Tokenize the text into words
    tokens = tokenize_text(cleaned_text)

    # Preserve structural elements (headings)
    structured_text = preserve_structure(cleaned_text)

    # Create chunks
    chunks = create_chunks(cleaned_text, tokenizer, chunk_size)

    return text, segments, cleaned_text, tokens, structured_text, chunks


In [14]:
# Using the preprocessor
from transformers import LongformerTokenizer

# Initialise the Longformer tokenizer
tokenizer = LongformerTokenizer.from_pretrained('allenai/longformer-base-4096')

# Call the preprocess_doc function
original_text, segments, cleaned_text, tokens, structured_text, chunks = preprocess_doc(file_path, tokenizer)

print("Original Text:", original_text[:50], "...")
print("---" * 25)
print("Segments:", segments[:1])
print("---" * 25)
print("Cleaned Text:", cleaned_text[:50], "...")
print("---" * 25)
print("Tokens:", tokens[:5])
print("---" * 25)
print("Structured Text:", structured_text[:50], "...")
print("---" * 25)
print("Chunks:", chunks[:1])

Original Text: Input:
I would like to seek expert advice regardin ...
---------------------------------------------------------------------------
Segments: ["Input:\nI would like to seek expert advice regarding our company's upcoming transaction involving the disposal of cryptocurrency assets. Specifically, we are planning to sell a substantial amount of Ethereum (ETH) and convert it into stablecoins due to market volatility. Please highlight the tax implications of that. Do we need to formalise the sale of ETH with a formal agreement?\nFollow up question: What is the volume of the transactions?\nFollow up question: What is the date of the transactions\nFollow up question: What was the price of purchased ETH?/ What was the value of ETH expressed in stablecoin?\nFollow up question: What is the stablecoin pegged to? \nFollow up question: Has this transaction will happen between dependent or independent parties?\nFollow up question: When is the transaction planned for?\nFollow up question

In [15]:
# Tokenize the cleaned_text to inspect the embedding size
sample_tokenized = tokenizer(cleaned_text, return_tensors="pt", max_length=4096, truncation=True)

# Print the size of the embedding
embedding_size = sample_tokenized['input_ids'].size(1)  # Get the number of tokens
print(f"Embedding size: {embedding_size}")

Embedding size: 1517


# Step by Step

In [16]:
import os
from dotenv import load_dotenv
from pinecone import Pinecone, ServerlessSpec
import openai

# Load environment variables from a .env file
load_dotenv()

# Initialize Pinecone client
pinecone_api_key = os.getenv('PINECONE_API_KEY')
index_name = 'luthor-test-nb-0'
pinecone_client = Pinecone(api_key=pinecone_api_key)

# Check if the index exists; if not, create it
if index_name not in pinecone_client.list_indexes().names():
    pinecone_client.create_index(
        name=index_name,
        dimension=1536,  # Ensure this matches the output size of the embedding model
        metric='cosine',
        spec=ServerlessSpec(
            cloud='aws',
            region='us-east-1'
        )
    )
# Retrieve or create the index from Pinecone
index = pinecone_client.Index(index_name)

# Initialize OpenAI client
openai_api_key = os.getenv('OPENAI_API_KEY')
openai.api_key = openai_api_key

print(f"Pinecone client initialized: {pinecone_client is not None}")
print(f"OpenAI client initialized: {openai.api_key is not None}")
print(f"Index '{index_name}' exists or created: {index_name in pinecone_client.list_indexes().names()}")

Pinecone client initialized: True
OpenAI client initialized: True
Index 'luthor-test-nb-0' exists or created: True


In [17]:
def get_embedding(text: str, model: str = "text-embedding-3-small"):
    response = openai.embeddings.create(input=text, model=model)
    return response.data[0].embedding

# Assuming the chunks are obtained from the preprocessing step
chunk_embeddings = [get_embedding(chunk) for chunk in chunks]

# Debug: Print some embeddings to verify
for i, embedding in enumerate(chunk_embeddings):
    print(f"Embedding {i}: {embedding[:1]}...")


# Verify embeddings
print(f"Number of embeddings: {len(chunk_embeddings)}")
print(f"First embedding (first 5 dimensions): {chunk_embeddings[0][:5]}")

Embedding 0: [0.03321181610226631]...
Number of embeddings: 1
First embedding (first 5 dimensions): [0.03321181610226631, 0.00520020117983222, 0.02285817265510559, 0.01826249249279499, -0.0008028248557820916]


In [18]:
# Create vector dictionaries for upsertion
vectors = [
    {
        "id": f"doc_{i}",
        "values": chunk_embeddings[i],
        "metadata": {
            "text": chunks[i],
            "source": "database"
        }
    }
    for i in range(len(chunks))
]


# Upsert vectors into Pinecone index
index.upsert(vectors=vectors)

# Verify upsertion
print(f"Successfully upserted {len(vectors)} vectors to index {index_name}.")


Successfully upserted 1 vectors to index luthor-test-nb-0.


In [19]:
def query_pinecone(query_vector: list, top_k: int = 3):
    # Perform a query on the Pinecone index
    response = index.query(vector=query_vector, top_k=top_k, include_values=True, include_metadata=True)
    matches = response['matches']

    # Debugging print to verify structure
    # print(f"Query response structure: {matches}")

    return matches

# Define a question and generate its embedding
question = "What are the essential elements of a contract in English law?"
query_embedding = get_embedding(question)

# Query Pinecone index
matches = query_pinecone(query_embedding)

# Verify query results
print(f"Number of matches: {len(matches)}")
for i, match in enumerate(matches):
    # Check if metadata exists and is accessible
    if 'metadata' in match and 'text' in match['metadata']:
        print(f"Match {i}: {match['metadata']['text'][:1]}...")  # Print the first character
    else:
        print(f"Match {i} has no metadata or text")

Number of matches: 2
Match 0: I...
Match 1: I...


In [20]:
from openai import OpenAI

# Initialize the OpenAI client
client = OpenAI(api_key=openai_api_key)

def generate_answer(question: str, context: str, model: str = 'gpt-4o-mini'):
    if not context.strip():
        return "I don't know."

    # Identify context source
    if "database" in context:
        context_source = "from the database"
    else:
        context_source = "from my general knowledge"

    prompt = f"""You are an experienced lawyer specializing in extracting and interpreting information from legal
    documents, past memos, and other records to answer questions accurately. Your goal is to provide the most
    reliable and detailed advice possible based on the available information.

    Context (Source: {context_source}): {context}

    Question: {question}

    Guidelines:
    - Use the context provided to support your answer.
    - If the answer is not available, suggest potential research avenues or considerations that may help.
    - Structure your response to address the question clearly and logically.
    - If unable to answer, state 'I don't know,' but also indicate why (e.g., insufficient context, unclear question).
    """

    # Construct the message prompt for the chat completion
    messages = [
        {"role": "system", "content": "You are an experienced lawyer specializing in extracting and interpreting information from legal documents to provide accurate advice."},
        {"role": "user", "content": f"Context: {context}"},
        {"role": "user", "content": f"Question: {question}"}
    ]

    # Call the chat completions API
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        max_tokens=250,
        temperature=0
    )

    # Extract the assistant's message from the response
    generated_answer = response.choices[0].message.content.strip()
    return generated_answer

# Compile context from retrieved matches
retrieved_texts = [match['metadata']['text'] for match in matches if 'metadata' in match and 'text' in match['metadata']]
context = "\n".join(retrieved_texts)

# Generate an answer
answer = generate_answer(question, context)

# Verify generated answer
print(f"Generated Answer: {answer}")


Generated Answer: In English law, the essential elements of a contract are as follows:

1. **Offer**: One party must make a clear and definite offer to enter into an agreement.

2. **Acceptance**: The other party must accept the offer in its exact terms. Acceptance must be communicated to the offeror.

3. **Consideration**: There must be something of value exchanged between the parties. This can be money, services, goods, or a promise to do (or not do) something.

4. **Intention to Create Legal Relations**: The parties must intend for the agreement to be legally binding. In commercial agreements, this intention is usually presumed.

5. **Capacity**: The parties must have the legal capacity to enter into a contract. This generally means they must be of legal age (18 years or older in most cases) and of sound mind.

6. **Legality**: The purpose of the contract must be lawful. Contracts that involve illegal activities are not enforceable.

7. **Certainty**: The terms of the contract must 

In [21]:
def analyze_answer(answer: str, context: str):
    if any(chunk in answer for chunk in context.split("\n")):
        print("Answer derived from database context.")
    else:
        print("Answer likely generated by the language model.")

In [22]:
def query_pinecone_with_score(query_vector: list, top_k: int=3):
    response = index.query(vector=query_vector, top_k=top_k, include_values=True, include_metadata=True)
    matches = response['matches']

    # Determine if the highest score is above a certain threshold
    if matches and matches[0]['score'] > 0.9:
        print("High confidence in database-derived answer.")
    else:
        print("Answer may be more LLM-derived due to lower confidence.")

    return matches

In [28]:
# Analyse the answer
analyze_answer(answer, context)

Answer derived from database context.


In [29]:
# Query Pinecone
matches = query_pinecone_with_score(query_embedding)

# Compile context from matches
retrieved_texts = [match['metadata']['text'] for match in matches if 'metadata' in match and 'text' in match['metadata']]
context = "\n".join(retrieved_texts)

Answer may be more LLM-derived due to lower confidence.
