#### Imports

In [92]:
from dotenv import load_dotenv
import os
from llama_index.llms.mistralai import MistralAI
from llama_index.embeddings.mistralai import MistralAIEmbedding
from pathlib import Path
from llama_index.core import SimpleDirectoryReader
from llama_index.core import Document as LlamaDocument
from llama_index.core.node_parser import SentenceSplitter 
from llama_index.core.ingestion import IngestionPipeline
import pandas as pd
import numpy as np
import re
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import asyncio
import nest_asyncio
from tqdm.asyncio import tqdm_asyncio
import pickle
import matplotlib.pyplot as plt
import datetime as datetime
import chromadb
from llama_index.vector_stores.chroma import ChromaVectorStore

#### Keys

In [93]:
load_dotenv()
MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY")

#### Open-source models (Mistral)

In [94]:
# LLM 
llm = MistralAI(api_key=MISTRAL_API_KEY, model="mistral-large-latest")

# # Embedings model
embed = MistralAIEmbedding(api_key=MISTRAL_API_KEY, model_name="mistral-embed")

# Instruct model
instruct = MistralAI(api_key=MISTRAL_API_KEY, model="codestral-latest")

#### Data ingestion

In [None]:
# Custom class for documents
# nodes creations expects get_content method

# class Document(LlamaDocument):
#     def __init__(self, content, metadata, *embedding):
#         self.content = content
#         self.metadata = metadata
#         self.embedding = embedding

#     def get_content(self, metadata_mode=None):
#         content = self.content
#         return str(content)
    
#     def get_metadata(self):
#         metadata = self.metadata
#         return str(metadata)
    
#     def to_dict(self):
#         return {"content": self.content, "metadata": self.metadata}
    
#     def get_type(self):
#         return type(self)
    
#     def get_embedding(self):
#         return self.embedding

In [None]:
reader = SimpleDirectoryReader(input_dir="data") # For text files

data_folder_path = Path("data")

all_documents = []

for csv_file in data_folder_path.glob("*.csv"):
    df = pd.read_csv(csv_file, low_memory=False)
    file_name = re.search(r'[^\\]+(?=\.\w+$)', str(csv_file)).group()


    documents = df.apply(lambda row: LlamaDocument(
        content=row.to_dict(),
        metadata={
            "table_name": file_name,
            "table_shape": str(df.shape),
            "source": file_name
        }
    ), axis=1).tolist()

    # Add documents to the list
    all_documents.extend(documents)

In [115]:
len(all_documents)

1728833

In [116]:
# Create a shorter documents list for testing 
all_documents = all_documents[:5]

In [118]:
for i in range(len(all_documents)):    
    print(all_documents[i].get_content())








In [119]:
type(all_documents[0])

llama_index.core.schema.Document

In [120]:
# Create a vector db

db = chromadb.PersistentClient(path="./sbm_chroma_db")
chroma_collection = db.get_or_create_collection("submariner")
vector_store = ChromaVectorStore(chroma_collection=chroma_collection)

In [121]:
model = embed

pipeline = IngestionPipeline(
    transformations=[
        embed,
    ],
    vector_store=vector_store
)

nodes = await pipeline.arun(documents=all_documents) # Single threaded approximate 48 hours to run

ValueError: Value for metadata table_shape must be one of (str, int, float, None)

In [109]:
print(len(nodes[0].embedding))

1024


In [104]:
# nest_asyncio.apply()

# # Define batch size
# batch_size = 250  # You may need to tune this based on your memory constraints and API rate limits

# # Create batches
# batches = [all_documents[i: i + batch_size] for i in range(0, len(all_documents), batch_size)]

# async def process_documents():
#     # Create pipeline
#     pipeline = IngestionPipeline(
#         transformations=[
#             MistralAIEmbedding()
#         ]
#     )
    
#     # Process batches concurrently with controlled concurrency
#     all_nodes = []
#     semaphore = asyncio.Semaphore(10)  # Limit concurrent requests to avoid API rate limits
    
#     async def process_batch_with_semaphore(batch_idx, batch):
#         async with semaphore:
#             try:
#                 result = await pipeline.arun(documents=batch)
#                 return result
#             except Exception as e:
#                 print(f"Error processing batch {batch_idx}: {e}")
#                 return []
    
#     # Create tasks for all batches
#     tasks = [process_batch_with_semaphore(i, batch) for i, batch in enumerate(batches)]
    
#     # Process results as they complete
#     for result in await tqdm_asyncio.gather(*tasks, desc="Processing batches"):
#         all_nodes.extend(result)
    
#     return all_nodes

# # Run the processing
# nodes = await process_documents()
# print(f"Processed {len(nodes)} nodes")

In [105]:
# model = embed

In [106]:
# nest_asyncio.apply()

# batch_size = 250

# batches = [all_documents[i: i + batch_size] for i in range(0, len(all_documents), batch_size)]

# save_dir = Path("intermediate_embeddings")
# save_dir.mkdir(exist_ok=True)

# async def process_batch(batch):
    
#     # Create pipeline for this batch
#     pipeline = IngestionPipeline(
#         transformations=[
#             MistralAI(model=embed)
#         ]
#     )
#     return await pipeline.arun(documents=batch)

# async def main():
#     nodes = []
    
#     # Create semaphore to limit concurrent API calls
#     semaphore = asyncio.Semaphore(3)  # Adjust based on API rate limits
    
#     async def bounded_process_batch(batch_idx, batch):
#         async with semaphore:
#             try:
#                 result = await process_batch(batch)
#                 print(f"Batch {batch_idx}/{len(batches)} completed with {len(result)} nodes")
#                 return result
#             except Exception as e:
#                 print(f"Error processing batch {batch_idx}: {e}")
#                 return []
    
#     # Create tasks
#     tasks = [bounded_process_batch(i, batch) for i, batch in enumerate(batches)]
    
#     # Use as_completed to process results as they finish
#     for i, future in enumerate(tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Processing batches")):
#         result = await future
#         nodes.extend(result)
        
#         # Save intermediate results every 10 batches (adjust as needed)
#         if i > 0 and i % 10 == 0:
#             print(f"Saving intermediate results ({i}/{len(tasks)} batches processed)...")
#             with open(save_dir / f"nodes_checkpoint_{i}.pkl", "wb") as f:
#                 pickle.dump(nodes, f)
    
#     # Save final results
#     with open(save_dir / "nodes_final.pkl", "wb") as f:
#         pickle.dump(nodes, f)
    
#     print(f"Processed {len(nodes)} nodes")
#     return nodes


In [107]:
# nodes = await main()

In [108]:
## Test with mistral documenations ressources

import nest_asyncio
import asyncio
from tqdm import tqdm
import pickle
from pathlib import Path
import os
from mistralai import Mistral
import concurrent.futures

# Apply nest_asyncio to allow nested event loops
nest_asyncio.apply()

# Load environment variables
MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY")
mistral_client = Mistral(api_key=MISTRAL_API_KEY)
model = "mistral-embed"

# Define batch size
batch_size = 100  # Adjust based on API constraints
save_dir = Path("intermediate_embeddings")
save_dir.mkdir(exist_ok=True)

# Function to extract content from documents for embedding
def get_document_texts(docs):
    return [doc.get_content() for doc in docs[:10]]

# Function to create embeddings using Mistral API
def create_embeddings_batch(batch_docs):
    try:
        batch_texts = get_document_texts(batch_docs)
        response = mistral_client.embeddings.create(
            model="mistral-embed",
            inputs=batch_texts,
        )
        
        # Create dictionary with document and its embedding
        results = []
        for i, doc in enumerate(batch_docs):
            embedding = response.data[i].embedding
            results.append({
                "document": doc,
                "embedding": embedding,
                "metadata": doc.get_metadata()
            })
        return results
    except Exception as e:
        print(f"Error creating embeddings: {e}")
        return []

batch_stats = []

# Modify your main() function to collect statistics
async def main():
    # Create batches
    batches = [all_documents[i: i + batch_size] for i in range(0, len(all_documents), batch_size)]
    all_results = []
    
    print(f"Total documents: {len(all_documents)}")
    print(f"Number of batches: {len(batches)}")
    print(f"Batch size: {batch_size}")
    
    # Use ThreadPoolExecutor for parallel processing
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Submit all batch processing tasks
        future_to_batch = {
            executor.submit(create_embeddings_batch, batch): i 
            for i, batch in enumerate(batches)
        }
        
        # Process results as they complete
        for future in tqdm(concurrent.futures.as_completed(future_to_batch), total=len(batches), desc="Processing batches"):
            batch_idx = future_to_batch[future]
            start_time = datetime.now()
            try:
                results = future.result()
                end_time = datetime.now()
                processing_time = (end_time - start_time).total_seconds()
                
                # Collect batch statistics
                doc_lengths = [len(get_document_texts([doc.document])[0]) for doc in results]
                batch_stats.append({
                    'batch_idx': batch_idx,
                    'batch_size': len(results),
                    'avg_doc_length': np.mean(doc_lengths) if doc_lengths else 0,
                    'max_doc_length': max(doc_lengths) if doc_lengths else 0,
                    'min_doc_length': min(doc_lengths) if doc_lengths else 0,
                    'processing_time': processing_time,
                    'timestamp': end_time
                })
                
                all_results.extend(results)
                print(f"Batch {batch_idx}/{len(batches)} completed with {len(results)} embeddings")
                
                # Save intermediate results every 10 batches
                if batch_idx > 0 and batch_idx % 10 == 0:
                    print(f"Saving intermediate results ({batch_idx}/{len(batches)} batches processed)...")
                    with open(save_dir / f"embeddings_checkpoint_{batch_idx}.pkl", "wb") as f:
                        pickle.dump(all_results, f)
            except Exception as e:
                print(f"Batch {batch_idx} generated an exception: {e}")
    
    # Save final results
    with open(save_dir / "embeddings_final.pkl", "wb") as f:
        pickle.dump(all_results, f)
    
    print(f"Created embeddings for {len(all_results)} documents")
    return all_results, batch_stats

# Run the main function
embedded_docs, batch_stats = await main()

Total documents: 5
Number of batches: 1
Batch size: 100


Processing batches:   0%|          | 0/1 [00:00<?, ?it/s]


AttributeError: module 'datetime' has no attribute 'now'

In [None]:
# # Create a DataFrame from batch statistics
# df = pd.DataFrame(batch_stats)

# # Set up the visualization
# fig, axs = plt.subplots(2, 2, figsize=(15, 10))
# fig.suptitle('API Batch Processing Visualization', fontsize=16)

# # Batch sizes
# axs[0, 0].bar(df['batch_idx'], df['batch_size'])
# axs[0, 0].set_title('Documents per Batch')
# axs[0, 0].set_xlabel('Batch Index')
# axs[0, 0].set_ylabel('Number of Documents')

# # Document lengths
# axs[0, 1].plot(df['batch_idx'], df['avg_doc_length'], 'b-', label='Avg Length')
# axs[0, 1].fill_between(df['batch_idx'], 
#                        df['min_doc_length'], 
#                        df['max_doc_length'], 
#                        alpha=0.2, 
#                        color='blue')
# axs[0, 1].set_title('Document Length per Batch')
# axs[0, 1].set_xlabel('Batch Index')
# axs[0, 1].set_ylabel('Character Count')
# axs[0, 1].legend()

# # Processing time
# axs[1, 0].bar(df['batch_idx'], df['processing_time'])
# axs[1, 0].set_title('Processing Time per Batch')
# axs[1, 0].set_xlabel('Batch Index')
# axs[1, 0].set_ylabel('Time (seconds)')

# # Cumulative documents processed over time
# if len(df) > 1:
#     df = df.sort_values('timestamp')
#     df['cumulative_docs'] = df['batch_size'].cumsum()
#     axs[1, 1].plot(range(len(df)), df['cumulative_docs'])
#     axs[1, 1].set_title('Cumulative Documents Processed')
#     axs[1, 1].set_xlabel('Batch (in order of completion)')
#     axs[1, 1].set_ylabel('Total Documents')

# plt.tight_layout()
# plt.subplots_adjust(top=0.9)
# plt.show()

# # Print summary statistics
# print("Summary Statistics:")
# print(f"Total batches processed: {len(df)}")
# print(f"Total documents processed: {df['batch_size'].sum()}")
# print(f"Average documents per batch: {df['batch_size'].mean():.2f}")
# print(f"Average processing time per batch: {df['processing_time'].mean():.2f} seconds")
# print(f"Total processing time: {df['processing_time'].sum():.2f} seconds")

In [None]:
# # Define the directory where the pickle files are saved
# save_dir = Path("intermediate_embeddings")

# # Read a specific checkpoint file
# def read_checkpoint(checkpoint_number):
#     checkpoint_path = save_dir / f"nodes_checkpoint_{checkpoint_number}.pkl"
#     try:
#         with open(checkpoint_path, "rb") as f:
#             nodes = pickle.load(f)
#         print(f"Loaded {len(nodes)} nodes from checkpoint {checkpoint_number}")
#         return nodes
#     except FileNotFoundError:
#         print(f"Checkpoint file not found: {checkpoint_path}")
#         return None
#     except Exception as e:
#         print(f"Error loading checkpoint {checkpoint_number}: {e}")
#         return None

# # Read the final results file
# def read_final_results():
#     final_path = save_dir / "nodes_final.pkl"
#     try:
#         with open(final_path, "rb") as f:
#             nodes = pickle.load(f)
#         print(f"Loaded {len(nodes)} nodes from final results")
#         return nodes
#     except FileNotFoundError:
#         print(f"Final results file not found: {final_path}")
#         return None
#     except Exception as e:
#         print(f"Error loading final results: {e}")
#         return None

# # Example usage:
# # To read a specific checkpoint (e.g., checkpoint 20):
# nodes_checkpoint_20 = read_checkpoint(20)

# # To read the final results:
# nodes_final = read_final_results()

In [None]:

# for batch, i in batches:
#     print(batch[i].get_content())