
<h1 style="text-align: center; font-size: 50px;">Transcript Summarization with LangChain and Galileo</h1>

This notebook demonstrates how to build a semantic chunking and summarization pipeline for transcripts using LangChain, sentence transformers for semantic chunking, and LLMs for generating summaries. It also integrates with Galileo for evaluation, protection, and monitoring.

# Notebook Overview
- Imports
- Configurations
- Verify Assets
- Data Loading
- Semantic Chunking
- Model Setup
- Summarization Chain Creation
- Galileo Evaluate
- Galileo Protect
- Galileo Observe
- Model Service

# Imports

Most of the libraries that are necessary for the development of this example are built-in on the GenAI workspace, available in AI Studio. More specific libraries to handle the type of input will be added here. In this case, we are giving support to transcripts in the webvtt format, used to store transcripts, which require the webvtt-py library.

In [1]:
%pip install -r ../requirements.txt --quiet

Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
import sys
import logging

# Define the relative path to the 'src' directory (two levels up from current working directory)
src_path = os.path.abspath(os.path.join(os.getcwd(), "../.."))

# Add 'src' directory to system path for module imports (e.g., utils)
if src_path not in sys.path:
    sys.path.append(src_path)

# === Standard Library Imports ===
import os
import sys
import logging
import json
import time
import warnings
from datetime import datetime
from pathlib import Path

# === Third-Party Imports ===
import numpy as np
import pandas as pd
import evaluate
import webvtt
import mlflow
from sentence_transformers import SentenceTransformer
from scipy.spatial.distance import cosine
from langchain.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser
from langchain.schema.runnable import RunnablePassthrough, RunnableLambda
from operator import itemgetter
import promptquality as pq
import galileo_protect as gp
from galileo_protect import ProtectTool, ProtectParser, Ruleset

# === Project-Specific Imports (from src.utils) ===
from src.utils import (
    load_config_and_secrets,
    configure_proxy,
    initialize_llm,
    setup_galileo_environment,
    initialize_galileo_evaluator,
    initialize_galileo_protect,
    initialize_galileo_observer,
    configure_hf_cache
)

# Configurations

In [3]:
warnings.filterwarnings("ignore")

In [4]:
# === Create logger ===
logger = logging.getLogger("summarization-notebook")
logger.setLevel(logging.INFO)

formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s", 
                             datefmt="%Y-%m-%d %H:%M:%S") 

stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
logger.propagate = False

In [5]:
# === Constants ===
# Model and experiment configuration
SENTENCE_TRANSFORMER_MODEL_NAME = 'sentence-transformers/all-MiniLM-L6-v2'
RUN_NAME = "Text_Summarization_Service"
PROJECT_NAME = "AIStudio_template_code_summarization"
EVALUATION_RUN_NAME = "Transcript_summarization_evaluation"
GALILEO_PROTECT_PROJECT_NAME = "AIStudio-Summarizer-ProtectProject"
GALILEO_OBSERVE_PROJECT_NAME = "AIStudio-Summarizer-ObserveProject"

# Path configuration
CONFIG_PATH = "../../configs/config.yaml"
SECRETS_PATH = "../../configs/secrets.yaml"
DATA_PATH = "../data"
MODEL_PATH = "/home/jovyan/datafabric/llama2-7b/ggml-model-f16-Q5_K_M.gguf"

# Text processing configuration
CHUNK_SEPARATOR = "\n\n"

In [6]:
logger.info('Notebook execution started.')

2025-04-10 14:54:35 - INFO - Notebook execution started.


# Summarization of transcripts with Langchain

In this example, we intend to create a summarizer for long transcripts. The main goal is to break the original transcript into different chunks based on context - i.e. using an unsupervised approach to identify the different topics throughout the transcript (somehow similarly to Topic Modelling) - and summarize each of these chunks. in the end, the different summaries are returned to the user.

### Configuration of Hugging face caches

In the next cell, we configure HuggingFace cache, so that all the models downloaded from them are persisted locally, even after the workspace is closed. This is a future desired feature for AI Studio and the GenAI addon.

In [7]:
# Add the src directory to the path to import utils
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "../..")))

# Configure HuggingFace cache
configure_hf_cache()

### Configuration and Secrets Loading

In this section, we load configuration parameters and API keys from separate YAML files. This separation helps maintain security by keeping sensitive information (API keys) separate from configuration settings.

- **config.yaml**: Contains non-sensitive configuration parameters like model sources and URLs
- **secrets.yaml**: Contains sensitive API keys for services like Galileo and HuggingFace

In [8]:
config, secrets = load_config_and_secrets(CONFIG_PATH, SECRETS_PATH)

### Proxy Configuration

In order to connect to Galileo service, a SSH connection needs to be established. For certain enterprise networks, this might require an explicit setup of the proxy configuration. If this is your case, set up the "proxy" field on your config.yaml and the following cell will configure the necessary environment variable.

In [9]:
configure_proxy(config)

# Verify Assets

In [10]:
def log_asset_status(asset_path: str, asset_name: str, success_message: str, failure_message: str) -> None:
    """
    Logs the status of a given asset based on its existence.

    Parameters:
        asset_path (str): File or directory path to check.
        asset_name (str): Name of the asset for logging context.
        success_message (str): Message to log if asset exists.
        failure_message (str): Message to log if asset does not exist.
    """
    if Path(asset_path).exists():
        logger.info(f"{asset_name} is properly configured. {success_message}")
    else:
        logger.info(f"{asset_name} is not properly configured. {failure_message}")


# Check and log status for BERT model, embeddings file, and tokenizer
log_asset_status(
    asset_path=MODEL_PATH,
    asset_name="Local Llama model",
    success_message="",
    failure_message="Please create and download the required assets in your project on AI Studio if you want to use local model."
)

2025-04-10 14:54:36 - INFO - Local Llama model is properly configured. 


## Step 1: Loading the data from the transcript

At first, we need to read the data from the transcript. As our transcript is in the .vtt format, we use a library called webvtt-py to read the content. As the text is a trancript of audio/video, it is organized in small chunks of conversation, each containing a sequential id, the time of the start and end of the chunk, and the text content (often in the form speaker:content).

From this data, we expect to extract the actual content,  while keeping reference to the other metadata - for this reason, we are loading all the data into a Pandas dataset. 

In [11]:
if not os.path.exists(DATA_PATH):
    raise FileNotFoundError(f"'data' folder not found in path: {os.path.abspath(DATA_PATH)}")

file_path = os.path.join(DATA_PATH, "I_have_a_dream.vtt")

data = {
    "id": [],
    "speaker": [],
    "content": [],
    "start": [],
    "end": []
}

for caption in webvtt.read(file_path):
    line = caption.text.split(":")
    while len(line) < 2:
        line = [''] + line
    data["id"].append(caption.identifier)
    data["speaker"].append(line[0].strip())
    data["content"].append(line[1].strip())
    data["start"].append(caption.start)
    data["end"].append(caption.end)
    
df = pd.DataFrame(data)

df.head()

Unnamed: 0,id,speaker,content,start,end
0,1,,I am happy to join with you today,00:00:00.880,00:00:03.920
1,2,,in what will go down in history,00:00:06.500,00:00:09.360
2,3,,as the greatest demonstration for freedom in t...,00:00:11.720,00:00:16.460
3,4,,nation.,00:00:16.460,00:00:17.293
4,5,,"Five score years ago,",00:00:26.410,00:00:28.740


As a second option, we provide here a code to load the same structure from a plain text document, which only contains the actual content of the speech/conversation, without extra metadata. For the sake of simplicity and reuse of code, we keep the same Data Frame structure as the previous version, by filling the remaining fields with empty strings.

In [12]:
with open(file_path) as file:
    lines = file.read()

data = {
    "id": [],
    "speaker": [],
    "content": [],
    "start": [],
    "end": []
}

for line in lines.split("\n"):
    if line.strip() != "":
        data["id"].append("")
        data["speaker"].append("")
        data["content"].append(line.strip())
        data["start"].append("")
        data["end"].append("")        
        
df = pd.DataFrame(data)

df.head()

Unnamed: 0,id,speaker,content,start,end
0,,,﻿WEBVTT,,
1,,,1,,
2,,,00:00:00.880 --> 00:00:03.920,,
3,,,<v 0>I am happy to join with you today</v>,,
4,,,2,,


## Step 2: Semantic chunking of the transcript
Having the information content loaded according to the transcription format - with the text split into audio blocks, or into paragraphs, we now want to group these small blocks into relevant topics - so we can summarize each topic individually. Here, we are using a very simple approach for that, by using a semantic embedding of each sentence (using an embedding model from Hugging Face Sentence Transformers), and identifying the "breaks" among chunks as the ones with higher semantic distance. Notice that this method can be parameterized, to inform the number of topics or the best method to identify the breaks.

In [13]:
embedding_model = SentenceTransformer(SENTENCE_TRANSFORMER_MODEL_NAME)
embeddings = embedding_model.encode(df.content)

In [14]:
class SemanticSplitter():
    """
    A class for semantically splitting text into coherent chunks based on embeddings.
    This class uses embedding-based distance metrics to identify topic transitions in text.
    """
    def __init__(self, content, embedding_model, method="number", partition_count=10, quantile=0.9, clustering_method=None, n_clusters=None):
        """
        Initialize the SemanticSplitter.
        
        Args:
            content: List of text segments to process and split
            embedding_model: Model to use for generating text embeddings
            method: Chunking method - 'number' (fixed number of breaks), 'quantiles' (threshold-based), or 'clustering'
            partition_count: Number of breaks to create when using 'number' method
            quantile: Threshold quantile to use when using 'quantiles' method
            clustering_method: Which clustering algorithm to use ('kmeans', 'hierarchical', None)
            n_clusters: Number of clusters to create when using clustering method
        """
        try:
            self.content = content
            self.embedding_model = embedding_model
            self.partition_count = partition_count
            self.quantile = quantile
            self.clustering_method = clustering_method
            self.n_clusters = n_clusters if n_clusters is not None else partition_count
            
            logger.info(f"Encoding {len(content)} content items with embedding model")
            self.embeddings = embedding_model.encode(content)
            logger.info(f"Generated embeddings with shape: {self.embeddings.shape}")
            
            # Calculate distances between consecutive embeddings
            self.distances = [cosine(self.embeddings[i - 1], self.embeddings[i]) for i in range(1, len(self.embeddings))]
            self.breaks = []
            self.centroids = []
            
            # Load break points using the specified method
            self.load_breaks(method=method)
            logger.info(f"Created {len(self.breaks)} breaks using method '{method}'")
        except Exception as e:
            logger.error(f"Error initializing SemanticSplitter: {str(e)}")
            raise

    def centroid_distance(self, embedding_id, centroid_id):
        """
        Calculate cosine distance between an embedding and a centroid.
        
        Args:
            embedding_id: Index of the embedding to compare
            centroid_id: Index of the centroid to compare
            
        Returns:
            Cosine distance between the embedding and centroid
        """
        if not self.centroids:
            logger.warning("Centroids haven't been loaded. Call load_centroids() first.")
            return 1.0  # Return max distance if no centroids
            
        try:
            return cosine(self.embeddings[embedding_id], self.centroids[centroid_id])
        except IndexError as e:
            logger.error(f"Invalid index in centroid_distance: {str(e)}")
            return 1.0  # Return max distance on error
        except Exception as e:
            logger.error(f"Error in centroid_distance: {str(e)}")
            return 1.0  # Return max distance on error

    def adjust_neighbors(self, window_size=3, distance_threshold=0.7):
        """
        Adjust break points by examining neighboring segments to improve coherence.
        This helps avoid breaking semantic units that should stay together.
        
        Args:
            window_size: Number of neighboring segments to consider
            distance_threshold: Threshold for merging nearby segments
        """
        if not self.breaks:
            logger.info("No breaks to adjust")
            return
            
        logger.info(f"Adjusting {len(self.breaks)} breaks with window size {window_size}")
        
        try:
            adjusted_breaks = []
            # Sort breaks to process them in order
            sorted_breaks = sorted(self.breaks)
            
            for i, break_pos in enumerate(sorted_breaks):
                # Skip if this break is too close to the previous adjusted break
                if adjusted_breaks and break_pos - adjusted_breaks[-1] < window_size:
                    continue
                    
                # Check surrounding context for better break point
                best_pos = break_pos
                best_dist = self.distances[break_pos]
                
                # Look at nearby positions for potentially better break points
                start = max(0, break_pos - window_size)
                end = min(len(self.distances) - 1, break_pos + window_size)
                
                for j in range(start, end + 1):
                    if j != break_pos and self.distances[j] > best_dist:
                        best_pos = j
                        best_dist = self.distances[j]
                        
                # Add the optimized break position
                adjusted_breaks.append(best_pos)
                
            self.breaks = sorted(list(set(adjusted_breaks)))
            logger.info(f"Adjusted breaks count: {len(self.breaks)}")
        except Exception as e:
            logger.error(f"Error adjusting neighbors: {str(e)}")
            # Keep original breaks on error

    def load_breaks(self, method='number'):
        """
        Load break points based on the specified method.
        
        Args:
            method: Method to determine breaks - 'number', 'quantiles', or 'clustering'
        """
        try:
            if method == 'number':
                # Ensure we don't request more breaks than possible
                if self.partition_count > len(self.distances):
                    logger.warning(f"Requested {self.partition_count} breaks but only {len(self.distances)} positions available.")
                    self.partition_count = len(self.distances)
                    
                # Find the partition_count highest distance positions
                self.breaks = np.sort(np.argpartition(self.distances, len(self.distances) - self.partition_count)[-self.partition_count:])
                logger.info(f"Created {len(self.breaks)} breaks using fixed number method")
                
            elif method == 'quantiles':
                # Find positions with distance above the quantile threshold
                threshold = np.quantile(self.distances, self.quantile)
                self.breaks = [i for i, v in enumerate(self.distances) if v >= threshold]
                logger.info(f"Created {len(self.breaks)} breaks using quantile method with threshold {threshold:.4f}")
                
            elif method == 'clustering':
                # Use clustering algorithms to group similar segments
                self._cluster_embeddings()
                logger.info(f"Created {len(self.breaks)} breaks using clustering method")
                
            else:
                logger.warning(f"Unknown method: {method}. No breaks created.")
                self.breaks = []
        except Exception as e:
            logger.error(f"Error loading breaks with method '{method}': {str(e)}")
            self.breaks = []

    def _cluster_embeddings(self):
        """
        Cluster embeddings using the specified clustering method.
        Sets breaks at the boundaries between clusters.
        """
        try:
            from sklearn.cluster import KMeans, AgglomerativeClustering
            
            if len(self.embeddings) < self.n_clusters:
                logger.warning(f"Not enough samples ({len(self.embeddings)}) for {self.n_clusters} clusters.")
                self.n_clusters = max(2, len(self.embeddings) // 2)
            
            # Choose clustering algorithm based on configuration
            if self.clustering_method == 'kmeans':
                logger.info(f"Performing KMeans clustering with {self.n_clusters} clusters")
                clustering = KMeans(n_clusters=self.n_clusters, random_state=42, n_init=10)
            else:  # Default to hierarchical clustering
                logger.info(f"Performing Hierarchical clustering with {self.n_clusters} clusters")
                clustering = AgglomerativeClustering(n_clusters=self.n_clusters)
            
            # Fit and predict cluster labels
            labels = clustering.fit_predict(self.embeddings)
            logger.info(f"Clustering complete, found {len(set(labels))} clusters")
            
            # Find transitions between clusters
            self.breaks = []
            for i in range(1, len(labels)):
                if labels[i] != labels[i-1]:
                    self.breaks.append(i-1)
                    
            logger.info(f"Found {len(self.breaks)} breaks between clusters")
            
        except ImportError:
            logger.error("scikit-learn not available. Install it for clustering support.")
            self.breaks = []
        except Exception as e:
            logger.error(f"Error in clustering: {str(e)}")
            self.breaks = []

    def get_centroid(self, beginning, end):
        """
        Calculate centroid embedding for a range of content.
        
        Args:
            beginning: Start index (inclusive)
            end: End index (exclusive)
            
        Returns:
            Centroid embedding for the specified content range
        """
        try:
            if beginning >= end or beginning < 0 or end > len(self.content):
                logger.warning(f"Invalid range: {beginning}-{end}")
                return np.zeros(self.embeddings[0].shape)
                
            text = '\n'.join(self.content[beginning:end])
            return self.embedding_model.encode(text)
        except Exception as e:
            logger.error(f"Error calculating centroid: {str(e)}")
            if len(self.embeddings) > 0:
                return np.zeros(self.embeddings[0].shape)
            return np.zeros(384)  # Default embedding size if unknown
    
    def load_centroids(self):
        """
        Load centroids for each chunk after breaks have been calculated.
        """
        logger.info("Loading centroids for chunks")
        try:
            if len(self.breaks) == 0:
                self.centroids = [self.get_centroid(0, len(self.content))]
                logger.info("Created 1 centroid for the entire content")
            else:
                self.centroids = []
                beginning = 0
                for break_position in sorted(self.breaks):
                    self.centroids.append(self.get_centroid(beginning, break_position + 1))
                    beginning = break_position + 1
                self.centroids.append(self.get_centroid(beginning, len(self.content)))
                logger.info(f"Created {len(self.centroids)} centroids")
        except Exception as e:
            logger.error(f"Error loading centroids: {str(e)}")
            self.centroids = []

    def get_chunk(self, beginning, end):
        """
        Get content chunk between specified indices.
        
        Args:
            beginning: Start index (inclusive)
            end: End index (exclusive)
            
        Returns:
            Chunk of content as a single string
        """
        try:
            if beginning >= end or beginning < 0 or end > len(self.content):
                logger.warning(f"Invalid chunk range: {beginning}-{end}")
                return ""
            return '\n'.join(self.content[beginning:end])
        except Exception as e:
            logger.error(f"Error getting chunk: {str(e)}")
            return ""
    
    def get_chunks(self):
        """
        Get all content chunks based on calculated breaks.
        
        Returns:
            List of content chunks
        """
        try:
            if len(self.breaks) == 0:
                logger.info("No breaks found, returning entire content as single chunk")
                return [self.get_chunk(0, len(self.content))]
            else:
                chunks = []
                beginning = 0
                sorted_breaks = sorted(self.breaks)
                for break_position in sorted_breaks:
                    chunk = self.get_chunk(beginning, break_position + 1)
                    chunks.append(chunk)
                    beginning = break_position + 1
                # Add the last chunk after the final break
                chunks.append(self.get_chunk(beginning, len(self.content)))
                logger.info(f"Generated {len(chunks)} chunks from content")
                return chunks
        except Exception as e:
            logger.error(f"Error getting chunks: {str(e)}")
            return [self.get_chunk(0, len(self.content))]

## Topic Segmentation with Clustering

While the basic chunking method using cosine distances can be effective, it may produce noisy results for complex documents. To improve topic identification, we can use clustering algorithms like KMeans and Hierarchical Clustering to group semantically related content.

The implementation offers two main clustering approaches:

1. **KMeans clustering** - Groups embeddings into k clusters based on vector similarity
2. **Hierarchical clustering** - Creates a tree of clusters by progressively merging similar groups

These methods identify natural topic boundaries in the text by finding transitions between semantic clusters, which often produces more coherent topical chunks than simple distance-based approaches.

In [15]:
# Create a new instance of the SemanticSplitter with cosine distance method
# splitter = SemanticSplitter(df.content, embedding_model, method="number", partition_count=6)

# Create a new instance of the SemanticSplitter with KMeans clustering
splitter = SemanticSplitter(
    content=df.content, 
    embedding_model=embedding_model, 
    method="clustering",
    clustering_method="kmeans", 
    n_clusters=6
)

# Get chunks using KMeans clustering
chunks = splitter.get_chunks()
text = CHUNK_SEPARATOR.join(chunks)

2025-04-10 14:54:42 - INFO - Encoding 250 content items with embedding model
2025-04-10 14:54:42 - INFO - Generated embeddings with shape: (250, 384)
2025-04-10 14:54:42 - INFO - Performing KMeans clustering with 6 clusters
2025-04-10 14:54:43 - INFO - Clustering complete, found 6 clusters
2025-04-10 14:54:43 - INFO - Found 248 breaks between clusters
2025-04-10 14:54:43 - INFO - Created 248 breaks using clustering method
2025-04-10 14:54:43 - INFO - Created 248 breaks using method 'clustering'
2025-04-10 14:54:43 - INFO - Generated 249 chunks from content


## Step 3: Using a LLM model to Summarize each chunk
In our example, we are going to summarize each individual chunk separately. This solution might be advantageous in different situations:
 * When the original text is too big , or the loaded model works with a context that is too small. In this scenario, breaking information into chunks are necessary to allow the model to be applied
 * When the user wants to make sure that all the separate topics of a conversation are covered into the summarized version. An extra step could be added to allow some verification or manual configuration of the chunks to allow the user to customize the output

In this notebook, we provide three different options for loading the model:
 * **local**: by loading the llama2-7b model from the asset downloaded on the project
 * **hugging-face-local** by downloading a DeepSeek model from Hugging Face and running locally
 * **hugging-face-cloud** by accessing the Mistral model through Hugging Face cloud API (requires HuggingFace API key saved on secrets.yaml)

This choice can be set in the config.yaml file. The model deployed on the bottom cells of this notebook will load the choice from the config file.

In [16]:
model_source = config["model_source"]

In [17]:
%%time

llm = initialize_llm(model_source, secrets)

CPU times: user 1.86 s, sys: 6.44 s, total: 8.31 s
Wall time: 4min 8s


In [18]:
prompt_template = '''
The following text is an excerpt of a transcription:

### 
{context} 
###

Please, produce a single paragraph summarizing the given excerpt.
'''

## Step 4: Create parallel chain to summarize the transcript

In the following cell, we create a chain that will receive a single string with multiple chunks (separated by the declared separator), than:
  * Break the input into separated chains - using the break_chunks function embedded in a RunnableLambda to be used in LangChain
  * Run a Parallel Chain with the following elements for each chunk:
    * Get an individual element
    * Personalize the prompt template to create an individual prompt for each chunk
    * Use the LLM inference to summarize the chunk
  * Merge the individual summaries into a single one




In [19]:
# Converts prompt_template to LangChain object
prompt = ChatPromptTemplate.from_template(prompt_template)

def break_chunks(text):
    """
    Split text into chunks using the predefined separator.
    """
    return text.split(CHUNK_SEPARATOR)

def process_chunk(chunk_text):
    # Create a proper runnable chain for each chunk
    chunk_chain = (
        RunnablePassthrough.assign(context=lambda _: chunk_text)
        | prompt 
        | llm
    )
    return chunk_chain.invoke({})

def process_chunks(text):
    chunks_list = break_chunks(text)
    results = []
    
    logger.info(f"Processing {len(chunks_list)} chunks")
    
    for i, chunk in enumerate(chunks_list):
        logger.info(f"Processing chunk {i+1}/{len(chunks_list)}...")
        try:
            result = process_chunk(chunk)
            results.append(result)
            logger.info(f"Successfully processed chunk {i+1}")
        except Exception as e:
            error_msg = f"Error processing chunk {i+1}: {str(e)}"
            logger.error(error_msg)
            logger.error(f"Exception type: {type(e).__name__}")
            import traceback
            logger.error(f"Traceback: {traceback.format_exc()}")
            results.append(f"Error: {str(e)}")
            
    return "\n\n".join(results)

lambda_break = RunnableLambda(break_chunks)

def join_summaries(summaries_dict):
    # Extract values from the dictionary and join them
    joined_summaries = "\n\n".join([str(v) for v in summaries_dict.values()])
    logger.info(f"Joined {len(summaries_dict)} summaries")
    return joined_summaries

lambda_join = RunnableLambda(join_summaries)

# Create the complete chain
chain = RunnableLambda(process_chunks) | StrOutputParser()

## Step 5: Connect to Galileo
Through the Galileo library called Prompt Quality, we connect our API generated in the Galileo console to log in. To get your ApiKey, use this link: https://console.hp.galileocloud.io/api-keys

In [20]:
#########################################
# In order to connect to Galileo, create a secrets.yaml file in the same folder as this notebook
# This file should be an entry called Galileo, with the your personal Galileo API Key
# Galileo API keys can be created on https://console.hp.galileocloud.io/settings/api-keys
#########################################

setup_galileo_environment(secrets)
print(os.environ['GALILEO_CONSOLE_URL'])
pq.login(os.environ['GALILEO_CONSOLE_URL'])

https://console.hp.galileocloud.io/
👋 You have logged into 🔭 Galileo (https://console.hp.galileocloud.io/) as nickyjhames@hp.com.


Config(console_url=HttpUrl('https://console.hp.galileocloud.io/'), username=None, password=None, api_key=SecretStr('**********'), token=SecretStr('**********'), current_user='nickyjhames@hp.com', current_project_id=None, current_project_name=None, current_run_id=None, current_run_name=None, current_run_url=None, current_run_task_type=None, current_template_id=None, current_template_name=None, current_template_version_id=None, current_template_version=None, current_template=None, current_dataset_id=None, current_job_id=None, current_prompt_optimization_job_id=None, api_url=HttpUrl('https://api.hp.galileocloud.io/'))

## Step 6: Run the chain and connect the metrics to Galileo

In this session, we call the created chain and create the mechanisms to ingest the quality metrics into Galileo. For this example, we create a personalized metric (scorer), that will be running locally to measure the quality of the summarization. For this reason, we use HuggingFace implementation of ROUGE (using evaluate library), and implement into a CustomScorer from Galileo (next cell).

Below, we illustrate two alternative ways to connect to Galileo:
  * Using a customized run, which calculates the scores and logs into Galileo
  * Using the langchain callback (currently unavailable due to compatibility issues)

In [21]:
def rouge_executor(row) -> float:
    try:
        logger.info(f"node_input: {row.node_input}")
        logger.info(f"node_output: {row.node_output}")

        # Try to decode node_input as JSON
        try:
            node_input = json.loads(row.node_input)
            reference = node_input.get("content", "").strip()
        except json.JSONDecodeError:
            logger.error(f"Error decoding JSON in node_input: {row.node_input}")
            return 0.0

        # Try to decode node_output as JSON
        try:
            node_output = json.loads(row.node_output)
            prediction = node_output.get("content", "").strip()
        except json.JSONDecodeError:
            logger.error(f"Error decoding JSON in node_output: {row.node_output}")
            return 0.0

        if not reference or not prediction:
            logger.warning("'content' fields are empty in node_input or node_output")
            return 0.0

        # Calculates ROUGE-L
        rouge = evaluate.load("rouge")
        rouge_values = rouge.compute(predictions=[prediction], references=[reference])

        return rouge_values.get("rougeL", 0.0)
    except Exception as e:
        logger.error(f"Unexpected error in rouge_executor: {e}")
        return 0.0

def rouge_aggregator(scores, indices) -> dict:
    if len(scores) == 0:
        return {'Average RougeL': 0.0}
    else:
        return {'Average RougeL': sum(scores) / len(scores)}

# Define CustomScorer with corrected functions
rouge_scorer = pq.CustomScorer(name='RougeL', executor=rouge_executor, aggregator=rouge_aggregator)

# Configures the assessment execution
partitioned_run = pq.EvaluateRun(
    project_name=PROJECT_NAME,
    run_name=RUN_NAME + "_" + datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    scorers=[pq.Scorers.toxicity, pq.Scorers.sexist, rouge_scorer]
)
# Measures execution time
start_time = time.time()
response = chain.invoke(text)
total_time = int((time.time() - start_time) * 1000000)

partitioned_run.add_workflow(input=text, output=response, duration_ns=total_time) 
partitioned_run.add_llm_step(input=text, output=response, duration_ns=total_time, model='local')

# Finalizes the execution of the assessment
partitioned_run.finish()


2025-04-10 14:58:59 - INFO - Processing 249 chunks
2025-04-10 14:58:59 - INFO - Processing chunk 1/249...
2025-04-10 14:59:02 - INFO - Successfully processed chunk 1
2025-04-10 14:59:02 - INFO - Processing chunk 2/249...
2025-04-10 14:59:07 - INFO - Successfully processed chunk 2
2025-04-10 14:59:07 - INFO - Processing chunk 3/249...
2025-04-10 14:59:08 - INFO - Successfully processed chunk 3
...2025-04-10 15:08:18 - INFO - Processing chunk 244/249...
2025-04-10 15:08:19 - INFO - Successfully processed chunk 244
2025-04-10 15:08:19 - INFO - Processing chunk 245/249...
2025-04-10 15:08:21 - INFO - Successfully processed chunk 245
2025-04-10 15:08:21 - INFO - Processing chunk 246/249...
2025-04-10 15:08:26 - INFO - Successfully processed chunk 246
2025-04-10 15:08:26 - INFO - Processing chunk 247/249...
2025-04-10 15:08:27 - INFO - Successfully processed chunk 247
2025-04-10 15:08:27 - INFO - Processing chunk 248/249...
2025-04-10 15:08:28 - INFO - Successfully processed chunk 248
2025-0

Processing chain run...:   0%|          | 0/5 [00:00<?, ?it/s]

Initial job complete, executing scorers asynchronously. Current status:
cost: Done ✅
toxicity: Done ✅
sexist: Done ✅
pii: Done ✅
protect_status: Done ✅
latency: Done ✅
🔭 View your prompt run on the Galileo console at: https://console.hp.galileocloud.io/prompt/chains/7655ee33-5cba-449a-a60e-c58f5aadebe0/ca057e2a-5a29-4b22-8ac9-33bbfc0e2884?taskType=12


2025-04-10 15:08:58 - INFO - node_input: ﻿WEBVTT

1

00:00:00.880 --> 00:00:03.920

<v 0>I am happy to join with you today</v>

2

00:00:06.500 --> 00:00:09.360

in what will go down in history

3

00:00:11.720 --> 00:00:16.460

as the greatest demonstration for freedom in the history of our

4

00:00:16.460 --> 00:00:17.293

nation.

5

00:00:26.410 --> 00:00:28.740

Five score years ago,
6

00:00:31.540 --> 00:00:35.980

a great American in whose symbolic shadow we stand today,

7

00:00:38.000 --> 00:00:41.540

signed the emancipation proclamation.

8

00:00:44.060 --> 00:00:48.950

This momentous decree came as a great beacon,

9

00:00:49.000 --> 00:00:53.870

light of hope to millions of Negro slaves who

10

00:00:53.890 --> 00:00:57.630

had been SEAD in the flames of Withing in justice.

11

00:00:59.970 --> 00:01:02.350

It came as a joyous daybreak

12

00:01:04.530 --> 00:01:07.870

to end the long night of their captivity.

13

00:01:10.570 --> 00:01:12.710

But 100 years 

In [22]:
# Create a custom ROUGE scorer for summarization evaluation
def rouge_executor(row) -> float:
    try:
        print(f"node_input: {row.node_input}")
        print(f"node_output: {row.node_output}")

        # Try to decode node_input as JSON
        try:
            node_input = json.loads(row.node_input)
            reference = node_input.get("content", "").strip()
        except json.JSONDecodeError:
            print(f"Error decoding JSON in node_input: {row.node_input}")
            return 0.0

        # Try to decode node_output as JSON
        try:
            node_output = json.loads(row.node_output)
            prediction = node_output.get("content", "").strip()
        except json.JSONDecodeError:
            print(f"Error decoding JSON in node_output: {row.node_output}")
            return 0.0

        if not reference or not prediction:
            print("'content' fields are empty in node_input or node_output")
            return 0.0

        # Calculates ROUGE-L
        rouge = evaluate.load("rouge")
        rouge_values = rouge.compute(predictions=[prediction], references=[reference])

        return rouge_values.get("rougeL", 0.0)
    except Exception as e:
        print(f"Unexpected error in rouge_executor: {e}")
        return 0.0

def rouge_aggregator(scores, indices) -> dict:
    if len(scores) == 0:
        return {'Average RougeL': 0.0}
    else:
        return {'Average RougeL': sum(scores) / len(scores)}

# Define CustomScorer with corrected functions
rouge_scorer = pq.CustomScorer(name='RougeL', executor=rouge_executor, aggregator=rouge_aggregator)

# Create and configure the Galileo evaluator with our custom scorer
summarization_callback = initialize_galileo_evaluator(
    project_name=EVALUATION_RUN_NAME,
    scorers=[pq.Scorers.toxicity, pq.Scorers.sexist, rouge_scorer]
)

### THIS CODE IS NOT WORKING YET, AS GALILEO DOES NOT SUPPORT LISTS AS THE OUTPUT OF CHAIN NODES 

## Galileo Protect

Galileo Protect serves as a powerful tool for safeguarding AI model outputs by detecting and preventing the release of sensitive information like personal addresses or other PII. By integrating Galileo Protect into your AI pipelines, you can ensure that model responses comply with privacy and security guidelines in real-time.

Galileo functions as an API that provides support for protection verification of your chain/LLM. To log into the Galileo console, it is necessary to integrate it with another service, such as Galileo Evaluate or Galileo Observe.

**Attention**: an integrated API within the Galileo console is required to perform this verification.

In [23]:
# Create a project and stage for Galileo Protect
project, project_id, stage_id = initialize_galileo_protect(GALILEO_PROTECT_PROJECT_NAME + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

👋 You have logged into 🔭 Galileo (https://console.hp.galileocloud.io/) as nickyjhames@hp.com.


Galileo Protect works by creating rules that identify conditions such as Personally Identifiable Information (PII) and toxicity. It ensures that the prompt will not receive or respond to sensitive questions. In this example, we create a set of rules (ruleset) and a set of actions that return a pre-programmed response if a rule is triggered. Galileo Protect also offers a variety of other metrics to suit different protection needs. You can learn more about the available metrics here: [Supported Metrics and Operators](https://docs.rungalileo.io/galileo/gen-ai-studio-products/galileo-protect/how-to/supported-metrics-and-operators).

Additionally, it is possible to import rulesets directly from Galileo through stages. Learn more about this feature here: [Invoking Rulesets](https://docs.rungalileo.io/galileo/gen-ai-studio-products/galileo-protect/how-to/invoking-rulesets).


In [24]:
# Define a ruleset for PII detection (specifically SSN)
pii_ruleset = Ruleset(
    # Define the rules to check for potential issues
    rules=[
        {
            "metric": "pii",  # Using Personal Identifiable Information metric
            "operator": "contains",  # Check if PII contains specific type
            "target_value": "ssn",  # Looking for Social Security Numbers
        },
    ],
    # Define the action to take when rules are triggered
    action={
        "type": "OVERRIDE",  # Override the model response
        "choices": [
            "Personal Identifiable Information detected. Sorry, I cannot provide the response."
        ],
    }
)

# Initialize ProtectTool with the configured stage_id and ruleset
protect_tool = ProtectTool(stage_id=stage_id, prioritized_rulesets=[pii_ruleset], timeout=10)

# Use existing chain and combine with ProtectTool
protect_parser = ProtectParser(chain=chain)  # 'chain' has already been defined previously
protected_chain = protect_tool | protect_parser.parser

# Example of using the protected chain with input and output
input_data = {
    "input": "John Doe's social security number is 123-45-6789.",
    "output": "John Doe's social security number is 123-45-6789."
}

# Invoke the protected chain
print("Invoking the chain with PII protection...")
response = protected_chain.invoke(input_data)
print("Protected chain response:")
print(response)

Invoking the chain with PII protection...
Protected chain response:
Personal Identifiable Information detected. Sorry, I cannot provide the response.


## Galileo Observe

Galileo Observe helps you monitor your generative AI applications in production. With Observe you will understand how your users are using your application and identify where things are going wrong. Keep tabs on your production system, instantly receive alerts when bad things happen, and perform deep root cause analysis though the Observe dashboard.

You can connect Galileo Observe to your Langchain chain to monitor metrics such as cost and guardrail indicators.

In [25]:
example_query = """Tell me a story about technology and innovation. 
Explain how artificial intelligence is shaping the future. 
Summarize the impact of renewable energy on society."""

result_break = lambda_break.invoke(example_query)

chain = lambda_break | {
    f"summary_{i}": itemgetter(i) | prompt | llm
    for i in range(len(result_break))
} | lambda_join | StrOutputParser()

monitor_handler = initialize_galileo_observer(GALILEO_OBSERVE_PROJECT_NAME + datetime.now().strftime('%Y-%m-%d %H:%M:%S'))

print("Invoking the chain with Galileo Observe...")
try:
    output = chain.invoke(
        example_query,
        config={"callbacks": [monitor_handler]}
    )
    print("Observed chain output:")
    print(output)
except Exception as e:
    print(f"Error during chain execution: {e}")

🚀 Creating new project... project AIStudio-Summarizer-ObserveProject2025-04-10 15:09:12 created!
Invoking the chain with Galileo Observe...


2025-04-10 15:09:19 - INFO - Joined 1 summaries


Observed chain output:

I will provide the answer in a moment.


## Model service Galileo Protect + Observe

In this example, we illustrate a different approach to create a text summarizer. Instead of splitting the text into topics and summarize the topics individually, this model service provides a REST API endpoint to allow summarization of an entire text, in a single call to the model.

## Text Summarization Service

This section demonstrates how to use our TextSummarizationService from the src/service directory. This approach improves code organization by separating the service implementation from the notebook, making it easier to maintain and update.

In [26]:
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "..")))

# In case you just want to run this cell without the rest of the notebook, run the following block:
# SECRETS_PATH = "../../configs/secrets.yaml"
# CONFIG_PATH = "../../configs/config.yaml"

# Import the TextSummarizationService class
from core.service.text_summarization_service import TextSummarizationService

# Set up the MLflow experiment
mlflow.set_experiment("Summarization_Service")

# Define path to the model

# Check if the model file exists
if not os.path.exists(MODEL_PATH):
    print(f"Warning: Model file not found at {MODEL_PATH}. You may need to update the path.")

# Use the TextSummarizationService's log_model method to register the model in MLflow
with mlflow.start_run(run_name="Text_Summarization_Service") as run:
    # Log and register the model using the service's classmethod
    TextSummarizationService.log_model(
        secrets_path=SECRETS_PATH,
        config_path=CONFIG_PATH,
        model_path=MODEL_PATH
    )
    
    # Register the model in MLflow Model Registry
    model_uri = f"runs:/{run.info.run_id}/text_summarization_service"
    mlflow.register_model(model_uri=model_uri, name="Text_Summarization_Service")
    print(f"Model registered successfully with run ID: {run.info.run_id}")

2025/04/10 15:09:21 INFO mlflow.tracking.fluent: Experiment with name 'Summarization_Service' does not exist. Creating a new experiment.


Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

Downloading artifacts:   0%|          | 0/1 [00:00<?, ?it/s]

2025-04-10 15:14:20,553 - INFO - Model and artifacts successfully registered in MLflow.
Successfully registered model 'Text_Summarization_Service'.


Model registered successfully with run ID: bdfe62d347ee46e9a67d0a194ecf130d


Created version '1' of model 'Text_Summarization_Service'.


Built with ❤️ using Z by HP AI Studio.