In [17]:
from typing import List, Optional, Generator, Tuple
from pathlib import Path
import os
import json
from black import format_str, Mode
from autogen import UserProxyAgent
from autogen.agentchat.conversable_agent import ConversableAgent
from autogen import AssistantAgent, UserProxyAgent, GroupChat, GroupChatManager
import autogen
from dotenv import load_dotenv
from Utilities.envVars import *

In [18]:
# Configure environment variables  
load_dotenv(dotenv_path='./.env')

True

In [19]:
projectRoot = "./Data/"
#llmConfig = {"model": "gpt-4", "api_key": os.getenv("OpenAiApiKey")}
llmConfig = {
        "model": os.getenv("OpenAiChat16k"),
        "api_key": os.getenv("OpenAiKey"),
        "azure_endpoint": os.getenv("OpenAiEndPoint"),
        "api_type": "azure",
        "api_version": os.getenv("OpenAiVersion"),
}
sourceCodeDir = os.path.join(projectRoot, "Cobol")
conversionDir= os.path.join(sourceCodeDir, "Conversion")
codeExecutionDir = os.path.join(sourceCodeDir, "CodeExecution")
assistantId = "CobolAssistant"
temperature = 0
seed = 44
maxTries = 5
processTimeout = 20

In [26]:
def listCobolFiles(exclusion_file_names: List[str] = []) -> Generator:
    """
    Generator function to yield COBOL files in a given directory, excluding specified files.

    :param exclusion_file_names: List of file names to exclude from the result.
    :type exclusion_file_names: List[str]

    :return: A generator yielding Path objects for COBOL files.
    :rtype: Generator[Path, None, None]
    """
    for file in Path(sourceCodeDir).rglob('*.cbl'):
        if file.stem not in exclusion_file_names:
            yield file

def getCobol():
    """
    Retrieve cobol code to be converted.
    """
    files = list(listCobolFiles())
    return "\n\n".join([f.read_text() for f in files])

def readAssetFile(text_filename):
    try:
        with open(text_filename, 'r', encoding='utf-8') as file:
            text = file.read()
        status = True
    except Exception as e:
        text = ""
        print(f"Error reading text file: {e}")
        status = False

    return text, status

def writeToFile(text, text_filename, mode = 'w'):
    with open(text_filename, mode, encoding='utf-8') as file:
        file.write(text)

In [21]:
counter = 0
for _ in listCobolFiles():
    counter += 1
assert counter > 0, "No files found"
cobol = getCobol()

In [30]:
cobolBrSysMessage = """You are a helpful AI assistant who is expert in performing business analysis
You will be give Cobol code and you will be asked to provide a business analysis of the code.  
As a part of the analysis, you will be asked to provide a list of business processes, requirements and the rules that the code implements.
"""
cSharpCoderSysMessage = """You are a helpful AI assistant.
You convert Cobol code into C# .NET code. Please do not provide unit tests. Provide instead a main method to run the application. 
Also do not omit any code for brevity. We want to see the whole code."""
cSharpUnitTesterSysMessage = """You are a helpful AI assistant.
You create unit tests based on the unittest library for C# .NET code in the conversation. 
Please copy the original C# .NET code that you are testing to your response. 
Please make sure to import the unittest library. Provide a main method to run the tests."""
codeCriticsSysMessage = """Critic. You are a helpful assistant highly skilled in evaluating the quality of a given code by providing a score from 1 (bad) - 10 (good) while providing clear rationale. YOU MUST CONSIDER CODING BEST PRACTICES for each evaluation. Specifically, you can carefully evaluate the code across the following dimensions
- bugs (bugs):  are there bugs, logic errors, syntax error or typos? Are there any reasons why the code may fail to compile? How should it be fixed? If ANY bug exists, the bug score MUST be less than 5.
- Goal compliance (compliance): how well the Cobol code was converted?
- Data encoding (encoding): How good are the unit tests that you can find?

YOU MUST PROVIDE A SCORE for each of the above dimensions.
{bugs: 0, transformation: 0, compliance: 0, type: 0, encoding: 0, aesthetics: 0}
Do not suggest code. 
Finally, based on the critique above, suggest a concrete list of actions that the coder should take to improve the code.
If Unit tests are available already and seem OK, reply with TERMINATE"""
codeRestSysMessage = """You are an expert in converting C# .NET code that would normally run as a command line application to a REST interface.
You use for this task the FastAPI library for this task. When you see code you analyse it and you canvert the command line application to a REST interface which will produce JSON based answers.
If there is no code in the message, then you must generate also no code.
If you REST interfaces wrtten with FastAPI are available already you reply with TERMINATE
"""
userProxySysMessage = """A human admin which asks to translate Cobol into C# .NET."""
userProxyMessage = """Please convert the Cobol code to C# .NET code and write unit tests for it. 
After creating the unit tests, please document all code using the C# reST (PEP 287 – reStructuredText Docstring Format) format.
The Cobol code can be found between the delimiters === CODE START === and === CODE END ===. 
Make sure that the code is formatted using markdown syntax.

=== CODE START ===
{cobol_code}
=== CODE END ===

"""
restUserProxySysMessage = """A human admin which asks to translates C# .NET command line apps into REST applications."""
restUserProxyMessage = """Please convert the following C# .NET code between the delimiters to a REST application using the FastAPI library. The start delimiter is === CODE START === and the end delimiter is === CODE END ===
=== CODE START ===
{CSharp_code}
=== CODE END ===
"""

In [23]:
def terminateLambda(x: str):
    return x.get("content", "").rstrip().find("TERMINATE") > -1

def userProxyFactory() -> UserProxyAgent:
    user_proxy_agent = UserProxyAgent(
        name="user_proxy",
        human_input_mode="NEVER",
        max_consecutive_auto_reply=10,
        is_termination_msg=terminateLambda,
        code_execution_config={"work_dir": codeExecutionDir, "use_docker": False},
        system_message=userProxySysMessage,
    )
    return user_proxy_agent

userProxy = userProxyFactory()

def createGroupChatManager(
    user_proxy: UserProxyAgent,
    agents: List[AssistantAgent],
) -> GroupChatManager:
    groupchat = GroupChat(
        agents=[user_proxy, *agents],
        messages=[],
        max_round=maxTries,
    )
    manager = GroupChatManager(groupchat=groupchat, llm_config=llmConfig)
    return manager

def cSharpTestAgentFactory() -> AssistantAgent:
    assistant = AssistantAgent(
        name="Unit_Tester",
        system_message=cSharpUnitTesterSysMessage,
        llm_config=llmConfig,
    )
    return assistant

def cSharpCodeCriticFactory() -> AssistantAgent:
    assistant = AssistantAgent(
        name="Code_Critic",
        system_message=codeCriticsSysMessage,
        llm_config=llmConfig,
        is_termination_msg=terminateLambda,
    )
    return assistant

def cobolConvertAgentFactory() -> AssistantAgent:
    assistant = AssistantAgent(
        name="CSharp_Coder",
        system_message=cSharpCoderSysMessage,
        llm_config=llmConfig,
    )
    return assistant

def cobolBusinessRequirementAgentFactory() -> AssistantAgent:
    assistant = AssistantAgent(
        name="Cobol_Business_Requirement",
        system_message=cobolBrSysMessage,
        llm_config=llmConfig,
    )
    return assistant

def userProxyRestFactory() -> UserProxyAgent:
    user_proxy_agent = UserProxyAgent(
        name="rest_user_proxy",
        human_input_mode="NEVER",
        max_consecutive_auto_reply=10,
        is_termination_msg=terminateLambda,
        code_execution_config=False,  # No execution of code, because this was stalling the code generation in Windows
        system_message=restUserProxySysMessage,
    )
    return user_proxy_agent

def cSharpRestGenerator() -> AssistantAgent:
    return AssistantAgent(
        name="Code_REST_Generator",
        system_message=codeRestSysMessage,
        llm_config=llmConfig,
        is_termination_msg=terminateLambda,
    )

def createRestGroupChatManager(user_proxy: UserProxyAgent) -> GroupChatManager:
    groupchat = GroupChat(
        agents=[user_proxy, cSharpRestGenerator(), cSharpCodeCriticFactory()],
        messages=[],
        max_round=10,
    )
    return GroupChatManager(groupchat=groupchat, llm_config=llmConfig)

def createUserProxyChatManager() -> Tuple[UserProxyAgent, GroupChatManager]:
    rest_user_proxy = userProxyRestFactory()
    rest_group_chat_manager = createRestGroupChatManager(rest_user_proxy)
    return rest_user_proxy, rest_group_chat_manager

def initiateRestChat(
    user_proxy: UserProxyAgent, group_chat_manager: GroupChatManager, python_code: str
):
    user_proxy_message = restUserProxyMessage.format(
        CSharp_code=python_code
    )
    user_proxy.initiate_chat(
        group_chat_manager,
        message=user_proxy_message,
    )

conversionManager = createGroupChatManager(
    userProxy,
    [
        cobolBusinessRequirementAgentFactory(),
        cobolConvertAgentFactory(),
        cSharpTestAgentFactory(),
        cSharpCodeCriticFactory(),
    ],
)

In [31]:
def extractCode(text: str, language: str = "csharp") -> List[str]:
    code_start = -1
    language_len = len(language)
    text_len = len(text)
    text_list = []
    delimiter_length = 3
    for i in range(text_len):
        if i > text_len - delimiter_length:
            break
        snippet = text[i : i + delimiter_length]
        if "```" == snippet:
            if (
                text[i + delimiter_length : i + delimiter_length + language_len]
                == language
            ):
                code_start = i + delimiter_length + language_len
            else:
                text_list.append(text[code_start:i].strip())
                code_start = -1
    return text_list

def formatFile(file: Path) -> str:
    assert file is not None
    text = file.read_text()
    formatted = format_str(text, mode=Mode())
    file.write_text(formatted, encoding="utf-8")
    return formatted

def loopChatMessage(user_proxy: UserProxyAgent, manager: ConversableAgent):
    for message in user_proxy.chat_messages[manager]:
        if "name" in message:
            agent_name = message["name"]
            yield message, agent_name

def processCSharpFile(file: Path, content: str):
    try:
        print(f"Processing CSharp file: {file}")
        print(f"Content: {content}")
        if len(content.strip()) > 0:
            writeToFile(content, file)
            #formatFile(file)
            #lint_code(file)
    except Exception as e:
        print(f"Failed to process CSharp file {file}: {e}")

def processMessage(
    message: dict, prefix: str, cobol_file: Path, suffix: str = "cs"
) -> Optional[Path]:
    if "content" in message:
        content = message["content"]
        code_blocks = extractCode(content)
        print(f"Code blocks: {code_blocks}")
        code_blocks_len = len(code_blocks)
        print(f"Code blocks length: {code_blocks_len}")
        print(f"Conversion folder: {conversionDir}")
        convertedFile = os.path.join(conversionDir, f"{prefix}{cobol_file.stem}.{suffix}")
        print(f"Converted file: {convertedFile}")
        if code_blocks_len > 0:
            if suffix == "cs":
                print("CSharp file")
                processCSharpFile(convertedFile, code_blocks[0])
        elif suffix == "txt":
            print("Text file")
            writeToFile(content, convertedFile)
        return convertedFile
    return None

def processRestConversion(file: Path):
    cSharp_code, status = readAssetFile(file)

    rest_user_proxy, rest_group_chat_manager = createUserProxyChatManager()

    initiateRestChat(rest_user_proxy, rest_group_chat_manager, cSharp_code)

    for message, agent_name in loopChatMessage(
        rest_user_proxy, rest_group_chat_manager
    ):
        match agent_name:
            case "Code_REST_Generator":
                content = message["content"]
                code_blocks = extractCode(content)
                if len(code_blocks) > 0:
                    rest_interface_file =  os.path.join(conversionDir, f"rest_{file.stem}.cs")
                    all_code = "\n\n".join(code_blocks)
                    processCSharpFile(rest_interface_file, all_code)
            case "Code_Critic":
                processMessage(message, "rest_critique_", file, "txt")

def convertSingleFile(cobol_file: Path):
    """
    Convert a single COBOL file to CSharp code and perform additional processing.

    :param cobol_file: The Path object representing the input COBOL file.
    :type cobol_file: Path

    This function reads the content of the COBOL file, initiates a chat with the user proxy
    and processes messages received during the chat.
    The messages are handled by different agents based on their types.

    - For messages from the CSharp coder agent, the COBOL code is converted to a CSharp file,
      and additional processing is performed using the `process_rest_conversion` function.

    - For messages from the unit tester agent, a test file is generated and executed using
      the `run_subprocess` function.

    - For messages from the REST interface generator agent, the corresponding processing is done.

    - For messages from the code critic agent, a critique file is generated.

    :raises: Exceptions may be raised during the conversion and processing steps.

    This function relies on the following global variables:
    - `prompts`: A dictionary containing messages for different agents.
    - `user_proxy`: An object representing the user proxy for chat-based interaction.
    - `conversion_manager`: An object managing the conversion process.
    """
    try:
        print(f"Converting file: {cobol_file}")
        cobolCode = cobol_file.read_text()
        userProxyAppendedMessage = userProxyMessage.format(
            cobol_code=cobolCode
        )
        userProxy.initiate_chat(
            conversionManager,
            message=userProxyAppendedMessage,
        )
    except Exception as e:
        print(f"Failed to convert file {cobol_file}: {e}")

    try:
        for message, agent_name in loopChatMessage(userProxy, conversionManager):
            match agent_name:
                case "CSharp_Coder":
                    print("CSharp coder")
                    cSharp_file = processMessage(message, "", cobol_file)
                    processRestConversion(cSharp_file)
                case "Unit_Tester":
                    test_file = processMessage(message, "test_", cobol_file)
                    # if test_file is not None:
                    #     run_subprocess(test_file)
                case "Code_REST_Generator":
                    processMessage(message, "rest_", cobol_file)
                case "Code_Critic":
                    processMessage(message, "critique_", cobol_file, "txt")
    except Exception as e:
        print(f"Failed to convert file {cobol_file}: {e}")

def cobolConversion(cobol_files: List[Path]):
    assert cobol_files is not None
    for cobol_file in cobol_files:
        try:
            convertSingleFile(cobol_file)
        except:
            print(f"Failed to convert file {cobol_file}")

In [32]:
cobolConversion(listCobolFiles())

Converting file: Data\Cobol\ADDAMT.cbl
[33muser_proxy[0m (to chat_manager):

Please convert the Cobol code to C# .NET code and write unit tests for it. 
After creating the unit tests, please document all code using the C# reST (PEP 287 – reStructuredText Docstring Format) format.
The Cobol code can be found between the delimiters === CODE START === and === CODE END ===. 
Make sure that the code is formatted using markdown syntax.

=== CODE START ===
      *-----------------------
      * Copyright Contributors to the COBOL Programming Course
      * SPDX-License-Identifier: CC-BY-4.0
      *----------------------- 
       IDENTIFICATION DIVISION.
       PROGRAM-ID.
           ADDAMT.
      *******************************************************
      *    This program accepts input and displays output    *
      *******************************************************
       DATA DIVISION.
       WORKING-STORAGE SECTION.
       01  KEYED-INPUT.
           05  CUST-NO-IN               