### S.A.G.E.

Sentiment & \
Agent-based \
Guidance \
Engine

In [None]:
from dotenv import load_dotenv
import os
from openai import OpenAI
from pydantic import BaseModel
import json

load_dotenv()

In [None]:
OPENAI_KEY = os.getenv("OPENAI_KEY")
LOCAL_OPENAI_KEY = os.getenv("LOCAL_OPENAI_KEY")
BASE_URL = os.getenv("BASE_URL")
client = OpenAI(api_key = OPENAI_KEY)
local_client = OpenAI(api_key=OPENAI_KEY,base_url="http://192.168.1.186:11434/v1")


In [None]:
class CalendarEvent(BaseModel):
    name: str
    date: str
    participants: list[str]

response = client.responses.parse(
    model="gpt-4o-2024-08-06",
    input=[
        {"role": "system", "content": "Extract the event information."},
        {
            "role": "user",
            "content": "Alice and Bob are going to a science fair on Friday.",
        },
    ],
    text_format=CalendarEvent,
)
print(response)
# event = response.output_parsed
# event

In [None]:
class CalendarEvent(BaseModel):
    name: str
    date: str
    participants: list[str]

completion = local_client.chat.completions.parse(
      model="gemma3:12b",
      messages=[
          {"role": "system", "content": "Extract the event information."},
        {
            "role": "user",
            "content": "Alice and Bob are going to a science fair on Friday.",
        },],
        response_format=CalendarEvent
        )
print(completion) 

In [None]:

tools = [
    {
        "type": "function",
        "name": "get_horoscope",
        "description": "Get today's horoscope for an astrological sign.",
        "parameters": {
            "type": "object",
            "properties": {
                "sign": {
                    "type": "string",
                    "description": "An astrological sign like Taurus or Aquarius",
                },
            },
            "required": ["sign"],
        },
    },
]

def get_horoscope(sign):
    return f"{sign}: Next Tuesday you will befriend a baby otter."

# Create a running input list we will add to over time
input_list = [
    {"role": "user", "content": "What is my horoscope? I am an Aquarius."}
]

# 2. Prompt the model with tools defined
response = client.responses.create(
    model="gpt-5",
    tools=tools,
    input=input_list,
)

# Save function call outputs for subsequent requests
input_list += response.output

for item in response.output:
    if item.type == "function_call":
        if item.name == "get_horoscope":
            # 3. Execute the function logic for get_horoscope
            horoscope = get_horoscope(json.loads(item.arguments))
            
            # 4. Provide function call results to the model
            input_list.append({
                "type": "function_call_output",
                "call_id": item.call_id,
                "output": json.dumps({
                  "horoscope": horoscope
                })
            })

print("Final input:")
print(input_list)

response = client.responses.create(
    model="gpt-5",
    instructions="Respond only with a horoscope generated by a tool.",
    tools=tools,
    input=input_list,
)

# 5. The model should be able to give a response!
print("Final output:")
print(response.model_dump_json(indent=2))
print("\n" + response.output_text)

In [None]:
data_url = 'https://www.gutenberg.org/cache/epub/1112/pg1112.txt'
filepath = keras.utils.get_file('romeo',data_url)
with open(filepath) as f:
    raw_text = f.read()

my_splitter = RecursiveCharacterTextSplitter(
    chunk_size=800,
    chunk_overlap = 200,
    length_function = len
)
chunks = my_splitter.split_text(raw_text)

def generate_ids(number, size):
  import string, random
  ids=[]
  for i in range(number):
    res = ''.join(random.choices(string.ascii_letters, k=size))
    ids.append(res)
    if len(set(ids)) != i+1:
      i-=1
      ids.pop(-1)

  return ids

def get_embeddings(text, model="text-embedding-3-small"):
    text = text.replace("\n"," ")
    return client.embeddings.create(input=text, model=model).data[0].embedding

pre_upsert_df = pd.DataFrame(columns=['id','values','metadata'])

def load_chunks(df,split_text):
    ids = generate_ids(len(split_text),7)
    i = 0
    for chunk in split_text:
        df.loc[i] = [ids[i],get_embeddings(chunk,model='text-embedding-3-small'), {'text':chunk}]
        i+=1
    return df

my_df = load_chunks(pre_upsert_df,chunks)

def prepare_DF(df):
  import json,ast
  try: df=df.drop('Unnamed: 0',axis=1)
  except: print('Unnamed Not Found')
  df['values']=df['values'].apply(lambda x: np.array([float(i) for i in x.replace("[",'').replace("]",'').split(',')]))
  df['metadata']=df['metadata'].apply(lambda x: ast.literal_eval(x))
  return df

my_index_df = prepare_DF(my_df)
index = pc.Index('my-rag')

def convert_data(chunk):
    'Converts a pandas dataframe to be a simple list of tuples, formatted how the `upsert()` method in the Pinecone Python client expects.'
    data = []
    for i in chunk.to_dict('records'):
        data.append(i)
    return data

def load_chunker(seq, size):
    'Yields a series of slices of the original iterable, up to the limit of what size is.'
    for pos in range(0, len(seq), size):
        yield seq.iloc[pos:pos + size]

for load_chunk in load_chunker(my_index_df,800):
    vectors=convert_data(load_chunk)
    index.upsert(vectors)

In [None]:
import os
import requests
import uuid
from io import BytesIO
from urllib.parse import urlparse
from openai import OpenAI
from pinecone import Pinecone
from pypdf import PdfReader
from langchain.text_splitter import RecursiveCharacterTextSplitter

def process_and_upload_to_pinecone(url: str, index_name: str):

    try:
        pc = Pinecone(api_key=os.environ.get("PINECONE_API_KEY"))
        openai_client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
        print("Successfully connected to Pinecone and OpenAI.")
    except Exception as e:
        print(f"Connection Error: {e}")
        return
    print(f"Downloading data from {url}...")
    try:
        response = requests.get(url)
        response.raise_for_status()  
        parsed_url = urlparse(url)
        file_path = parsed_url.path
        
        raw_text = ""
        if file_path.lower().endswith('.pdf'):
            pdf_file = BytesIO(response.content)
            pdf_reader = PdfReader(pdf_file)
            print(f"Parsing PDF file with {len(pdf_reader.pages)} pages...")
            for page in pdf_reader.pages:
                raw_text += page.extract_text() if page.extract_text() else ""
        elif file_path.lower().endswith('.txt'):
            print("Parsing TXT file...")
            raw_text = response.text
        else:
            print(f"Unsupported file type from URL: {url}")
            return
            
        print(f"Successfully extracted {len(raw_text)} characters of text.")

    except requests.exceptions.RequestException as e:
        print(f"Failed to download data from URL. Error: {e}")
        return

    print("Splitting text into manageable chunks...")
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=800,
        chunk_overlap=100,
        length_function=len,
    )
    chunks = text_splitter.split_text(raw_text)
    print(f"Created {len(chunks)} text chunks.")

    try:
        index = pc.Index(index_name)
        print(f"Successfully connected to Pinecone index '{index_name}'.")
    except Exception as e:
        print(f"Could not connect to Pinecone index '{index_name}'. Error: {e}")
        return
        
    batch_size = 100
    print(f"Generating embeddings and uploading to Pinecone in batches of {batch_size}...")

    for i in range(0, len(chunks), batch_size):
        batch_chunks = chunks[i:i + batch_size]
        
        try:
            response = openai_client.embeddings.create(
                model="text-embedding-3-small",
                input=[chunk.replace("\n", " ") for chunk in batch_chunks]
            )
            embeddings = [item.embedding for item in response.data]
        except Exception as e:
            print(f"Failed to generate embeddings. Error: {e}")
            continue
        vectors_to_upsert = []
        for j, chunk in enumerate(batch_chunks):
            vector_id = str(uuid.uuid4()) 
            embedding_index = i + j
            vectors_to_upsert.append({
                "id": vector_id,
                "values": embeddings[j],
                "metadata": {"text": chunk, "source_url": url}
            })
        try:
            index.upsert(vectors=vectors_to_upsert)
            print(f"  -> Successfully upserted batch {i//batch_size + 1}")
        except Exception as e:
            print(f"Failed to upsert batch. Error: {e}")

    print("\nAll chunks have been processed and uploaded to Pinecone!")


if __name__ == '__main__':
    txt_data_url = 'https://www.gutenberg.org/cache/epub/1112/pg1112.txt'
    
    pdf_data_url = 'https://arxiv.org/pdf/1706.03762.pdf'
    
    my_pinecone_index = 'sage'
    process_and_upload_to_pinecone(url=pdf_data_url, index_name=my_pinecone_index)

In [None]:
"""
Downloads data from a URL (PDF or TXT), parses it, creates embeddings,
and uploads it to a Pinecone index.

Args:
url (str): The URL of the .txt or .pdf file.
index_name (str): The name of the Pinecone index.
"""    

In [None]:
def get_context(query, embed_model = 'text-embedding-3-small',k=5,index=index):
    query_embeddings = get_embeddings(query,model=embed_model)
    pinecone_response = index.query(vector=query_embeddings,top_k=k,include_metadata=True)
    contexts = [item['metadata']['text'] for item in pinecone_response['matches']] 
    return contexts, query

In [None]:
import string
import random
import requests
import pandas as pd
import fitz  # PyMuPDF
from typing import Optional, List, Dict, Any, Generator
from langchain_text_splitters import RecursiveCharacterTextSplitter

# --- Helper Functions (from your code, slightly modified) ---

def generate_ids(number: int, size: int) -> List[str]:
    """Generates a list of unique random string IDs."""
    ids = []
    for _ in range(number):
        res = ''.join(random.choices(string.ascii_letters, k=size))
        while res in ids:  # Ensure uniqueness
            res = ''.join(random.choices(string.ascii_letters, k=size))
        ids.append(res)
    return ids

def get_embeddings(text: str, model: str) -> List[float]:
    """Generates embeddings for a given text using the OpenAI client."""
    # Assumes 'client' is an initialized OpenAI() client in the global scope
    global client
    text = text.replace("\n", " ")
    return client.embeddings.create(input=text, model=model).data[0].embedding

def load_chunks(split_text: List[str], model: str) -> pd.DataFrame:
    """Creates a DataFrame from text chunks with IDs and embeddings."""
    df = pd.DataFrame(columns=['id', 'values', 'metadata'])
    ids = generate_ids(len(split_text), 7)
    for i, chunk in enumerate(split_text):
        df.loc[i] = [ids[i], get_embeddings(chunk, model=model), {'text': chunk}]
    return df

def convert_data(chunk: pd.DataFrame) -> List[Dict[str, Any]]:
    """Converts a DataFrame chunk to the list-of-dicts format for Pinecone upsert."""
    data = []
    for i in chunk.to_dict('records'):
        data.append(i)
    return data

def load_chunker(seq: pd.DataFrame, size: int) -> Generator[pd.DataFrame, None, None]:
    """Yields slices of a DataFrame for batch processing."""
    for pos in range(0, len(seq), size):
        yield seq.iloc[pos:pos + size]

# --- Main Tool Function ---

def embed_and_upload_to_pinecone(
    index_name: str,
    url: Optional[str] = None,
    text: Optional[str] = None,
    chunk_size: int = 800,
    chunk_overlap: int = 200,
    embedding_model: str = "text-embedding-3-small"
) -> Dict[str, Any]:
    """
    Processes text from a URL (PDF/TXT) or raw string, chunks it,
    creates embeddings, and upserts to a Pinecone index.
    
    Assumes 'client' (OpenAI) and 'pc' (Pinecone) are initialized globally.
    """
    # Assumes 'pc' is an initialized Pinecone() client in the global scope
    global pc
    
    raw_text = ""

    # 1. Get Raw Text
    if url:
        try:
            response = requests.get(url)
            response.raise_for_status()  # Raise an exception for bad status codes

            if url.lower().endswith('.pdf'):
                # Process PDF
                with fitz.open(stream=response.content, filetype="pdf") as doc:
                    raw_text = "".join(page.get_text() for page in doc)
            elif url.lower().endswith('.txt'):
                # Process TXT
                raw_text = response.text
            else:
                return {"status": "error", "message": "Unsupported file type. URL must end in .pdf or .txt"}
        
        except requests.exceptions.RequestException as e:
            return {"status": "error", "message": f"Failed to download or access URL: {e}"}
    
    elif text:
        raw_text = text
    
    else:
        return {"status": "error", "message": "No input provided. You must specify either a 'url' or 'text'."}

    if not raw_text:
        return {"status": "error", "message": "Extracted text is empty. Nothing to process."}

    # 2. Split Text
    my_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len
    )
    chunks = my_splitter.split_text(raw_text)

    if not chunks:
        return {"status": "error", "message": "Text splitting resulted in zero chunks."}

    # 3. Load Chunks into DataFrame with Embeddings
    try:
        my_df = load_chunks(chunks, model=embedding_model)
    except Exception as e:
        return {"status": "error", "message": f"Failed to generate embeddings: {e}"}

    # 4. Connect to Pinecone Index
    try:
        index = pc.Index(index_name)
    except Exception as e:
        return {"status": "error", "message": f"Failed to connect to Pinecone index '{index_name}': {e}"}

    # 5. Upsert in Batches
    total_upserted = 0
    batch_size = 100  # Pinecone recommends batches of 100
    try:
        for load_chunk in load_chunker(my_df, batch_size):
            vectors = convert_data(load_chunk)
            index.upsert(vectors)
            total_upserted += len(vectors)
    except Exception as e:
        return {"status": "error", "message": f"Failed during Pinecone upsert: {e}"}

    # 6. Return Success
    return {
        "status": "success",
        "total_chunks_processed": len(chunks),
        "total_vectors_upserted": total_upserted,
        "index_name": index_name
    }

In [None]:
import os
import string
import random
import requests
import pandas as pd
import fitz
import json
from openai import OpenAI
from pinecone import Pinecone
from typing import Optional, List, Dict, Any, Generator
from langchain_text_splitters import RecursiveCharacterTextSplitter
from dotenv import load_dotenv

load_dotenv()


OPENAI_KEY = os.getenv("OPENAI_KEY")
PINE_KEY = os.getenv('PINE_KEY')
pc = Pinecone(api_key=PINE_KEY)
client = OpenAI(api_key=OPENAI_KEY)

def get_embeddings(text: str, model: str) :
    """Generates embeddings for a given text using the OpenAI client."""
    text = text.replace("\n", " ")
    return client.embeddings.create(input=text, model=model).data[0].embedding

def generate_ids(number: int, size: int) -> List[str]:
    ids = []
    for _ in range(number):
        res = ''.join(random.choices(string.ascii_letters, k=size))
        while res in ids:
            res = ''.join(random.choices(string.ascii_letters, k=size))
        ids.append(res)
    return ids

def load_chunks(split_text: List[str], model: str) -> pd.DataFrame:
    df = pd.DataFrame(columns=['id', 'values', 'metadata'])
    ids = generate_ids(len(split_text), 7)
    for i, chunk in enumerate(split_text):
        df.loc[i] = [ids[i], get_embeddings(chunk, model=model), {'text': chunk}]
    return df

def convert_data(chunk: pd.DataFrame) -> List[Dict[str, Any]]:
    return chunk.to_dict('records')

def load_chunker(seq: pd.DataFrame, size: int) -> Generator[pd.DataFrame, None, None]:
    for pos in range(0, len(seq), size):
        yield seq.iloc[pos:pos + size]

def embed_and_upload_to_pinecone(
    url: Optional[str] = None,
    text: Optional[str] = None,
    chunk_size: int = 800,
    chunk_overlap: int = 200,
    embedding_model: str = "text-embedding-3-small"
) -> Dict[str, Any]:
    """
    Processes text from a URL (PDF/TXT) or raw string, chunks it,
    creates embeddings, and upserts to a Pinecone index.
    """
    index = pc.Index('sage')
    raw_text = ""

    if url:
        try:
            response = requests.get(url)
            response.raise_for_status()
            if url.lower().endswith('.pdf'):
                with fitz.open(stream=response.content, filetype="pdf") as doc:
                    raw_text = "".join(page.get_text() for page in doc)
            elif url.lower().endswith('.txt'):
                raw_text = response.text
            else:
                return {"status": "error", "message": "Unsupported file type. URL must end in .pdf or .txt"}
        except requests.exceptions.RequestException as e:
            return {"status": "error", "message": f"Failed to download or access URL: {e}"}
    elif text:
        raw_text = text
    else:
        return {"status": "error", "message": "No input provided. You must specify either 'url' or 'text'."}

    if not raw_text:
        return {"status": "error", "message": "Extracted text is empty. Nothing to process."}

    my_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len
    )
    chunks = my_splitter.split_text(raw_text)
    if not chunks:
        return {"status": "error", "message": "Text splitting resulted in zero chunks."}

    try:
        my_df = load_chunks(chunks, model=embedding_model)
    except Exception as e:
        return {"status": "error", "message": f"Failed to generate embeddings: {e}"}

    try:
        target_index = pc.Index('sage')
    except Exception as e:
        return {"status": "error", "message": f"Failed to connect to Pinecone index '{'sage'}': {e}"}

    total_upserted = 0
    batch_size = 100
    try:
        for load_chunk in load_chunker(my_df, batch_size):
            vectors = convert_data(load_chunk)
            target_index.upsert(vectors)
            total_upserted += len(vectors)
    except Exception as e:
        return {"status": "error", "message": f"Failed during Pinecone upsert: {e}"}

    return {
        "status": "success",
        "total_chunks_processed": len(chunks),
        "total_vectors_upserted": total_upserted,
        "index_name": 'sage'
    }

def get_context(query: str, embed_model: str = 'text-embedding-3-small', k: int = 5) -> Dict[str, Any]:
    """
    Retrieves relevant text contexts from the Pinecone index
    based on a user's search query.
    """
    try:
        query_embeddings = get_embeddings(query, model=embed_model)
        pinecone_response = index.query(
            vector=query_embeddings, 
            top_k=k, 
            include_metadata=True
        )
        contexts = [item['metadata']['text'] for item in pinecone_response['matches']]
        
        if not contexts:
            return {"status": "success", "message": "Query successful, but no matching contexts were found."}
            
        return {"status": "success", "contexts_found": contexts}
    except Exception as e:
        return {"status": "error", "message": f"Failed to retrieve context: {e}"}


tools = [
    {
        "type": "function",
        "function": {
            "name": "embed_and_upload_to_pinecone",
            "description": "Processes text from a URL (PDF/TXT) or raw string, chunks it, creates embeddings, and upserts to Pinecone. One of 'url' or 'text' must be provided.",
            "parameters": {
                "type": "object",
                "properties": {
                    "url": {"type": "string", "description": "The URL of the PDF or TXT file to process. If provided, the 'text' parameter is ignored."},
                    "text": {"type": "string", "description": "A string of raw text to process. This is used only if the 'url' parameter is not provided."},
                    "chunk_size": {"type": "integer", "default": 800},
                    "chunk_overlap": {"type": "integer", "default": 200},
                    "embedding_model": {"type": "string", "default": "text-embedding-3-small"}
                },
                # "required": ["url"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "get_context",
            "description": "Retrieves relevant text contexts from the Pinecone index based on a user's search query.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {"type": "string", "description": "The search query to find relevant context for."},
                    "k": {"type": "integer", "default": 5},
                    "embed_model": {"type": "string", "default": "text-embedding-3-small"}
                },
                "required": ["query"]
            }
        }
    }
]

available_tools = {
    "embed_and_upload_to_pinecone": embed_and_upload_to_pinecone,
    "get_context": get_context,
}

def main():
    """
    Main loop to run the chat-with-tools.
    """
    print("Starting chat... (type 'quit' to exit)")
    messages = [
        {"role": "system", "content": "You are a helpful assistant. You have two tools: one to upload documents to a Pinecone index, and one to retrieve context from it to answer questions."}
    ]

    while True:
        try:
            # Get user input
            user_prompt = input("You: ")
            if user_prompt.lower() == 'quit':
                print("Ending chat. Goodbye!")
                break
            
            messages.append({"role": "user", "content": user_prompt})

            # --- First API Call: Get model response or tool call ---
            response = client.chat.completions.create(
                model="gpt-4o",  # Or your preferred model
                messages=messages,
                tools=tools,
                tool_choice="auto",
            )
            response_message = response.choices[0].message
            tool_calls = response_message.tool_calls

            # --- Check if the model wants to call a tool ---
            if tool_calls:
                # Append the assistant's request to the message history
                messages.append(response_message)
                
                # --- Execute all tool calls ---
                for tool_call in tool_calls:
                    function_name = tool_call.function.name
                    function_to_call = available_tools.get(function_name)
                    
                    if not function_to_call:
                        print(f"Error: Model tried to call unknown function '{function_name}'")
                        continue
                        
                    try:
                        # Parse the JSON arguments
                        function_args = json.loads(tool_call.function.arguments)
                        
                        print(f"--- Calling Tool: {function_name}({function_args}) ---")
                        
                        # Call the corresponding Python function
                        function_response = function_to_call(**function_args)
                        
                        print(f"--- Tool Response: {function_response} ---")
                        
                        # Append the tool's output to the message history
                        messages.append(
                            {
                                "tool_call_id": tool_call.id,
                                "role": "tool",
                                "name": function_name,
                                "content": json.dumps(function_response),  # Convert response to JSON string
                            }
                        )
                    except Exception as e:
                        print(f"Error executing tool {function_name}: {e}")
                        messages.append(
                            {
                                "tool_call_id": tool_call.id,
                                "role": "tool",
                                "name": function_name,
                                "content": json.dumps({"status": "error", "message": str(e)}),
                            }
                        )

                final_response = client.chat.completions.create(
                    model="gpt-4o",
                    messages=messages,
                )
                final_answer = final_response.choices[0].message.content
                print(f"Assistant: {final_answer}")
                messages.append({"role": "assistant", "content": final_answer})

            else:
                # --- No tool call, just a direct answer ---
                assistant_response = response_message.content
                print(f"Assistant: {assistant_response}")
                messages.append({"role": "assistant", "content": assistant_response})

        except Exception as e:
            print(f"An unexpected error occurred: {e}")
            break

if __name__ == "__main__":
    main()