In [1]:
import os
import logging
from typing import List, Dict
import asyncio
from dotenv import load_dotenv
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import voyageai

# Load those environment variables (because who doesn't love a good secret?)
load_dotenv()


class EmbeddingModel:
    """Base class for embedding generation using Voyage AI"""
    def __init__(self):
        self.client = voyageai.Client(
            api_key=os.getenv("VOYAGE_API_KEY")
        )
        self.batch_size = 128  # Voyage's maximum batch size
    
    async def embed_batch(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings for multiple texts using Voyage AI"""
        try:
            response = self.client.embed(
                texts=texts,
                model="voyage-3",
                input_type="document",
                output_dimension=1024
            )
            return response.embeddings
            
        except Exception as e:
            logging.error(f"Error in Voyage AI batch embedding generation: {str(e)}")
            raise

    async def embed(self, text: str) -> List[float]:
        """Legacy method for single text embedding"""
        embeddings = await self.embed_batch([text])
        return embeddings[0]

    async def embed_many(self, texts: List[str]) -> List[List[float]]:
        """Process a large number of texts in optimal batches"""
        all_embeddings = []
        for i in range(0, len(texts), self.batch_size):
            batch = texts[i:i + self.batch_size]
            batch_embeddings = await self.embed_batch(batch)
            all_embeddings.extend(batch_embeddings)
        return all_embeddings

class QdrantMerger:
    def __init__(self):
        self.client = QdrantClient(
            url=os.getenv("QDRANT_URL"),
            api_key=os.getenv("QDRANT_API_KEY")
        )
        self.embedding_model = EmbeddingModel()
        
    async def fetch_collection_data(self, collection_name: str) -> Dict[str, Dict]:
        """Fetch all points from a collection and index them by filename"""
        result = {}
        offset = None
        duplicates = set()  # To keep track of our overeager duplicates
        
        while True:
            batch, offset = self.client.scroll(
                collection_name=collection_name,
                with_payload=True,
                with_vectors=False,
                offset=offset
            )
            
            # Process each point in the batch
            for point in batch:
                filename = point.payload.get('filename')
                if filename:
                    if filename in result:
                        # Oops, looks like we found a duplicate!
                        duplicates.add(filename)
                    result[filename] = point.payload
            
            # If no more offset, we've reached the end of our dating pool
            if offset is None:
                break
        
        # Let's be responsible and warn about any duplicate profiles we found
        if duplicates:
            logging.warning(f"Found duplicate entries for filenames: {duplicates}")
        
        return result
    
    async def create_merged_collection(self):
        """Create and populate the merged collection"""
        # First, let's get our lonely hearts from both collections
        personality_data = await self.fetch_collection_data("Personality")
        storyteller_data = await self.fetch_collection_data("Storyteller")
        
        # Create our new love nest (collection)
        self.client.recreate_collection(
            collection_name="Full_Texts",
            vectors_config=VectorParams(size=1024, distance=Distance.COSINE)
        )
        
        # Time to play matchmaker!
        merged_points = []
        for filename in personality_data.keys():
            if filename in storyteller_data:
                # Found a match! Let's bring them together
                merged_payload = {
                    "filename": filename,
                    "raw_text": personality_data[filename]["raw_text"],
                    "personality": personality_data[filename]["personality"],
                    "story": storyteller_data[filename]["story"]
                }
                
                # Generate new embedding for the combined text
                combined_text = "\n\n".join([
                    f"📄 RESUME:\n{merged_payload['raw_text']}",
                    f"🧠 PERSONALITY:\n{merged_payload['personality']}",
                    f"📚 STORY:\n{merged_payload['story']}"
                ])
                embedding = await self.embedding_model.embed_many([combined_text])
                
                # Create the happy couple (point)
                merged_points.append(PointStruct(
                    id=hash(filename),  # Using filename hash as ID
                    payload=merged_payload,
                    vector=embedding[0]
                ))
        
        # Upload our happy couples in batches (because even Cupid needs breaks)
        batch_size = 100
        for i in range(0, len(merged_points), batch_size):
            batch = merged_points[i:i + batch_size]
            self.client.upsert(
                collection_name="Full_Texts",
                points=batch
            )
            logging.info(f"Uploaded batch {i//batch_size + 1} of {(len(merged_points)-1)//batch_size + 1}")

async def main():
    logging.basicConfig(level=logging.INFO)
    merger = QdrantMerger()
    await merger.create_merged_collection()
    logging.info("Collection merger completed successfully! 🎉")

if __name__ == "__main__":
    main()

RuntimeError: asyncio.run() cannot be called from a running event loop

In [6]:
import os
import logging
from typing import List, Dict, Tuple
import asyncio
from hashlib import md5
from dotenv import load_dotenv
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import voyageai

load_dotenv()  # Loading secrets like they're your DMs

class EmbeddingModel:
    """Base class for embedding generation using Voyage AI"""
    def __init__(self):
        self.client = voyageai.Client(
            api_key=os.getenv("VOYAGE_API_KEY")
        )
        self.batch_size = 128  # Voyage's party size limit
    
    async def embed_batch(self, texts: List[str]) -> List[List[float]]:
        """Generate embeddings for a batch of texts using Voyage AI"""
        try:
            response = self.client.embed(
                texts=texts,
                model="voyage-3",
                input_type="document",
                output_dimension=1024
            )
            return response.embeddings
            
        except Exception as e:
            logging.error(f"Voyage AI said 'no' with error: {str(e)}")
            raise

class QdrantMerger:
    def __init__(self):
        self.client = QdrantClient(
            url=os.getenv("QDRANT_URL"),
            api_key=os.getenv("QDRANT_API_KEY")
        )
        self.embedding_model = EmbeddingModel()
        
    async def fetch_collection_data(self, collection_name: str) -> Dict[str, Dict]:
        """Fetch all points from a collection and index them by filename"""
        result = {}
        offset = None
        duplicates = set()
        
        while True:
            batch, offset = self.client.scroll(
                collection_name=collection_name,
                with_payload=True,
                with_vectors=False,
                offset=offset
            )
            
            for point in batch:
                filename = point.payload.get('filename')
                if filename:
                    if filename in result:
                        duplicates.add(filename)
                    result[filename] = point.payload
            
            if offset is None:
                break
        
        if duplicates:
            logging.warning(f"Found some duplicate attention seekers: {duplicates}")
        
        return result

    def _prepare_text(self, raw_text: str, personality: str, story: str) -> str:
        """Combine texts with formatting that would make your English teacher proud"""
        return "\n\n".join([
            f"📄 RESUME:\n{raw_text}",
            f"🧠 PERSONALITY:\n{personality}",
            f"📚 STORY:\n{story}"
        ])

    def _create_point(self, filename: str, payload: Dict, embedding: List[float]) -> PointStruct:
        """Create a point that's more stable than your ex's promises"""
        return PointStruct(
            id=int(md5(filename.encode()).hexdigest()[:16], 16),
            payload=payload,
            vector=embedding
        )
    
    async def create_merged_collection(self):
        """Create and populate the merged collection, like a dating app but for data"""
        # Fetch the lonely data points looking for love
        personality_data = await self.fetch_collection_data("personality")
        storyteller_data = await self.fetch_collection_data("storyteller")
        
        # Create our new collection (swipe right to match!)
        self.client.recreate_collection(
            collection_name="Full_Texts",
            vectors_config=VectorParams(size=1024, distance=Distance.COSINE)
        )
        
        # Prepare all our potential matches
        texts_to_embed = []
        file_order = []
        payloads = []
        
        for filename in personality_data:
            if filename in storyteller_data:  # If it's a match!
                payload = {
                    "filename": filename,
                    "raw_text": personality_data[filename]["raw_text"],
                    "personality": personality_data[filename]["personality"],
                    "story": storyteller_data[filename]["story"]
                }
                payloads.append(payload)
                file_order.append(filename)
                texts_to_embed.append(self._prepare_text(
                    payload["raw_text"],
                    payload["personality"],
                    payload["story"]
                ))
        
        # Process embeddings in bite-sized chunks (because nobody likes a choking hazard)
        all_embeddings = []
        for i in range(0, len(texts_to_embed), self.embedding_model.batch_size):
            chunk = texts_to_embed[i:i + self.embedding_model.batch_size]
            embeddings = await self.embedding_model.embed_batch(chunk)
            all_embeddings.extend(embeddings)
            logging.info(f"Processed chunk {i//self.embedding_model.batch_size + 1} of "
                        f"{(len(texts_to_embed) + self.embedding_model.batch_size - 1)//self.embedding_model.batch_size}")
        
        # Create our happy couples
        points = [
            self._create_point(filename, payload, embedding)
            for filename, payload, embedding in zip(file_order, payloads, all_embeddings)
        ]
        
        # Upload in batches (because even Cupid needs a break)
        upload_batch_size = 100
        for i in range(0, len(points), upload_batch_size):
            batch = points[i:i + upload_batch_size]
            self.client.upsert(collection_name="Full_Texts", points=batch)
            logging.info(f"Uploaded batch {i//upload_batch_size + 1} of {(len(points)-1)//upload_batch_size + 1}")

async def main():
    logging.basicConfig(level=logging.INFO)
    merger = QdrantMerger()
    await merger.create_merged_collection()
    logging.info("Collection merger completed successfully! 🎉 Time to delete the evidence!")

if __name__ == "__main__":
    await main()

INFO:httpx:HTTP Request: POST https://daf5172e-cb38-4794-9a1e-935ac3be2a22.us-east4-0.gcp.cloud.qdrant.io:6333/collections/personality/points/scroll "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://daf5172e-cb38-4794-9a1e-935ac3be2a22.us-east4-0.gcp.cloud.qdrant.io:6333/collections/personality/points/scroll "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://daf5172e-cb38-4794-9a1e-935ac3be2a22.us-east4-0.gcp.cloud.qdrant.io:6333/collections/personality/points/scroll "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://daf5172e-cb38-4794-9a1e-935ac3be2a22.us-east4-0.gcp.cloud.qdrant.io:6333/collections/personality/points/scroll "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://daf5172e-cb38-4794-9a1e-935ac3be2a22.us-east4-0.gcp.cloud.qdrant.io:6333/collections/personality/points/scroll "HTTP/1.1 200 OK"
INFO:httpx:HTTP Request: POST https://daf5172e-cb38-4794-9a1e-935ac3be2a22.us-east4-0.gcp.cloud.qdrant.io:6333/collections/personality/points/scroll "HTTP/1.1 200 OK