## Step 0: Configuring the environment

Most of the libraries that are necessary for the development of this example are built-in on the GenAI workspace, available in AI Studio. More specific libraries to handle the type of input will be added here. In this case, we are giving support to transcripts in the webvtt format, used to store transcripts, which require the webvtt-py library.

In [None]:
!pip install -r ../requirements.txt

# Summarization of transcripts with Langchain

In this example, we intend to create a summarizer for long transcripts. The main goal is to break the original transcript into different chunks based on context - i.e. using an unsupervised approach to identify the different topics throughout the transcript (somehow similarly to Topic Modelling) - and summarize each of these chunks. in the end, the different summaries are returned to the user.

### Configuration of Hugging face caches

In the next cell, we configure HuggingFace cache, so that all the models downloaded from them are persisted locally, even after the workspace is closed. This is a future desired feature for AI Studio and the GenAI addon.

In [None]:
import os
import sys

# Add the src directory to the path to import utils
sys.path.append(os.path.abspath(os.path.join(os.getcwd(), "../..")))
from src.utils import configure_hf_cache

# Configure HuggingFace cache
configure_hf_cache()

### Configuration and Secrets Loading

In this section, we load configuration parameters and API keys from separate YAML files. This separation helps maintain security by keeping sensitive information (API keys) separate from configuration settings.

- **config.yaml**: Contains non-sensitive configuration parameters like model sources and URLs
- **secrets.yaml**: Contains sensitive API keys for services like Galileo and HuggingFace

In [None]:
from src.utils import load_config_and_secrets

config_path = "../../configs/config.yaml"
secrets_path = "../../configs/secrets.yaml"

config, secrets = load_config_and_secrets(config_path, secrets_path)

### Proxy Configuration

In order to connect to Galileo service, a SSH connection needs to be established. For certain enterprise networks, this might require an explicit setup of the proxy configuration. If this is your case, set up the "proxy" field on your config.yaml and the following cell will configure the necessary environment variable.

In [None]:
from src.utils import configure_proxy

configure_proxy(config)

## Step 1: Loading the data from the transcript

At first, we need to read the data from the transcript. As our transcript is in the .vtt format, we use a library called webvtt-py to read the content. As the text is a trancript of audio/video, it is organized in small chunks of conversation, each containing a sequential id, the time of the start and end of the chunk, and the text content (often in the form speaker:content).

From this data, we expect to extract the actual content,  while keeping reference to the other metadata - for this reason, we are loading all the data into a Pandas dataset. 

In [None]:
import webvtt
import pandas as pd

data_path = "../data"

if not os.path.exists(data_path):
    raise FileNotFoundError(f"'data' folder not found in path: {os.path.abspath(data_path)}")

file_path = os.path.join(data_path, "I_have_a_dream.vtt")

data = {
    "id": [],
    "speaker": [],
    "content": [],
    "start": [],
    "end": []
}

for caption in webvtt.read(file_path):
    line = caption.text.split(":")
    while len(line) < 2:
        line = [''] + line
    data["id"].append(caption.identifier)
    data["speaker"].append(line[0].strip())
    data["content"].append(line[1].strip())
    data["start"].append(caption.start)
    data["end"].append(caption.end)
    
df = pd.DataFrame(data)

df.head()

As a second option, we provide here a code to load the same structure from a plain text document, which only contains the actual content of the speech/conversation, without extra metadata. For the sake of simplicity and reuse of code, we keep the same Data Frame structure as the previous version, by filling the remaining fields with empty strings.

In [None]:
with open(file_path) as file:
    lines = file.read()

data = {
    "id": [],
    "speaker": [],
    "content": [],
    "start": [],
    "end": []
}

for line in lines.split("\n"):
    if line.strip() != "":
        data["id"].append("")
        data["speaker"].append("")
        data["content"].append(line.strip())
        data["start"].append("")
        data["end"].append("")        
        
df = pd.DataFrame(data)

df.head()

## Step 2: Semantic chunking of the transcript
Having the information content loaded according to the transcription format - with the text split into audio blocks, or into paragraphs, we now want to group these small blocks into relevant topics - so we can summarize each topic individually. Here, we are using a very simple approach for that, by using a semantic embedding of each sentence (using an embedding model from Hugging Face Sentence Transformers), and identifying the "breaks" among chunks as the ones with higher semantic distance. Notice that this method can be parameterized, to inform the number of topics or the best method to identify the breaks.

In [None]:
import numpy as np
from sentence_transformers import SentenceTransformer
from scipy.spatial.distance import cosine

embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
embeddings = embedding_model.encode(df.content)

In [None]:
class SemanticSplitter():
    def __init__ (self, content, embedding_model, method="number", partition_count = 10, quantile = 0.9):
        self.content = content
        self.embedding_model = embedding_model
        self.partition_count = partition_count
        self.quantile = quantile
        self.embeddings = embedding_model.encode(content)
        self.distances = [cosine(embeddings[i - 1], embeddings[i]) for i in range(1, len(embeddings))]
        self.breaks = []
        self.centroids = []
        self.load_breaks(method=method)

    def centroid_distance(self, embedding_id, centroid_id):
        return cosine(self.embeddings[embedding], self.centroid[centroid])

    def adjust_neighbors(self):
        self.breaks = []

    def load_breaks(self, method = 'number'):
        if method == 'number':
            if self.partition_count > len(self.distances):
                self.partition_count = len(self.distances)
            self.breaks = np.sort(np.argpartition(self.distances, self.partition_count - 1)[0:self.partition_count - 1])
        elif method == 'quantiles':
            threshold = np.quantile(self.distances, self.quantile)
            self.breaks = [i for i, v in enumerate(self.distances) if v >= threshold]
        else:
            self.breaks = []

    def get_centroid(self, beginning, end):
        return embedding_model.encode('\n'.join(self.content[beginning : end]))
    
    def load_centroids(self):
        if len(self.breaks) == 0:
            self.centroids = [self.get_centroid(0, len(self.content))]
        else:
            self.centroids = []
            beginning = 0
            for break_position in self.breaks:
                self.centroids += [self.get_centroid(beginning, break_position + 1)]
                beginning = break_position + 1
            self.centroids += [self.get_centroid(beginning, len(self.content))]

    def get_chunk(self, beginning, end):
        return '\n'.join(self.content[beginning : end])
    
    def get_chunks(self):
        if len(self.breaks) == 0:
            return [self.get_chunk(0, len(self.content))]
        else:
            chunks = []
            beginning = 0
            for break_position in self.breaks:
                chunks += [self.get_chunk(beginning, break_position + 1)]
                beginning = break_position + 1
            chunks += [self.get_chunk(beginning, len(self.content))]
        return chunks

In [None]:
chunk_separator = "\n *-* \n"

splitter = SemanticSplitter(df.content, embedding_model, method="number", partition_count=6)
chunks = chunk_separator.join(splitter.get_chunks())

## Step 3: Using a LLM model to Summarize each chunk
In our example, we are going to summarize each individual chunk separately. This solution might be advantageous in different situations:
 * When the original text is too big , or the loaded model works with a context that is too small. In this scenario, breaking information into chunks are necessary to allow the model to be applied
 * When the user wants to make sure that all the separate topics of a conversation are covered into the summarized version. An extra step could be added to allow some verification or manual configuration of the chunks to allow the user to customize the output

To achieve this goal, we load a LLM model and use a summarization prompt. For the model, we provide two different options for loading the model:
 * **local**: by loading the llama2-7b model from the asset downloaded on the project
 * **hugging-face-cloud** by accessing the Mistral model through Hugging Face cloud API (requires HuggingFace API key saved on secrets.yaml)
 
This choice can be set in the variable model_source below or as an entry in the config.yaml file. The model deployed on the bottom cells of this notebook will load the choice from the config file. The option hugging-face-local, while available in other models, is not available for this example yet

In [None]:
from src.utils import initialize_llm

model_source = "local"
if "model_source" in config:
    model_source = config["model_source"]

llm = initialize_llm(model_source, secrets)

In [None]:
prompt_template = '''
The following text is an excerpt of a transcription:

### 
{context} 
###

Please, produce a single paragraph summarizing the given excerpt.
'''

## Step 4: Create parallel chain to summarize the transcript

In the following cell, we create a chain that will receive a single string with multiple chunks (separated by the declared separator), than:
  * Break the input into separated chains - using the break_chunks function embedded in a RunnableLambda to be used in LangChain
  * Run a Parallel Chain with the following elements for each chunk:
    * Get an individual element
    * Personalize the prompt template to create an individual prompt for each chunk
    * Use the LLM inference to summarize the chunk
  * Merge the individual summaries into a single one




In [None]:
from langchain_core.runnables import RunnableLambda, RunnablePassthrough
from langchain.schema import StrOutputParser
from langchain.prompts import ChatPromptTemplate

# Converts prompt_template to LangChain object
prompt = ChatPromptTemplate.from_template(prompt_template)

def break_chunks(text):
    """
    Split text into chunks using the predefined separator.
    """
    return text.split(chunk_separator)

def process_chunk(chunk_text):
    # Create a proper runnable chain for each chunk
    chunk_chain = (
        RunnablePassthrough.assign(context=lambda _: chunk_text)
        | prompt 
        | llm
    )
    return chunk_chain.invoke({})

def process_chunks(text):
    chunks_list = break_chunks(text)
    results = []
    for i, chunk in enumerate(chunks_list):
        print(f"Processing chunk {i+1}/{len(chunks_list)}...")
        try:
            result = process_chunk(chunk)
            results.append(result)
        except Exception as e:
            print(f"Error processing chunk {i}: {e}")
            results.append(f"Error: {str(e)}")
    return "\n\n".join(results)

lambda_break = RunnableLambda(break_chunks)

def join_summaries(summaries_dict):
    # Extract values from the dictionary and join them
    return "\n\n".join([str(v) for v in summaries_dict.values()])

lambda_join = RunnableLambda(join_summaries)

chain = RunnableLambda(process_chunks) | StrOutputParser()

## Step 5: Connect to Galileo
Through the Galileo library called Prompt Quality, we connect our API generated in the Galileo console to log in. To get your ApiKey, use this link: https://console.hp.galileocloud.io/api-keys

In [None]:
import promptquality as pq
from src.utils import setup_galileo_environment

#########################################
# In order to connect to Galileo, create a secrets.yaml file in the same folder as this notebook
# This file should be an entry called Galileo, with the your personal Galileo API Key
# Galileo API keys can be created on https://console.hp.galileocloud.io/settings/api-keys
#########################################

setup_galileo_environment(secrets)
pq.login(os.environ['GALILEO_CONSOLE_URL'])

## Step 6: Run the chain and connect the metrics to Galileo

In this session, we call the created chain and create the mechanisms to ingest the quality metrics into Galileo. For this example, we create a personalized metric (scorer), that will be running locally to measure the quality of the summarization. For this reason, we use HuggingFace implementation of ROUGE (using evaluate library), and implement into a CustomScorer from Galileo (next cell).

Below, we illustrate two alternative ways to connect to Galileo:
  * Using a customized run, which calculates the scores and logs into Galileo
  * Using the langchain callback (currently unavailable due to compatibility issues)

In [None]:
import evaluate
import time
import json
import promptquality as pq

def rouge_executor(row) -> float:
    try:
        print(f"node_input: {row.node_input}")
        print(f"node_output: {row.node_output}")

        # Try to decode node_input as JSON
        try:
            node_input = json.loads(row.node_input)
            reference = node_input.get("content", "").strip()
        except json.JSONDecodeError:
            print(f"Error decoding JSON in node_input: {row.node_input}")
            return 0.0

        # Try to decode node_output as JSON
        try:
            node_output = json.loads(row.node_output)
            prediction = node_output.get("content", "").strip()
        except json.JSONDecodeError:
            print(f"Error decoding JSON in node_output: {row.node_output}")
            return 0.0

        if not reference or not prediction:
            print("'content' fields are empty in node_input or node_output")
            return 0.0

        # Calculates ROUGE-L
        rouge = evaluate.load("rouge")
        rouge_values = rouge.compute(predictions=[prediction], references=[reference])

        return rouge_values.get("rougeL", 0.0)
    except Exception as e:
        print(f"Unexpected error in rouge_executor: {e}")
        return 0.0

def rouge_aggregator(scores, indices) -> dict:
    if len(scores) == 0:
        return {'Average RougeL': 0.0}
    else:
        return {'Average RougeL': sum(scores) / len(scores)}

# Define CustomScorer with corrected functions
rouge_scorer = pq.CustomScorer(name='RougeL', executor=rouge_executor, aggregator=rouge_aggregator)

# Configures the assessment execution
partitioned_run = pq.EvaluateRun(
    project_name="AIStudio_template_code_summarization",
    run_name="Test4 partitioned script",
    scorers=[pq.Scorers.toxicity, pq.Scorers.sexist, rouge_scorer]
)

# Measures execution time
start_time = time.time()
response = chain.invoke(chunks)
total_time = int((time.time() - start_time) * 1000000)

partitioned_run.add_workflow(input=chunks, output=response, duration_ns=total_time) 
partitioned_run.add_llm_step(input=chunks, output=response, duration_ns=total_time, model='local')

# Finalizes the execution of the assessment
partitioned_run.finish()


In [None]:
from src.utils import initialize_galileo_evaluator

# Create a custom ROUGE scorer for summarization evaluation
def rouge_executor(row) -> float:
    try:
        print(f"node_input: {row.node_input}")
        print(f"node_output: {row.node_output}")

        # Try to decode node_input as JSON
        try:
            node_input = json.loads(row.node_input)
            reference = node_input.get("content", "").strip()
        except json.JSONDecodeError:
            print(f"Error decoding JSON in node_input: {row.node_input}")
            return 0.0

        # Try to decode node_output as JSON
        try:
            node_output = json.loads(row.node_output)
            prediction = node_output.get("content", "").strip()
        except json.JSONDecodeError:
            print(f"Error decoding JSON in node_output: {row.node_output}")
            return 0.0

        if not reference or not prediction:
            print("'content' fields are empty in node_input or node_output")
            return 0.0

        # Calculates ROUGE-L
        rouge = evaluate.load("rouge")
        rouge_values = rouge.compute(predictions=[prediction], references=[reference])

        return rouge_values.get("rougeL", 0.0)
    except Exception as e:
        print(f"Unexpected error in rouge_executor: {e}")
        return 0.0

def rouge_aggregator(scores, indices) -> dict:
    if len(scores) == 0:
        return {'Average RougeL': 0.0}
    else:
        return {'Average RougeL': sum(scores) / len(scores)}

# Define CustomScorer with corrected functions
rouge_scorer = pq.CustomScorer(name='RougeL', executor=rouge_executor, aggregator=rouge_aggregator)

# Create and configure the Galileo evaluator with our custom scorer
summarization_callback = initialize_galileo_evaluator(
    project_name="AIStudio_template_code_summarization",
    scorers=[pq.Scorers.toxicity, pq.Scorers.sexist, rouge_scorer]
)

### THIS CODE IS NOT WORKING YET, AS GALILEO DOES NOT SUPPORT LISTS AS THE OUTPUT OF CHAIN NODES 

#summaries = chain.invoke(chunks, config={"callbacks": [summarization_callback]})

## Galileo Protect

Galileo Protect serves as a powerful tool for safeguarding AI model outputs by detecting and preventing the release of sensitive information like personal addresses or other PII. By integrating Galileo Protect into your AI pipelines, you can ensure that model responses comply with privacy and security guidelines in real-time.

Galileo functions as an API that provides support for protection verification of your chain/LLM. To log into the Galileo console, it is necessary to integrate it with another service, such as Galileo Evaluate or Galileo Observe.

**Attention**: an integrated API within the Galileo console is required to perform this verification.

In [None]:
import galileo_protect as gp
from src.utils import initialize_galileo_protect

# Create a project and stage for Galileo Protect
project, project_id, stage_id = initialize_galileo_protect('validate_protect')

Galileo Protect works by creating rules that identify conditions such as Personally Identifiable Information (PII) and toxicity. It ensures that the prompt will not receive or respond to sensitive questions. In this example, we create a set of rules (ruleset) and a set of actions that return a pre-programmed response if a rule is triggered. Galileo Protect also offers a variety of other metrics to suit different protection needs. You can learn more about the available metrics here: [Supported Metrics and Operators](https://docs.rungalileo.io/galileo/gen-ai-studio-products/galileo-protect/how-to/supported-metrics-and-operators).

Additionally, it is possible to import rulesets directly from Galileo through stages. Learn more about this feature here: [Invoking Rulesets](https://docs.rungalileo.io/galileo/gen-ai-studio-products/galileo-protect/how-to/invoking-rulesets).


In [None]:
from galileo_protect import ProtectTool, ProtectParser, Ruleset

# Define a ruleset for PII detection (specifically SSN)
pii_ruleset = Ruleset(
    # Define the rules to check for potential issues
    rules=[
        {
            "metric": "pii",  # Using Personal Identifiable Information metric
            "operator": "contains",  # Check if PII contains specific type
            "target_value": "ssn",  # Looking for Social Security Numbers
        },
    ],
    # Define the action to take when rules are triggered
    action={
        "type": "OVERRIDE",  # Override the model response
        "choices": [
            "Personal Identifiable Information detected. Sorry, I cannot provide the response."
        ],
    }
)

# Initialize ProtectTool with the configured stage_id and ruleset
protect_tool = ProtectTool(stage_id=stage_id, prioritized_rulesets=[pii_ruleset], timeout=10)

# Use existing chain and combine with ProtectTool
protect_parser = ProtectParser(chain=chain)  # 'chain' has already been defined previously
protected_chain = protect_tool | protect_parser.parser

# Example of using the protected chain with input and output
input_data = {
    "input": "John Doe's social security number is 123-45-6789.",
    "output": "John Doe's social security number is 123-45-6789."
}

# Invoke the protected chain
print("Invoking the chain with PII protection...")
response = protected_chain.invoke(input_data)
print("Protected chain response:")
print(response)

## Galileo Observe

Galileo Observe helps you monitor your generative AI applications in production. With Observe you will understand how your users are using your application and identify where things are going wrong. Keep tabs on your production system, instantly receive alerts when bad things happen, and perform deep root cause analysis though the Observe dashboard.

You can connect Galileo Observe to your Langchain chain to monitor metrics such as cost and guardrail indicators.

In [None]:
from operator import itemgetter
from src.utils import initialize_galileo_observer

example_query = """Tell me a story about technology and innovation. 
Explain how artificial intelligence is shaping the future. 
Summarize the impact of renewable energy on society."""

result_break = lambda_break.invoke(example_query)

chain = lambda_break | {
    f"summary_{i}": itemgetter(i) | prompt | llm
    for i in range(len(result_break))
} | lambda_join | StrOutputParser()

monitor_handler = initialize_galileo_observer("validate_galileo_observe")

print("Invoking the chain with Galileo Observe...")
try:
    output = chain.invoke(
        example_query,
        config={"callbacks": [monitor_handler]}
    )
    print("Observed chain output:")
    print(output)
except Exception as e:
    print(f"Error during chain execution: {e}")

## Model service Galileo Protect + Observe

In this example, we illustrate a different approach to create a text summarizer. Instead of splitting the text into topics and summarize the topics individually, this model service provides a REST API endpoint to allow summarization of an entire text, in a single call to the model.

## Text Summarization Service

This section demonstrates how to use our TextSummarizationService from the src/service directory. This approach improves code organization by separating the service implementation from the notebook, making it easier to maintain and update.

In [None]:
import os
import mlflow
from mlflow.types.schema import Schema, ColSpec
from mlflow.models import ModelSignature

# Import the TextSummarizationService class
from src.service import TextSummarizationService

# Set up the MLflow experiment
mlflow.set_experiment("Summarization_Service")

# Define paths
model_path = "/home/jovyan/datafabric/llama2-7b/ggml-model-f16-Q5_K_M.gguf"

# Check if the model file exists
if not os.path.exists(model_path):
    print(f"Warning: Model file not found at {model_path}. You may need to update the path.")

# Define the input and output schema
input_schema = Schema([ColSpec("string", "text")])
output_schema = Schema([ColSpec("string", "summary")])
signature = ModelSignature(inputs=input_schema, outputs=output_schema)

# Log and register the model using MLflow
with mlflow.start_run(run_name="Text_Summarization_Service") as run:
    # Log the model
    mlflow.pyfunc.log_model(
        artifact_path="text_summarization_service",
        python_model=TextSummarizationService(),
        artifacts={"secrets": secrets_path, "config": config_path, "model": model_path},
        signature=signature,
        pip_requirements=[
            "galileo-protect==0.15.1",
            "galileo-observe==1.13.2",
            "pyyaml",
            "pandas",
            "sentence-transformers",
            "langchain_core",
            "langchain_huggingface"
        ]
    )
    
    # Register the model in MLflow
    model_uri = f"runs:/{run.info.run_id}/text_summarization_service"
    mlflow.register_model(model_uri=model_uri, name="Text_Summarization_Service")
    print(f"Model registered successfully with run ID: {run.info.run_id}")