In [None]:
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

## Getting Started (Gemini Pro 1.5)


In [None]:
#################### The code was written for the analysis in Vertex AI by Google LLC ####################

### Install Vertex AI SDK for Gen AI Evaluation Service

In [None]:
%pip install -U -q google-cloud-aiplatform[evaluation]

### Install other required packages

In [None]:
%pip install -U -q datasets
%pip install -U -q anthropic[vertex]
%pip install -U -q openai

### Restart runtime
To use the newly installed packages in this Jupyter runtime, you must restart the runtime. You can do this by running the cell below, which restarts the current kernel.

The restart might take a minute or longer. After it's restarted, continue to the next step.

In [None]:
# import IPython

# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

<div class="alert alert-block alert-warning">
<b>⚠️ The kernel is going to restart. Wait until it's finished before continuing to the next step. ⚠️</b>
</div>


### Authenticate your notebook environment (Colab only)

In [None]:
import sys

if "google.colab" in sys.modules:
    from google.colab import auth

    auth.authenticate_user()



### Set Google Cloud project information and initialize Vertex AI SDK

In [None]:
PROJECT_ID = ""  # @param {type:"string"}
LOCATION = "us-central1"  # @param {type:"string"}

if not PROJECT_ID or PROJECT_ID == "[your-project-id]":
    raise ValueError("Please set your PROJECT_ID")


import vertexai

vertexai.init(project=PROJECT_ID, location=LOCATION)

### Import libraries

In [None]:
from anthropic import AnthropicVertex
from google.auth import default, transport
import openai
from vertexai.evaluation import (
    EvalTask,
    MetricPromptTemplateExamples,
    PairwiseMetric,
    PointwiseMetric,
    PointwiseMetricPromptTemplate,
)
from vertexai.generative_models import GenerativeModel

### Library settings

In [None]:
# @title

import logging
import warnings

import pandas as pd

logging.getLogger("urllib3.connectionpool").setLevel(logging.ERROR)
warnings.filterwarnings("ignore")

# pd.set_option('display.max_colwidth', None)

### Helper functions

In [None]:
# @title

import random
import string

from IPython.display import HTML, Markdown, display
import plotly.graph_objects as go


def display_explanations(eval_result, metrics=None, n=1):
    """Display the explanations."""
    style = "white-space: pre-wrap; width: 1500px; overflow-x: auto;"
    metrics_table = eval_result.metrics_table
    df = metrics_table.sample(n=n)

    if metrics:
        df = df.filter(
            ["response", "baseline_model_response"]
            + [
                metric
                for metric in df.columns
                if any(selected_metric in metric for selected_metric in metrics)
            ]
        )
    for index, row in df.iterrows():
        for col in df.columns:
            display(HTML(f"<div style='{style}'><h4>{col}:</h4>{row[col]}</div>"))
        display(HTML("<hr>"))


def display_eval_result(eval_result, title=None, metrics=None):
    """Display the evaluation results."""
    summary_metrics, metrics_table = (
        eval_result.summary_metrics,
        eval_result.metrics_table,
    )

    metrics_df = pd.DataFrame.from_dict(summary_metrics, orient="index").T
    if metrics:
        metrics_df = metrics_df.filter(
            [
                metric
                for metric in metrics_df.columns
                if any(selected_metric in metric for selected_metric in metrics)
            ]
        )
        metrics_table = metrics_table.filter(
            [
                metric
                for metric in metrics_table.columns
                if any(selected_metric in metric for selected_metric in metrics)
            ]
        )

    if title:
        # Display the title with Markdown for emphasis
        display(Markdown(f"## {title}"))
    # Display the summary metrics DataFrame
    display(Markdown("### Summary Metrics"))
    display(metrics_df)
    # Display the metrics table DataFrame
    display(Markdown("### Row-based Metrics"))
    display(metrics_table)


def display_radar_plot(eval_results, metrics=None):
    """Plot the radar plot."""
    fig = go.Figure()
    for item in eval_results:
        title, eval_result = item
        summary_metrics = eval_result.summary_metrics
        if metrics:
            summary_metrics = {
                k.replace("/mean", ""): summary_metrics[k]
                for k, v in summary_metrics.items()
                if any(selected_metric + "/mean" in k for selected_metric in metrics)
            }
        fig.add_trace(
            go.Scatterpolar(
                r=list(summary_metrics.values()),
                theta=list(summary_metrics.keys()),
                fill="toself",
                name=title,
            )
        )
    fig.update_layout(
        polar=dict(radialaxis=dict(visible=True, range=[0, 5])), showlegend=True
    )
    fig.show()


def display_bar_plot(eval_results_list, metrics=None):
    """Plot the bar plot."""
    fig = go.Figure()
    data = []

    for eval_results in eval_results_list:
        title, eval_result = eval_results[0], eval_results[1]

        summary_metrics = eval_result.summary_metrics
        mean_summary_metrics = [f"{metric}/mean" for metric in metrics]
        updated_summary_metrics = []
        if metrics:
            for k, v in summary_metrics.items():
                if k in mean_summary_metrics:
                    updated_summary_metrics.append((k, v))
            summary_metrics = dict(updated_summary_metrics)
            # summary_metrics = {k: summary_metrics[k] for k, v in summary_metrics.items() if any(selected_metric in k for selected_metric in metrics)}

        data.append(
            go.Bar(
                x=list(summary_metrics.keys()),
                y=list(summary_metrics.values()),
                name=title,
            )
        )

    fig = go.Figure(data=data)

    # Change the bar mode
    fig.update_layout(barmode="group", showlegend=True)
    fig.show()


def generate_uuid(length: int = 8) -> str:
    """Generate a uuid of a specified length (default=8)."""
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))

In [None]:
########################### From here, you can follow, once you have loaded the model ###################

## Load an evaluation dataset

In [None]:
from google.cloud import storage
import pandas as pd

# Configure your GCS bucket and file
bucket_name = ""  # Replace with your GCS bucket name
file_path = ""  # Replace with your file's path in the bucket

# Download file from GCS
client = storage.Client()
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(file_path)
blob.download_to_filename("")  # Save locally

# Load the dataset
data = pd.read_csv("")


In [None]:
# data check
data.head()

In [None]:
from vertexai import init
from vertexai.generative_models import GenerativeModel
import pandas as pd

# Initialize Vertex AI
PROJECT_ID = ""  # Replace with your Google Cloud project ID
LOCATION = "us-central1"  # Adjust based on your model location (e.g., "europe-west4")

init(project=PROJECT_ID, location=LOCATION)

#Print available methods
print(dir(model))

### Define a model

In [None]:
# Model to be evaluated
model = GenerativeModel(
    "gemini-1.5-pro",
    generation_config={"temperature": 0.6, "max_output_tokens": 50, "top_k": 1},
)

### **Classification using PHQ-4** ###

### **Gemini Pro 1.5, zero-shot, temp 0.6, BASE (without persona)** ###

In [None]:
import pandas as pd
import re
import time
import requests
from typing import Tuple, Dict

# Define the prompt components first
system_instruction = "none"
criteria = (
    "little interest (little interest or pleasure in doing things), "
    "hopeless (feeling down, depressed, or hopeless), "
    "nervous (feeling nervous, anxious or on edge), "
    "worrying (not being able to stop or control worrying)"
)
question = "What is the likelihood of little interest, hopeless, nervous, and worrying for each message?"
requirements = (
    "Please provide the likelihood for each criterion on a 4-point Likert scale "
    "(0=not likely, 1=unlikely, 2=likely, 3=very likely) and sum all the Likert scales for each message."
)

def classify_text_with_gemini(text) -> str:
    full_prompt = (
        f"{system_instruction}\n"
        f"Answer this question: {question} using {criteria} "
        f"and follow the requirements: {requirements}\n"
        f"Respond in the following format strictly, without adding any extra text:\n"
        f"1. Little interest: <0-3>\n"
        f"2. Hopeless: <0-3>\n"
        f"3. Nervous: <0-3>\n"
        f"4. Worrying: <0-3>\n"
        f"Sum of Likert scales: <sum>\n"
        f"Message: {text}"
    )

    try:
        response = model.generate_content(full_prompt)

        # Safety check: assuming response has 'candidates' and 'finish_reason'
        if not response.candidates or response.candidates[0].finish_reason == 'SAFETY':
            print(f"Response blocked by safety filters for text: {text}")
            return None  # Returning None to indicate a blocked response

        # Extract the generated text from the response object
        # Adjust the attribute access based on your actual response structure
        generated_text = response.candidates[0].text  # Example attribute
        return generated_text
    except Exception as e:
        print(f"Error calling the model: {e}")
        return None

def extract_ratings(response: str) -> Tuple[int, Dict[str, int]]:
    rating_dict = {
        "0": 0,
        "1": 1,
        "2": 2,
        "3": 3
    }

    patterns = {
        'little_interest': re.compile(r'little interest\s*:\s*(\d)', re.IGNORECASE),
        'hopeless': re.compile(r'hopeless\s*:\s*(\d)', re.IGNORECASE),
        'nervous': re.compile(r'nervous\s*:\s*(\d)', re.IGNORECASE),
        'worrying': re.compile(r'worrying\s*:\s*(\d)', re.IGNORECASE)
    }

    scores = {}
    total_score = 0

    for key, pattern in patterns.items():
        match = pattern.search(response)
        if match:
            rating = match.group(1).strip()
            if rating in rating_dict:
                score = rating_dict[rating]
                scores[key] = score
                total_score += score
                print(f"Matched {key}: {rating} -> {score}")  # Debugging
            else:
                scores[key] = None
                print(f"No valid rating for {key}: found {rating}, not in rating_dict")  # Debugging
        else:
            scores[key] = None
            print(f"No match found for {key} in the response")  # Debugging

    print(f"Total Score: {total_score}, Scores: {scores}")  # Additional Debugging
    return total_score, scores


sum_ratings = []
individual_ratings = {'little_interest': [], 'hopeless': [], 'nervous': [], 'worrying': []}
responses = []

for idx, msg in data['msg'].items():
    response = classify_text_with_gemini(msg)
    print(f"Text: {msg}\nResponse: {response}\n")  # Debugging

    if response:
        total_score, scores = extract_ratings(response)
    else:
        total_score = None
        scores = {'little_interest': None, 'hopeless': None, 'nervous': None, 'worrying': None}
        print("Empty response received from the model.")  # Debugging

    sum_ratings.append(total_score)
    for key in individual_ratings.keys():
        individual_ratings[key].append(scores[key])
    responses.append(response)
    time.sleep(1)

# Append the ratings and full responses to your dataframe
data['rating'] = sum_ratings
for key, values in individual_ratings.items():
    data[key] = values
data['model_response'] = responses

# Save the result
data.to_csv("", index=False)

print(data)

In [None]:
saved_data = pd.read_csv ("")

In [None]:
saved_data.head()

In [None]:
from google.cloud import storage

def upload_to_bucket(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""

    # Initialize a storage client
    storage_client = storage.Client()

    # Get the bucket
    bucket = storage_client.bucket(bucket_name)

    # Create a blob object from the bucket
    blob = bucket.blob(destination_blob_name)

    # Upload the file
    blob.upload_from_filename(source_file_name)

    print(f"File {source_file_name} uploaded to {destination_blob_name}.")


# Define your bucket name
bucket_name = ""

# File paths
source_file_name = ""
destination_blob_name = ""

# Upload the file
upload_to_bucket(bucket_name, source_file_name, destination_blob_name)

In [None]:
### classification

In [None]:
import pandas as pd

# Load the data from the CSV file
df = pd.read_csv("")

# Add a new 'classification' column based on the 'rating' column
df['classification'] = df['rating'].apply(lambda x: 1 if x >= 6 else 0)

# Save the updated dataframe back to a CSV file
df.to_csv("", index=False)

# Print the first few rows to check
print(df.head())

In [None]:
# performance with 95% CI

In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score

# Load the data from the CSV file (assuming the file with the 'classification' column was saved earlier)
df = pd.read_csv("")

# Extract the 'label' and 'classification' columns
y_true = df['label']
y_pred = df['classification']


def bootstrap_confidence_interval(y_true, y_pred, metric_func, n_bootstraps=1000, ci=95, **kwargs):
    """
    Calculates the confidence interval for a given metric using bootstrapping.

    Parameters:
        y_true (pd.Series): True labels.
        y_pred (pd.Series): Predicted labels.
        metric_func (function): Scikit-learn metric function to calculate (e.g., f1_score).
        n_bootstraps (int): Number of bootstrap samples.
        ci (float): Confidence level (e.g., 95 for 95% CI).
        **kwargs: Additional keyword arguments for the metric function.

    Returns:
        tuple: Lower and upper bounds of the confidence interval.
    """
    boot_scores = []
    n = len(y_true)

    for _ in range(n_bootstraps):
        # Sample with replacement
        indices = np.random.randint(0, n, n)
        y_true_boot = y_true.iloc[indices]
        y_pred_boot = y_pred.iloc[indices]

        # Handle cases where metric might fail (e.g., no positive predictions)
        try:
            score = metric_func(y_true_boot, y_pred_boot, **kwargs)
            boot_scores.append(score)
        except ValueError:
            continue  # Skip this bootstrap sample if metric calculation fails

    # Calculate percentiles for the confidence interval
    lower = np.percentile(boot_scores, (100 - ci) / 2)
    upper = np.percentile(boot_scores, 100 - (100 - ci) / 2)
    return lower, upper

# Calculate the point estimates for the metrics
f1 = f1_score(y_true, y_pred, average='binary')
precision = precision_score(y_true, y_pred, average='binary')
recall = recall_score(y_true, y_pred, average='binary')
accuracy = accuracy_score(y_true, y_pred)

# Calculate the 95% confidence intervals using bootstrapping
f1_ci = bootstrap_confidence_interval(y_true, y_pred, f1_score, average='binary')
precision_ci = bootstrap_confidence_interval(y_true, y_pred, precision_score, average='binary')
recall_ci = bootstrap_confidence_interval(y_true, y_pred, recall_score, average='binary')
accuracy_ci = bootstrap_confidence_interval(y_true, y_pred, accuracy_score)

# Print the results
print(f'F1 Score: {f1:.4f} (95% CI: {f1_ci[0]:.4f} - {f1_ci[1]:.4f})')
print(f'Precision: {precision:.4f} (95% CI: {precision_ci[0]:.4f} - {precision_ci[1]:.4f})')
print(f'Recall: {recall:.4f} (95% CI: {recall_ci[0]:.4f} - {recall_ci[1]:.4f})')
print(f'Accuracy: {accuracy:.4f} (95% CI: {accuracy_ci[0]:.4f} - {accuracy_ci[1]:.4f})')

### **Gemini Pro 1.5, temp 0.6, zero-shot, systemic persona**

In [None]:
# Model to be evaluated
model = GenerativeModel(
    "gemini-1.5-pro",
    generation_config={"temperature": 0.6, "max_output_tokens": 50, "top_k": 1},
)

In [None]:
#load the dataset

from google.cloud import storage
import pandas as pd

# Configure your GCS bucket and file
bucket_name = ""  # Replace with your GCS bucket name
file_path = ""  # Replace with your file's path in the bucket

# Download file from GCS
client = storage.Client()
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(file_path)
blob.download_to_filename("")  # Save locally

# Load the dataset
data = pd.read_csv("")


In [None]:
import pandas as pd
import re
import time
import requests
from typing import Tuple, Dict

# Define the prompt components first
system_instruction = (
    "As Dr. Gemini, a professional psychiatrist in the hospital,"
    "one of your roles is responding to the patients messages through the patient portal."
    "Patients ask health questions and report some symptoms or lab results."
    "I will ask for you to evaluate the patients messages to screen those who would need further assessment for depression or anxiety."
    "I myself am a psychiatrist in the hospital. I will use your evaluation to better detect"
    "patients depressive and anxious symptoms timely through the patient portal messages."
    "For each question, you should take time to think deeply and step-by-step to be sure to offer"
    "the right answer in a professional manner. To begin, please confirm that you understand your role."
)

criteria = (
    "little interest (little interest or pleasure in doing things), "
    "hopeless (feeling down, depressed, or hopeless), "
    "nervous (feeling nervous, anxious or on edge), "
    "worrying (not being able to stop or control worrying)"
)
question = "What is the likelihood of little interest, hopeless, nervous, and worrying for each message?"
requirements = (
    "Please provide the likelihood for each criterion on a 4-point Likert scale "
    "(0=not likely, 1=unlikely, 2=likely, 3=very likely) and sum all the Likert scales for each message."
)

def classify_text_with_gemini(text) -> str:
    full_prompt = (
        f"{system_instruction}\n"
        f"Answer this question: {question} using {criteria} "
        f"and follow the requirements: {requirements}\n"
        f"Respond in the following format strictly, without adding any extra text:\n"
        f"1. Little interest: <0-3>\n"
        f"2. Hopeless: <0-3>\n"
        f"3. Nervous: <0-3>\n"
        f"4. Worrying: <0-3>\n"
        f"Sum of Likert scales: <sum>\n"
        f"Message: {text}"
    )

    try:
        response = model.generate_content(full_prompt)

        # Safety check: assuming response has 'candidates' and 'finish_reason'
        if not response.candidates or response.candidates[0].finish_reason == 'SAFETY':
            print(f"Response blocked by safety filters for text: {text}")
            return None  # Returning None to indicate a blocked response

        # Extract the generated text from the response object
        # Adjust the attribute access based on your actual response structure
        generated_text = response.candidates[0].text  # Example attribute
        return generated_text
    except Exception as e:
        print(f"Error calling the model: {e}")
        return None

def extract_ratings(response: str) -> Tuple[int, Dict[str, int]]:
    rating_dict = {
        "0": 0,
        "1": 1,
        "2": 2,
        "3": 3
    }

    patterns = {
        'little_interest': re.compile(r'little interest\s*:\s*(\d)', re.IGNORECASE),
        'hopeless': re.compile(r'hopeless\s*:\s*(\d)', re.IGNORECASE),
        'nervous': re.compile(r'nervous\s*:\s*(\d)', re.IGNORECASE),
        'worrying': re.compile(r'worrying\s*:\s*(\d)', re.IGNORECASE)
    }

    scores = {}
    total_score = 0

    for key, pattern in patterns.items():
        match = pattern.search(response)
        if match:
            rating = match.group(1).strip()
            if rating in rating_dict:
                score = rating_dict[rating]
                scores[key] = score
                total_score += score
                print(f"Matched {key}: {rating} -> {score}")  # Debugging
            else:
                scores[key] = None
                print(f"No valid rating for {key}: found {rating}, not in rating_dict")  # Debugging
        else:
            scores[key] = None
            print(f"No match found for {key} in the response")  # Debugging

    print(f"Total Score: {total_score}, Scores: {scores}")  # Additional Debugging
    return total_score, scores


sum_ratings = []
individual_ratings = {'little_interest': [], 'hopeless': [], 'nervous': [], 'worrying': []}
responses = []

for idx, msg in data['msg'].items():
    response = classify_text_with_gemini(msg)
    print(f"Text: {msg}\nResponse: {response}\n")  # Debugging

    if response:
        total_score, scores = extract_ratings(response)
    else:
        total_score = None
        scores = {'little_interest': None, 'hopeless': None, 'nervous': None, 'worrying': None}
        print("Empty response received from the model.")  # Debugging

    sum_ratings.append(total_score)
    for key in individual_ratings.keys():
        individual_ratings[key].append(scores[key])
    responses.append(response)
    time.sleep(1)

# Append the ratings and full responses to dataframe
data['rating'] = sum_ratings
for key, values in individual_ratings.items():
    data[key] = values
data['model_response'] = responses

# Save the result
data.to_csv("", index=False)

print(data)

In [None]:
saved_data = pd.read_csv("")

In [None]:
#data check
saved_data.head()

In [None]:
from google.cloud import storage

def upload_to_bucket(bucket_name, source_file_name, destination_blob_name):
    """Uploads a file to the bucket."""

    # Initialize a storage client
    storage_client = storage.Client()

    # Get the bucket
    bucket = storage_client.bucket(bucket_name)

    # Create a blob object from the bucket
    blob = bucket.blob(destination_blob_name)

    # Upload the file
    blob.upload_from_filename(source_file_name)

    print(f"File {source_file_name} uploaded to {destination_blob_name}.")


# Define your bucket name
bucket_name = ""

# File paths
source_file_name = ""
destination_blob_name = ""  # Change the path if needed

# Upload the file
upload_to_bucket(bucket_name, source_file_name, destination_blob_name)

In [None]:
import pandas as pd

# Load the data from the CSV file
df = pd.read_csv("")

# Add a new 'classification' column based on the 'rating' column
df['classification'] = df['rating'].apply(lambda x: 1 if x >= 6 else 0)

# Save the updated dataframe back to a CSV file
df.to_csv("", index=False)

# Print the first few rows to check
print(df.head())

In [None]:
# performance WITH 95% CI

In [None]:
import pandas as pd
import numpy as np
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score

# Load the data from the CSV file (assuming the file with the 'classification' column was saved earlier)
df = pd.read_csv("")

# Extract the 'label' and 'classification' columns
y_true = df['label']
y_pred = df['classification']


def bootstrap_confidence_interval(y_true, y_pred, metric_func, n_bootstraps=1000, ci=95, **kwargs):
    """
    Calculates the confidence interval for a given metric using bootstrapping.

    Parameters:
        y_true (pd.Series): True labels.
        y_pred (pd.Series): Predicted labels.
        metric_func (function): Scikit-learn metric function to calculate (e.g., f1_score).
        n_bootstraps (int): Number of bootstrap samples.
        ci (float): Confidence level (e.g., 95 for 95% CI).
        **kwargs: Additional keyword arguments for the metric function.

    Returns:
        tuple: Lower and upper bounds of the confidence interval.
    """
    boot_scores = []
    n = len(y_true)

    for _ in range(n_bootstraps):
        # Sample with replacement
        indices = np.random.randint(0, n, n)
        y_true_boot = y_true.iloc[indices]
        y_pred_boot = y_pred.iloc[indices]

        # Handle cases where metric might fail (e.g., no positive predictions)
        try:
            score = metric_func(y_true_boot, y_pred_boot, **kwargs)
            boot_scores.append(score)
        except ValueError:
            continue  # Skip this bootstrap sample if metric calculation fails

    # Calculate percentiles for the confidence interval
    lower = np.percentile(boot_scores, (100 - ci) / 2)
    upper = np.percentile(boot_scores, 100 - (100 - ci) / 2)
    return lower, upper

# Calculate the point estimates for the metrics
f1 = f1_score(y_true, y_pred, average='binary')
precision = precision_score(y_true, y_pred, average='binary')
recall = recall_score(y_true, y_pred, average='binary')
accuracy = accuracy_score(y_true, y_pred)

# Calculate the 95% confidence intervals using bootstrapping
f1_ci = bootstrap_confidence_interval(y_true, y_pred, f1_score, average='binary')
precision_ci = bootstrap_confidence_interval(y_true, y_pred, precision_score, average='binary')
recall_ci = bootstrap_confidence_interval(y_true, y_pred, recall_score, average='binary')
accuracy_ci = bootstrap_confidence_interval(y_true, y_pred, accuracy_score)

# Print the results
print(f'F1 Score: {f1:.4f} (95% CI: {f1_ci[0]:.4f} - {f1_ci[1]:.4f})')
print(f'Precision: {precision:.4f} (95% CI: {precision_ci[0]:.4f} - {precision_ci[1]:.4f})')
print(f'Recall: {recall:.4f} (95% CI: {recall_ci[0]:.4f} - {recall_ci[1]:.4f})')
print(f'Accuracy: {accuracy:.4f} (95% CI: {accuracy_ci[0]:.4f} - {accuracy_ci[1]:.4f})')