In [1]:
import dataclasses

import numpy as np

import chat
import llm_task
import enact
import demo_utils
import asteroids


task_prompt = '''
Write a concise python function with the following signature.
Answer with a single code block.'''
code_gen_prompt = '''

```
def control_one_goal(position: np.ndarray,
                     goal: np.ndarray,
                     orientation: float) -> Tuple[float, float]
  """Control an asteroids-style ship to move to a goal.
  
  Args:
    position: A 2D vector representing the position of the ship.
    goal: A 2D vector representing the position of the goal.
    orientation: A float representing the orientation of the ship in radians.
  Returns:
    A pair of floats representing:
      torque: The amount of torque to apply to the ship, between -1 and 1.
      thrust: The amount of thrust to apply to the ship, between 0.0 and 1
  """
  <INSERT IMPLEMENTATION HERE>
```
'''

code_gen = llm_task.Task(
  task_prompt=task_prompt,
  chat_agent=chat.GPTAgent(model='gpt-4'))
code_gen.add_example(
  '''```def add(x: int, y: int):\n  <INSERT IMPLEMENTATION HERE>```''',
  '''```python\ndef add(x: int, y: int):\n  return x + y\n```''')

@enact.typed_invokable(enact.NoneResource, enact.Str)
@dataclasses.dataclass
class CreatePolicy(enact.Invokable):
  code_gen: llm_task.Task
  
  def call(self):
    return self.code_gen(enact.Str(code_gen_prompt))


sample_input = (np.zeros((2,)), np.zeros((2,)), 0.0)

PolicyChecker = demo_utils.get_policy_checker(asteroids.Action,
                                              asteroids.create_trajectory,
                                              asteroids.plot_trajectory,
                                              sample_input,
                                              func_name='control_one_goal')

store = enact.Store(backend=enact.FileBackend('/home/max/Documents/enact/examples/store_backend/'))
with store:
  post_processor_ref = enact.commit(PolicyChecker())
  code_gen.post_processor = post_processor_ref
  code_gen.max_retries = 2
  create_policy = CreatePolicy(code_gen)


In [2]:
import enact.gradio as gradio

with store:
  ref = enact.commit(create_policy)
  gui = gradio.gradio.GUI(
    ref,
    input_required_outputs=[enact.Image],
    input_required_inputs=[enact.Str])
  gui.launch(share=True)

  from .autonotebook import tqdm as notebook_tqdm


Running on local URL:  http://127.0.0.1:7860
Running on public URL: https://ceef4d5fa552abd035.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from Terminal to deploy to Spaces (https://huggingface.co/spaces)


Traceback (most recent call last):
  File "/home/max/Documents/enact/.venv/lib/python3.8/site-packages/gradio/routes.py", line 437, in run_predict
    output = await app.get_blocks().process_api(
  File "/home/max/Documents/enact/.venv/lib/python3.8/site-packages/gradio/blocks.py", line 1352, in process_api
    result = await self.call_function(
  File "/home/max/Documents/enact/.venv/lib/python3.8/site-packages/gradio/blocks.py", line 1077, in call_function
    prediction = await anyio.to_thread.run_sync(
  File "/home/max/Documents/enact/.venv/lib/python3.8/site-packages/anyio/to_thread.py", line 33, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/max/Documents/enact/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 877, in run_sync_in_worker_thread
    return await future
  File "/home/max/Documents/enact/.venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 807, in run
    result = context.run(func, *args)
  Fil

In [3]:
digest_str = '{"digest": "ca8a0e37f21900ed31d512beb2f4d1bf81dd434bd0ae97e3e602daa3b228a1e4"}'
with store:
  reference = enact.Ref.from_id(digest_str)
  soln_string = reference().response().output()

print(soln_string)

from typing import Tuple
import numpy as np

def control_one_goal(position: np.ndarray,
                     goal: np.ndarray,
                     orientation: float) -> Tuple[float, float]:
    # direction vector
    direction = goal - position

    # desired orientation (angle) to reach the goal
    desired_orientation = np.arctan2(direction[1], direction[0])

    # calculate the difference between current and desired orientation
    orientation_diff = desired_orientation - orientation

    # wrap difference between -pi and pi
    orientation_diff = (orientation_diff + np.pi) % (2*np.pi) - np.pi

    # control torque proportionally to the orientation difference
    torque = np.clip(orientation_diff, -1, 1)

    # control thrust inversely proportional to the orientation difference
    thrust = np.clip(1 - np.abs(orientation_diff) / np.pi, 0.0, 1)

    return torque, thrust

