In [2]:
import json
import pprint
from pathlib import Path

# Method 1: Basic JSON loading
def load_and_explore_json(file_path):
    """Load JSON file and explore its structure"""
    
    # Load the JSON file
    with open(file_path, 'r') as file:
        data = json.load(file)
    
    # Basic exploration
    print(f"File: {file_path}")
    print(f"Type: {type(data)}")
    print(f"Size: {len(data) if hasattr(data, '__len__') else 'N/A'}")
    
    return data

# Method 2: Pretty print structure
def explore_json_structure(data, max_depth=2, current_depth=0):
    """Recursively explore JSON structure"""
    
    if current_depth > max_depth:
        return "..."
    
    if isinstance(data, dict):
        result = {}
        for key, value in data.items():
            if isinstance(value, (dict, list)):
                result[key] = f"{type(value).__name__}({len(value)} items)"
            else:
                result[key] = f"{type(value).__name__}: {str(value)[:50]}..."
        return result
    
    elif isinstance(data, list):
        if len(data) == 0:
            return "Empty list"
        elif len(data) == 1:
            return [explore_json_structure(data[0], max_depth, current_depth + 1)]
        else:
            return [
                explore_json_structure(data[0], max_depth, current_depth + 1),
                f"... and {len(data) - 1} more items"
            ]
    
    else:
        return f"{type(data).__name__}: {str(data)[:50]}..."

# Example usage:
file_path = "data/conversations.json"

# Load the data
data = load_and_explore_json(file_path)

# Explore structure
print("\n=== JSON Structure ===")
structure = explore_json_structure(data)
pprint.pprint(structure, width=80, depth=3)

# If it's a list, show first few items
if isinstance(data, list):
    print(f"\n=== First item details ===")
    if len(data) > 0:
        pprint.pprint(data[0], width=80, depth=2)

# If it's a dict, show keys and sample values
elif isinstance(data, dict):
    print(f"\n=== Dictionary keys ===")
    for key, value in data.items():
        print(f"{key}: {type(value).__name__}")
        if isinstance(value, (list, dict)) and len(value) > 0:
            print(f"  Sample: {str(value)[:100]}...")

File: data/conversations.json
Type: <class 'list'>
Size: 690

=== JSON Structure ===
[{'account': 'dict(1 items)',
  'chat_messages': 'list(8 items)',
  'created_at': 'str: 2024-05-28T13:05:48.783430Z...',
  'name': 'str: Monetizing Creativity with Blockchain AGI...',
  'updated_at': 'str: 2024-05-28T13:12:45.115241Z...',
  'uuid': 'str: b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312...'},
 '... and 689 more items']

=== First item details ===
{'account': {'uuid': '2fc29045-9a0b-488f-af46-48e235f655ea'},
 'chat_messages': [{...}, {...}, {...}, {...}, {...}, {...}, {...}, {...}],
 'created_at': '2024-05-28T13:05:48.783430Z',
 'name': 'Monetizing Creativity with Blockchain AGI',
 'updated_at': '2024-05-28T13:12:45.115241Z',
 'uuid': 'b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312'}


# Parse into Llamaindex document

In [None]:
from typing import List, Dict, Any
from datetime import datetime
from llama_index.core import Document

def parse_timestamp_to_int(timestamp_str):
    """Convert ISO timestamp to integer"""
    if not timestamp_str:
        return 0
    try:
        dt = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
        return int(dt.timestamp() * 1000000)
    except:
        return 0

def create_documents_per_turn(conversations_data: List[Dict]) -> List[Document]:
    """
    Create one document for each turn/message in conversations.
    
    Args:
        conversations_data: List of conversation dictionaries from JSON
        
    Returns:
        List of LlamaIndex Document objects (one per message)
    """
    docs = []
    
    for conversation in conversations_data:
        thread_id = conversation.get('uuid', 'unknown')
        conversation_name = conversation.get('name', 'Untitled Conversation')
        conversation_created = conversation.get('created_at')
        
        # Process each message as a separate document
        for turn_idx, message in enumerate(conversation.get('chat_messages', [])):
            # Extract text content from message
            text_content = ""
            content = message.get('content', [])
            
            if isinstance(content, list):
                text_parts = []
                for content_item in content:
                    if isinstance(content_item, dict) and 'text' in content_item:
                        text_parts.append(content_item['text'])
                text_content = "\n".join(text_parts)
            elif isinstance(content, str):
                text_content = content
            elif isinstance(content, dict) and 'text' in content:
                text_content = content['text']
            
            # Only create document if there's actual text content
            if text_content.strip():
                 # Create deterministic ID
                created_at = message.get('created_at', '')
                timestamp_int = parse_timestamp_to_int(created_at)
                doc_id = f"{thread_id}-{turn_idx:03d}-{timestamp_int}"
                
                docs.append(
                    Document(
                        text=text_content,
                        metadata={
                            "thread": thread_id,
                            "thread_name": conversation_name,
                            "role": message.get('sender', 'unknown'),  # "human" / "assistant"
                            "turn_index": turn_idx,
                            "conversation_created": conversation_created,
                            "message_created": message.get('created_at'),
                            "source": "conversations.json",
                            "doc_id": doc_id
                        }
                    )
                )
    
    print(f"Created {len(docs)} documents from individual turns")
    return docs

# Create documents - one per turn/message
docs = create_documents_per_turn(data)

# Show statistics
if docs:
    print(f"\nDocument Statistics:")
    print(f"Total documents: {len(docs)}")
    
    # Count by role
    role_counts = {}
    for doc in docs:
        role = doc.metadata.get('role', 'unknown')
        role_counts[role] = role_counts.get(role, 0) + 1
    
    print(f"Messages by role: {role_counts}")
    
    # Show sample document
    print(f"\nSample document:")
    sample_doc = docs[0]
    print(f"Role: {sample_doc.metadata['role']}")
    print(f"Thread: {sample_doc.metadata['thread_name']}")
    print(f"Text length: {len(sample_doc.text)}")
    print(f"Text preview: {sample_doc.text[:200]}...")
    print(f"Full metadata: {sample_doc.metadata}")

In [10]:
data[0].keys()

dict_keys(['uuid', 'name', 'created_at', 'updated_at', 'account', 'chat_messages'])

In [None]:
data[0]

In [9]:
docs[0]

Document(id_='200033be-6255-4d84-ba83-25fb9634fd46', embedding=None, metadata={'thread': 'b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312', 'thread_name': 'Monetizing Creativity with Blockchain AGI', 'role': 'human', 'turn_index': 0, 'conversation_created': '2024-05-28T13:05:48.783430Z', 'message_created': '2024-05-28T13:07:32.757438Z', 'source': 'conversations.json'}, excluded_embed_metadata_keys=[], excluded_llm_metadata_keys=[], relationships={}, metadata_template='{key}: {value}', metadata_separator='\n', text_resource=MediaResource(embeddings=None, data=None, text='Right now can I scope this down or pick a single focused that’s in this area I can turn into a Soloprenuer or small startup.\n\nI was thinking exploring LLms for writers and using ai to define and own a writing niche.', path=None, url=None, mimetype=None), image_resource=None, audio_resource=None, video_resource=None, text_template='{metadata_str}\n\n{content}')

# Embed and store in DuckDB

In [19]:
import duckdb, datetime, uuid


db = duckdb.connect("augmcp_v0.duckdb")
db.execute("""
CREATE TABLE IF NOT EXISTS raw_chunks(
  doc_id  TEXT PRIMARY KEY,
  thread_id    TEXT,
  role      TEXT,
  ts_ingest TIMESTAMP,
  content   TEXT,
  embedding DOUBLE[]
)
""")


<duckdb.duckdb.DuckDBPyConnection at 0x11b9f42f0>

### More Robust
- resumes
- handles errors
- parallel

In [20]:
import datetime
import duckdb
from tqdm import tqdm
from dotenv import load_dotenv
from concurrent.futures import ThreadPoolExecutor, as_completed

from llama_index.embeddings.openai import OpenAIEmbedding

load_dotenv("/Users/chris/repos/openaugi/keys.env")
embedder = OpenAIEmbedding(model="text-embedding-3-small")

DB_PATH   = "augmcp_v0.duckdb"
TABLE_NAME = "raw_chunks"
MAX_TEXT_LEN = 8192     # truncate to first 8k chars
MAX_WORKERS  = 5        # tweak based on your notebook / rate limits

def get_existing_doc_ids(conn):
    """Fetch all doc_ids already in the DB so we can skip them."""
    try:
        rows = conn.execute(f"SELECT DISTINCT doc_id FROM {TABLE_NAME}").fetchall()
        return {row[0] for row in rows}
    except duckdb.CatalogException:
        # If table doesn’t exist yet
        return set()

def _embed_chunk(doc):
    """
    Truncate text, call the embedding endpoint, and return
    the full record ready for insertion.
    """
    text = doc.text[:MAX_TEXT_LEN]
    emb  = embedder.get_text_embedding(text)
    return (
        doc.metadata["doc_id"],
        doc.metadata["thread"],
        doc.metadata["role"],
        datetime.datetime.fromisoformat(doc.metadata["message_created"]),
        text,
        emb
    )

def process_documents_resumable(docs):
    # ——— 1. Open a single DuckDB connection on the main thread ———
    conn = duckdb.connect(DB_PATH)
    existing = get_existing_doc_ids(conn)
    print(f"Found {len(existing)} already-embedded docs; skipping them.")

    # ——— 2. Filter out docs that are already in the DB ———
    to_process = [d for d in docs if d.metadata["thread"] not in existing]
    print(f"Embedding {len(to_process)} new documents...")

    # ——— 3. Spin up threads to do only the embedding calls ———
    with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(_embed_chunk, d): d for d in to_process}

        # ——— 4. As each future completes, insert its result serially ———
        for future in tqdm(as_completed(futures), total=len(futures), desc="Embedding"):
            doc = futures[future]
            try:
                record = future.result()
                conn.execute(
                    f"INSERT INTO {TABLE_NAME} VALUES (?,?,?,?,?,?)",
                    record
                )
            except Exception as e:
                print(f"⚠️ Failed on doc {doc.metadata['thread']}: {e}")

    conn.close()
    print("Done.")

# ——— Usage ———
process_documents_resumable(docs)


Found 0 already-embedded docs; skipping them.
Embedding 11284 new documents...


Embedding: 100%|██████████| 11284/11284 [26:27<00:00,  7.11it/s]


Done.


# Data is embedded into DuckDB


In [7]:
import duckdb
import pandas as pd

conn = duckdb.connect('augmcp_v0.duckdb')

# Convert query to pandas DataFrame for nice display
df = conn.execute("SELECT * FROM raw_chunks LIMIT 10").df()
print(df)

                                              doc_id  \
0  b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312-003-17169...   
1  b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312-000-17169...   
2  b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312-004-17169...   
3  b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312-002-17169...   
4  b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312-001-17169...   
5  b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312-006-17169...   
6  b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312-007-17169...   
7  b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312-005-17169...   
8  94040b8e-44fd-4854-9aa3-adc2b65baa5c-000-17168...   
9  94040b8e-44fd-4854-9aa3-adc2b65baa5c-005-17168...   

                              thread_id       role                  ts_ingest  \
0  b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312  assistant 2024-05-28 09:07:32.757438   
1  b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312      human 2024-05-28 09:07:32.757438   
2  b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312      human 2024-05-28 09:11:21.515193   
3  b4fa5efd-9e1c-4e4b-a8fe-ae1dba7ef312  assistant 2024-05-

# Create Dim Threads

In [11]:
conn = duckdb.connect("augmcp_v0.duckdb")

# Read your conversations JSON into a DataFrame
convs = pd.read_json("data/conversations.json")
threads = convs[["uuid", "name"]].rename(
    columns={"uuid": "thread_id", "name": "thread_name"}
)

# Write it into DuckDB
conn.execute("""
    CREATE TABLE IF NOT EXISTS dim_thread (
    thread_id   TEXT PRIMARY KEY,
    thread_name TEXT
    )
""")
conn.register("threads_df", threads)        # temp table backed by pandas
conn.execute("INSERT OR REPLACE INTO dim_thread SELECT * FROM threads_df")

<duckdb.duckdb.DuckDBPyConnection at 0x127451c70>

In [18]:
import duckdb

# 1. Connect to your database
conn = duckdb.connect("augmcp_v0.duckdb")

# 2. Create (or replace) the view
conn.execute("""
CREATE OR REPLACE VIEW vw_chunks_with_name AS
SELECT
  rc.doc_id,
  rc.thread_id,
  dt.thread_name,
  rc.role,
  rc.ts_ingest,
  rc.content,
  rc.embedding
FROM raw_chunks AS rc
JOIN dim_thread AS dt
  ON rc.thread_id = dt.thread_id
""")


# 5. Sample some rows to confirm the join
print("\n=== SAMPLE DATA FROM VIEW ===")
out = conn.execute("""
SELECT
  doc_id,
  thread_id,
  thread_name,
  role,
  LEFT(content, 80) AS content_preview
FROM vw_chunks_with_name
LIMIT 5
""").df()

# print(out)

num_rows = conn.execute("SELECT COUNT(*) FROM vw_chunks_with_name").fetchone()[0]
print(f"Total rows in vw_chunks_with_name: {num_rows:,}")

conn.close()


=== SAMPLE DATA FROM VIEW ===
Total rows in vw_chunks_with_name: 11,284
