In [2]:
from datasets import load_dataset
ds = load_dataset("teyler/epstein-files-20k")

In [3]:
ds

DatasetDict({
    train: Dataset({
        features: ['text'],
        num_rows: 2136420
    })
})

In [4]:
# Convert the train split to pandas
df = ds["train"].to_pandas()

# View
print(df.head())
print(df.shape)

                                                text
0                                      filename,text
1         IMAGES-005-HOUSE_OVERSIGHT_020367.txt,"215
2                                                   
3  The final choice he was made to board a non-st...
4  To remain in Hong Kong once a criminal complai...
(2136420, 1)


In [5]:
df.head()

Unnamed: 0,text
0,"filename,text"
1,"IMAGES-005-HOUSE_OVERSIGHT_020367.txt,""215"
2,
3,The final choice he was made to board a non-st...
4,To remain in Hong Kong once a criminal complai...


In [None]:
df.iloc[4]["text"]

'To remain in Hong Kong once a criminal complaint was leveled against him would have meant'

- Create Embeddings
- Similarity Explanation 
- Find most similar sentences 
- Create Knowledge Graph  

In [6]:
df.shape

(2136420, 1)

In [7]:
df.to_csv("main_df.csv",index=False)

In [14]:
# Benchmark with 10000 Rows 
df_sample = df.iloc[:10000,:]
df_sample.to_csv("sample_df_test.csv",index=False)

In [4]:
df_sample = df.iloc[:1000,:]
df_sample.to_csv("sample_df_test_1000.csv",index=False)

In [5]:
del df

### Optimized Pipeline

In [None]:
# ---- CONFIG ----
EMBED_MODEL = "qwen3-embedding"
LLM_MODEL = "llama3"
TEXT_COL = "text"
MAX_WORKERS = 4  # Start

In [17]:
import pandas as pd
import ollama

In [18]:
sample_df = pd.read_csv("sample_df_test_1000.csv")

In [19]:
sample_df.head()

Unnamed: 0,text
0,"filename,text"
1,"IMAGES-005-HOUSE_OVERSIGHT_020367.txt,""215"
2,
3,The final choice he was made to board a non-st...
4,To remain in Hong Kong once a criminal complai...


**V1 - Not Optimized**
1. Both models on GPU 
2. Single Pass
3. No Chunking Mechanism

In [None]:
import pandas as pd
import ollama
import json
import ast
import concurrent.futures
from tqdm import tqdm

# ---- CONFIG ----
EMBED_MODEL = "qwen3-embedding"
LLM_MODEL = "llama3"
TEXT_COL = "text"
MAX_WORKERS = 4  # Start with 4; adjust based on your GPU/CPU capabilities

# ---- FUNCTION: keyword extraction using LLM ----
def extract_keywords(text):
    prompt = f"""
    Extract concise, meaningful metadata keywords from the text below.
    Focus on entities, topics, domain terms.
    Return only a Python list.

    TEXT:
    {text}
    """
    
    try:
        response = ollama.chat(
            model=LLM_MODEL,
            messages=[{"role": "user", "content": prompt}]
        )
        # ast.literal_eval is much safer than standard eval()
        keywords = ast.literal_eval(response["message"]["content"])
    except Exception:
        keywords = []

    return keywords

# ---- FUNCTION: embedding ----
def get_embedding(text):
    try:
        response = ollama.embeddings(
            model=EMBED_MODEL,
            prompt=text
        )
        return response["embedding"]
    except Exception:
        return []

# ---- FUNCTION: Worker for a single row ----
def process_row(row_data):
    """Processes a single row to be used by the thread pool."""
    idx, text = row_data
    text = str(text)

    keywords = extract_keywords(text)
    embedding = get_embedding(text)

    return {
        "row_id": int(idx),
        "metadata_keywords": keywords,
        "embedding": embedding
    }

# ---- MAIN PIPELINE ----
if __name__ == "__main__":
    
    records = []
        
    # 1. Prepare data as a list of tuples (index, text)
    row_data_list = [(idx, row[TEXT_COL]) for idx, row in sample_df.iterrows()]

    # 2. Spin up the ThreadPoolExecutor
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        # Submit all tasks to the pool
        futures = {executor.submit(process_row, row): row for row in row_data_list}
        
        # 3. Use as_completed to update tqdm the moment a row finishes
        for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
            records.append(future.result())

    # 4. Optional: Sort records by row_id since threads finish out of order
    records = sorted(records, key=lambda x: x["row_id"])

    # ---- SAVE TO JSON ----
    with open("processed_records.json", "w") as f:
        json.dump(records, f, indent=2)

  2%|▏         | 19/1000 [01:14<1:00:36,  3.71s/it]

**Time Calculation**
- 3.7 Second per 4 row ~ Almost 1 second per row 
- 21 million seconds --> TOO MUCH

**V2 - Optimized**
1. Chunking 
2. 2 Pass Strategy 
3. Using SLM LLAMA 1B instead of LLAMA 8b
4. sqllite3 usage to save processed records

In [1]:
import pandas as pd
import ollama
import sqlite3
import ast
import concurrent.futures
from tqdm import tqdm
import os
from keybert import KeyBERT

# ---- CONFIG ----
LLM_MODEL = "llama3.2:1b"  # Downsized for speed and VRAM efficiency
TEXT_COL = "text"
DB_PATH = "processed_records.db"
CSV_PATH = "main_df.csv"
CHUNK_SIZE = 512
MAX_WORKERS = 8

TOP_N_KEYWORDS = 5 # Number of keywords to extract per row
P1_COMPUTE = "CPU"


def setup_database():
    """Creates the SQLite table if it doesn't exist."""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    # Storing keywords as a string representation of a list to keep the schema simple
    cursor.execute('''
        CREATE TABLE IF NOT EXISTS records (
            row_id INTEGER PRIMARY KEY,
            text TEXT,
            metadata_keywords TEXT,
            status TEXT
        )
    ''')
    conn.commit()
    conn.close()

def process_chunk_keybert(chunk, kw_model):
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    # Fault Tolerance: Check which row_ids in this chunk are already in the DB
    cursor.execute("SELECT row_id FROM records")
    existing_ids = set([row[0] for row in cursor.fetchall()])
    
    # Filter out rows we've already processed
    valid_rows = [(idx, str(row['text'])) for idx, row in chunk.iterrows() if idx not in existing_ids]
    
    if not valid_rows:
        conn.close()
        return
        
    row_ids = [row[0] for row in valid_rows]
    texts = [row[1] for row in valid_rows]
    
    # ---- THE BATCH INFERENCE ----
    # Passing the entire list of texts to KeyBERT at once is exponentially faster on CPU
    batch_results = kw_model.extract_keywords(
        texts, 
        keyphrase_ngram_range=(1, 2), # Extracts single words and 2-word phrases
        stop_words='english', 
        top_n=TOP_N_KEYWORDS,
        use_mmr =True,
        diversity = 0.2
    )
    
    records = []
    for idx, text_str, doc_kws in zip(row_ids, texts, batch_results):
        # KeyBERT returns a list of tuples: [('keyword', 0.85), ('other phrase', 0.71)]
        # We just want the words, so we strip out the confidence scores
        words_only = [kw_tuple[0] for kw_tuple in doc_kws]
        
        # Save as stringified list to match our SQLite schema
        records.append((idx, text_str, str(words_only), "PASS_1_COMPLETE"))

    # Bulk insert
    cursor.executemany('''
        INSERT OR REPLACE INTO records (row_id, text, metadata_keywords, status)
        VALUES (?, ?, ?, ?)
    ''', records)
    
    conn.commit()
    conn.close()

def extract_keywords(text):
    prompt = f"Extract concise metadata keywords. Return only a Python list.\nTEXT:\n{text}"
    try:
        response = ollama.chat(
            model=LLM_MODEL,
            messages=[{"role": "user", "content": prompt}]
        )
        return str(ast.literal_eval(response["message"]["content"]))
    except Exception:
        return "[]"

def process_row(row_data):
    idx, text = row_data
    text_str = str(text)
    keywords = extract_keywords(text_str)
    return (idx, text_str, keywords, "PASS_1_COMPLETE")

def process_chunk(chunk):
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    # Fault Tolerance: Check which row_ids in this chunk are already in the DB
    cursor.execute("SELECT row_id FROM records")
    existing_ids = set([row[0] for row in cursor.fetchall()])
    
    # Only process rows we haven't seen before
    row_data_list = [(idx, row[TEXT_COL]) for idx, row in chunk.iterrows() if idx not in existing_ids]
    
    if not row_data_list:
        return # Skip if the entire chunk is already processed
        
    records = []
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        futures = {executor.submit(process_row, row): row for row in row_data_list}
        for future in concurrent.futures.as_completed(futures):
            records.append(future.result())

    # Bulk insert the finished chunk to SQLite safely
    cursor.executemany('''
        INSERT OR REPLACE INTO records (row_id, text, metadata_keywords, status)
        VALUES (?, ?, ?, ?)
    ''', records)
    
    conn.commit()
    conn.close()

if __name__ == "__main__":
    setup_database()
    

    if P1_COMPUTE == 'CPU':
        print("Loading CPU-Optimized KeyBERT model...")
        # all-MiniLM-L6-v2 is the default: it is very small and fast for CPU execution
        cpu_kw_model = KeyBERT(model='all-MiniLM-L6-v2') 
        
        print("Starting Pass 1: CPU Keyword Extraction...")
        CHUNK_SIZE_CPU = 10000
        chunk_iterator = pd.read_csv(CSV_PATH, chunksize=CHUNK_SIZE_CPU)
        
        for chunk in tqdm(chunk_iterator, desc="Processing Batches"):
            process_chunk_keybert(chunk, cpu_kw_model)
            
        print("Pass 1 Complete! You can now run Pass 2 using the GPU.")
    

    else:     
        # GPU + LLM Model
        # Read the data file iteratively to save system RAM
        print("Starting Pass 1: Keyword Extraction...")
        chunk_iterator = pd.read_csv(CSV_PATH, chunksize=CHUNK_SIZE)
        
        for chunk in tqdm(chunk_iterator, desc="Processing Chunks"):
            process_chunk(chunk)
    
        print("Pass 1 Complete. Unloading LLM from VRAM...")    
        # Force unload model to free GPU for Pass 2
        ollama.chat(model=LLM_MODEL, messages=[], keep_alive=0) 
        print("VRAM cleared. Ready for Pass 2 (Embeddings).")

  from .autonotebook import tqdm as notebook_tqdm


Loading CPU-Optimized KeyBERT model...
Starting Pass 1: CPU Keyword Extraction...


Processing Batches: 214it [3:17:17, 55.32s/it]

Pass 1 Complete! You can now run Pass 2 using the GPU.





In [None]:
"""
**Time Calculation**

Worker 4 
Batch 100 

Starting Pass 1: Keyword Extraction...
Processing Chunks: 10it [06:41, 40.18s/it]
Pass 1 Complete. Unloading LLM from VRAM...
VRAM cleared. Ready for Pass 2 (Embeddings).
"""

### PASS 2 Embedding Generation

In [None]:
import sqlite3
import ollama
import json
from tqdm import tqdm

# ---- CONFIG ----
DB_PATH = "processed_records.db"
EMBED_MODEL = "qwen3-embedding"
BATCH_SIZE = 512  # Number of rows to send to the GPU at once

def setup_pass2_db():
    """Ensures the SQLite database is ready to accept vector data."""
    conn = sqlite3.connect(DB_PATH)
    cursor = conn.cursor()
    
    # Try to add the embedding column (will fail silently if it already exists)
    try:
        cursor.execute("ALTER TABLE records ADD COLUMN embedding TEXT")
    except sqlite3.OperationalError:
        pass 
        
    conn.commit()
    return conn

def run_pass2():
    print("Starting Pass 2: Bulk Embedding Generation...")
    
    # Optional: Pre-load the embedding model into your cleared VRAM
    # A negative keep_alive value keeps the model loaded in memory indefinitely
    try:
        ollama.chat(model=EMBED_MODEL, messages=[], keep_alive=-1)
    except Exception:
        pass

    conn = setup_pass2_db()
    cursor = conn.cursor()
    
    # Determine exactly how much work is left to do
    cursor.execute("SELECT COUNT(*) FROM records WHERE embedding IS NULL AND status = 'PASS_1_COMPLETE'")
    total_unprocessed = cursor.fetchone()[0]
    
    if total_unprocessed == 0:
        print("All rows have embeddings. Pipeline finished!")
        conn.close()
        return

    # Process chunks using Native Batching
    with tqdm(total=total_unprocessed, desc="Generating Vectors") as pbar:
        while True:
            # Fetch the next batch of rows missing embeddings
            cursor.execute('''
                SELECT row_id, text 
                FROM records 
                WHERE embedding IS NULL AND status = 'PASS_1_COMPLETE'
                LIMIT ?
            ''', (BATCH_SIZE,))
            
            batch = cursor.fetchall()
            if not batch:
                break
                
            row_ids = [row[0] for row in batch]
            texts = [str(row[1]) for row in batch]
            
            try:
                # Pass the entire list of strings to Ollama natively
                response = ollama.embed(model=EMBED_MODEL, input=texts)
                embeddings = response["embeddings"]
                
                # Pair up the new embeddings with their database row IDs
                update_data = []
                for idx, emb in zip(row_ids, embeddings):
                    update_data.append((json.dumps(emb), idx))
                    
                # Bulk update the SQLite database
                cursor.executemany("UPDATE records SET embedding = ? WHERE row_id = ?", update_data)
                conn.commit()
                
                pbar.update(len(batch))
                
            except Exception as e:
                print(f"Error processing batch at GPU level: {e}")
                break
                
    conn.close()
    print("Pass 2 Complete!")

if __name__ == "__main__":
    run_pass2()

Starting Pass 2: Bulk Embedding Generation...


Generating Vectors:   4%|▍         | 89600/2130607 [13:28:05<5794:05:21, 10.22s/it] 

### SQL Lite to Parquet

In [56]:
import sqlite3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import json
import ast
from tqdm import tqdm

# ---- CONFIG ----
DB_PATH = "processed_records.db"
PARQUET_PATH = "final_output.parquet"
CHUNK_SIZE = 250000  # Process a quarter-million rows at a time

def export_to_parquet():
    print("Starting Export: SQLite to Parquet...")
    conn = sqlite3.connect(DB_PATH)
    
    # 1. Count total completed rows for the progress bar
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM records WHERE status = 'PASS_1_COMPLETE' AND embedding IS NOT NULL")
    total_rows = cursor.fetchone()[0]
    
    if total_rows == 0:
        print("No completed records found to export.")
        conn.close()
        return
        
    print(f"Found {total_rows} completed records. Exporting...")

    # 2. Setup the Parquet Writer
    writer = None
    offset = 0
    
    with tqdm(total=total_rows, desc="Writing Parquet") as pbar:
        while True:
            # 3. Read chunk from SQLite safely
            query = f"""
                SELECT row_id, text, metadata_keywords, embedding 
                FROM records 
                WHERE status = 'PASS_1_COMPLETE' AND embedding IS NOT NULL
                LIMIT {CHUNK_SIZE} OFFSET {offset}
            """
            chunk_df = pd.read_sql_query(query, conn)
            
            if chunk_df.empty:
                break
                
            # 4. Clean up stringified data into real Python/Arrow lists
            chunk_df['metadata_keywords'] = chunk_df['metadata_keywords'].apply(
                lambda x: ast.literal_eval(x) if pd.notnull(x) else []
            )
            chunk_df['embedding'] = chunk_df['embedding'].apply(
                lambda x: json.loads(x) if pd.notnull(x) else []
            )
            
            # 5. Convert DataFrame to PyArrow Table
            table = pa.Table.from_pandas(chunk_df)
            
            # 6. Initialize writer on the first pass (requires the table schema)
            if writer is None:
                writer = pq.ParquetWriter(PARQUET_PATH, table.schema, compression='snappy')
            
            # 7. Append chunk directly to the file
            writer.write_table(table)
            
            offset += CHUNK_SIZE
            pbar.update(len(chunk_df))
            
    if writer:
        writer.close()
    conn.close()
    
    print(f"Export complete! Highly compressed dataset saved to {PARQUET_PATH}")

if __name__ == "__main__":
    export_to_parquet()

Starting Export: SQLite to Parquet...
Found 1000 completed records. Exporting...


Writing Parquet: 100%|██████████| 1000/1000 [00:00<00:00, 1022.62it/s]

Export complete! Highly compressed dataset saved to final_output.parquet





In [57]:
df = pd.read_parquet('final_output.parquet')

In [58]:
df

Unnamed: 0,row_id,text,metadata_keywords,embedding
0,0,"filename,text","[filename text, filename, text]","[0.023052093, -0.028317599, 0.011062983, -0.02..."
1,1,"IMAGES-005-HOUSE_OVERSIGHT_020367.txt,""215","[house_oversight_020367 txt, 005 house_oversig...","[-0.0015994079, 0.0009793631, -0.010118909, -0..."
2,2,,[nan],"[0.031404983, 0.03075171, -0.02906024, -0.0345..."
3,3,The final choice he was made to board a non-st...,"[flight moscow, moscow june, stop flight, flig...","[0.027918369, 0.005494184, -0.0023703119, -0.0..."
4,4,To remain in Hong Kong once a criminal complai...,"[remain hong, kong criminal, hong kong, hong, ...","[-0.004669463, 0.009473486, -0.0022050955, 0.0..."
...,...,...,...,...
995,995,including cross-cultural analyses (similaritie...,"[differences dream, dream posting, cultural an...","[-0.00082270417, 0.017136006, 0.0028171376, 0...."
996,996,and structure of dream data sets. This enables...,"[dream data, studies dreams, structure dream, ...","[0.026132682, 0.004024183, -0.0052859643, -0.0..."
997,997,"Patterns are repeated. For example, in general...","[dreams males, dreams, general dreams, pattern...","[0.003786812, 0.031239565, 0.013577143, -0.028..."
998,998,Males will engage in fighting with other males...,"[fighting males, male dreamers, males engage, ...","[0.008204588, 0.031083258, 0.0074874726, -0.03..."


### Testing Code

In [None]:
def extract_keywords(text):
    print(text)
    prompt = f"""Extract concise metadata keywords from the text below. 
    You must respond ONLY in strict JSON format like this: {{"keywords": ["word1", "word2"]}}
    
    TEXT:
    {text}"""
    try:
        response = ollama.chat(
            model=LLM_MODEL,
            messages=[{"role": "user", "content": prompt}],
            format = "json",
                    options={
                "temperature": 0.0,  # 0.0 makes the output strictly factual and deterministic
                "top_p": 0.1,        # Restricts the model to only the highest-probability words
            }
        )
        return str(ast.literal_eval(response["message"]["content"]))
    except Exception as e:
        print(str(e))
        return "[]"

In [46]:
extract_keywords("vital part of maintaining normal brain and mental functioning. New neurons are being created, importantly, in ")

vital part of maintaining normal brain and mental functioning. New neurons are being created, importantly, in 


"{'keywords': ['brain', 'neurons', 'mental functioning', 'new neurons']}"