# Drive API Wrappers

In [None]:
%%capture
# @title Google Drive API Wrapper Class

from typing import Dict, Any, Optional, List
from googleapiclient.discovery import build
from google.colab import auth
from googleapiclient.http import MediaFileUpload

class GoogleDrive:
    def __init__(self):
        self.service = self._init_service()

    def _init_service(self):
        """Initialize Google Drive service."""
        auth.authenticate_user()
        return build('drive', 'v3', credentials=None)

    def file_exists(self, file_name: str, parent_id: Optional[str] = None) -> bool:
        """
        Check if a file exists in Drive or specific folder.

        Args:
            file_name: Name of the file to check
            parent_id: Optional parent folder ID

        Returns:
            bool: True if file exists, False otherwise
        """
        query = f"name = '{file_name}' and trashed = false"
        if parent_id:
            query += f" and '{parent_id}' in parents"

        results = self.service.files().list(
            q=query,
            spaces='drive',
            fields='files(id, name)'
        ).execute()

        return len(results.get('files', [])) > 0

    def directory_exists(self, dir_name: str, parent_id: Optional[str] = None) -> Optional[str]:
        """
        Check if directory exists and return its ID if found.

        Args:
            dir_name: Name of the directory to check
            parent_id: Optional parent folder ID

        Returns:
            Optional[str]: Directory ID if exists, None otherwise
        """
        query = f"name = '{dir_name}' and mimeType = 'application/vnd.google-apps.folder' and trashed = false"
        if parent_id:
            query += f" and '{parent_id}' in parents"

        results = self.service.files().list(
            q=query,
            spaces='drive',
            fields='files(id, name)'
        ).execute()

        files = results.get('files', [])
        return files[0]['id'] if files else None

    def create_directory(self, dir_name: str, parent_id: Optional[str] = None) -> str:
        """
        Create a new directory in Drive.

        Args:
            dir_name: Name of the directory to create
            parent_id: Optional parent folder ID

        Returns:
            str: ID of created directory
        """
        file_metadata = {
            'name': dir_name,
            'mimeType': 'application/vnd.google-apps.folder'
        }

        if parent_id:
            file_metadata['parents'] = [parent_id]

        file = self.service.files().create(
            body=file_metadata,
            fields='id'
        ).execute()

        return file.get('id')

    def create_file(self, file_name: str, mime_type: str, file_path: str,
                   parent_id: Optional[str] = None) -> str:
        """
        Create a new file in Drive.

        Args:
            file_name: Name for the new file
            mime_type: MIME type of the file
            file_path: Path to the file to upload
            parent_id: Optional parent folder ID

        Returns:
            str: ID of created file
        """
        file_metadata = {'name': file_name}
        if parent_id:
            file_metadata['parents'] = [parent_id]

        media = MediaFileUpload(file_path, mimetype=mime_type)
        file = self.service.files().create(
            body=file_metadata,
            media_body=media,
            fields='id'
        ).execute()

        return file.get('id')

    def get_files_in_directory(self, dir_id: str) -> List[Dict[str, str]]:
        """
        List all files in a directory.

        Args:
            dir_id: ID of the directory to list

        Returns:
            List[Dict[str, str]]: List of files with their IDs and names
        """
        query = f"'{dir_id}' in parents and trashed = false"
        results = self.service.files().list(
            q=query,
            spaces='drive',
            fields='files(id, name, mimeType)'
        ).execute()

        return results.get('files', [])

if __name__ == "__main__":
    drive = GoogleDrive()

    # Check if ColabAgent folder exists
    colab_folder_id = drive.directory_exists("ColabAgent")

    if not colab_folder_id:
        colab_folder_id = drive.create_directory("ColabAgent")
        print(f"Created ColabAgent folder with ID: {colab_folder_id}")
    else:
        print(f"Found existing ColabAgent folder with ID: {colab_folder_id}")

    # List files in ColabAgent directory
    files = drive.get_files_in_directory(colab_folder_id)
    print("\nFiles in ColabAgent folder:")
    for file in files:
        print(f"- {file['name']} ({file['id']})")



In [None]:
# @title Google Sheets API Wrapper Class

import pandas as pd
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from google.colab import auth
import re
from urllib.parse import parse_qs, urlparse
from typing import Dict, Any

class GoogleSheet:
    def __init__(self, url: str):
        self.url = url
        self.spreadsheet_id, self.gid = self._extract_spreadsheet_info(url)
        self.service = self._init_service()
        self.sheet_name = self._get_sheet_name()

    def _init_service(self):
        auth.authenticate_user()
        return build('sheets', 'v4', credentials=None)

    @staticmethod
    def _extract_spreadsheet_info(url: str):
        match = re.search(r'/d/([a-zA-Z0-9-_]+)', url)
        if not match:
            raise ValueError("Could not find spreadsheet ID in URL")
        spreadsheet_id = match.group(1)

        parsed = urlparse(url)
        query_params = parse_qs(parsed.fragment or parsed.query)
        gid = query_params.get('gid', ['0'])[0]

        return spreadsheet_id, gid

    def _get_sheet_name(self) -> str:
        sheet_metadata = self.service.spreadsheets().get(
            spreadsheetId=self.spreadsheet_id
        ).execute()

        for sheet in sheet_metadata.get('sheets', ''):
            if sheet['properties']['sheetId'] == int(self.gid):
                return sheet['properties']['title']
        return 'Sheet1'

    def read_to_dataframe(self) -> pd.DataFrame:
        """Read sheet contents into a pandas DataFrame."""
        result = self.service.spreadsheets().values().get(
            spreadsheetId=self.spreadsheet_id,
            range=self.sheet_name
        ).execute()

        values = result.get('values', [])
        if not values:
            raise ValueError('No data found in spreadsheet')

        headers = values[0]
        data = values[1:]
        return pd.DataFrame(data, columns=headers)

    def get_metadata(self) -> Dict[str, Any]:
        """Get sheet metadata."""
        return self.service.spreadsheets().get(
            spreadsheetId=self.spreadsheet_id
        ).execute()

    def update_values(self, range_name: str, values: list):
        """Update values in specified range."""
        body = {'values': values}
        self.service.spreadsheets().values().update(
            spreadsheetId=self.spreadsheet_id,
            range=range_name,
            valueInputOption='RAW',
            body=body
        ).execute()

if __name__ == "__main__":
    # Test GoogleSheet functionality
    sheet_url = "https://docs.google.com/spreadsheets/d/1Tarn_9Hou5HVY8nxY7ox84mIvjUFUl9sQKNY4GpU_7g/edit?gid=61014510#gid=61014510"
    sheet = GoogleSheet(sheet_url)
    df = sheet.read_to_dataframe()
    print("First few rows of the Google Sheet:")
    print(df.head())

In [None]:
# @title Google Docs API Wrapper Class

class GoogleDoc:
    def __init__(self, url: str):
        self.url = url
        self.document_id = self._extract_document_id(url)
        self.service = self._init_service()

    def _init_service(self):
        auth.authenticate_user()
        return build('docs', 'v1', credentials=None)

    @staticmethod
    def _extract_document_id(url: str) -> str:
        match = re.search(r'/d/([a-zA-Z0-9-_]+)', url)
        if not match:
            raise ValueError("Could not find document ID in URL")
        return match.group(1)

    def get_document(self) -> Dict[str, Any]:
        """Retrieve the document's metadata and content."""
        return self.service.documents().get(documentId=self.document_id).execute()

    def read_content(self) -> str:
        """Read the plain text content of the document."""
        document = self.get_document()
        return self._extract_text(document)

    def _extract_text(self, document: Dict[str, Any]) -> str:
        """Extract text from the document's body."""
        text = ''
        for element in document.get('body', {}).get('content', []):
            paragraph = element.get('paragraph')
            if paragraph:
                for elem in paragraph.get('elements', []):
                    if text_run := elem.get('textRun'):
                        text += text_run.get('content', '')
        return text

    def update_content(self, new_text: str):
        """Replace the document's content with new_text."""
        end_index = self._get_end_index()
        # Adjust end_index to exclude the final newline character
        if end_index > 1:
            end_index -= 1

        requests = [
            {
                'deleteContentRange': {
                    'range': {
                        'startIndex': 1,
                        'endIndex': end_index
                    }
                }
            },
            {
                'insertText': {
                    'location': {'index': 1},
                    'text': new_text
                }
            }
        ]
        return self.service.documents().batchUpdate(
            documentId=self.document_id,
            body={'requests': requests}
        ).execute()

    def _get_end_index(self) -> int:
        """Get the end index for deleting content."""
        document = self.get_document()
        return document.get('body').get('content')[-1].get('endIndex', 1)

    def append_content(self, additional_text: str):
        """Append text to the end of the document."""
        requests = [{
            'insertText': {
                'location': {'index': self._get_end_index() - 1},
                'text': additional_text
            }
        }]
        return self.service.documents().batchUpdate(
            documentId=self.document_id,
            body={'requests': requests}
        ).execute()

if __name__ == "__main__":
    # Test GoogleDoc functionality
    doc_url = "https://docs.google.com/document/d/1iuY9x6oBj9LvOaTbGRFkXzxdztPZALGV3Yv0vDkNLgk/edit?tab=t.0"
    doc = GoogleDoc(doc_url)
    content = doc.read_content()
    print("\nContent of the Google Doc:")
    print(content)

    doc.update_content("This is the new content of the document.")
    print("\nGoogle Doc content has been updated.")

    doc.append_content("\nThis text was appended to the document.")
    print("\nAdditional content has been appended to the Google Doc.")

### Env Setup

In [19]:
!pip install anthropic

from google.colab import drive
from googleapiclient.discovery import build

def setup_directory_structure():
    """Set up required directory structure for the ColabAgent."""
    gdrive = GoogleDrive()
    drive.mount('/content/drive')

    colab_agent_dir = gdrive.directory_exists('ColabAgent')
    if not colab_agent_dir:
        raise ValueError("Could not find ColabAgent directory")

    required_dirs = ['prompts', 'chains', 'steps']
    created_dirs = {}

    for dir_name in required_dirs:
        dir_id = gdrive.directory_exists(dir_name, parent_id=colab_agent_dir)
        if not dir_id:
            print(f"Creating {dir_name}/ directory...")
            dir_id = gdrive.create_directory(dir_name, parent_id=colab_agent_dir)
        else:
            print(f"{dir_name}/ directory already exists")
        created_dirs[dir_name] = dir_id

    return created_dirs

def copy_public_prompts(prompts_dir_id: str, public_folder_id: str = "1AYlnc58M0TnMuDTPr4x_bwaJIlSV5-3v"):
    """Copy Google Docs from public folder to prompts directory if they don't already exist."""
    gdrive = GoogleDrive()

    # Get all files from public folder
    files = gdrive.get_files_in_directory(public_folder_id)

    # Filter for Google Docs
    docs = [f for f in files if f['mimeType'] == 'application/vnd.google-apps.document']

    # Copy each doc to prompts directory if it doesn't exist
    for doc in docs:
        if not gdrive.file_exists(doc['name'], parent_id=prompts_dir_id):
            copied_file = gdrive.service.files().copy(
                fileId=doc['id'],
                body={'name': doc['name'], 'parents': [prompts_dir_id]}
            ).execute()
            print(f"Copied {doc['name']} to prompts directory")
        else:
            print(f"Skipped {doc['name']} - already exists in prompts directory")

def copy_public_chains(chains_dir_id: str, public_folder_id: str = "1xhbQNeqWngRoMeughdXwP_pvJkVbyg4c"):
    """Copy Google Docs from public folder to chains directory if they don't already exist."""
    gdrive = GoogleDrive()

    # Get all files from public folder
    files = gdrive.get_files_in_directory(public_folder_id)

    # Filter for Google Docs
    docs = [f for f in files if f['mimeType'] == 'application/vnd.google-apps.document']

    # Copy each doc to chains directory if it doesn't exist
    for doc in docs:
        if not gdrive.file_exists(doc['name'], parent_id=chains_dir_id):
            copied_file = gdrive.service.files().copy(
                fileId=doc['id'],
                body={'name': doc['name'], 'parents': [chains_dir_id]}
            ).execute()
            print(f"Copied {doc['name']} to chains directory")
        else:
            print(f"Skipped {doc['name']} - already exists in chains directory")

def main():
    created_dirs = setup_directory_structure()
    copy_public_prompts(created_dirs['prompts'])
    copy_public_chains(created_dirs['chains'])

if __name__ == "__main__":
    main()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
prompts/ directory already exists
chains/ directory already exists
steps/ directory already exists
Skipped PROMPT_TEMPLATE - already exists in prompts directory
Skipped summarize - already exists in prompts directory
Skipped analyze_text - already exists in prompts directory
Skipped example_input - already exists in prompts directory
Skipped example_combination - already exists in prompts directory
Skipped example_system - already exists in prompts directory
Skipped example_var - already exists in prompts directory
Skipped example - already exists in prompts directory
Copied CHAIN_TEMPLATE to chains directory
Copied example_chain to chains directory


### PromptManager

In [27]:
from typing import List, Dict, Any
from jinja2 import Environment, BaseLoader
import yaml
import json

class PromptManager:
    def __init__(self):
        self.env = Environment(loader=BaseLoader())
        self.prompt_cache = {}

    def load_prompt_from_doc(self, doc_url: str) -> str:
        if doc_url in self.prompt_cache:
            return self.prompt_cache[doc_url]
        gdoc = GoogleDoc(doc_url)
        content = gdoc.read_content()
        self.prompt_cache[doc_url] = content
        return content

    def compose_prompt(self, prompt_urls: List[str], template_vars: Dict[str, Any]) -> List[Dict[str, Any]]:
        composed_prompts = []

        for doc_url in prompt_urls:
            content = self.load_prompt_from_doc(doc_url)
            template = self.env.from_string(content)
            prompt_text = template.render(**template_vars)

            try:
                parsed_content = yaml.safe_load(prompt_text.strip())

                if isinstance(parsed_content, dict):
                    parsed_content = [parsed_content]
                elif not isinstance(parsed_content, list):
                    raise ValueError(f"Prompt doc {doc_url} must contain a dictionary or list of dictionaries")

                composed_prompts.extend(parsed_content)

            except yaml.YAMLError as e:
                raise ValueError(f"Failed to parse prompt as YAML in {doc_url}: {str(e)}")

        return composed_prompts

if __name__ == "__main__":
    prompt_manager = PromptManager()

    try:
        doc_url = "https://docs.google.com/document/d/1jh1XaWHzg-Wrsqn9xGroUtsqjdMUMPRFMdEaor8X84c/edit?tab=t.0"
        template_vars = {"color": "red"}

        prompts = prompt_manager.compose_prompt([doc_url], template_vars)
        print(json.dumps(prompts, indent=2))

    except Exception as e:
        print(f"Error occurred: {str(e)}")

[
  {
    "name": "system_prompt",
    "role": "system",
    "description": "Sets the behavior of the assistant",
    "content": "You are a helpful assistant\n"
  },
  {
    "name": "user_prompt",
    "role": "user",
    "description": "The user's request for color facts",
    "content": "Tell me 3 facts about the color red"
  }
]


### ChainManager

In [31]:
from typing import Dict, Any, List, Callable, Optional
from google.colab import drive
import yaml
import json
from pathlib import Path

class ChainManager:
    """Manages execution of chains defined in Google Docs"""

    STEP_FUNCTIONS: Dict[str, Callable] = {}

    @classmethod
    def register_step_function(cls, name: str):
        def decorator(func):
            cls.STEP_FUNCTIONS[name] = func
            return func
        return decorator

    def __init__(self, debug: bool = False):
        self.gdoc = None
        self.debug = debug
        self.prompt_manager = PromptManager()
        self.llm_provider = AnthropicProvider()
        self.steps = []
        self.context = {}

    def load_chain(self, doc_url: str):
        """Load chain configuration from Google Doc URL"""
        self.gdoc = GoogleDoc(doc_url)
        content = self.gdoc.read_content()

        try:
            config = yaml.safe_load(content)

            self.name = config.get('name', 'unnamed_chain')
            self.description = config.get('description', '')

            for step_config in config['steps']:
                if step_config['step_function'] not in self.STEP_FUNCTIONS:
                    raise ValueError(f"Unknown step function: {step_config['step_function']}")

                self.steps.append({
                    'name': step_config['name'],
                    'input_key': step_config.get('input_key'),
                    'output_key': step_config.get('output_key'),
                    'step_function': step_config['step_function'],
                    'prompt_templates': step_config.get('prompt_templates', [])
                })

        except Exception as e:
            raise ValueError(f"Error loading chain configuration: {str(e)}")

    def get_context(self) -> Dict[str, Any]:
        return self.context

    def add_to_context(self, key: str, value: Any):
        self.context[key] = value

    def execute(self) -> Dict[str, Any]:
        for step in self.steps:
            func = self.STEP_FUNCTIONS[step['step_function']]
            result = func(
                chain=self,
                prompt_templates=step.get('prompt_templates', []),
                debug=self.debug
            )

            if step.get('output_key'):
                self.add_to_context(step['output_key'], result)

        return self.context

@ChainManager.register_step_function("process_with_llm")
def process_with_llm(
    chain: Any,
    prompt_templates: List[str] = None,
    debug: bool = False
) -> str:
    context = chain.get_context()
    composed_prompts = chain.prompt_manager.compose_prompt(prompt_templates, context)
    return chain.llm_provider.process_prompt(composed_prompts)

if __name__ == "__main__":
    chain_manager = ChainManager(debug=True)

    try:
        doc_url = "https://docs.google.com/document/d/1eFr4wp6qaCQ9Av8OlXA8IfiKjzbDBvo3gkNP_TMhZ9k/edit?tab=t.0"
        chain_manager.load_chain(doc_url)
        print(f"Loaded chain: {chain_manager.name}")
        print(f"Description: {chain_manager.description}")
        print("\nExecuting chain...")

        # result = chain_manager.execute()
        # print("\nExecution completed. Final context:")
        # print(json.dumps(result, indent=2))

    except Exception as e:
        print(f"Error occurred: {str(e)}")

Loaded chain: basic_analysis_chain
Description: A chain that analyzes text and generates a summary

Executing chain...


### Steps

In [None]:

@ChainManager.register_step_function("process_with_llm")
def process_with_llm(
    chain: Any,
    prompt_templates: List[str] = None,
    debug: bool = False
) -> str:
    """Process prompts through LLM and update context"""
    context = chain.get_context()
    composed_prompts = chain.prompt_manager.compose_prompt(prompt_templates, context)
    return chain.llm_provider.process_prompt(composed_prompts)

### LLM API Wrapper

In [30]:
from abc import ABC, abstractmethod
from typing import Dict, Any, List
import anthropic
import json

class LLMProvider(ABC):
    def __init__(self):
        pass

    @abstractmethod
    def convert_to_messages(self, prompt_dicts: List[Dict[str, Any]]) -> tuple[List[Dict[str, Any]], str]:
        pass

    @abstractmethod
    def generate(self, messages: tuple[List[Dict[str, Any]], str]) -> Any:
        pass

    @abstractmethod
    def parse_response(self, response: Any) -> str:
        pass

    def process_prompt(self, prompt_dicts: List[Dict[str, Any]]) -> str:
        messages_and_system = self.convert_to_messages(prompt_dicts)
        response = self.generate(messages_and_system)
        return self.parse_response(response)

os.environ["ANTHROPIC_API_KEY"] = LLM_API_KEY
class AnthropicProvider(LLMProvider):
    def __init__(self, model: str = "claude-3-5-sonnet-20240620"):
        super().__init__()
        self.client = anthropic.Anthropic()
        self.model = model

    def extract_system_message(self, messages: List[Dict[str, Any]]) -> tuple[str, List[Dict[str, Any]]]:
        system_messages = []
        other_messages = []

        for message in messages:
            if message['role'] == 'system':
                system_messages.append(message['content'])
            else:
                other_messages.append(message)

        system_message = ' '.join(system_messages)
        return system_message, other_messages

    def convert_to_messages(self, prompt_dicts: List[Dict[str, Any]]) -> tuple[List[Dict[str, Any]], str]:
        system_message, other_messages = self.extract_system_message(prompt_dicts)
        converted_messages = [
            {
                "role": msg.get("role", "user"),
                "content": msg["content"]
            }
            for msg in other_messages
        ]
        return converted_messages, system_message

    def generate(self, messages_and_system: tuple[List[Dict[str, Any]], str]) -> Any:
        converted_messages, system_message = messages_and_system
        request_args = {
            "model": self.model,
            "messages": converted_messages,
            "max_tokens": 4096,
            "temperature": 0.0
        }

        if system_message:
            request_args["system"] = system_message

        response = self.client.messages.create(**request_args)
        return response

    def parse_response(self, response: Any) -> str:
        return response.content[0].text

if __name__ == "__main__":
    provider = AnthropicProvider()
    try:
        prompt_dicts = [
            {
                "role": "system",
                "content": "You are a helpful AI assistant. Be concise."
            },
            {
                "role": "user",
                "content": "Tell me a short story about a robot."
            }
        ]
        response = provider.process_prompt(prompt_dicts)
        print(response)
    except Exception as e:
        print(f"Error: {str(e)}")

The last human switched off, and Robot X-7 was alone. It wandered empty streets, processing its purpose. Years passed. One day, it discovered a flower growing through cracked concrete. X-7 paused, then carefully watered it. A new mission emerged: nurture life. The robot's circuits hummed with renewed energy as it set out to tend Earth's rebirth.
