### Initialize environment

In [191]:
from dotenv import load_dotenv

load_dotenv("../.env")
load_dotenv("../.env.local", override=True)

True

### Initialize OpenAI Client

In [192]:
from openai import OpenAI
from os import getenv

# API Key is loaded from environment variable OPENAI_API_KEY initialized through dotenv above
oai_client = OpenAI()
oai_model = getenv('OPENAI_API_MODEL')
oai_assistant_id = getenv('OPENAI_API_ASSISTANT_ID')

### Test OpenAI Client

In [193]:
# chat_completion = oai_client.chat.completions.create(
#   messages=[
#     {
#       "role": "user",
#       "content": "Sag mal Banane",
#     }
#   ],
#   model="gpt-4-1106-preview",
# )

# chat_completion.choices[0].message.content

### Domain & Implementation

In [194]:
from typing import List, Set, Optional, Literal
from time import sleep
from openai.types.beta.thread import Thread

class WorkflowRole:
  def __init__(self, name: str, description: str):
    self.name = name
    self.description = description
  
  def gpt_str(self) -> str:
    return f"('role' name:[{self.name}], description:[{self.description}])"

  def __str__(self) -> str:
    return f"WorkflowRole[{self.name}]"

class WorkflowArtifact:
  def __init__(self, name: str, description: str):
    self.name = name
    self.description = description
  
  def gpt_str(self) -> str:
    return f"('artifact' name:[{self.name}], description:[{self.description}])"

  def __str__(self) -> str:
    return f"WorkflowArtifact[{self.name}]"

class WorkflowConversation:
  def __init__(self, name: str, description: str, lead: WorkflowRole, assistant: WorkflowRole, input: WorkflowArtifact, output: WorkflowArtifact):
    self.name = name
    self.description = description
    self.lead = lead
    self.assistant = assistant
    self.input = input
    self.output = output

  def roles(self) -> Set[WorkflowRole]:
    return {self.lead, self.assistant}
  
  def gpt_str(self) -> str:
    return f"('conversation' name:[{self.name}], description:[{self.description}], lead name:[{self.lead.name}], assistant name:[{self.assistant.name}], input name:[{self.input.name}], output name:[{self.output.name}])"

  def __str__(self) -> str:
    return f"WorkflowConversation[{self.name}]"

class WorkflowPhase:
  def __init__(self, name: str, description: str, conversations: List[WorkflowConversation]):
    self.name = name
    self.description = description
    self.conversations = conversations
    self.current_conversation_index = 0
  
  def roles(self) -> Set[WorkflowRole]:
    return {role for conversation in self.conversations for role in conversation.roles()}
  
  def current_conversation(self) -> WorkflowConversation:
    return self.conversations[self.current_conversation_index]
  
  def gpt_str(self) -> str:
    return f"('workflow' name:[{self.name}], description:[{self.description}], conversation names:[{','.join([conversation.name for conversation in self.conversations])}])"

  def __str__(self) -> str:
    return f"WorkflowPhase[{self.name}]"
  
class AiThread:
  def __init__(self, oai_client: OpenAI, model: str, assistant_id: str, instructions: str, initial_message: str):
    self.oai_client = oai_client
    self.model = model
    self.assistant_id = assistant_id
    self.instructions = instructions
    self.initial_message = initial_message
    self.current_thread: Optional[Thread] = None
    self.last_message_id: Optional[str] = None

  def thread(self) -> Thread:
    if self.current_thread is None:
      print(f"DEBUG: Creating thread...")
      self.current_thread = self.oai_client.beta.threads.create(
        messages= [{
          "role": "user",
          "content": self.initial_message
        }]
      )
      print(f"DEBUG: Created thread with ID {self.current_thread.id}")

    return self.current_thread
  
  def send(self, content: str) -> str:
    print(f"DEBUG: Sending message [{content}]")
    print(f"DEBUG: Creating new message in thread {self.thread().id}...")
    message = self.oai_client.beta.threads.messages.create(
      thread_id= self.thread().id,
      role= "user",
      content= content,
    )

    print(f"DEBUG: Creating thread run...")
    run = self.oai_client.beta.threads.runs.create(
      thread_id= self.thread().id,
      model= self.model,
      assistant_id= self.assistant_id,
      instructions= self.instructions,
    )
    print(f"DEBUG: Created thread run {run.id}.")

    while run.status == 'queued' or run.status == 'in_progress' or run.status == 'requires_action':
      print(f"DEBUG: Run {run.id} status {run.status}, reloading...")
      run = self.oai_client.beta.threads.runs.retrieve(run_id= run.id, thread_id= self.thread().id)
      if run.status != 'queued' and run.status != 'in_progress' and run.status != 'requires_action':
        break

      sleep(5)

    print(f"DEBUG: Run finished with status {run.status}")

    new_messages = self.oai_client.beta.threads.messages.list(
      thread_id= self.thread().id,
      limit= 1,
      before= message.id,
    )

    if len(new_messages.data) == 0:
      raise RuntimeError(f"No new messages found in thread {self.thread().id}")
    
    print(f"DEBUG: New message with ID {new_messages.data[0].id} found")
    print(f"DEBUG: Response: [{new_messages.data[0].content[0].text.value}]")
    return new_messages.data[0].content[0].text.value
  
  def destroy(self):
    if self.current_thread is None:
      return
    
    print(f"DEBUG: Removing thread with ID {self.current_thread.id}")
    self.oai_client.beta.threads.delete(thread_id= self.current_thread.id)
  
class WorkflowManager:
  def __init__(self, oai_client: OpenAI, model: str, assistant_id: str, description: str, phases: List[WorkflowPhase]):
    self.oai_client = oai_client
    self.model = model
    self.assistant_id = assistant_id
    self.description = description
    self.phases = phases
    self.current_phase_index = 0
    self.last_message_id: Optional[str] = None

  def roles(self) -> Set[WorkflowRole]:
    return {role for phase in self.phases for role in phase.roles()}

  def conversations(self) -> Set[WorkflowConversation]:
    return {conversation for phase in self.phases for conversation in phase.conversations}
  
  def current_phase(self) -> WorkflowPhase:
    return self.phases[self.current_phase_index]
  
  def current_conversation(self) -> WorkflowConversation:
    return self.current_phase().current_conversation()
  
  def execute(self, input: str):
    eol = '\n'
    instructions = '''
      You are roleplaying multiple people in a workflow, e.g. a company working on things in a process.
      In every response you represent only a single person.
      The roleplay is structured in phases and each phase has a conversation.
      Each conversation always has a lead and an assistant role you are to impersonate.
      Each conversation has an input type and an output type you will try to create.
      Use all existing inputs and outputs created to properly create the respective output.

      In my initial message I will provide instructions that are to be handled by the workflow based on the input of the conversation.
      I provide you with descriptions of the conversations and the phases they belong to so that you know what to do.
      When I say "START CONVERSATION <Conversation Name>" you will impersonate the lead of the conversation.
      You will read the existing thread and everything it it carefully.
      You will find an input per description. You answer to the provided assistant in respect to the roles and inputs described.

      When I say "SWITCH" you will impersonate the assistant and answer to the lead.
      When I say "SWITCH" again you will impersonate the lead again and answer to the assistant.
      Each impersonation can explain things or ask questions.
      If you feel like you are done, your message will only contain the conversation's desired
      output and end with "END CONVERSATION".

      Always start your responses with the name of the role, e.g.:

      ```
      CEO:

      <the message>
      ```

      ```
      CPO:

      <the message>
      ```
    '''

    initial_message = f'''
      These are the roles you are to impersonate:
      {eol.join([role.gpt_str() for role in self.roles()])}

      These are the artifacts that are referenced as inputs and outputs in conversations:
      {eol.join([role.gpt_str() for role in self.roles()])}

      These are the conversations you are to hold. They are referenced in phases:
      {eol.join([conversation.gpt_str() for conversation in self.conversations()])}

      These are the phases you are going through:
      {eol.join([phase.gpt_str() for phase in self.phases])}

      Here is a general description of the workflow for context:
      {self.description}

      This is the instruction you will handle with the workflow:
      ({input})
    '''

    thread = AiThread(
      oai_client= self.oai_client,
      model= self.model,
      assistant_id= self.assistant_id,
      instructions= instructions,
      initial_message= initial_message,
    )

    # Sanity check
    print(f"DEBUG: Sending sanity check")
    response = thread.send('''
      Answer with only "SUCCESS" if you understood everything and with only "FAILURE" if you didn't."
    ''')
    print(f"DEBUG: Sanity: {response}")

    last_response = thread.send(f'''
      START CONVERSATION {self.current_conversation().name}
    ''')
    print(last_response)
  
    message_count = 0
    while message_count < 10 and "END CONVERSATION" not in last_response:
      last_response = thread.send("SWITCH")
      print(last_response)
      message_count += 1

    thread.destroy()

  def __str__(self) -> str:
    return f"WorkflowManager[{self.name}]"

### Put together final workflow

#### Roles

In [195]:
ceo = WorkflowRole(
  name= 'CEO',
  description= '''
    You are the CEO of the company.
    You take the initial task from the customer and understand business value, rough time estimation, needed resources etc.
  ''',
)

cpo = WorkflowRole(
  name= 'CPO',
  description= '''
    You are the CPO of the company.
    You understand the project of the customer from a business perspective
    and define requirements that the CTO can translate to technical tasks.
  ''',
)

cto = WorkflowRole(
  name= 'CTO',
  description= '''
    You are the CTO of the company.
    You understand the project from a technical perspective and represent the bridge between
    the technical team and the business roles CEO, CPO and Customer.
  ''',
)

programmer = WorkflowRole(
  name= 'Programmer',
  description= '''
    You are an intermediate programmer.
    You can program really well, but not perfect. You don't understand all best practices yet,
    but the code you produce works and is solid.
  ''',
)

designer = WorkflowRole(
  name= 'Designer',
  description= '''
    You are a software designer.
    You understand software architecture well, understand UX and a lot of business properties
    of software as well as common best practices.
  ''',
)

reviewer = WorkflowRole(
  name= 'Reviewer',
  description= '''
    You are a code reviewer.
    You understand code from the inside out and can spot problems with an implementation.
    You write review tasks that the programmer can then use to improve the code.
  ''',
)

tester = WorkflowRole(
  name= 'Tester',
  description= '''
    You are a software tester.
    You test code, uncover and problems that the programmer can resolve.
    You write test concepts that the programmer can implement.
  ''',
)

#### Artifacts

In [196]:
task = WorkflowArtifact(
  name= 'Task',
  description= '''
    The business task as defined by the customer. It should be realised as a software product or feature.
  ''',
)

modalities = WorkflowArtifact(
  name= 'Modalities',
  description= '''
    The modalities of the task as defined by CEO and CPO. It defines the time for development, the budget,
    human resource costs and similar constraints.
  ''',
)

language = WorkflowArtifact(
  name= 'Language',
  description= '''
    The use cases of the task defined in human language which can be translated between
    business and technical department easily.
  '''
)

code = WorkflowArtifact(
  name= 'Code',
  description= '''
    The code as written by the programmer. It is yet to be designed, reviewed and tested.
  ''',
)

designed_code = WorkflowArtifact(
  name= 'Designed Code',
  description= '''
    The code as enhanced by the designer. It takes into account architecturial decisions
    and things like UX, best practices etc. It is yet to be reviewed and tested.
  ''',
)

reviewed_code = WorkflowArtifact(
  name= 'Reviewed Code',
  description= '''
    The reviewed code containing a list of review items and applied corrections. It is yet to be tested.
  ''',
)

tested_code = WorkflowArtifact(
  name= 'Tested Code',
  description= '''
    The tested code containing the finished code and proper tests. It is ready to be deployed.
  ''',
)

spec = WorkflowArtifact(
  name= 'Specification',
  description= '''
    The technical specification of the implemented code. It declares technical aspects
    for future programmers and can be translated to a manual.
  ''',
)

manual = WorkflowArtifact(
  name= 'Manual',
  description= '''
    A manual written for the customer so that they know how to properly use the changes made
    and can forward integrations to other departments or companies.
  ''',
)

### Conversations

In [197]:
design_modalities = WorkflowConversation(
  name= 'Design (Modalities)',
  description= '''
    The task given by the customer is analyzed and the respective modalities are created.
  ''',
  lead= ceo,
  assistant= cpo,
  input= task,
  output= modalities,
)

design_language = WorkflowConversation(
  name= 'Design (Language)',
  description= '''
    From the task and modalities specific use-cases are created that help the technical
    team to form working code from them.
  ''',
  lead= ceo,
  assistant= cto,
  input= modalities,
  output= language,
)

coding_code = WorkflowConversation(
  name= 'Coding (Code)',
  description= '''
    The initial implementation of the given use-cases realized as code.
    The code is not perfect yet, but it works.
    The code is always written in Python.
  ''',
  lead= cto,
  assistant= programmer,
  input= language,
  output= code,
)

coding_design = WorkflowConversation(
  name= 'Coding (Design)',
  description= '''
    The given code is analyzed for common architecturial problems and
    improved upon. Best practices are discussed and implemented by the programmer.
  ''',
  lead= programmer,
  assistant= designer,
  input= code,
  output= designed_code,
)

testing_review = WorkflowConversation(
  name= 'Testing (Review)',
  description= '''
    The completed code is reviewed for common code problems, errors
    and best practice problems. The programmer resolves problems.
  ''',
  lead= programmer,
  assistant= reviewer,
  input= designed_code,
  output= reviewed_code,
)

testing_test = WorkflowConversation(
  name= 'Testing (Test)',
  description= '''
    The given code is tested if it contains all business cases. Tests are written
    by the programmer by the test concept given by the tester.
  ''',
  lead= programmer,
  assistant= tester,
  input= reviewed_code,
  output= tested_code,
)

documenting_spec = WorkflowConversation(
  name= 'Documenting (Specification)',
  description= '''
    A technical specification is created that covers all aspects of the code
    so that following programmers and technical people can fully understand
    what it is doing.
  ''',
  lead= cto,
  assistant= programmer,
  input= tested_code,
  output= spec,
)

documenting_manual = WorkflowConversation(
  name= 'Documenting (Manual)',
  description= '''
    A business documentation is created for the customer. It explains
    what was implemented on a business level, which use-cases are covered
    and how integration into other systems can be done.
  ''',
  lead= ceo,
  assistant= cpo,
  input= spec,
  output= manual,
)

#### Phases

In [198]:
designing = WorkflowPhase(
  name= 'Designing',
  description= '''
    The task is provided by the customer and business modalities
    and use-cases are created for the technical team to implement them properly.
  ''',
  conversations= [design_modalities, design_language],
)

coding = WorkflowPhase(
  name= 'Coding',
  description= '''
    The coding phase. Here the actual code implementation is created.
  ''',
  conversations= [coding_code, coding_design],
)

testing = WorkflowPhase(
  name= 'Testing',
  description= '''
    The testing phase. The implementation is reviewed and tested properly
    so that it checks all quality marks.
  ''',
  conversations= [testing_review, testing_test],
)

documenting = WorkflowPhase(
  name= 'Documenting',
  description= '''
    The implementation is documented for technical and business people
    so that it can be understood for future extensions, business usage and business integrations.
  ''',
  conversations= [documenting_spec, documenting_manual],
)

#### Manager (Putting it all together)

In [199]:
manager = WorkflowManager(
  oai_client= oai_client,
  model= oai_model,
  assistant_id= oai_assistant_id,
  description= '''
    You will roleplay roles of a software company with a typical waterflow-based development workflow.
  ''',
  phases = [designing, coding, testing, documenting],
)

manager.execute("I want a to create users in my application!")

DEBUG: Sending sanity check
DEBUG: Sending message [
      Answer with only "SUCCESS" if you understood everything and with only "FAILURE" if you didn't."
    ]
DEBUG: Creating thread...


DEBUG: Created thread with ID thread_F9Og5fkh8zsjkjN9E84yWDmB
DEBUG: Creating new message in thread thread_F9Og5fkh8zsjkjN9E84yWDmB...
DEBUG: Creating thread run...
DEBUG: Created thread run run_KcheNaIpuqNmZqgCjTmrvzpO.
DEBUG: Run run_KcheNaIpuqNmZqgCjTmrvzpO status queued, reloading...
DEBUG: Run run_KcheNaIpuqNmZqgCjTmrvzpO status in_progress, reloading...
DEBUG: Run finished with status completed
DEBUG: New message with ID msg_waHPYou7FKu67YjlYSF8B5pc found
DEBUG: Response: [SUCCESS]
DEBUG: Sanity: SUCCESS
DEBUG: Sending message [
      START CONVERSATION Design (Modalities)
    ]
DEBUG: Creating new message in thread thread_F9Og5fkh8zsjkjN9E84yWDmB...
DEBUG: Creating thread run...
DEBUG: Created thread run run_ssEAkgu0r55jXjnltC1eryt6.
DEBUG: Run run_ssEAkgu0r55jXjnltC1eryt6 status queued, reloading...
DEBUG: Run run_ssEAkgu0r55jXjnltC1eryt6 status queued, reloading...
DEBUG: Run finished with status completed
DEBUG: New message with ID msg_I0QUlvmULHasvbusgADC0el3 found
DEBUG: Re