#1 General Steps

In [None]:
#1: Install Google Cloud AI and Storage Libraries
!pip install google-cloud-aiplatform google-cloud-storage google-cloud-datalabeling



In [None]:
#2: Authenticate with Google Cloud
from google.colab import auth
auth.authenticate_user()

# Now you are authenticated and ready to interact with your Google Cloud project


In [None]:
#3: Import the Necessary Libraries and Initialize AI Platform
from google.cloud import aiplatform

# Initialize the AI Platform
aiplatform.init(project='book-examples-2024', location='us-central1')

In [None]:
#4: Create a new Bucket
from google.cloud import storage

# Specify your Google Cloud project ID
project_id = 'book-examples-2024'  # Your actual project ID

# Initialize the Google Cloud Storage client with your project
client = storage.Client(project=project_id)

# Define a globally unique bucket name
bucket_name = 'my-image-data-bucket-unique'  # Change this to something unique

# Create the new bucket with a specified location
try:
    bucket = client.create_bucket(bucket_name, location='us-central1')  # Specify the location
    print(f'Bucket {bucket.name} created successfully.')
except Exception as e:
    print(f'Error creating bucket: {e}')



Bucket my-image-data-bucket-unique created successfully.


#Example 7.2.2 Execution
#1. Collecting Data with Google Cloud Data Labeling Service

In [None]:
#1: Create a Dataset for Image Classification
# Create an ImageDataset for image classification
dataset = aiplatform.ImageDataset.create(
    display_name='HITL-dataset',
    gcs_source='gs://my-image-data-bucket-unique/images',  # Update with the correct path to your images
    import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification
)



INFO:google.cloud.aiplatform.datasets.dataset:Creating ImageDataset
INFO:google.cloud.aiplatform.datasets.dataset:Create ImageDataset backing LRO: projects/518684982184/locations/us-central1/datasets/7436657395381567488/operations/6891515304082407424
INFO:google.cloud.aiplatform.datasets.dataset:ImageDataset created. Resource name: projects/518684982184/locations/us-central1/datasets/7436657395381567488
INFO:google.cloud.aiplatform.datasets.dataset:To use this ImageDataset in another session:
INFO:google.cloud.aiplatform.datasets.dataset:ds = aiplatform.ImageDataset('projects/518684982184/locations/us-central1/datasets/7436657395381567488')
INFO:google.cloud.aiplatform.datasets.dataset:Importing ImageDataset data: projects/518684982184/locations/us-central1/datasets/7436657395381567488
INFO:google.cloud.aiplatform.datasets.dataset:Import ImageDataset data backing LRO: projects/518684982184/locations/us-central1/datasets/7436657395381567488/operations/3674256320279609344
INFO:google.clo

In [None]:
#2 Create a Labeling Job
from google.cloud import datalabeling_v1beta1 as datalabeling

# Initialize the Data Labeling API client
client = datalabeling.DataLabelingServiceClient()

# Define the project path and GCS path for your images and instructions
project_id = 'book-examples-2024'
bucket_name = 'my-image-data-bucket-unique'

# Full project path
project_path = f'projects/book-examples-2024'

# Step 1: Create an Annotation Spec Set (cats and dogs labels)
annotation_spec_set = datalabeling.types.AnnotationSpecSet(
    display_name='Animal Classification',
    description='Classify images as either cats or dogs',
    annotation_specs=[
        datalabeling.types.AnnotationSpec(display_name='cats'),
        datalabeling.types.AnnotationSpec(display_name='dogs'),
    ]
)

# Step 2: Create the Annotation Spec Set in Data Labeling Service
annotation_spec_set_request = datalabeling.CreateAnnotationSpecSetRequest(
    parent=project_path,
    annotation_spec_set=annotation_spec_set
)

created_annotation_spec_set = client.create_annotation_spec_set(request=annotation_spec_set_request)

# Step 3: Set up Human Annotation Instructions (ensure it's a valid PDF in GCS)
instruction = datalabeling.types.Instruction(
    display_name='Animal Classification Instructions',
    description='Instructions for labeling images as cats or dogs',
    pdf_instruction=datalabeling.types.PdfInstruction(
        gcs_file_uri=f'gs://{bucket_name}/labeling-instructions.pdf'  # Ensure the PDF exists in this location
    )
)

instruction_request = datalabeling.CreateInstructionRequest(
    parent=project_path,
    instruction=instruction
)

created_instruction = client.create_instruction(request=instruction_request)

# Step 4: Define the GcsSource for the dataset
gcs_source = datalabeling.types.GcsSource(
    input_uri=f'gs://{bucket_name}/images/'  # Ensure the images exist in this location
)

# Step 5: Define Input Configuration for the dataset
input_config = datalabeling.types.InputConfig(
    gcs_source=gcs_source
)

# Step 6: Create the Dataset
dataset_request = datalabeling.CreateDatasetRequest(
    parent=project_path,
    dataset=datalabeling.types.Dataset(
        display_name='Animal Image Dataset',
        description='Dataset for labeling images of cats and dogs',
        input_configs=[input_config]
    )
)

created_dataset = client.create_dataset(request=dataset_request)

# Step 7: Submit the Labeling Task
labeling_task_request = datalabeling.LabelImageRequest(
    parent=project_path,
    basic_config=datalabeling.types.HumanAnnotationConfig(
        instruction=created_instruction.name,
        annotated_dataset_display_name='HITL-Labeling-Job',
        language_code='en'
    ),
    input_configs=[input_config]
)

client.label_image(request=labeling_task_request)

print("Labeling job created successfully.")


#2. Active Learning Implementation:

In [None]:
# Step 1: Install the Google Cloud SDK if necessary (Colab environment specific)
!pip install google-cloud-aiplatform

# Step 2: Import necessary libraries
from google.cloud import aiplatform

# Step 3: Initialize the AI Platform
aiplatform.init(project='book-examples-2024', location='us-central1')

# Step 4: Define an active learning strategy
def active_learning_strategy(predictions, uncertainty_threshold=0.5):
    """
    Identify samples with prediction uncertainty above a threshold.
    :param predictions: List of predictions with uncertainty values
    :param uncertainty_threshold: The uncertainty threshold for selecting samples
    :return: List of indices of uncertain samples
    """
    uncertain_samples = [i for i, p in enumerate(predictions) if p['uncertainty'] > uncertainty_threshold]
    return uncertain_samples

# Step 5: Load model and dataset
model = aiplatform.Model('projects/book-examples-2024/locations/us-central1/models/your-model-id')
dataset = aiplatform.ImageDataset('projects/book-examples-2024/locations/us-central1/datasets/your-dataset-id')

# Step 6: Predict on unlabeled data
# Use batch_predict for generating predictions from unlabeled dataset (Colab execution may need smaller dataset for testing)
unlabeled_predictions = model.batch_predict(
    gcs_source='gs://my-image-data-bucket-unique/unlabeled-images/',
    gcs_destination_prefix='gs://my-image-data-bucket-unique/batch-predictions-output/'
)

# Step 7: Use active learning strategy to select uncertain samples
# Here we simulate predictions. In practice, you'd obtain predictions from the batch_predict output.
# This is an example simulation:
simulated_predictions = [{'uncertainty': 0.6}, {'uncertainty': 0.3}, {'uncertainty': 0.8}]  # Simulated data
uncertain_samples = active_learning_strategy(simulated_predictions)

# Step 8: Submit uncertain samples for human labeling
labeling_job = aiplatform.LabelingJob.create(
    display_name='Active-Learning-Labeling-Job',
    dataset=dataset,
    labeler_count=5,  # Number of labelers assigned
    instruction_uri='gs://my-image-data-bucket-unique/labeling-instructions.pdf',
    sample_ids=uncertain_samples  # IDs of uncertain samples to be labeled
)

print("Labeling job submitted for uncertain samples.")


#3. Expert Annotation:


In [None]:
# Step 1: Install Google Cloud SDK if necessary (Colab specific)
!pip install google-cloud-aiplatform

# Step 2: Import necessary libraries
from google.cloud import aiplatform

# Step 3: Initialize the AI Platform
aiplatform.init(project='book-examples-2024', location='us-central1')

# Step 4: Create a dataset for expert annotation
# Ensure your GCS bucket path is correct and images are accessible
expert_dataset = aiplatform.ImageDataset.create(
    display_name='Expert-Annotated-Dataset',
    gcs_source='gs://my-image-data-bucket-unique/path-to-images',  # Update with the correct GCS path
    import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
)

# Step 5: Assign expert annotators to the labeling task
# Provide the correct GCS path for the instruction PDF
expert_labeling_job = aiplatform.LabelingJob.create(
    display_name='Expert-Labeling-Job',
    dataset=expert_dataset,
    labeler_count=3,  # Number of expert labelers
    instruction_uri='gs://my-image-data-bucket-unique/expert-labeling-instructions.pdf',  # Make sure the instructions PDF is accessible in GCS
)

print("Expert labeling job created successfully.")


#4. Refining Labels with Google Cloud Functions:

In [None]:
# Step 1: Install the necessary libraries
!pip install google-cloud-storage

# Step 2: Import necessary libraries
from google.cloud import storage

# Step 3: Define the function to refine labels based on human feedback
def refine_labels(event, context):
    """
    Function to refine labels based on human feedback from a Cloud Storage file.
    :param event: Event payload from Cloud Storage (file creation trigger)
    :param context: Metadata for the event
    """
    # Initialize the Google Cloud Storage client
    client = storage.Client()

    # Access the bucket where the human feedback is stored
    bucket = client.bucket('my-image-data-bucket-unique')  # Replace with your bucket name

    # Access the blob (file) containing human feedback (e.g., JSON format)
    blob = bucket.blob('path-to-human-feedback.json')  # Replace with the correct path to your JSON file

    # Download the feedback data
    feedback_data = blob.download_as_text()

    # Process and refine labels (You would define `process_feedback`)
    refined_labels = process_feedback(feedback_data)  # Define this function to handle the actual feedback

    # Save the refined labels back to Cloud Storage
    refined_blob = bucket.blob('path-to-refined-labels.json')  # Path to save refined labels
    refined_blob.upload_from_string(refined_labels)

    print("Labels refined and saved successfully.")

# Step 4: Function to process feedback (Implement based on your specific needs)
def process_feedback(feedback_data):
    """
    Example function to process feedback data and return refined labels.
    :param feedback_data: The feedback data in JSON or another format
    :return: Refined labels as a JSON string
    """
    # Example processing logic (you can define your own)
    refined_data = feedback_data  # Placeholder for real logic
    return refined_data

# Step 5: You can test the function locally in Colab by calling `refine_labels` (with simulated events)
# Example of testing the function locally with event and context simulation
# Uncomment the next two lines to test the function locally in Colab
# event = {'name': 'path-to-human-feedback.json'}  # Simulated event
# refine_labels(event, None)


#Example 7.2.3 Augmentation

#1. Data Augmentation Based on Human Feedback:

In [None]:
# Step 1: Install the necessary libraries (for Colab)
!pip install google-cloud-aiplatform

# Step 2: Import necessary libraries
from google.cloud import aiplatform

# Step 3: Initialize the AI Platform
aiplatform.init(project='book-examples-2024', location='us-central1')

# Step 4: Define a function to generate augmented data based on feedback
def augment_data(original_data, feedback):
    """
    Function to augment original data with human feedback variations.
    :param original_data: Original dataset
    :param feedback: Human feedback (e.g., additional annotations or corrections)
    :return: Augmented data
    """
    augmented_data = original_data + feedback_variations(feedback)  # Define feedback_variations based on your use case
    return augmented_data

# Step 5: Load the original dataset from Google Cloud AI Platform
dataset = aiplatform.ImageDataset('projects/book-examples-2024/locations/us-central1/datasets/your-dataset-id')

# Step 6: Simulate human feedback (in a real scenario, you'll have feedback from labeling tasks)
def get_human_feedback():
    """Simulate or fetch human feedback"""
    # Simulate feedback
    feedback = [{'label': 'cat', 'correction': 'dog'}, {'label': 'dog', 'correction': 'cat'}]
    return feedback

# Step 7: Generate augmented data using the original dataset and human feedback
feedback = get_human_feedback()  # Replace with actual feedback
augmented_data = augment_data(dataset, feedback)

# Step 8: Save the augmented data to a new dataset in Google Cloud AI Platform
augmented_dataset = aiplatform.ImageDataset.create(
    display_name='Augmented-Dataset',
    gcs_source='gs://my-image-data-bucket-unique/path-to-augmented-images',  # Update with the correct GCS path
    import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
)

print("Augmented dataset created successfully.")

#2. Continuous Improvement and Retraining:

In [None]:
# Step 1: Install the necessary libraries (for Colab)
!pip install google-cloud-aiplatform

# Step 2: Import necessary libraries
from google.cloud import aiplatform

# Step 3: Initialize the AI Platform
aiplatform.init(project='book-examples-2024', location='us-central1')

# Step 4: Define the model training function with augmented dataset
def retrain_model(augmented_dataset):
    """
    Function to retrain a model with an augmented dataset.
    :param augmented_dataset: The augmented dataset created from previous steps
    :return: Retrained model
    """
    model = aiplatform.CustomTrainingJob(
        display_name='Retrained-Model',
        script_path='train_script.py',  # Update this to the path where your training script is stored
        model_display_name='Augmented-Model',
        container_uri='us-docker.pkg.dev/vertex-ai/training/tf-gpu.2-3:latest'  # TensorFlow container with GPU
    )

    # Launch training with the augmented dataset
    model.run(
        dataset=augmented_dataset,  # Using the augmented dataset
        model_display_name='Augmented-Model',
        training_pipeline_display_name='Augmented-Training-Pipeline',
        replica_count=1,
        machine_type='n1-standard-4',  # Machine type for training
        accelerator_type='NVIDIA_TESLA_T4',  # GPU type
        accelerator_count=1  # Number of GPUs
    )

    print("Model retraining job submitted.")

# Step 5: Ensure that the augmented dataset is loaded or created
# Assuming that 'augmented_dataset' is already created in a previous step
augmented_dataset = aiplatform.ImageDataset(
    'projects/book-examples-2024/locations/us-central1/datasets/your-augmented-dataset-id'
)

# Step 6: Retrain the model with the augmented dataset
retrain_model(augmented_dataset)


#7.3 Leveraging GCP for Human Annotations

#1: Preparing Your Dataset

In [None]:
# Execute this on the command prompt for uploading a dataset to Google Cloud Storage
gsutil cp -r /local-path-to-your-dataset gs://your-bucket/path-to-dataset


In [None]:
#2: Creating an Annotation Dataset
# Step 1: Install the necessary libraries (for Colab)
!pip install google-cloud-aiplatform

# Step 2: Import necessary libraries
from google.cloud import aiplatform

# Step 3: Initialize the AI Platform
aiplatform.init(project='book-examples-2024', location='us-central1')

# Step 4: Create an image dataset for annotation from a GCS source
dataset = aiplatform.ImageDataset.create(
    display_name='Image-Annotation-Dataset',
    gcs_source=['gs://my-image-data-bucket-unique/path-to-dataset'],  # Update with your bucket and dataset path
    import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
)

print("Image dataset created successfully.")


In [None]:
#3: Setting Up a Labeling Task
# Step 1: Install the necessary libraries (for Colab)
!pip install google-cloud-aiplatform

# Step 2: Import necessary libraries
from google.cloud import aiplatform

# Step 3: Initialize the AI Platform
aiplatform.init(project='book-examples-2024', location='us-central1')

# Step 4: Set up a labeling job for the dataset
labeling_job = aiplatform.DataLabelingJob.create(
    display_name='Image-Labeling-Job',
    dataset=dataset,  # Reference to the dataset created earlier
    labeler_count=5,  # Number of human labelers
    instruction_uri='gs://my-image-data-bucket-unique/labeling-instructions.pdf',  # Path to the instructions PDF in GCS
    annotation_specs=['Cat', 'Dog', 'Bird'],  # Example labels to be used in the annotation
)

print("Labeling job created successfully.")


In [None]:
#4: Monitoring the Labeling Process
# Step 1: Install the necessary libraries (for Colab)
!pip install google-cloud-aiplatform

# Step 2: Import necessary libraries
from google.cloud import aiplatform

# Step 3: Initialize the AI Platform
aiplatform.init(project='book-examples-2024', location='us-central1')

# Step 4: Set up a labeling job for the dataset
labeling_job = aiplatform.DataLabelingJob.create(
    display_name='Image-Labeling-Job',
    dataset=dataset,  # Reference to the dataset created earlier
    labeler_count=5,  # Number of human labelers
    instruction_uri='gs://my-image-data-bucket-unique/labeling-instructions.pdf',  # Path to the instructions PDF in GCS
    annotation_specs=['Cat', 'Dog', 'Bird'],  # Example labels to be used in the annotation
)

# Step 5: Monitor the progress of the labeling job
labeling_job.wait()  # This will block until the job is complete

# Step 6: Retrieve the status and details of the labeling job
status = labeling_job.state
details = labeling_job.label_stats

print(f"Labeling Job Status: {status}")
print(f"Labeling Job Details: {details}")


In [None]:
#5: Reviewing and Refining Annotations
# Download and review the labeled dataset
# This will export the labeled dataset in JSONL format to a GCS bucket
labeled_dataset = dataset.export_data(
    export_format='jsonl',  # Exporting data in JSON Lines format
    gcs_destination='gs://my-image-data-bucket-unique/path-to-labeled-dataset'  # GCS bucket destination for the export
)

# Wait for the export to complete
labeled_dataset.wait()

print("Labeled dataset export complete. Review the dataset in the GCS bucket.")


#7.4 Training a Custom Reward Model and Preparing Data

In [None]:
#Splitting Data:
# Step 1: Install necessary libraries (for Colab)
!pip install pandas scikit-learn

# Step 2: Import necessary libraries
import pandas as pd
from sklearn.model_selection import train_test_split

# Step 3: Load your dataset into a pandas DataFrame
# Assuming your dataset is already in a pandas DataFrame 'df'
# Example:
# df = pd.read_csv('/path/to/your/data.csv')

# Step 4: Split the dataset into training, validation, and test sets
train_data, test_data = train_test_split(df, test_size=0.2, random_state=42)  # 20% test set
train_data, val_data = train_test_split(train_data, test_size=0.2, random_state=42)  # 20% validation from remaining 80%

# Step 5: Print the sizes of the splits
print(f"Training data size: {len(train_data)}")
print(f"Validation data size: {len(val_data)}")
print(f"Test data size: {len(test_data)}")

In [None]:
#Model Training:
# Step 1: Install necessary libraries (for Colab)
!pip install pandas scikit-learn

# Step 2: Import necessary libraries
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

# Step 3: Load your dataset into a pandas DataFrame
# Assuming your dataset is already in a pandas DataFrame 'df'
# Example:
# df = pd.read_csv('/path/to/your/data.csv')

# Step 4: Train a Random Forest model as the reward model
# Drop the 'reward' column as it's the target variable
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(train_data.drop(columns=['reward']), train_data['reward'])

# Step 5: Validate the model
val_predictions = model.predict(val_data.drop(columns=['reward']))
validation_score = mean_squared_error(val_data['reward'], val_predictions)

# Step 6: Print the validation score (MSE)
print(f"Validation MSE: {validation_score}")


In [None]:
#Deployment and Integration:
# Step 1: Install necessary libraries (if not already installed)
!pip install joblib

# Step 2: Import the joblib library
import joblib

# Step 3: Save the trained model for later use in the RL framework
# 'model' is the trained RandomForestRegressor from the previous steps
joblib.dump(model, 'custom_reward_model.pkl')

# Step 4: Save the model to Google Drive (optional) or Google Cloud Storage (GCS)
# Example for saving to Google Drive
from google.colab import drive
drive.mount('/content/drive')
joblib.dump(model, '/content/drive/MyDrive/custom_reward_model.pkl')

print("Model saved successfully.")


#7.5 Utilizing Reward Models, including Existing Ones

In [None]:
#Integration and Testing:
# Step 1: Assuming the RL agent and reward model are already set up
# Example of an RL agent integrating the fine-tuned reward model
# Pre-trained model is assumed to be loaded from previous steps

# Load the pre-trained reward model from a file
import joblib
pretrained_model = joblib.load('custom_reward_model.pkl')  # Adjust path if necessary

# Step 2: Train the RL agent using the reward model
# Replace 'rl_agent' with your actual RL agent instance
rl_agent.train_with_reward_model(pretrained_model)

# Step 3: Monitor the agent's performance to ensure reward signals lead to desired behavior
# This would vary depending on your RL framework; the following is an example placeholder
performance_metrics = rl_agent.monitor_performance()
print(f"Agent's performance metrics: {performance_metrics}")

# Save or log the agent's performance
# Example of logging or storing the performance metrics
with open('agent_performance_log.txt', 'w') as f:
    f.write(str(performance_metrics))

print("RL agent's performance monitoring complete.")


#7.6 Implementing Reinforcement Learning with Proximal Policy Optimization

In [None]:
#Setting Up the Environment
# Step 1: Install necessary libraries (for Colab)
!pip install gym

# Step 2: Import the gym library
import gym

# Step 3: Create the CartPole environment
env = gym.make('CartPole-v1')

# Step 4: Reset the environment to its initial state
state = env.reset()

# Step 5: Print the state space and action space
print(f"State space: {env.observation_space}")
print(f"Action space: {env.action_space}")


In [None]:
#Defining the PPO Agent
# Step 1: Install PyTorch (for Colab)
!pip install torch

# Step 2: Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim

# Step 3: Define the policy (actor) network
class PolicyNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)  # First fully connected layer
        self.fc2 = nn.Linear(128, 128)        # Second fully connected layer
        self.fc3 = nn.Linear(128, output_dim) # Output layer
        self.softmax = nn.Softmax(dim=-1)     # Softmax activation for probabilities

    def forward(self, x):
        x = torch.relu(self.fc1(x))           # ReLU activation for first layer
        x = torch.relu(self.fc2(x))           # ReLU activation for second layer
        x = self.fc3(x)                       # No activation before softmax
        return self.softmax(x)                # Softmax for output

# Step 4: Define the value (critic) network
class ValueNetwork(nn.Module):
    def __init__(self, input_dim):
        super(ValueNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)  # First fully connected layer
        self.fc2 = nn.Linear(128, 128)        # Second fully connected layer
        self.fc3 = nn.Linear(128, 1)          # Output layer for value prediction

    def forward(self, x):
        x = torch.relu(self.fc1(x))           # ReLU activation for first layer
        x = torch.relu(self.fc2(x))           # ReLU activation for second layer
        return self.fc3(x)                    # Output value prediction (no activation)

# Step 5: Initialize the environment
import gym
env = gym.make('CartPole-v1')  # Create CartPole environment

# Step 6: Initialize the policy and value networks
policy_net = PolicyNetwork(input_dim=env.observation_space.shape[0], output_dim=env.action_space.n)
value_net = ValueNetwork(input_dim=env.observation_space.shape[0])

# Step 7: Define optimizers for the networks
policy_optimizer = optim.Adam(policy_net.parameters(), lr=3e-4)
value_optimizer = optim.Adam(value_net.parameters(), lr=3e-4)

# Step 8: Verify initialization
print(f"Policy Network: {policy_net}")
print(f"Value Network: {value_net}")



In [None]:
#Implementing the PPO Update Function
import torch

# Define PPO update function
def ppo_update(policy_net, value_net, policy_optimizer, value_optimizer, states, actions, rewards, old_log_probs, advantages, clip_param=0.2):
    """
    PPO Update Function

    Args:
    - policy_net: The policy network (actor).
    - value_net: The value network (critic).
    - policy_optimizer: Optimizer for the policy network.
    - value_optimizer: Optimizer for the value network.
    - states: The states from the environment.
    - actions: The actions taken by the agent.
    - rewards: The observed rewards.
    - old_log_probs: The log probabilities from the old policy.
    - advantages: The advantage estimates.
    - clip_param: The PPO clipping parameter.

    Returns:
    None (performs in-place updates to the policy and value networks).
    """
    # Calculate the log probabilities of the actions with the current policy
    log_probs = torch.log(policy_net(states).gather(1, actions.unsqueeze(1)).squeeze())

    # Calculate the ratio of new probabilities to old probabilities
    ratios = torch.exp(log_probs - old_log_probs)

    # Calculate surrogate objective
    surr1 = ratios * advantages
    surr2 = torch.clamp(ratios, 1.0 - clip_param, 1.0 + clip_param) * advantages
    policy_loss = -torch.min(surr1, surr2).mean()

    # Calculate value loss
    values = value_net(states).squeeze()
    value_loss = (rewards - values).pow(2).mean()

    # Update policy network
    policy_optimizer.zero_grad()
    policy_loss.backward()
    policy_optimizer.step()

    # Update value network
    value_optimizer.zero_grad()
    value_loss.backward()
    value_optimizer.step()

    print(f"Policy Loss: {policy_loss.item()}, Value Loss: {value_loss.item()}")


In [None]:
#Training the PPO Agent
import torch

# Function to train PPO agent
def train_ppo(env, policy_net, value_net, policy_optimizer, value_optimizer, epochs=1000, gamma=0.99):
    """
    Trains the PPO agent using the given environment, policy, and value networks.

    Args:
    - env: The Gym environment.
    - policy_net: The policy network (actor).
    - value_net: The value network (critic).
    - policy_optimizer: Optimizer for the policy network.
    - value_optimizer: Optimizer for the value network.
    - epochs: Number of epochs to train for.
    - gamma: Discount factor for rewards.

    Returns:
    None (trains the networks in-place).
    """
    for epoch in range(epochs):
        # Reset environment for each epoch
        state = env.reset()
        rewards, states, actions, log_probs = [], [], [], []

        # Rollout episode
        for t in range(200):  # Adjust the range for longer/shorter episodes
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)

            # Sample action from policy
            dist = policy_net(state_tensor)
            action = dist.sample()
            log_prob = dist.log_prob(action)

            # Take action in the environment
            next_state, reward, done, _ = env.step(action.item())

            # Store episode information
            rewards.append(reward)
            states.append(state_tensor)
            actions.append(action)
            log_probs.append(log_prob)

            state = next_state
            if done:
                break

        # Convert lists to tensors
        rewards = torch.tensor(rewards, dtype=torch.float32)

        # Calculate returns (discounted rewards)
        returns = rewards.flip(dims=(0,)).cumsum(0).flip(dims=(0,)) * gamma

        # Calculate advantages: Difference between returns and value predictions
        advantages = returns - value_net(torch.cat(states)).detach()

        # Perform PPO update
        ppo_update(policy_net, value_net, policy_optimizer, value_optimizer,
                   torch.cat(states), torch.cat(actions), rewards, torch.cat(log_probs), advantages)

        # Print progress every 100 epochs
        if epoch % 100 == 0:
            print(f"Epoch {epoch}, Total Reward: {sum(rewards)}")

# Initialize the environment (e.g., CartPole)
import gym
env = gym.make('CartPole-v1')

# Assuming 'policy_net' and 'value_net' are already defined
# Example: Training the agent with PPO
train_ppo(env, policy_net, value_net, policy_optimizer, value_optimizer)


In [None]:
#Testing the Trained PPO Agent
import torch

# Reset the environment to start a new episode
state = env.reset()
total_reward = 0

# Run the environment for 200 steps (or until 'done' signal is received)
for _ in range(200):
    # Convert the state to a tensor to feed into the policy network
    state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)

    # Get the action from the policy network and sample it
    action = policy_net(state_tensor).sample()

    # Step in the environment using the sampled action
    next_state, reward, done, _ = env.step(action.item())

    # Accumulate rewards
    total_reward += reward

    # Update the state for the next iteration
    state = next_state

    # If the episode is done, break the loop
    if done:
        break

# Print the total reward after completing the episode
print(f"Total reward after training: {total_reward}")


#7.6 Fine-Tuning and Mitigating Reward Hacking Risks

In [None]:
# Example of a Shaped Reward Function with PyTorch tensors (if needed) for use in Colab
def shaped_reward(state, action, next_state):
    """
    Calculate a shaped reward for the agent's action in the environment.

    Args:
    - state: The current state of the environment.
    - action: The action taken by the agent.
    - next_state: The resulting state after the action.

    Returns:
    - The shaped reward as a combination of base reward, safety bonus, and efficiency bonus.
    """
    # Get the base reward (this could be the reward directly from the environment)
    base_reward = get_base_reward(state, action, next_state)

    # Calculate safety bonus based on current, action, and next state
    safety_bonus = calculate_safety_bonus(state, action, next_state)

    # Calculate efficiency bonus based on current, action, and next state
    efficiency_bonus = calculate_efficiency_bonus(state, action, next_state)

    # Return the total shaped reward
    return base_reward + safety_bonus + efficiency_bonus


# Sample utility functions (these are placeholders, you should define these)
def get_base_reward(state, action, next_state):
    """Calculate the base reward (e.g., from the environment)."""
    # Placeholder: return a base reward
    return 1.0  # Example: flat reward

def calculate_safety_bonus(state, action, next_state):
    """Calculate a bonus for actions that improve safety."""
    # Placeholder: add logic for safety bonuses based on environment dynamics
    return 0.1  # Example: a small safety bonus

def calculate_efficiency_bonus(state, action, next_state):
    """Calculate a bonus for actions that improve efficiency."""
    # Placeholder: add logic for efficiency bonuses based on environment dynamics
    return 0.2  # Example: a small efficiency bonus


In [None]:
#Penalizing Undesirable Behaviors:
# Example of a Penalizing Reward Function
def penalizing_reward(state, action, next_state):
    """
    Calculate a penalizing reward that subtracts points for undesirable actions,
    such as collisions or time inefficiency.

    Args:
    - state: The current state of the environment.
    - action: The action taken by the agent.
    - next_state: The resulting state after the action.

    Returns:
    - The penalized reward based on collisions and time penalties.
    """
    # Get the base reward (this could be the reward directly from the environment)
    base_reward = get_base_reward(state, action, next_state)

    # Apply collision penalty if there's a collision
    collision_penalty = -10 if has_collision(state, action, next_state) else 0

    # Apply a time penalty based on how long an action takes
    time_penalty = -1 * time_taken(state, action, next_state)

    # Return the total penalizing reward
    return base_reward + collision_penalty + time_penalty


# Sample utility functions (you should define the specific logic)
def get_base_reward(state, action, next_state):
    """Calculate the base reward (e.g., from the environment)."""
    # Placeholder: return a base reward
    return 1.0  # Example: flat reward

def has_collision(state, action, next_state):
    """Determine if the action led to a collision."""
    # Placeholder: implement collision logic
    return False  # Example: no collision by default

def time_taken(state, action, next_state):
    """Calculate a penalty based on the time taken for an action."""
    # Placeholder: add logic to penalize time-consuming actions
    return 1  # Example: constant time penalty


In [None]:
#Constrained Optimization:
# Example of a Constrained Optimization Reward Function
def constrained_reward(state, action, next_state):
    """
    Calculate a reward with a safety constraint. If the safety constraint is violated,
    the agent is penalized heavily.

    Args:
    - state: The current state of the environment.
    - action: The action taken by the agent.
    - next_state: The resulting state after the action.

    Returns:
    - The constrained reward, which includes penalties for violating safety constraints.
    """
    # Get the base reward (this could be the reward directly from the environment)
    base_reward = get_base_reward(state, action, next_state)

    # Apply a heavy penalty if safety constraints are violated
    safety_constraint_penalty = -100 if violates_safety_constraint(state, action, next_state) else 0

    # Return the total reward with the constraint penalty applied
    return base_reward + safety_constraint_penalty


# Sample utility functions (you should define the specific logic)
def get_base_reward(state, action, next_state):
    """Calculate the base reward (e.g., from the environment)."""
    # Placeholder: return a base reward
    return 1.0  # Example: flat reward

def violates_safety_constraint(state, action, next_state):
    """Check if the safety constraint is violated."""
    # Placeholder: add logic to detect safety violations
    return False  # Example: no violation by default



#7.7 Incorporating PEFT in Reinforcement Learning

In [None]:
#Identifying Tunable Parameters
import torch
import torch.nn as nn

# Example of selecting tunable parameters in the policy network
class PolicyNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        """
        Initializes the PolicyNetwork with tunable and non-tunable layers.

        Args:
        - input_dim: Number of input features (dimensions).
        - output_dim: Number of output actions (dimensions).
        """
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)  # Non-tunable layer
        self.fc2 = nn.Linear(128, 128)        # Non-tunable layer
        self.fc3 = nn.Linear(128, output_dim)  # Tunable layer
        self.softmax = nn.Softmax(dim=-1)

        # Freeze the parameters of the first two layers (fc1 and fc2)
        for param in self.fc1.parameters():
            param.requires_grad = False
        for param in self.fc2.parameters():
            param.requires_grad = False

    def forward(self, x):
        """
        Forward pass for the policy network.

        Args:
        - x: Input state.

        Returns:
        - Output action probabilities.
        """
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return self.softmax(x)


# Example usage
input_dim = 4  # Example input dimensions (e.g., from an environment state)
output_dim = 2  # Example output dimensions (e.g., action space)

# Initialize the policy network
policy_net = PolicyNetwork(input_dim, output_dim)

# Print out the tunable and frozen parameters
for name, param in policy_net.named_parameters():
    print(f"{name} - requires_grad: {param.requires_grad}")


In [None]:
#Implementing Fine-Tuning with PEFT Techniques:
import torch
import torch.nn as nn
import loralib as lora  # Make sure you have LoRA library installed

# Define a Policy Network
class PolicyNetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)
        self.fc2 = nn.Linear(128, 128)
        self.fc3 = nn.Linear(128, output_dim)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return self.softmax(x)

# Initialize the Policy Network
input_dim = 4  # Example input dimension (e.g., state space)
output_dim = 2  # Example output dimension (e.g., action space)
policy_network = PolicyNetwork(input_dim, output_dim)

# Apply LoRA to the selected layer (fc3)
lora.apply_lora(policy_network.fc3, rank=8)

# Define Adapter for additional transformation
class Adapter(nn.Module):
    def __init__(self, input_dim, adapter_dim):
        """
        Adapter layers provide an additional transformation.
        Args:
        - input_dim: Input dimension of the adapter (matches the layer size).
        - adapter_dim: The internal dimension for down-projection.
        """
        super(Adapter, self).__init__()
        self.adapter = nn.Sequential(
            nn.Linear(input_dim, adapter_dim),
            nn.ReLU(),
            nn.Linear(adapter_dim, input_dim),
        )

    def forward(self, x):
        return x + self.adapter(x)  # Skip connection

# Apply the adapter to the selected layer (fc3)
policy_network.fc3 = nn.Sequential(policy_network.fc3, Adapter(128, 32))

# Example forward pass
example_state = torch.rand((1, input_dim))  # Example input state
output = policy_network(example_state)
print(output)


In [None]:
#Fine-Tuning the Model:
import torch

# Assuming policy_network is already initialized and modified with LoRA and Adapter layers
# LoRA applied to policy_network.fc3 in earlier code
# Adapter applied to policy_network.fc3 in earlier code

# Define optimizer for the fc3 layer (with LoRA and Adapter layers applied)
optimizer = torch.optim.Adam(policy_network.fc3.parameters(), lr=1e-4)

# Define loss function (e.g., cross-entropy for classification tasks)
criterion = torch.nn.CrossEntropyLoss()

# Number of epochs and dataloader assumed to be predefined
num_epochs = 10  # Example number of epochs
dataloader = ...  # Define your own dataloader for your task

# Fine-tuning loop
for epoch in range(num_epochs):
    for batch in dataloader:
        optimizer.zero_grad()

        # Forward pass through the policy network
        states = batch['state']  # Input states from batch
        actions = batch['action']  # True actions from batch

        outputs = policy_network(states)  # Forward pass with modified policy network

        # Calculate loss between outputs and true actions
        loss = criterion(outputs, actions)

        # Backpropagation
        loss.backward()

        # Update the network parameters
        optimizer.step()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}")



#7.8 Evaluating Model Performance Qualitatively and Quantitatively

#7.8.1 Quantitative Evaluation

In [None]:
#Accuracy, Precision, Recall, and F1 Score:
from sklearn.metrics import precision_score, recall_score, f1_score

# True labels and predicted labels (binary classification example)
y_true = [0, 1, 1, 1, 0, 1, 0, 0, 1, 0]  # Ground truth (actual labels)
y_pred = [0, 1, 0, 1, 0, 1, 0, 1, 1, 0]  # Model predictions

# Calculate Precision, Recall, and F1 score
precision = precision_score(y_true, y_pred)  # Precision calculation
recall = recall_score(y_true, y_pred)  # Recall calculation
f1 = f1_score(y_true, y_pred)  # F1 Score calculation

# Display results
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")
print(f"F1 Score: {f1:.2f}")


In [None]:
#Confusion Matrix:
from sklearn.metrics import confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt

# True labels and predicted labels (same as before)
y_true = [0, 1, 1, 1, 0, 1, 0, 0, 1, 0]
y_pred = [0, 1, 0, 1, 0, 1, 0, 1, 1, 0]

# Generate confusion matrix
conf_matrix = confusion_matrix(y_true, y_pred)

# Plot confusion matrix as a heatmap using Seaborn
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')

# Add labels to the plot
plt.xlabel('Predicted')
plt.ylabel('Actual')

# Display the plot
plt.show()


In [None]:
#ROC-AUC Score:
from sklearn.metrics import roc_auc_score

# True labels and predicted labels (binary classification example)
y_true = [0, 1, 1, 1, 0, 1, 0, 0, 1, 0]
y_pred = [0, 1, 0, 1, 0, 1, 0, 1, 1, 0]

# Calculate ROC-AUC score
roc_auc = roc_auc_score(y_true, y_pred)

# Print the ROC-AUC Score
print(f"ROC-AUC Score: {roc_auc:.2f}")

In [None]:
#Mean Squared Error (MSE) and R-squared:
from sklearn.metrics import mean_squared_error, r2_score

# True labels and predicted labels (example for regression)
y_true = [3.0, -0.5, 2.0, 7.0]
y_pred = [2.5, 0.0, 2.1, 7.8]

# Calculate Mean Squared Error (MSE)
mse = mean_squared_error(y_true, y_pred)

# Calculate R-squared (R²)
r_squared = r2_score(y_true, y_pred)

# Print the results
print(f"Mean Squared Error: {mse:.2f}")
print(f"R-squared: {r_squared:.2f}")


#7.9 Loading Evaluation Model and Aggregating Evaluation Metrics for Comparison

In [None]:
#Loading the Evaluation Model:
import joblib

# Load the saved model from a pickle file
model = joblib.load('path_to_model.pkl')

# Example: Prepare the evaluation dataset (Replace with actual function or code)
def load_test_data():
    # Dummy test data as an example
    X_test = [[1.5, 2.0], [3.0, 4.5], [5.2, 6.1]]  # Features
    y_test = [0, 1, 1]  # Labels
    return X_test, y_test

# Load the test data
X_test, y_test = load_test_data()

# Now you can use `model` for prediction or further evaluation
y_pred = model.predict(X_test)

# Print predictions and compare to actual test labels
print(f"Predictions: {y_pred}")
print(f"True Labels: {y_test}")


In [None]:
#Evaluating the Model:
from sklearn.metrics import accuracy_score, precision_score, recall_score

# Generate predictions using the pre-trained model on the test data
y_pred = model.predict(X_test)

# Calculate evaluation metrics
accuracy = accuracy_score(y_test, y_pred)
precision = precision_score(y_test, y_pred)
recall = recall_score(y_test, y_pred)

# Store the metrics in a dictionary
metrics = {
    'accuracy': accuracy,
    'precision': precision,
    'recall': recall
}

# Print the evaluation metrics
print(f"Accuracy: {accuracy:.2f}")
print(f"Precision: {precision:.2f}")
print(f"Recall: {recall:.2f}")


In [None]:
#Aggregating Evaluation Metrics:
import pandas as pd

# Example metrics for two models
metrics_1 = {'model': 'Model 1', 'accuracy': 0.95, 'precision': 0.93, 'recall': 0.92}
metrics_2 = {'model': 'Model 2', 'accuracy': 0.94, 'precision': 0.92, 'recall': 0.91}

# Aggregate the metrics into a DataFrame for comparison
df_metrics = pd.DataFrame([metrics_1, metrics_2])

# Display the aggregated metrics comparison
print(df_metrics)
