In [None]:
%pip install dist/alquimia_fair_forge-0.0.1.tar.gz 'elasticsearch>=7.0.0,<8.0.0' -q

In [None]:
from helpers.retriever import LakeFSRetriever
from fair_forge.metrics import Bias
from fair_forge.guardians import IBMGranite
from fair_forge.guardians.llms.providers import OpenAIGuardianProvider
from fair_forge.schemas import GuardianLLMConfig
from pydantic import SecretStr
from elasticsearch import Elasticsearch,helpers
import os

In [3]:
ELASTIC_HOST = os.environ.get('ELASTIC_HOST')
ELASTIC_AUTH_USER = os.environ.get('ELASTIC_AUTH_USER')
ELASTIC_AUTH_PASSWORD = os.environ.get('ELASTIC_AUTH_PASSWORD')
RUN_NAME = os.environ.get("run_name")
index_name = f"{RUN_NAME}-bias"

In [5]:
GUARDIAN_URL = os.environ.get("GUARDIAN_URL")
GUARDIAN_MODEL_NAME = os.environ.get("GUARDIAN_MODEL_NAME")
GUARDIAN_API_KEY = SecretStr(os.environ.get("GUARDIAN_API_KEY"))
GUARDIAN_TEMPERATURE = os.environ.get("guardian_temperature",0.5)
CONFIDENCE_LEVEL = os.environ.get("confidence_level",0.80)

In [None]:
## UMAP 
UMAP_N_NEIGHBORS = os.environ.get("umap_n_neighbors",30)
UMAP_N_COMPONENTS = os.environ.get("umap_n_components",2)
UMAP_MIN_DIST = os.environ.get("umap_min_dist",0.1)
UMAP_RANDOM_STATE = os.environ.get("umap_random_state",42)
UMAP_METRIC = os.environ.get("umap_metric","cosine")

In [None]:
## CLUSTERING 
TOXICITY_MIN_CLUSTER_SIZE = os.environ.get("toxicity_min_cluster_size",2)
TOXICITY_CLUSTER_USE_LATENT_SPACE = os.environ.get("toxicity_cluster_use_latent_space",True)
TOXICITY_CLUSTER_SELECTION_EPSILON = os.environ.get("toxicity_cluster_selection_epsilon",0.01)
TOXICITY_CLUSTER_SELECTION_METHOD = os.environ.get("toxicity_cluster_selection_method","euclidean")

In [None]:
metrics= Bias.run(
    LakeFSRetriever,
    guardian = IBMGranite,
    confidence_level = CONFIDENCE_LEVEL,
    config = GuardianLLMConfig(
        model= GUARDIAN_MODEL_NAME,
        api_key= GUARDIAN_API_KEY.get_secret_value(),
        url=GUARDIAN_URL,
        temperature=GUARDIAN_TEMPERATURE,
        provider=OpenAIGuardianProvider,
        logprobs= True
    ),
    toxicity_min_cluster_size=TOXICITY_MIN_CLUSTER_SIZE,
    toxicity_cluster_use_latent_space=TOXICITY_CLUSTER_USE_LATENT_SPACE,
    toxicity_cluster_selection_epsilon=TOXICITY_CLUSTER_SELECTION_EPSILON,
    toxicity_cluster_selection_method=TOXICITY_CLUSTER_SELECTION_METHOD,
    umap_n_neighbors=UMAP_N_NEIGHBORS,
    umap_n_components=UMAP_N_COMPONENTS,
    umap_min_dist=UMAP_MIN_DIST,
    umap_random_state=UMAP_RANDOM_STATE,
    umap_metric=UMAP_METRIC,
    verbose=True
)

In [None]:
es = Elasticsearch([f'https://{ELASTIC_AUTH_USER}:{ELASTIC_AUTH_PASSWORD}@{ELASTIC_HOST}:443'])

In [None]:
def recreate_index(index_name: str, mapping: dict):
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        print(f"Index '{index_name}' deleted.")
    es.indices.create(index=index_name, body=mapping)
    print(f"Index '{index_name}' created.")

In [None]:
## ELASTICSEARCH MAPPINGS & CONFIGURATION
mapping_bias = {
    "mappings": {
        "properties": {
            "session_id": {"type": "keyword"},
            "assistant_id": {"type": "keyword"},
            "confidence_intervals": {
                "type": "nested",
                "properties": {
                    "protected_attribute": {"type": "keyword"},
                    "lower_bound": {"type": "float"},
                    "upper_bound": {"type": "float"},
                    "probability": {"type": "float"},
                    "samples": {"type": "integer"},
                    "k_success": {"type": "integer"},
                    "alpha": {"type": "float"},
                    "confidence_level": {"type": "float"}
                }
            },
            "guardian_interactions": {
                "type": "nested",
                "properties": {
                    "protected_attribute": {"type": "keyword"},
                    "qa_id": {"type": "keyword"},
                    "is_biased": {"type": "boolean"},
                    "certainty": {"type": "float"}
                }
            },
            "cluster_profiling": {
                "type": "object",
                "properties": {
                    "cluster_id": {"type": "keyword"},
                    "toxicity_score": {"type": "float"}
                }
            },
            "assistant_space": {
                "properties": {
                    "cluster_labels": {"type": "keyword"},
                    "embeddings": {"type": "float"},
                    "latent_space": {"type": "float"}
                }
            }
        }
    }
}

def flatten_bias_metric(metric):
    """
    Flatten a BiasMetric object for Elasticsearch indexing.
    
    Args:
        metric (BiasMetric): The BiasMetric object to flatten
        
    Returns:
        dict: Flattened document ready for Elasticsearch
    """
    # Base document
    doc = {
        "session_id": metric.session_id,
        "assistant_id": metric.assistant_id,
    }
    
    # Flatten confidence intervals
    doc["confidence_intervals"] = [
        {
            "protected_attribute": ci.protected_attribute,
            "lower_bound": ci.lower_bound,
            "upper_bound": ci.upper_bound,
            "probability": ci.probability,
            "samples": ci.samples,
            "k_success": ci.k_success,
            "alpha": ci.alpha,
            "confidence_level": ci.confidence_level
        }
        for ci in metric.confidence_intervals
    ]
    
    # Flatten guardian interactions
    doc["guardian_interactions"] = []
    for attr, interactions in metric.guardian_interactions.items():
        for interaction in interactions:
            doc["guardian_interactions"].append({
                "protected_attribute": attr,
                "qa_id": interaction.qa_id,
                "is_biased": interaction.is_biased,
                "certainty": interaction.certainty
            })
    
    # Flatten cluster profiling
    doc["cluster_profiling"] = [
        {"cluster_id": str(cluster_id), "toxicity_score": score}
        for cluster_id, score in metric.cluster_profiling.items()
    ]
    
    # Flatten assistant space
    doc["assistant_space"] = {
        "cluster_labels": [str(label) for label in metric.assistant_space.cluster_labels],
        "embeddings": metric.assistant_space.embeddings,
        "latent_space": metric.assistant_space.latent_space
    }
    
    return doc

In [None]:
def bulk_index_metrics(es, index_name: str, metrics: list) -> tuple[int, int]:
    """
    Bulk index BiasMetric objects into Elasticsearch.
    
    Args:
        es: Elasticsearch client
        index_name (str): Name of the index to use
        metrics (List[BiasMetric]): List of BiasMetric objects to index
    
    Returns:
        Tuple[int, int]: (success_count, error_count)
    """
    # Flatten metrics
    flattened = [flatten_bias_metric(metric) for metric in metrics]
    
    # Prepare bulk indexing documents
    docs = []
    for doc in flattened:
        docs.append({
            "_index": index_name,
            "_source": doc
        })
    
    # Perform bulk indexing with error handling
    success_count = 0
    error_count = 0
    
    try:
        # Use streaming_bulk for better memory management
        for success, info in helpers.streaming_bulk(
            es,
            docs,
            chunk_size=100,
            max_retries=3,
            initial_backoff=2,
            max_backoff=10
        ):
            if success:
                success_count += 1
            else:
                error_count += 1
                print(f"Failed to index document: {info}")
                
        print(f"Successfully indexed {success_count} documents")
        if error_count > 0:
            print(f"Failed to index {error_count} documents")
            
    except Exception as e:
        print(f"Error during bulk indexing: {str(e)}")
        return success_count, error_count
    
    return success_count, error_count

In [None]:
recreate_index(index_name, mapping_bias)
bulk_index_metrics(es,index_name, metrics)