In [None]:
from elasticsearch import Elasticsearch
import urllib3
import json
from dotenv import load_dotenv
import os

load_dotenv()  

# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

ELASTIC_PASSWORD = os.getenv('ELASTIC_PASSWORD')  

# Create connection
es = Elasticsearch(
    ['https://localhost:9200'],
    basic_auth=('elastic', ELASTIC_PASSWORD),
    verify_certs=False,
    ssl_show_warn=False,
    request_timeout=30
)

try:
    # Test connection
    if es.ping():
        print("Connected to Elasticsearch!")
        
        # Get cluster info
        info = es.info()
        print(f"\nCluster Information:")
        print(f"- Name: {info['cluster_name']}")
        print(f"- Version: {info['version']['number']}")
        
    else:
        print("Connection failed - ping returned False")
        
except Exception as e:
    print(f"Error: {e}")


Connected to Elasticsearch!

Cluster Information:
- Name: infollion-elastic-cluster
- Version: 8.16.0

Indices found: ['.kibana_usage_counters_8.16.0_001', '.internal.alerts-transform.health.alerts-default-000001', 'dynamic_expert_search_v1_0_9', '.slo-observability.sli-v3.3', 'dynamic_expert_search_v1_0_11', '.internal.alerts-ml.anomaly-detection.alerts-default-000001', '.internal.alerts-observability.slo.alerts-default-000001', '.kibana_security_session_1', 'dynamic_expert_search_v1_0_1', '.internal.alerts-observability.apm.alerts-default-000001', '.internal.alerts-default.alerts-default-000001', 'dynamic_expert_search_v1_0_8', '.internal.alerts-observability.metrics.alerts-default-000001', '.kibana_8.16.0_001', '.apm-custom-link', '.internal.alerts-ml.anomaly-detection-health.alerts-default-000001', '.security-profile-8', '.internal.alerts-security.alerts-default-000001', '.kibana_task_manager_8.16.0_001', '.internal.alerts-stack.alerts-default-000001', '.internal.alerts-observabili

  indices = es.indices.get_alias(index="*")


In [None]:
from elasticsearch import Elasticsearch
from sentence_transformers import SentenceTransformer
import numpy as np
import urllib3
from elasticsearch import helpers
import json
from typing import Dict, List
import logging

# Disable SSL warnings
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Connection (already established)
es = Elasticsearch(
    ['https://localhost:9200'],
    basic_auth=('elastic', ELASTIC_PASSWORD),
    verify_certs=False,
    ssl_show_warn=False
)

# Initialize the embedding model
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')  # 384 dimensions

# Define the new index name
profile_index = 'dynamic_expert_search_v1_0_12_with_embeddings'
source_index = 'dynamic_expert_search_v1_0_11'  # Your current index name

# Define mapping for the new index with nested work_experiences
profile_mapping = {
    "mappings": {
        "properties": {
            # Original text fields
            "base_location": {"type": "text"},
            "bio": {"type": "text"},
            "expertise_in_these_geographies": {"type": "text"},
            "functions": {"type": "text"},
            "headline": {"type": "text"},
            "id": {"type": "integer"},
            "internal_notes": {"type": "text"},
            "total_years_of_experience": {"type": "integer"},
            "type": {"type": "text"},
            
            # Individual embedding fields for important fields
            "bio_embedding": {
                "type": "dense_vector",
                "dims": 384,
                "index": True,
                "similarity": "cosine"
            },
            "headline_embedding": {
                "type": "dense_vector",
                "dims": 384,
                "index": True,
                "similarity": "cosine"
            },
            "functions_embedding": {
                "type": "dense_vector",
                "dims": 384,
                "index": True,
                "similarity": "cosine"
            },
            
            # Complete profile embedding (combines bio, headline, functions, and work experiences)
            "profile_embedding": {
                "type": "dense_vector",
                "dims": 384,
                "index": True,
                "similarity": "cosine"
            },
            
            # Combined work experiences embedding (all work experiences together)
            "combined_work_experiences_embedding": {
                "type": "dense_vector",
                "dims": 384,
                "index": True,
                "similarity": "cosine"
            },
            
            # Work experiences as nested objects with individual embeddings
            "work_experiences": {
                "type": "nested",
                "properties": {
                    "company": {"type": "text"},
                    "currently_works_here": {"type": "boolean"},
                    "designation": {"type": "text"},
                    "division": {"type": "text"},
                    "end_date": {"type": "text"},
                    "job_description": {"type": "text"},
                    "location": {"type": "text"},
                    "start_date": {"type": "text"},
                    "work_exp_text": {"type": "text"}, # Combined text for the work experience
                    "embedding": {
                        "type": "dense_vector",
                        "dims": 384,
                        "index": True,
                        "similarity": "cosine"
                    }
                }
            }
        }
    }
}

# Create the index
if es.indices.exists(index=profile_index):
    print(f"Index {profile_index} already exists. Deleting...")
    es.indices.delete(index=profile_index)

es.indices.create(index=profile_index, body=profile_mapping)
print(f"Created new index: {profile_index}")



INFO:sentence_transformers.SentenceTransformer:Load pretrained SentenceTransformer: sentence-transformers/all-MiniLM-L6-v2
INFO:sentence_transformers.SentenceTransformer:Use pytorch device_name: cuda
INFO:elastic_transport.transport:HEAD https://localhost:9200/dynamic_expert_search_v1_0_12_with_embeddings [status:404 duration:0.368s]
INFO:elastic_transport.transport:PUT https://localhost:9200/dynamic_expert_search_v1_0_12_with_embeddings [status:200 duration:0.203s]


Created new index: dynamic_expert_search_v1_0_12_with_embeddings


In [None]:
#########################################
# Functions for generating embeddings and processing profiles
#########################################
def create_work_experience_text(work_exp: Dict) -> str:
    """
    Convert work experience object into meaningful text for embedding
    """
    text_parts = []
    
    # Basic role and company
    if work_exp.get('designation') and work_exp.get('company'):
        text_parts.append(f"Worked as {work_exp['designation']} at {work_exp['company']}")
    elif work_exp.get('designation'):
        text_parts.append(f"Position: {work_exp['designation']}")
    elif work_exp.get('company'):
        text_parts.append(f"Worked at {work_exp['company']}")
    
    # Division if available
    if work_exp.get('division'):
        text_parts.append(f"in {work_exp['division']} division")
    
    # Location
    if work_exp.get('location'):
        text_parts.append(f"located in {work_exp['location']}")
    
    # Duration
    if work_exp.get('start_date'):
        duration_text = f"from {work_exp['start_date']}"
        if work_exp.get('currently_works_here'):
            duration_text += " to present"
        elif work_exp.get('end_date'):
            duration_text += f" to {work_exp['end_date']}"
        text_parts.append(duration_text)
    
    # Job description - most important for semantic search
    if work_exp.get('job_description'):
        job_desc = work_exp['job_description'].strip()
        if job_desc:
            text_parts.append(f"Responsibilities: {job_desc}")
    
    return ". ".join(text_parts)

def create_combined_work_experiences_text(work_experiences: List[Dict]) -> str:
    """
    Combine all work experiences into a single text for embedding
    """
    if not work_experiences:
        return ""
    
    all_experiences = []
    for work_exp in work_experiences:
        exp_text = create_work_experience_text(work_exp)
        if exp_text:
            all_experiences.append(exp_text)
    
    return " | ".join(all_experiences)

def generate_profile_embeddings(doc: Dict, model) -> Dict:
    """Generate embeddings for professional profile document using weighted approach"""
    enhanced_doc = doc.copy()
    
    # Fields to generate individual embeddings for
    embedding_fields = {
        'bio': 'bio_embedding',
        'headline': 'headline_embedding',
        'functions': 'functions_embedding'
    }
    
    # Generate individual embeddings for simple fields
    for field, embedding_field in embedding_fields.items():
        if field in doc and doc[field] and str(doc[field]).strip():
            try:
                text = str(doc[field])
                embedding = model.encode(text)
                enhanced_doc[embedding_field] = embedding.tolist()
            except Exception as e:
                logger.warning(f"Error generating embedding for {field}: {e}")
    
    # Process work experiences
    if doc.get('work_experiences') and isinstance(doc['work_experiences'], list):
        # Process individual work experiences
        enhanced_work_experiences = []
        work_exp_texts = []
        
        for work_exp in doc['work_experiences']:
            if isinstance(work_exp, dict):
                # Create text representation for the work experience
                work_exp_text = create_work_experience_text(work_exp)
                work_exp_texts.append(work_exp_text)
                
                # Create enhanced work experience with embedding
                enhanced_work_exp = work_exp.copy()
                enhanced_work_exp['work_exp_text'] = work_exp_text
                
                # Generate embedding if we have meaningful text
                if work_exp_text.strip():
                    try:
                        enhanced_work_exp['embedding'] = model.encode(work_exp_text).tolist()
                    except Exception as e:
                        logger.warning(f"Failed to create work experience embedding: {e}")
                
                enhanced_work_experiences.append(enhanced_work_exp)
        
        enhanced_doc['work_experiences'] = enhanced_work_experiences
        
        # Create combined work experiences embedding
        combined_work_exp_text = " | ".join(work_exp_texts)
        if combined_work_exp_text.strip():
            try:
                work_exp_embedding = model.encode(combined_work_exp_text)
                enhanced_doc['work_experiences_embedding'] = work_exp_embedding.tolist()
            except Exception as e:
                logger.warning(f"Error generating work_experiences_embedding: {e}")
    
    # Generate combined embedding with weighted importance
    combined_texts = []
    weights = []
    
    ############################################################
    # Weights for different fields based on their importance
    # We can adjust these weights for better results via  testing
    ############################################################
    field_weights = {
        'headline': 2.5,     
        'bio': 2.0,          
        'functions': 1.5,   
        'work_experiences': 1.0 
    }
    
    # Prepare texts for weighted combination
    for field, weight in field_weights.items():
        text_to_add = None
        
        if field == 'work_experiences':
            # For work experiences, use the combined text
            if doc.get('work_experiences') and isinstance(doc['work_experiences'], list):
                combined_text = create_combined_work_experiences_text(doc['work_experiences'])
                if combined_text.strip():
                    text_to_add = combined_text
        else:
            # For other fields, use the field value directly
            if field in doc and doc[field] and str(doc[field]).strip():
                text_to_add = str(doc[field])
        
        if text_to_add:
            combined_texts.append(text_to_add)
            weights.append(weight)
    
    # Create combined embedding
    if combined_texts:
        # Generate embeddings for each text
        embeddings = [model.encode(text) for text in combined_texts]
        
        # Weighted average
        weighted_embeddings = []
        for emb, weight in zip(embeddings, weights):
            weighted_embeddings.append(emb * weight)
        
        combined_embedding = np.mean(weighted_embeddings, axis=0)
        # Normalize
        combined_embedding = combined_embedding / np.linalg.norm(combined_embedding)
        enhanced_doc["combined_embedding"] = combined_embedding.tolist()
    
    return enhanced_doc

In [None]:
def reindex_profiles_with_embeddings(source_index, target_index, batch_size=50):
    """Reindex professional profiles with embeddings"""
    total_processed = 0
    actions = []
    
    print(f"\nStarting reindexing from '{source_index}' to '{target_index}'...")
    
    try:
        # Check if source index exists
        if not es.indices.exists(index=source_index):
            print(f"Source index '{source_index}' not found!")
            # List available indices
            indices = es.indices.get_alias(index="*")
            print("\nAvailable indices:")
            for idx in sorted(indices.keys()):
                if not idx.startswith('.'):  # Skip system indices
                    print(f"  - {idx}")
            return
        
        # Get document count
        count = es.count(index=source_index)['count']
        print(f"Found {count} documents to process")
        
        # Define milestones for printing progress
        milestones = [2500, 5000, 7500, 10000, 15000, 20000, 25000, 30000, 40000, 50000]
        next_milestone_idx = 0

        for doc in helpers.scan(es, index=source_index, size=100):
            try:
                # Get the source document
                source_doc = doc['_source']
                
                # Generate embeddings
                enhanced_doc = generate_profile_embeddings(source_doc, model)
                
                # Prepare bulk action
                action = {
                    "_index": target_index,
                    "_id": doc['_id'],
                    "_source": enhanced_doc
                }
                actions.append(action)
                
                # Bulk index when batch is full
                if len(actions) >= batch_size:
                    helpers.bulk(es, actions)
                    total_processed += len(actions)
                    
                    if next_milestone_idx < len(milestones) and total_processed >= milestones[next_milestone_idx]:
                        print(f"Indexed {milestones[next_milestone_idx]} documents")
                        next_milestone_idx += 1
                    
                    actions = []
                    
            except Exception as e:
                print(f"Error processing document {doc.get('_id', 'unknown')}: {e}")
        
        # Index remaining documents
        if actions:
            helpers.bulk(es, actions)
            total_processed += len(actions)
        
        print(f"\nReindexing complete! Total documents indexed: {total_processed}")
        
        # Refresh the index to make documents searchable
        es.indices.refresh(index=target_index)
        print(f"Index '{target_index}' refreshed and ready for search")
        
    except Exception as e:
        print(f"Error during reindexing: {e}")
        import traceback
        traceback.print_exc()


reindex_profiles_with_embeddings(source_index, profile_index)
