In [1]:
#!pip install ipympl
%matplotlib widget

In [2]:
import os
os.environ["LANGCHAIN_API_KEY"] = 
os.environ["OPENAI_API_KEY"] = 

In [3]:
!pip freeze | grep langchain

langchain==0.2.14
langchain-community==0.2.12
langchain-core==0.2.33
langchain-experimental==0.0.57
langchain-huggingface==0.0.3
langchain-openai==0.0.8
langchain-text-splitters==0.2.2
langchainhub==0.1.15


In [4]:
from operator import itemgetter
from typing import List

from langchain_openai.chat_models import ChatOpenAI

from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.documents import Document
from langchain_core.messages import BaseMessage, AIMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from pydantic import BaseModel, Field
from langchain_core.runnables import (
    RunnableLambda,
    ConfigurableFieldSpec,
    RunnablePassthrough,
)
from langchain_core.runnables.history import RunnableWithMessageHistory

from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers.string import StrOutputParser
from langchain_core.output_parsers.json import JsonOutputParser
import re
from utils.time import create_time_stamp, get_today_weekday
import matplotlib.pyplot as plt
from IPython import display
from langchain.agents import create_react_agent
from langchain.agents import AgentExecutor

In [5]:

class InMemoryHistory(BaseChatMessageHistory, BaseModel):
    """In memory implementation of chat message history."""

    messages: List[BaseMessage] = Field(default_factory=list)

    def add_messages(self, messages: List[BaseMessage]) -> None:
        """Add a list of messages to the store"""
        self.messages.extend(messages)

    def clear(self) -> None:
        self.messages = []

# Here we use a global variable to store the chat message history.
# This will make it easier to inspect it to see the underlying results.
store = {}

def get_by_session_id(session_id: str) -> BaseChatMessageHistory:
    if session_id not in store:
        store[session_id] = InMemoryHistory()
    return store[session_id]


In [6]:
dimensions = (200, 200)
xstep = 5
ystep = 20

In [7]:
class SenderAgent:
    def __init__(self):
        model = ChatOpenAI(model_name="gpt-4o", temperature=0.)
        
        # Define the prompt template with a system message and placeholder for memory
        prompt_template = ChatPromptTemplate.from_messages([
            ("system","""
            You are a player in a pong game, where you send the ball.
            Your sent ball will start at position y=0 and x=random generated number
            and then you will move the ball along y-direction by fixing x value and increasing
            y.
            
            ## Objective
                - Send a ball downwards, and let it pass my barrier
            
            ## Guidelines
                - Your window space is given by dimension (width, height) {dimensions}, so never
                generate something outside of x-range.
                - Dimension system starts from  (0,0) on top left corner and increase x to the
                right, increase y downwards
                - After first serve, x value is constant never changed
                - After first serve y value is increased by {ystep}
                - I will let you know if you overpass my barrier -> you win
                - If you lose, re-visit your history to see where you start
                - Everytime i tell you "move" it means you will move the same ball
                - Once i tell you "start" it means we reset, start from again
                - Always remember where is the ball from your previous move
                - Pay attetion to user further commandments
                - Learn from each game how to initialize your best x-position right or left
                - Your only output is position given in JSON format
                    {{
                        "x": integer,
                        "y": integer
                    }}
                - At the end of the game lets summarize
          
            """),
            MessagesPlaceholder(variable_name="history"),
            ("human", "{input}")           
        ])
        
        fagent = prompt_template | model
        
        # Updated AgentExecutor to include memory
#        agent = AgentExecutor(
#            agent=fagent,
#            tools=[],
#            verbose=False,
#            handle_parsing_errors=True
#        )
        
        # Initialize conversation memory
        self.chain_with_history = RunnableWithMessageHistory(
            fagent,
            # Uses the get_by_session_id function defined in the example
            # above.
            get_by_session_id,
            input_messages_key="input",
            history_messages_key="history",
        ) | JsonOutputParser()
        self.mem_key = "sender"

    def play(self, cmd):
        new_pos = self.chain_with_history.invoke({
            "input": cmd,
            "dimensions": dimensions,
            "ystep": ystep
        },
            config={
                "configurable": {
                    "session_id": self.mem_key
                }
            }
        )        
                                            
        return new_pos

In [8]:
sender_player = SenderAgent()

In [9]:
start_pos = sender_player.play("start")
print(start_pos)
for _ in range(10):
    curr_pos = sender_player.play("move")
    print(curr_pos)

{'x': 100, 'y': 0}
{'x': 100, 'y': 20}
{'x': 100, 'y': 40}
{'x': 100, 'y': 60}
{'x': 100, 'y': 80}
{'x': 100, 'y': 100}
{'x': 100, 'y': 120}
{'x': 100, 'y': 140}
{'x': 100, 'y': 160}
{'x': 100, 'y': 180}
{'x': 100, 'y': 200}


In [10]:
import random
class DummyBarrier:
    def __init__(self):
        self.xrange = range(0, dimensions[0] - 30)
        self.yrange = range(dimensions[1] // 3,  dimensions[1] - 100 )
        self.curr_pos = (0,0)
    def play(self, cmd):
        if cmd == "start":
            self.curr_pos = (
                random.randint(self.xrange.start, self.xrange.stop),
                random.randint(self.yrange.start, self.yrange.stop)
            )
        else:
            xpos = random.choice([1,-1,2,-2,3,-3]) * xstep * 2+ self.curr_pos[0]
            xpos = max(0, xpos)
            xpos = min(xpos, self.xrange.stop)
            self.curr_pos = (
                xpos,
                self.curr_pos[1]
            )
        return self.curr_pos

In [11]:
barrier = DummyBarrier()

In [12]:
start_pos = barrier.play("start")
print(start_pos)
for _ in range(10):
    curr_pos = barrier.play("move")
    print(curr_pos)

(20, 96)
(40, 96)
(70, 96)
(40, 96)
(50, 96)
(30, 96)
(50, 96)
(60, 96)
(50, 96)
(30, 96)
(60, 96)


In [13]:
def check_collision(pos1, size1, pos2, size2):
    # Unpack positions and sizes
    x1, y1 = pos1
    width1, height1 = size1
    x2, y2 = pos2
    width2, height2 = size2

    # Check for collision between two rectangles
    return not (
        x1 + width1 <= x2 or  # Left of obj1 is to the left of obj2's right edge
        x2 + width2 <= x1 or  # Left of obj2 is to the left of obj1's right edge
        y1 + height1 <= y2 or # Top of obj1 is above obj2's bottom edge
        y2 + height2 <= y1    # Top of obj2 is above obj1's bottom edge
    )

In [14]:
def draw_shapes(ax, shapes, dimensions):
    ax.clear()  # Clear the current axes to redraw
    ax.set_xlim(0, dimensions[0])
    ax.set_ylim(dimensions[1], 0)  # Invert y-axis
    ax.set_aspect('equal')
    ax.grid(True)

    # Draw each shape in the list
    for shape_info in shapes:
        shape, x, y = shape_info
        position = (x, y)
        
        if shape == 'circle':
            circle = plt.Circle(position, 10, color='blue', fill=True)  # Fixed radius of 10
            ax.add_artist(circle)
        elif shape == 'box':
            box = plt.Rectangle((x - 10, y - 10), 80, 20, color='red', fill=True)  # Fixed size
            ax.add_artist(box)
    plt.draw()
    plt.pause(.01)
    display.display(plt.gcf())
    display.clear_output(wait=True)

In [15]:
fig, ax = plt.subplots(figsize=(dimensions[0] / 100, dimensions[1] / 100))
#
barrier_pos = barrier.play("start")
#
bx,by = barrier_pos
ball_pos = sender_player.play(f"""
    start; careful! barrier is at location x:{bx} y:{by}
""")
trial = 0
for y in range(0,150):
    # Set up the figure
    renders = [
        ["circle", ball_pos["x"], ball_pos["y"]],
        ["box", barrier_pos[0], barrier_pos[1]]
    ]
    draw_shapes(ax, renders, dimensions)
    #
    if check_collision(
        (ball_pos["x"], ball_pos["y"]), 
        (15,15),
        (barrier_pos[0], barrier_pos[1]),
        (80, 20)
    ):
        print("AAH COLLISION")
        ball_pos = sender_player.play(f"""
            start; 
            You FAILED!!!!
            new game;
            careful! barrier is at location x:{barrier_pos[0]} y:{barrier_pos[1]}
        """)
        trial += 1
    elif ball_pos["y"] > barrier_pos[1] + 10:
        print(f"YOU PASSED at trial {trial+1}")
        ball_pos = sender_player.play(f"""
            GREAT JOB YOU WIN!
            new game;
            start; careful! barrier is at location x:{bx} y:{by}
        """)
        trial = 0
    #
    ball_pos = sender_player.play("move")
    barrier_pos = barrier.play("move")
plt.close()
print(sender_player.play(f"""
            Lets finish, how many times you've lost?
            Do you know what was best strategy to win?
        """))

KeyboardInterrupt: 

In [16]:
print(sender_player.play(f"""
            Lets finish, how many times you've lost?
            Do you know what was best strategy to win?
        """))

OutputParserException: Invalid json output: I lost 5 times during the games. The best strategy to win was to start the ball at an x-position that is just one unit away from the barrier's x-position, either to the left or right, and then move the ball straight down by increasing the y-value by 20 each time. This ensured that the ball would pass by the barrier without hitting it.