In [None]:
from sentence_transformers import SentenceTransformer
import lancedb
from openai import OpenAI
import re
import pandas as pd
import json
import time
from fuzzy_json import loads as fuzzy_loads

from google import genai
from google.genai import types
from dotenv import load_dotenv
from tqdm.notebook import tqdm
import os

import hashlib
from pathlib import Path
from datetime import datetime

from pydantic import BaseModel
from typing import List, Dict, Optional, Any

def citation_hash(source_string: str, n_digits: int = 6) -> str:
    hash_object = hashlib.sha256(source_string.encode())
    hex_digest = hash_object.hexdigest()
    hash_int = int(hex_digest, 16)
    numeric_id = hash_int % (10**n_digits)
    return f"{numeric_id:0{n_digits}d}"

from sqlmodel import Field, Session, SQLModel, create_engine, select
from sqlalchemy import and_, or_
load_dotenv('env_var')

In [None]:
class ExtractionDBDocument(SQLModel, table=True):
    __tablename__ = "extractions"
    p_key: Optional[int] = Field(default=None, primary_key=True)
    id: str
    citation: str
    title: str
    text: str
    type: str
    elements: str
    chunks: str
    document_ids: str
    run_id: str | None = ''
    additional_fields: str | None = None

In [None]:
client = OpenAI(base_url="http://localhost:1234/v1", api_key="lm-studio")
def call_llm(query, temperature=0.35, seed=42, model="gemma-3-12b-it-qat"):
    completion = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "user", "content": query}
        ],
        temperature=temperature,
        seed=seed,
    )
    return completion.choices[0].message.content

model = "gemini-2.0-flash"
model = "gemini-2.5-flash-lite-preview-06-17"
total_tokens = list()

def call_llm_flash(query, temperature=0.1, seed=42, max_tokens=8193 ):
    client = genai.Client(api_key=os.environ['GEMINI_API_KEY'])
    retries = 3
    time_delay = 15
    for i in range(retries):
        try:
            response = client.models.generate_content(
                model=model,
                contents=[query],
                config=types.GenerateContentConfig(
                    max_output_tokens=max_tokens,
                    temperature=temperature,
                    seed=seed
                )
            )
            break
        except Exception as e:
            print(e)
            print(f"Retries left: {retries - i}")
            time.sleep(time_delay)
            continue



    total_tokens.append({'prompt_tokens':response.usage_metadata.prompt_token_count,
                         'completion_tokens':response.usage_metadata.candidates_token_count,
                         'total_tokens':response.usage_metadata.total_token_count,
                         'timestamp':datetime.now().strftime("%Y_%m_%d_%H_%M_%S")})

    return response.text

In [None]:
data_folder = Path(os.environ['DATA_FOLDER'])
project_folder = Path(os.environ['PROJECT_FOLDER'])
database_folder = Path(os.environ['DATABASE_FOLDER'])
index_folder = Path(os.environ['INDEX_FOLDER'])
research_json_folder = project_folder.joinpath('research_json')

document_database_name = "documents.sqlite"
insight_db_sql = 'insights'
search_result_table_sql = 'search_results'
metadata_table_sql = 'documents'

# db_path = database_folder.joinpath(document_database_name).absolute()
# engine = create_engine(f'sqlite:///{db_path}')
database_location = Path('/Users/jameslittiebrant/Data/Mycroft/databases')
db_path = database_location.joinpath('documents.sqlite').absolute()
engine = create_engine(f'sqlite:///{db_path}')

# Session = sessionmaker(bind=engine)
session = Session()

index = lancedb.connect(index_folder)

index_table = 'crs_reports'
table = index.open_table(index_table)

# index_sr_location = 'crs_reports'
# table_sr_name = 'sections'
# index_folder = project_folder.joinpath(f'indexes/{index_sr_location}')
#
# index_sr = lancedb.connect(index_folder)
# table_sr = index_sr.open_table(table_sr_name)
encoder_model = 'nomic-ai/nomic-embed-text-v1.5'
device = 'mps'
encoder = SentenceTransformer(encoder_model, device=device, trust_remote_code=True)

citation_hash_length = 6
insight_citation_prefix = 'INST_'

with open('insight_prompts_update.json','r') as f:
    prompts = json.load(f)

insight_identification_prompt = prompts['insight_identification']
insight_extraction_prompt = prompts['insight_extraction_instructions']

run_id = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")

In [None]:
prompts.keys()

In [None]:
def convert_response_to_json(response_text: str) -> List[dict]:
    '''

    :param response_text: string response from the llm that only has json data either formatted with ```json...```, or ONLY json data
    :return: list of extracted items
    '''
    json_re = re.compile(r'```json(.+?)```', flags=re.DOTALL)
    if json_re.search(response_text):
        json_text = json_re.search(response_text).group(1)
    else:
        json_text = response_text
    response_json = fuzzy_loads(json_text)
    return response_json

def create_citation(item: dict, fields: List[str], prefix='INST_', number_of_digits=6):
    citation_string = ""
    for field in fields:
        citation_string += f" {field}: {item[field]}"
    citation_number = citation_hash(citation_string, n_digits=number_of_digits)
    return f"{prefix}{citation_number}"

In [None]:
class ExtractionDocument(BaseModel):
    id: str
    citation: str
    title: str
    text: str
    type: str
    elements: List[str]
    chunks: List[str]
    document_ids: List[str]
    run_id: str | None = ''
    additional_fields: Any | None = None

def format_sections_to_chunk(sections, separator='\n\n'):
    text = list()
    for section in sections:
        if 'heading' in section.type:
            heading_value = int(section.type.split('_')[1])
            header_value = '#'*heading_value
            section_text = f"{header_value} {section.content.strip()}".strip()
        else:
            section_text = f"{section.content}".strip()
        citation = f"""<{section.document_id}_{section.chunk_position}__{section.intra_chunk_position}>"""
        section_text = f"{citation}\n{section_text}\n{citation}"
        text.append(section_text)
    text = separator.join(text)
    text_total_citation_start = f"""<{section.document_id}_{section.chunk_position}>"""
    text_total_citation_end = f"""</{section.document_id}_{section.chunk_position}>"""

    text = f"{text_total_citation_start}\n{text}\n{text_total_citation_end}"
    return text

def create_grounding_documents_section_cite(db_documents, search_results):
    formatted_text = format_sections_to_chunk(db_documents)
    document_id_hash = citation_hash(formatted_text, n_digits=10)
    document_id = f"DOC_{document_id_hash}"
    document = ExtractionDocument(
        id=document_id,
        citation=document_id,
        title=search_results.title,
        text=formatted_text,
        type="grounding",
        elements=list(set([_element.id for _element in db_documents])),
        chunks=sorted(list(set([str(_element.chunk_position) for _element in db_documents]))),
        document_ids=list(set([str(search_results.document_id)])),
        additional_fields={}
    )
    return document


In [None]:
def create_grounding_document(search_result):
    results = list()
    with (Session(engine) as session):
        chunks = select(SectionDBModel).filter(
            and_(
                SectionDBModel.document_id==search_result.document_id,
                SectionDBModel.id.in_(search_result.element_ids)
            )
        ).limit(100)
        chunks = session.exec(chunks)
        for row in chunks:
            results.append(SectionDBModel.model_validate(row))
    document = create_grounding_documents_section_cite(results, search_result)
    return document

In [None]:
def create_batches(batch_items: List, batch_size: int, batch_flex: int = 1) -> List[dict]:
    '''
    Keeping this simple and easy to read
    :param batch_items: A list of items that you want to batch
    :param batch_size: How big the batch size is
    :param batch_flex: At what divisor should be applied to the batch size be stepped down to avoid a batch of a single item
        e.g. a batch size of 5 and the last batch is 2, then you will have two batches of 4 and 3. If 6 remain, then 3 and 3
        This is future functionality to build in.
    :return: list of batch items
    '''
    number_of_items = len(batch_items)
    batches = list()
    for start in range(0, number_of_items, batch_size):
        batch = batch_items[start:start+batch_size]
        batches.append(batch)
    return batches

In [None]:
def format_evidence(insight):
    evidence = list()
    if isinstance(insight['evidenceFor'],list):
        for evidence_for in insight['evidenceFor']:
            evidence.append(f"""**Evidence For Insight:**
    *Description:* {evidence_for['description']}
    *Details:* {evidence_for['details']}
    *Methodology:* {evidence_for['methodology']}
    *Source:* {evidence_for['source']}""")

    if isinstance(insight['evidenceAgainst'], list):
        for evidence_against in insight['evidenceAgainst']:
            evidence.append(f"""**Evidence Against Insight:**
    *Description:* {evidence_against['description']}
    *Details:* {evidence_against['details']}
    *Methodology:* {evidence_against['methodology']}
    *Source:* {evidence_against['source']}""")
    if evidence:
        return '\n'.join(evidence).strip()
    else:
        return ''

def format_reasoning(insight):
    _reasoning = list()
    if insight['reasoningFor'] != 'N/A':
        _reasoning.append(f"""*Reasoning For Insight:* {insight['reasoningFor']}""")
    if insight['reasoningAgainst'] != 'N/A':
        _reasoning.append(f"""*Reasoning Against Insight:* {insight['reasoningAgainst']}""")
    if _reasoning:
        return '\n'.join(_reasoning).strip()
    else:
        return ''

def format_strength(insight):
    strength_assessment = insight['strength']
    return f"""*Assessment:* {strength_assessment['assessment']}
    *Confidence:* {strength_assessment['confidence']}
    *Plausibility:* {strength_assessment['plausibility']}"""

def format_implications(insight):
    implications = insight['implications']
    return f"""
    *Insight Utility:* {implications['use']}
        *If True:* {implications['ifTrue']}
        *If False:* {implications['ifFalse']}"""

def format_full_insight(insight):
    evidence = format_evidence(insight)
    reasoning = format_reasoning(insight)
    strength_of_insight = format_strength(insight)
    implications = format_implications(insight)

    evidence_reasoning = list()
    if reasoning:
        evidence_reasoning.append(reasoning.strip())
    if evidence:
        evidence_reasoning.append(evidence.strip())
    evidence_reasoning = '\n'.join(evidence_reasoning)

    insight_text = f"""*Insight Title:* {insight['insight_title']}
*Insight Citation:* {insight['insight_citation']}
*Source Citations:* {', '.join(insight['citations'])}
*Insight Statement:* {insight['statement']}
*Insight Explanation:* {insight['explanation']}
{evidence_reasoning}
*Insight Strength Assessment:* {strength_of_insight}
*Implications:* {implications}
"""
    return insight_text.strip()

In [None]:
def format_insight_text(insight):
    _insight_data = insight.insight_data
    insight_statements = '\n\n'.join([_insight['statement'] for _insight in _insight_data])
    related_citations = ', '.join(insight.related_citations)
    insight_explanations = '\n\n'.join([_insight['explanation'] for _insight in _insight_data])
    insight_text = f"""**Insight Name:** {insight.insight_name}
**Insight Citation:** {insight.citation}
**Insight Type:** {insight.insight_type}
**Insight Synopsis:** {insight.insight_synopsis}
**Insight Description:** {insight_statements}
**Insight Explanation:** {insight_explanations}
**Insight Source Citations:** {related_citations}"""
    return insight_text

In [None]:
def extract_insights(insights: List,
                     document: ExtractionDocument,
                     batch_size: int = 5,
                     temperature: float = 0.2,
                     max_tokens: int = 10000) -> List[ExtractionDocument]:
    insight_extraction_batches = list()
    for i in range(0, len(insights), batch_size):
        insight_extraction_batches.append(insights[i:i+batch_size])
    print("Number of batches: ", len(insight_extraction_batches))

    all_insights = list()
    for batch in insight_extraction_batches:
        insights_formatted = [format_insight(insight) for insight in batch]
        insights_formatted = '\n\n'.join(insights_formatted)
        insight_details_prompt = prompts['insight_extraction_instructions'].format(insights=insights_formatted,
                                                                                 document=document.text)

        insight_extraction = call_llm_flash(insight_details_prompt, temperature=temperature, max_tokens=max_tokens)
        extracted_insights = convert_response_to_json(insight_extraction)
        for extracted_insight in extracted_insights:
            insight = ExtractionDocument(
                id = extracted_insight['insight_citation'],
                title=extracted_insight['insight_title'],
                citation=extracted_insight['insight_citation'],
                elements=extracted_insight['citations'],
                text=format_full_insight(extracted_insight),
                type='insight',
                chunks=[],
                document_ids=document.document_ids,
                run_id=document.run_id,
                additional_fields=extracted_insight
            )
            all_insights.append(insight)
    return all_insights

In [None]:
def llm_identify_insights(document: ExtractionDocument, identification_prompt: str) -> str:
    identification_prompt_formatted = identification_prompt.format(document=document.text)
    identified_insights = call_llm_flash(identification_prompt_formatted, temperature=0.2, max_tokens=1000)
    return identified_insights

def process_identified_insights(identified_insights: str,
                                document: ExtractionDocument,
                                insight_citation_prefix: str = "INST_",
                                insight_citation_hash_length: int = 6) -> List:

    identified_insights_json = convert_response_to_json(identified_insights)
    insights = list()
    for _insight in identified_insights_json:
        _insight['run_id'] = document.run_id
        _insight['citation'] = create_citation(_insight,
                                              fields=['insight_type','insight_name','insight_synopsis'],
                                              prefix=insight_citation_prefix,
                                              number_of_digits=insight_citation_hash_length)
        _insight['type'] = 'insight'
        insights.append(_insight)
    return insights

In [None]:
def format_insight(insight):
    return f"""*Insight Citation:* {insight['citation']}\n*Insight Type:* {insight['insight_type']}\n*Insight Name:* {insight['insight_name']}\n*Insight Description:* {insight['insight_synopsis']}\n*Found in:* {', '.join(insight['related_citations'])}"""

In [None]:
# from dataschemas import ChunkDBSchema, DocumentDBModel, SectionDBModel
from datamodels import ChunkLanceModel, ChunkDBModel, SectionDBModel

In [None]:
research_path_id = '2025_07_12_13_18_05'

In [None]:
query = "Rural Broadband Expansion for economic growth."
query_vec = encoder.encode(query)

In [None]:
search_results = table.search(query_vec).limit(5).to_list()
search_results = [ChunkLanceModel(**result) for result in search_results]

In [None]:
grounding_documents = [create_grounding_document(search_result) for search_result in search_results]
for document in grounding_documents:
    document.run_id = run_id

## Insight Extraction

In [None]:
search_insights = list()

In [None]:
for document in tqdm(grounding_documents):
    insights = llm_identify_insights(document, prompts['insight_identification'])
    insights = process_identified_insights(insights, document)
    print(len(insights))
    all_insights = extract_insights(insights, document, batch_size=6, temperature=0.2, max_tokens=10000)
    search_insights.extend(all_insights)

In [None]:
len(search_insights), [len(_doc.text.split(' ')) for _doc in grounding_documents], sum([len(_doc.text.split(' ')) for _doc in grounding_documents])

In [None]:
research_database_location = Path('/Users/jameslittiebrant/Data/Mycroft/databases')
research_db_path = research_database_location.joinpath('research.sqlite').absolute()
research_engine = create_engine(f'sqlite:///{research_db_path}')

SQLModel.metadata.create_all(research_engine)

In [None]:
def prep_extraction_for_insert(extraction):
    extraction = extraction.model_dump()
    extraction['elements'] = '|'.join(extraction['elements'])
    extraction['chunks'] = '|'.join(extraction['chunks'])
    extraction['document_ids'] = '|'.join(extraction['document_ids'])
    extraction['additional_fields'] = json.dumps(extraction['additional_fields'])
    return extraction

In [None]:
[len(_doc.text.split(' ')) for _doc in search_insights], sum([len(_doc.text.split(' ')) for _doc in search_insights])

In [None]:
to_insert = [prep_extraction_for_insert(_extraction) for _extraction in search_insights]

In [None]:
with Session(research_engine) as session:
    session.bulk_insert_mappings(ExtractionDBDocument, to_insert)
    session.commit()

In [None]:
engine.dispose()

In [None]:
research_engine.dispose()