In [None]:
from langchain.utilities.duckduckgo_search import DuckDuckGoSearchAPIWrapper
import yfinance
import json


def get_ticker(inputs):
    ddg = DuckDuckGoSearchAPIWrapper()
    company_name = inputs["company_name"]
    return ddg.run(f"Ticker symbol of {company_name}")


def get_income_statement(inputs):
    ticker = inputs["ticker"]
    stock = yfinance.Ticker(ticker)
    return json.dumps(stock.income_stmt.to_json())


def get_balance_sheet(inputs):
    ticker = inputs["ticker"]
    stock = yfinance.Ticker(ticker)
    return json.dumps(stock.balance_sheet.to_json())


def get_daily_stock_performance(inputs):
    ticker = inputs["ticker"]
    stock = yfinance.Ticker(ticker)
    return json.dumps(stock.history(period="3mo").to_json())


functions_map = {
    "get_ticker": get_ticker,
    "get_income_statement": get_income_statement,
    "get_balance_sheet": get_balance_sheet,
    "get_daily_stock_performance": get_daily_stock_performance,
}

functions = [
    {
        "type": "function",
        "function": {
            "name": "get_ticker",
            "description": "Given the name of a company returns its ticker symbol",
            "parameters": {
                "type": "object",
                "properties": {
                    "company_name": {
                        "type": "string",
                        "description": "The name of the company",
                    }
                },
                "required": ["company_name"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "get_income_statement",
            "description": "Given a ticker symbol (i.e AAPL) returns the company's income statement.",
            "parameters": {
                "type": "object",
                "properties": {
                    "ticker": {
                        "type": "string",
                        "description": "Ticker symbol of the company",
                    },
                },
                "required": ["ticker"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "get_balance_sheet",
            "description": "Given a ticker symbol (i.e AAPL) returns the company's balance sheet.",
            "parameters": {
                "type": "object",
                "properties": {
                    "ticker": {
                        "type": "string",
                        "description": "Ticker symbol of the company",
                    },
                },
                "required": ["ticker"],
            },
        },
    },
    {
        "type": "function",
        "function": {
            "name": "get_daily_stock_performance",
            "description": "Given a ticker symbol (i.e AAPL) returns the performance of the stock for the last 100 days.",
            "parameters": {
                "type": "object",
                "properties": {
                    "ticker": {
                        "type": "string",
                        "description": "Ticker symbol of the company",
                    },
                },
                "required": ["ticker"],
            },
        },
    },
]

get_income_statement({"ticker": "AAPL"})

In [8]:
import openai as client

# assistant = client.beta.assistants.create(
#     name="Investor Assistant",
#     instructions="You help user do research on publicly traded companies and you help them decide if they should buy the stock or not.",
#     model="gpt-3.5-turbo-0125",
#     tools = functions
# )

assistant_id = "asst_W2n1XTFiMov4kmyAk1i7rmKM"

In [65]:
thread = client.beta.threads.create()

thread_id = thread.id

message = client.beta.threads.messages.create(
    thread_id=thread_id,
    role= "user",
    content="I want to know if Health Salesforce is a good buy."
)

In [None]:
run = client.beta.threads.runs.create(
    thread_id=thread_id,
    assistant_id=assistant_id,
)

run_id = run.id

In [60]:
import json

def get_run(run_id, thread_id):
    return client.beta.threads.runs.retrieve(
        run_id=run_id,
        thread_id=thread_id
    )

def get_messages(thread_id):
    messages = client.beta.threads.messages.list(thread_id=thread_id)
    messages = list(messages)
    messages.reverse()
    for message in messages:
        print(f"{message.role}:{message.content[0].text.value}")


def get_tool_output(run_id, thread_id):
    run = get_run(run_id, thread_id)
    outputs = []
    for action in run.required_action.submit_tool_outputs.tool_calls:
        action_id = action.id
        function = action.function
        # print(f"Calling function {function.name} with arg {function.arguments}")
        outputs.append(
            {
                "output": functions_map[function.name](json.loads(function.arguments)),
                "tool_call_id": action_id,
            }
        )
    return outputs 

def submit_tool_output(run_id, thread_id):
    outputs = get_tool_output(run_id, thread_id)
    return client.beta.threads.runs.submit_tool_outputs(
        run_id=run_id,
        thread_id=thread_id,
        tool_outputs=outputs
    )

def send_message(thread_id, content):
    return client.beta.threads.messages.create(
        thread_id=thread_id,
        role="user",
        content=content,
    )

In [66]:
from typing_extensions import override
from openai import AssistantEventHandler
from openai.types.beta.threads import Message, MessageDelta
from openai.types.beta.threads.runs import ToolCall, RunStep
from openai.types.beta import AssistantStreamEvent

# messages = []

# First, we create a EventHandler class to define
# how we want to handle the events in the response stream.


class EventHandler(AssistantEventHandler):
    def __init__(self, thread_id, assistant_id):
        super().__init__()
        self.output = None
        self.tool_id = None
        self.thread_id = thread_id
        self.assistant_id = assistant_id
        self.run_id = None
        self.run_step = None
        self.function_name = ""
        self.arguments = ""

    @override
    def on_text_created(self, text) -> None:
        print(f"\nassistant on_text_created > ", end="", flush=True)

    @override
    def on_text_delta(self, delta, snapshot):
        # text가 만들어질때마다 작성하도록
        # print(f"\nassistant on_text_delta > {delta.value}", end="", flush=True)
        print(f"{delta.value}")

    @override
    def on_end(self):
        # 각 stream이 끝날 때때
        print(
            f"\n end assistant > ", self.current_run_step_snapshot, end="", flush=True
        )

    @override
    def on_exception(self, exception: Exception) -> None:
        """Fired whenever an exception happens during streaming"""
        print(f"\nassistant > {exception}\n", end="", flush=True)

    @override
    def on_message_created(self, message: Message) -> None:
        print(f"\nassistant on_message_created > {message}\n", end="", flush=True)

    @override
    def on_message_done(self, message: Message) -> None:
        print(f"\nassistant on_message_done > {message}\n", end="", flush=True)

    @override
    def on_message_delta(self, delta: MessageDelta, snapshot: Message) -> None:
        # print(f"\nassistant on_message_delta > {delta}\n", end="", flush=True)
        pass

    def on_tool_call_created(self, tool_call):
        # 4
        print(f"\nassistant on_tool_call_created > {tool_call}")
        self.function_name = tool_call.function.name
        self.tool_id = tool_call.id
        print(f"\on_tool_call_created > run_step.status > {self.run_step.status}")

        print(f"\nassistant > {tool_call.type} {self.function_name}\n", flush=True)

        keep_retrieving_run = client.beta.threads.runs.retrieve(
            thread_id=self.thread_id, run_id=self.run_id
        )

        while keep_retrieving_run.status in ["queued", "in_progress"]:
            keep_retrieving_run = client.beta.threads.runs.retrieve(
                thread_id=self.thread_id, run_id=self.run_id
            )

            print(f"\nSTATUS: {keep_retrieving_run.status}")

    @override
    def on_tool_call_done(self, tool_call: ToolCall) -> None:
        # tool_call이 끝났을 때
        keep_retrieving_run = client.beta.threads.runs.retrieve(
            thread_id=self.thread_id, run_id=self.run_id
        )
        # 현재 retrieving_run 정보
        print(f"\nDONE STATUS: {keep_retrieving_run.status}")

        if keep_retrieving_run.status == "completed":
            all_messages = client.beta.threads.messages.list(thread_id=thread_id.id)

            print(all_messages.data[0].content[0].text.value, "", "")
            return

        elif keep_retrieving_run.status == "requires_action":
            # keep_retrieving_run.status가 actoin을 요구하면
            print("here you would call your function")

            if self.function_name in functoins_map:
                # 해당 요구하는 action이 내가 정한 함수 안에 있을 때
                print(self.arguments)

                with client.beta.threads.runs.submit_tool_outputs_stream(
                    thread_id=self.thread_id,
                    run_id=self.run_id,
                    tool_outputs=[
                        {
                            "tool_call_id": self.tool_id,
                            "output": functoins_map[self.function_name](
                                json.loads(self.arguments)
                            ),
                        }
                    ],
                    event_handler=EventHandler(self.thread_id, self.assistant_id),
                ) as stream:
                    stream.until_done()
                # 해당 action에 대한 stream을 만듦
            else:
                print("unknown function")
                return

    @override
    def on_run_step_created(self, run_step: RunStep) -> None:
        # 2
        print(f"on_run_step_created")
        self.run_id = run_step.run_id
        self.run_step = run_step
        print("The type ofrun_step run step is ", type(run_step), flush=True)
        print(f"\n run step created assistant > {run_step}\n", flush=True)

    @override
    def on_run_step_done(self, run_step: RunStep) -> None:
        print(f"\n run step done assistant > {run_step}\n", flush=True)

    def on_tool_call_delta(self, delta, snapshot):
        if delta.type == "function":
            print("Get function argument")
            # the arguments stream thorugh here and then you get the requires action event
            print(delta.function.arguments, end="", flush=True)
            self.arguments += delta.function.arguments
        elif delta.type == "code_interpreter":
            print(f"on_tool_call_delta > code_interpreter")
            if delta.code_interpreter.input:
                print(delta.code_interpreter.input, end="", flush=True)
            if delta.code_interpreter.outputs:
                print(f"\n\noutput >", flush=True)
                for output in delta.code_interpreter.outputs:
                    if output.type == "logs":
                        print(f"\n{output.logs}", flush=True)
        else:
            print("ELSE")
            print(delta, end="", flush=True)

    @override
    def on_event(self, event: AssistantStreamEvent) -> None:
        # print("In on_event of event is ", event.event, flush=True)

        if event.event == "thread.run.requires_action":
            print("\nthread.run.requires_action > submit tool call")
            print(f"ARGS: {self.arguments}")


# Then, we use the `stream` SDK helper
# with the `EventHandler` class to create the Run
# and stream the response.

with client.beta.threads.runs.stream(
    thread_id=thread_id,
    assistant_id=assistant_id,
    instructions="You help user do research on publicly traded companies and you help them decide if they should buy the stock or not.",
    event_handler=EventHandler(thread_id, assistant_id),
) as stream:
    stream.until_done()

# print(stream.current_run_step_snapshot.run_id)

on_run_step_created
The type ofrun_step run step is  <class 'openai.types.beta.threads.runs.run_step.RunStep'>

 run step created assistant > RunStep(id='step_3zoLykEuqy2FkX2LrIN7fQT6', assistant_id='asst_W2n1XTFiMov4kmyAk1i7rmKM', cancelled_at=None, completed_at=None, created_at=1735291113, expired_at=None, failed_at=None, last_error=None, metadata=None, object='thread.run.step', run_id='run_WPiA5aElkeIMImCXPSEPDdaJ', status='in_progress', step_details=ToolCallsStepDetails(tool_calls=[], type='tool_calls'), thread_id='thread_2e5ZrtLTlwjYCTCMNGHHfHd5', type='tool_calls', usage=None, expires_at=1735291711)


assistant on_tool_call_created > FunctionToolCall(id='call_GH0n110cyGt7TbFdGXsEWnxP', function=Function(arguments='', name='get_ticker', output=None), type='function', index=0)
\on_tool_call_created > run_step.status > in_progress

assistant > function get_ticker

Get function argument
{"Get function argument
companyGet function argument
_nameGet function argument
":"Get function ar

In [None]:
get_messages(thread_id)

In [None]:
get_run(run_id, thread_id).status

In [None]:
submit_tool_output(run_id, thread_id)

In [None]:
send_message(thread_id, "Now I wnat to know if Cloudflare is a good buy")

In [None]:
stream.pirnt_messages()