# Multimodal reasoning with Chain of Thought(CoT)

copyright 2025, Denis Rothman


This notebook demonstrates how to leverage multimodal reasoning through the Chain of Thought (CoT) framework to enhance AI systems' reasoning capabilities. The project integrates various tools and methods, including:

- **OpenAI APIs** for text and image generation.
- **Machine Learning Baselines** for analyzing customer preferences and providing tailored suggestions.
- **Vector Stores (Pinecone)** for embedding-based search and retrieval.
- **Google Text-to-Speech (gTTS)** for converting text into audio.
- **Interactive Widgets** for dynamic user interaction within the Colab environment.

Key features include:
1. A **Chain of Thought Reasoning Process** that provides step-by-step explanations and reasoning for enhanced interpretability.
2. **Multimodal Integration** allowing for both text and image-based interactions and outputs.
3. Examples and templates for using OpenAI’s capabilities to generate embeddings, analyze data, and produce engaging multimedia content.

**Note**: *This notebook is for educational purposes only. It is not designed to be deployed into production.*

This notebook uses OpenAI GPT Models. https://openai.com




# Setting up the environment

This notebook was developed in Google Colab. Colab includes many pre-installed libraries and sets `/content/` as the default directory, meaning you can access files directly by their filename if you wish (e.g., `filename` instead of needing to specify `/content/filename`). This differs from local environments, where you'll often need to install libraries or specify full file paths.

## Installing `click`

`click`, a command line library, is required for gTTs. This cell will set up the correct 'click' version and *trigger a runtime restart*. After reconnecting, simply click `Run All` to continue.

In [None]:
```python
# Cell 1: Conditional 'click' setup and auto-restart.
# This cell will only modify 'click' and restart the runtime IF the 'click' version is not 8.1.8.
# After the restart (if it occurs), Colab will reconnect, and you can simply "Run All" from the top.

import importlib.metadata
import os
import time

required_click_version = '8.1.8'
current_click_version = None

print("--- Checking 'click' package version... ---")

try:
    current_click_version = importlib.metadata.version('click')
    print(f"Currently installed 'click' version: {current_click_version}")
except importlib.metadata.PackageNotFoundError:
    print("'click' package not found.")

# Check if current click version is not the required one
if current_click_version != required_click_version:
    print(f"\n--- 'click' version is not {required_click_version}. Initiating setup... ---")

    # Uninstall any existing 'click' version
    print("Uninstalling any existing 'click' installation...")
    !pip uninstall -y click

    # Install the specific 'click' version required by gTTS
    print(f"Installing 'click=={required_click_version}' for compatibility...")
    !pip install click=={required_click_version}

    print("\n--- 'click' setup complete. ---")
    print("!!! Initiating runtime restart to load the correct 'click' version. !!!")
    print("!!! Please wait for Colab to reconnect, then simply click 'Run All' from the top. !!!")

    # Give a brief moment for print statements to flush
    time.sleep(2)

    # Force a runtime restart. Colab will detect this.
    os.kill(os.getpid(), 9)

else:
    print(f"--- 'click' is already at the correct version ({required_click_version}). No action needed. ---")

# IMPORTANT: If a restart happens above, this part of the code will NOT be executed
# until the notebook is re-run AFTER the restart.

## File downloading script

grequests contains a script to download files from the repository

In [None]:
#Private repository notes
#1.This line will be deleted when the repository is made public and the following line will be uncommented
#2.The private token will also be removed from grequests.py in the commmons directory of the repository
!curl -L -H "Authorization: Bearer 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" https://raw.githubusercontent.com/Denis2054/Building-Business-Ready-Generative-AI-Systems/master/commons/grequests.py --output grequests.py

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0100  1008  100  1008    0     0   3577      0 --:--:-- --:--:-- --:--:--  3574100  1008  100  1008    0     0   3575      0 --:--:-- --:--:-- --:--:--  3574


In [None]:
#!curl -L https://raw.githubusercontent.com/Denis2054/Building-Business-Ready-Generative-AI-Systems/master/commons/grequests.py --output grequests.py

## OpenAI

In [None]:
from grequests import download
download("commons","requirements01.py")
download("commons","openai_setup.py")
download("commons","reason.py")
download("commons","machine_learning.py")

Downloaded 'requirements01.py' successfully.
Downloaded 'openai_setup.py' successfully.
Downloaded 'reason.py' successfully.
Downloaded 'machine_learning.py' successfully.


### Installing OpenAI

In [None]:
# Run the setup script to install and import dependencies
%run requirements01

Uninstalling 'openai'...
Installing 'openai' version 1.57.1...
'openai' version 1.57.1 is installed.


#### Initializing the OpenAI API key



In [None]:
google_secrets=True #activates Google secrets in Google Colab
if google_secrets==True:
  import openai_setup
  openai_setup.initialize_openai_api()

OpenAI API key initialized successfully.


In [None]:
if google_secrets==False: # Uncomment the code and choose any method you wish to initialize the API_KEY
  import os
  #API_KEY=[YOUR API_KEY]
  #os.environ['OPENAI_API_KEY'] = API_KEY
  #openai.api_key = os.getenv("OPENAI_API_KEY")
  #print("OpenAI API key initialized successfully.")

#### Importing the API call function

In [None]:
# Import the function from the custom OpenAI API file
import os
import reason
from reason import make_openai_api_call

## Installing gtts

gTTS (Google Text-to-Speech) is a Python library and CLI tool that interfaces with Google Translate's text-to-speech API. It allows users to convert text into spoken words, supporting multiple languages and accents, and can save the output as MP3 files.  

In [None]:
!pip install gTTS==2.5.4



In [None]:
import time
from gtts import gTTS
from IPython.display import Audio
import numpy as np

def text_to_speech(text):
    # Convert text to speech and save as an MP3 file
    tts = gTTS(text)
    tts.save("response.mp3")

## Machine learning

In [None]:
# Import the function from the custom OpenAI API file
import os
import machine_learning
from machine_learning import ml_agent

## Web search

In [None]:
download("commons","web_search.py")

Downloaded 'web_search.py' successfully.


In [None]:
# Import the function from the custom OpenAI API file
import os
import web_search
from web_search import search

## Chain of Thought(COT)

In [None]:
# Import the function from the custom OpenAI API file
import os
import reason
from reason import chain_of_thought_reasoning

## Installing Pinecone

In [None]:
download("commons","requirements02.py")

Downloaded 'requirements02.py' successfully.


In [None]:
# Run the setup script to install and import dependencies
%run requirements02

Uninstalling 'pinecone-client'...
Installing 'pinecone-client' version 5.0.1...
'pinecone-client' version 5.0.1 is installed.


### Initializing the Pinecone API key

In [None]:
download("commons","pinecone_setup.py")

Downloaded 'pinecone_setup.py' successfully.


In [None]:
if google_secrets==True:
  import pinecone_setup
  pinecone_setup.initialize_pinecone_api()

PINECONE_API_KEY initialized successfully.


In [None]:
if google_secrets==False: # Uncomment the code and choose any method you wish to initialize the Pinecone API key
  import os
  #PINECONE_API_KEY=[YOUR PINECONE_API_KEY]
  #os.environ['PINECONE_API_KEY'] = PINECONE_API_KEY
  #openai.api_key = os.getenv("PINECONE_API_KEY")
  #print("OpenAI API key initialized successfully.")

##  The Pinecone index

In [None]:
import os
from pinecone import Pinecone, ServerlessSpec
# Retrieve the API key from environment variables
api_key = os.environ.get('PINECONE_API_KEY')
if not api_key:
    raise ValueError("PINECONE_API_KEY is not set in the environment!")

# Initialize the Pinecone client
pc = Pinecone(api_key=api_key)

In [None]:
from pinecone import ServerlessSpec

index_name = 'genai-v1'
cloud = os.environ.get('PINECONE_CLOUD') or 'aws'
region = os.environ.get('PINECONE_REGION') or 'us-east-1'

spec = ServerlessSpec(cloud=cloud, region=region)

In [None]:
import time
import pinecone
# check if index already exists (it shouldn't if this is first time)
if index_name not in pc.list_indexes().names():
    # if does not exist, create index
    pc.create_index(
        index_name,
        dimension=1536,  # dimension of the embedding model
        metric='cosine',
        spec=spec
    )
    # wait for index to be initialized
    time.sleep(1)

# connect to index
index = pc.Index(index_name)
# view index stats
index.describe_index_stats()

{'dimension': 1536,
 'index_fullness': 0.0,
 'namespaces': {'agent_memory': {'vector_count': 4},
                'data01': {'vector_count': 9},
                'genaisys': {'vector_count': 100},
                'security': {'vector_count': 100}},
 'total_vector_count': 213}

## Querying functions

In [None]:
def display_results(query_results):
  for match in query_results['matches']:
    print(f"ID: {match['id']}, Score: {match['score']}")
    if 'metadata' in match and 'text' in match['metadata']:
        text=match['metadata']['text']
        #print(f"Text: {match['metadata']['text']}")
        target_id = query_results['matches'][0]['id']  # Get the ID from the first match
                #print(f"Target ID: {target_id}")
    else:
        print("No metadata available.")
  return text, target_id


In [None]:
import openai
client = openai.OpenAI()
embedding_model = "text-embedding-3-small"
def get_embedding(text, model=embedding_model):
    text = text.replace("\n", " ")
    response = client.embeddings.create(input=[text], model=model)
    embedding = response.data[0].embedding
    return embedding

In [None]:
def get_query_results(query_text, namespace):
    # Generate the query vector from the query text
    query_vector = get_embedding(query_text)  # Replace with your method to generate embeddings

    # Perform the query
    query_results = index.query(
        vector=query_vector,
        namespace=namespace,
        top_k=1,  # Adjust as needed
        include_metadata=True
    )
    # Return the results
    return query_results

In [None]:
def query_vector_store(query_text, namespace):
    print("Querying vector store...")

    # Retrieve query results
    query_results = get_query_results(query_text, namespace)

    # Process and display the results
    print("Processed query results:")
    text, target_id = display_results(query_results)

    return text, target_id

# AI Agent

In [None]:
import openai
from openai import OpenAI
from IPython.display import display, Image
import requests

# Initialize the OpenAI client
client = OpenAI()
user_memory=True # True=User messages are memorized False=User messages are not memorized
def chat_with_gpt(messages, user_message,files_status):
    try:
      continue_functions=True
      namespace=""
      if "Pinecone" in user_message or "RAG" in user_message:
         # Determine the keyword
        if "Pinecone" in user_message:
            namespace="genaisys"
        elif "RAG" in user_message:
            namespace="data01"
        print(namespace)
        #define query text
        query_text=user_message
        # Retrieve query results
        query_results = get_query_results(query_text, namespace)
        # Process and display the results
        print("Processed query results:")
        qtext, target_id = display_results(query_results)
        print(qtext)
        #run task
        sc_input=qtext + " " + user_message
        mrole = "system"
        mcontent = "You are an assistant who executes the tasks you are asked to do."
        user_role = "user"
        task_response = reason.make_openai_api_call(sc_input,mrole,mcontent,user_role)
        print(task_response)
        aug_output=namespace + ":" +task_response
        continue_functions=False

      if "Use reasoning" in user_message and "customer" in user_message and "activities" in user_message and continue_functions==True:
        initial_query = user_message
        download("Chapter05","customer_activities.csv")
        reasoning_steps = reason.chain_of_thought_reasoning(initial_query)
        aug_output=reasoning_steps
        continue_functions=False

      if "Create" in user_message and "image" in user_message and continue_functions==True:
        prompt = user_message
        image_url = reason.generate_image(prompt, model="dall-e-3", size="1024x1024", quality="standard", n=1)
        # Save the image locally
        save_path = "c_image.png"
        image_data = requests.get(image_url).content
        with open(save_path, "wb") as file:
          file.write(image_data)
        aug_output="Image created"
        continue_functions=False

      if user_memory==False and continue_functions==True:
        umessage = user_message
        mrole = "system"
        mcontent = "You are an assistant who executes the tasks you are asked to do."
        user_role = "user"
        task_response = reason.make_openai_api_call(umessage,mrole,mcontent,user_role)
        aug_output=task_response
        continue_functions=False

      if user_memory==True and continue_functions==True:
        # Extract ALL user messages from the conversation history
        user_messages_content = [
        msg["content"] for msg in messages
        if msg["role"] == "user" and "content" in msg
        ]
        # Combine all extracted user messages into a single string
        combined_user_messages = " ".join(user_messages_content)
        # Add the current user_message to the combined text
        umessage = f"{combined_user_messages} {user_message}"
        mrole = "system"
        mcontent = "You are an assistant who executes the tasks you are asked to do."
        user_role = "user"
        task_response = reason.make_openai_api_call(umessage,mrole,mcontent,user_role)
        aug_output=task_response
        continue_functions=False
      # Return the augmented output
      return aug_output

    except Exception as e:
        # Return the error message in case of an exception
        return f"An error occurred: {str(e)}"

# GenAISys IPython interface

## Running the interface

Enter(do not copy and paste) "Use reasoning" + text to trigger Chain-of-Thought for this domain-specific notebook (traveling).

For example:
`Use reasoning to suggest activities for our customers.`

In [None]:
def format_json_as_markdown(data, level=0):
    """Format JSON-like data as Markdown with proper indentation."""
    html_output = ""
    indent = "  " * level

    if isinstance(data, dict):
        for key, value in data.items():
            html_output += f"{indent}**{key}**:<br>\n"
            html_output += format_json_as_markdown(value, level + 1)
    elif isinstance(data, list):
        for item in data:
            html_output += format_json_as_markdown(item, level)
    else:
        html_output += f"{indent}{data}<br>\n"

    return html_output or ""  # Ensure a string is always returned

In [None]:
def format_entry(entry):
    """Format the content of an entry for Markdown display."""
    if entry['role'] == 'user':
        formatted_content = format_json_as_markdown(entry['content']) if isinstance(entry['content'], (dict, list)) else entry['content']
        formatted_content = formatted_content.replace("\n", "<br>")  # Process newlines outside the f-string
        return f"**<span style='color: blue;'>{active_user}:</span>** {formatted_content}"
    elif entry['role'] == 'assistant':
        formatted_content = format_json_as_markdown(entry['content']) if isinstance(entry['content'], (dict, list)) else entry['content']
        formatted_content = formatted_content.replace("\n", "<br>")  # Process newlines outside the f-string
        return f"**<span style='color: green;'>Agent:</span>** {formatted_content}"
    else:
        return entry['content']  # Fallback for unrecognized roles

In [None]:
# Import required modules
from IPython.display import display, HTML, clear_output,Image,Markdown
from ipywidgets import Dropdown, Text, Checkbox, VBox, Layout,Output
from PIL import Image as PILImage
import json
import time

# Create an output widget for reasoning steps
reasoning_output = Output(layout=Layout(border="1px solid black", padding="10px", margin="10px", width="100%"))


# Initialize conversation histories for all users and active user
user_histories = {"User01": [], "User02": [], "User03": []}
active_user = "User01"  # Default user
conversation_active = True

# Function to handle user input and optional bot response
def chat(user_message):
    global conversation_active
    # Check for exit signal
    if user_message.lower() in ['exit', 'quit']:
        conversation_active = False
        clear_output(wait=True)
        display(HTML("<div style='color: red;'><strong>Conversation ended. Saving history...</strong></div>"))
        save_conversation_history()  # Save the conversation history to a file
        display(HTML("<div style='color: green;'><strong>History saved. Proceed to the next cell.</strong></div>"))
        return

    # Append user message to the active user's history
    user_histories[active_user].append({"role": "user", "content": user_message})

    # Generate bot response if agent_checkbox is checked
    if agent_checkbox.value:
        pfiles = 1 if files_checkbox.value else 0
        response = chat_with_gpt(user_histories[active_user], user_message,pfiles)
        # Append bot response to the active user's history
        user_histories[active_user].append({"role": "assistant", "content": response})
        # If TTS is enabled, convert the response to speech
        if tts_checkbox.value:
            text_to_speech(response)

    # Update display
    update_display()

# Function to update the display
def update_display():
    clear_output(wait=True)

    for entry in user_histories[active_user]:
        formatted_entry = format_entry(entry)
        display(Markdown(formatted_entry))

    #Audio display
    if os.path.exists("/content/response.mp3"):
      display(Audio("/content/response.mp3", autoplay=True))
      !rm /content/response.mp3

    if os.path.exists("/content/c_image.png") and files_checkbox.value==True:
      # Open the image using PIL
      original_image = PILImage.open("/content/c_image.png")

      # Resize the image to 50% of its original size
      new_size = (original_image.width // 2, original_image.height // 2)
      resized_image = original_image.resize(new_size)
      # Display the resized image
      display(resized_image)

    # Display the interactive widgets, excluding reasoning_output
    if conversation_active:
        display(
            VBox(
                [user_selector, input_box, agent_checkbox, tts_checkbox, files_checkbox],
                layout=Layout(display='flex', flex_flow='column', align_items='flex-start', width='100%')
            )
        )

    # Display reasoning_output persistently
    display(reasoning_output)

# Function to handle the submission of the input
def handle_submit(sender):
    user_message = sender.value
    if user_message.strip():
        sender.value = ""  # Clear the input box
        chat(user_message)

# Function to update the active user
def on_user_change(change):
    global active_user
    active_user = change['new']
    update_display()  # Update the display to show the new active user's history

# Function to save conversation history to a file
def save_conversation_history():
    filename = "conversation_history.json"  # Define the filename
    with open(filename, 'w') as file:
        json.dump(user_histories, file, indent=4)  # Write the user histories dictionary to the file in JSON format
    display(HTML(f"<div style='color: green;'><strong>Conversation history saved to {filename}.</strong></div>"))

# Create a dropdown to select the user
user_selector = Dropdown(
    options=["User01", "User02", "User03"],
    value=active_user,
    description='User:',
    layout=Layout(width='50%')
)
user_selector.observe(on_user_change, names='value')

# Create the input box widget
input_box = Text(placeholder="Type your message here or type 'exit' or 'quit' to end the conversation.", layout=Layout(width='100%'))
input_box.on_submit(handle_submit)  # Attach the on_submit event handler

# Create a checkbox to toggle text-to-speech
tts_checkbox = Checkbox(
    value=False,
    description='Voice Output',
    layout=Layout(width='20%')
)

# Create a checkbox to toggle agent response
files_checkbox = Checkbox(
    value=False,
    description='Files',
    layout=Layout(width='20%')
)

# Create a checkbox to toggle agent response
agent_checkbox = Checkbox(
    value=True,
    description='Agent',
    layout=Layout(width='20%')
)

display(
            VBox(
                [user_selector, input_box, agent_checkbox, tts_checkbox, files_checkbox],
                layout=Layout(display='flex', flex_flow='column', align_items='flex-start', width='100%')
            )
        )

# Dynamically display reasoning_output when used
with reasoning_output:
    reasoning_output.clear_output(wait=True)
    # Add content to the reasoning_output widget here
    print("Reasoning activated")

**Note:** *Make sure you have had a conversion by running the interface in the preceding cell. Then make sure you stopped the conversion with 'quit' or 'exit" so that the conversation is saved. If not, the conversation will not be displayed and summarized in the following cells.*


In [None]:
display_conversation_history=True
summary=True

# Load and display the conversation history

In [None]:
import json
from IPython.display import display, Markdown
import os

if display_conversation_history == True or summary==True:
    # File path
    file_path = 'conversation_history.json'

    # Check if the file exists
    if os.path.exists(file_path):
        print(f"The file '{file_path}' exists.")
    else:
        print(f"The file '{file_path}' does not exist.")
        display_conversation_history=False
        summary=False
        print("Conversation history not processed")

In [None]:
# Display option
if display_conversation_history==True:
  # File path
  file_path = 'conversation_history.json'

  # Open the file and read its content into the 'dialog' variable
  with open(file_path, 'r', encoding='utf-8') as file:
      dialog = json.load(file)  # Parse JSON content

  # Function to format JSON content as markdown
  def format_json_as_markdown(data, level=0):
      html_output = ""
      indent = "  " * level
      if isinstance(data, dict):
          for key, value in data.items():
              html_output += f"{indent}**{key}**:<br>\n"
              html_output += format_json_as_markdown(value, level + 1)
      elif isinstance(data, list):
          for item in data:
              html_output += format_json_as_markdown(item, level)
      else:
          html_output += f"{indent}{data}<br>\n"
      return html_output

  # Format the JSON into markdown
  formatted_markdown = format_json_as_markdown(dialog)

  # Display formatted JSON as Markdown
  display(Markdown(formatted_markdown))

# Load and summarize the conversation history

In [None]:
import json
from IPython.display import Markdown, display

def summarize_conversation(file_path):
    """
    Reads a conversation history JSON file, formats it, and generates a detailed
    summary with a list of actions from the JSON text. The summary is displayed in Markdown.

    Parameters:
        file_path (str): Path to the JSON file containing conversation history.

    Returns:
        None: The summary is displayed as Markdown output.
    """
    # Step 1: Read the conversation history from the JSON file
    with open(file_path, 'r', encoding='utf-8') as file:
        dialog = file.read()
    conversation_history_json = json.loads(dialog)

    # Step 2: Construct dialog string from the JSON conversation history
    def construct_dialog(conversation_history_json):
        dialog = ""
        for user, messages in conversation_history_json.items():
            dialog += f"\n{user}:\n"
            for message in messages:
                role = message["role"]
                content = message["content"]
                dialog += f"- {role}: {content}\n"
        return dialog

    formatted_dialog = construct_dialog(conversation_history_json)

    # Step 3: Prepare the task for the summary
    mrole = "system"
    mcontent = "Your task is to read this JSON formatted text and summarize it."
    user_role = "user"
    task = f"Read this JSON formatted text and make a very detailed summary of it with a list of actions:\n{formatted_dialog}"

    # Step 4: Call the `make_openai_api_call` function
    task_response = make_openai_api_call(task, mrole, mcontent, user_role)

    # Step 5: Display the task response as Markdown
    display(Markdown(task_response))


In [None]:
if summary==True:
    # File path to the JSON file
    file_path = '/content/conversation_history.json'

    # Check if the file exists before calling the function
    if os.path.exists(file_path):
        summarize_conversation(file_path)
    else:
        print(f"File '{file_path}' does not exist. Please provide a valid file path.")
