# Step 1: Read all the Directories

In [31]:
#use the python_files, file_read, util_multi

# Step 2: Setup LLM's and install requirements

In [32]:
import getpass
import os


def _set_if_undefined(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"Please provide your {var}")

_set_if_undefined("LANGCHAIN_API_KEY")
_set_if_undefined("TAVILY_API_KEY")
_set_if_undefined("OPENAI_API_KEY")



In [33]:
available_models = ["mixtral-8x7b-instruct-v01", "llamaguard-7b", "gemma-7b-it", "mistral-7b-instruct-v02", "phi-2",
                    "llama-2-70b-chat", "phi-3-mini-128k-instruct", "llama-3-8b-instruct"]

# Let's select the model from the available list
model_selected = available_models[0]

In [34]:
from openai import OpenAI
import httpx
import os
from dotenv import load_dotenv    
load_dotenv()
# As mentioned in the above Note try to setup Dell certificate in your environment to avoid verify=False(SSL verification is disabled).
http_client=httpx.Client(verify=False)
client = OpenAI(
    base_url=os.environ["API_URL"],
    http_client=http_client,
    api_key=os.environ["API_KEY"],
)

## If you are using custom llm

In [35]:
from typing import Any, Dict, Iterator, List, Optional

from pydantic import Field
from langchain_core.callbacks.manager import CallbackManagerForLLMRun
from langchain_core.language_models.llms import LLM
from langchain_core.outputs import GenerationChunk
import httpx
from openai import OpenAI

class CustomOpenAILLM(LLM):
    """A custom LLM using OpenAI API for text generation.

    This class integrates with OpenAI's API to provide text generation capabilities.
    The number of characters from the prompt that the model echoes can be controlled
    through the initialization parameters.

    Example:

        .. code-block:: python

            model = CustomOpenAILLM(api_url="https://api.openai.com/v1", api_key="your_api_key", model="text-davinci-003")
            result = model._call("Hello, how are you?")
            result = model._stream("Hello, how are you?")
    """

    api_url: str
    """The base URL for the OpenAI API."""
    
    api_key: str
    """The API key for authenticating with OpenAI."""
    
    model: str
    """The model name to use for the OpenAI API."""

    client: OpenAI = Field(None)

    class Config:
        arbitrary_types_allowed = True

    def __init__(self, **data):
        super().__init__(**data)
        http_client = httpx.Client(verify=False)
        self.client = OpenAI(base_url=self.api_url, http_client=http_client, api_key=self.api_key)

    def _call(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> str:
        """Run the LLM on the given input.

        Args:
            prompt: The prompt to generate from.
            stop: Stop words to use when generating. Model output is cut off at the
                first occurrence of any of the stop substrings.
            run_manager: Callback manager for the run.
            **kwargs: Arbitrary additional keyword arguments.

        Returns:
            The model output as a string.
        """
        if stop is not None:
            raise ValueError("stop kwargs are not permitted.")
        
        completion = self.client.completions.create(
            model=self.model,
            prompt=prompt,
            max_tokens=1000,
        )
        output = completion.choices[0].text

        # Debugging: Log the completion
        if run_manager:
            run_manager.on_llm_end(completion)

        return output

    def _stream(
        self,
        prompt: str,
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> Iterator[GenerationChunk]:
        """Stream the LLM on the given prompt.

        Args:
            prompt: The prompt to generate from.
            stop: Stop words to use when generating. Model output is cut off at the
                first occurrence of any of these substrings.
            run_manager: Callback manager for the run.
            **kwargs: Arbitrary additional keyword arguments.

        Returns:
            An iterator of GenerationChunks.
        """
        completion = self.client.completions.create(
            model=self.model,
            prompt=prompt,
            max_tokens=1000,
        )
        for char in completion.choices[0].text:
            chunk = GenerationChunk(text=char)
            if run_manager:
                run_manager.on_llm_new_token(chunk.text, chunk=chunk)
            yield chunk

    @property
    def _identifying_params(self) -> Dict[str, Any]:
        """Return a dictionary of identifying parameters."""
        return {
            "model_name": "CustomOpenAILLM",
            "api_url": self.api_url,
            "model": self.model,
        }

    @property
    def _llm_type(self) -> str:
        """Get the type of language model used by this chat model."""
        return "custom_openai"

In [36]:
custom_llm = CustomOpenAILLM(api_url=api_url, api_key=api_key, model=available_models[6])

print(custom_llm)

NameError: name 'api_url' is not defined

## In-built langchain-openAi llm

### Importing Necessary Libraries for Conversational Buffer Memory

In [None]:
from langchain.memory import ConversationBufferWindowMemory
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_openai import ChatOpenAI
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.schema import HumanMessage, AIMessage


### Simple Langchain ChatOpenAI for invoke functions

In [None]:
langchain_llm = ChatOpenAI(base_url=os.environ['API_URL'],model = available_models[6], http_client=http_client, api_key=os.environ['API_KEY'])

In [None]:
langchain_llm1 = ChatOpenAI(base_url=os.environ['API_URL'],model = available_models[3], http_client=http_client, api_key=os.environ['API_KEY'])

# Step 3: Some Preprocessing

## Splitting into paths and contents for relevant information in a chunk

In [None]:
import re
def parse_output_file(filepath):
    text =''
    paths,contents = [],[]
    encodings = ['utf-8', 'iso-8859-1', 'windows-1252', 'ascii']
    for encoding in encodings:
        try:
            with open(filepath, 'r', encoding=encoding) as file:
                text =file.read()
        except UnicodeDecodeError:
            continue
    matches = re.finditer(r'C:/.*',text)
    matches1 = [match for match in matches]# make this the last section of the codebase
    for match in matches1:
        paths.append(match.group(0))
    for match, match_next in zip(matches1[:-1],matches1[1:]):
        ending_index = match.span()[1]
        starting_index = match_next.span()[0]
        contents.append(text[ending_index:starting_index])
    
    #appending the last file
    contents.append(text[matches1[-1].span()[1]:])
    return paths,contents

paths,contents =parse_output_file("output2.txt")
print(len(paths), len(contents))

3192 3192


## Split it into context-aware chunks

In [None]:
from typing import List
def chunk_code(content: str, max_chunk_size: int = 2048) -> List[str]:
    """
    Split the code into context-aware chunks.
    """
    # Split the content into lines
    lines = content.split('\n')
    chunks = []
    current_chunk = []
    current_chunk_size = 0

    for line in lines:
        line_size = len(line) + 1  # +1 for the newline character
        if current_chunk_size + line_size > max_chunk_size and current_chunk:
            chunks.append('\n'.join(current_chunk))
            current_chunk = []
            current_chunk_size = 0

        current_chunk.append(line)
        current_chunk_size += line_size

    if current_chunk:
        chunks.append('\n'.join(current_chunk))

    return chunks

In [43]:
chunk_of_contents =[]
for content in contents:
    chunk_of_contents.append(chunk_code(content,4096))
print(len(chunk_of_contents))
#chunk of contents is a list[list]

3192


# Step 4: Langgraph Architecture

## Prompt Template

In [44]:
from langchain_core.prompts import PromptTemplate

template1 = '''Answer the following questions as best you can. You have access to the following tools:

{tools}

Use the following format:

Question: the input you must take out only the sections of code that are useful for generation of test automation scripts
Thought: you should always think about what to do
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer
Final Answer: the final answer to the original input question

Begin!

Question: {input}
Thought:{agent_scratchpad}'''

prompt1 = PromptTemplate.from_template(template1)
template2 = '''Generate the Testing scripts. You have access to the following tools:

{tools}

Use the following format:

Question: Write PYTHON code and test it on the generate test automation scripts
Thought:First check if you have enough information to perform the task
Action: the action to take, should be one of [{tool_names}]
Action Input: the input to the action
Observation: the result of the action
... (this Thought/Action/Action Input/Observation can repeat N times)
Thought: I now know the final answer, If I can't generate a good enough final answer I will say,"I dont have enough information" or "I need more information"
Final Answer: the final answer to the original input question

Begin!

Question: {input}
Thought:{agent_scratchpad}'''

prompt2 = PromptTemplate.from_template(template2)

## Tools

In [45]:
def get_current_time(*args,**kwargs):
    """ returns date time in H:MM format"""
    import datetime
    now  = datetime.datetime.now()
    return now.strftime("%I:%M %p")


repl = PythonREPL()
def python_repl(
    code: Annotated[str, "The python code to execute to generate your chart."],
):
    """Use this to execute python code. If you want to see the output of a value,
    you should print it out with `print(...)`. This is visible to the user."""
    try:
        result = repl.run(code)
    except BaseException as e:
        return f"Failed to execute. Error: {repr(e)}"
    result_str = f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"
    return (
        result_str + "\n\nIf you have completed all tasks, respond with FINAL ANSWER."
    )
tools = [
    Tool(
        name = 'Time',
        func = get_current_time,
        description = "When you need to get the current time"),
    Tool(
        name='pythonrepl',
        func = python_repl,
        description = "To try and run your python code",
    ),
    # Tool(
    #     name ="Search",
    #     func = TavilySearchResults(max_results=1),
    #     description = "To search for information"
    # )
]

In [46]:
from langchain.memory import ConversationBufferMemory
# Create a ConversationBufferMemory
memory = ConversationBufferMemory(return_messages=True)


## Chain

Agents

In [47]:
from langchain import hub
from langchain.agents import AgentExecutor, create_react_agent
from langchain_core.tools import Tool
from langchain_community.tools.tavily_search import TavilySearchResults
from langchain_experimental.utilities import PythonREPL
from typing import Annotated



llm = langchain_llm
agent1 = create_react_agent(
    llm = llm,
    tools = tools,
    prompt = prompt1,
    stop_sequence = True
)
agent_executor1 = AgentExecutor.from_agent_and_tools(
    agent = agent1,
    tools = tools,
    verbose = True,
    handle_parsing_errors =True,
    memory = memory
)
agent2 = create_react_agent(
    llm = langchain_llm1,
    tools = tools,
    prompt = prompt2,
    stop_sequence = True
)
agent_executor2 = AgentExecutor.from_agent_and_tools(
    agent = agent2,
    tools = tools,
    verbose = True,
    handle_parsing_errors =True,
    memory = memory
)

In [49]:
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
store = {}


def get_session_history(session_id: str) -> BaseChatMessageHistory:
    if session_id not in store:
        store[session_id] = ChatMessageHistory()
    return store[session_id]

def format_output(output):
    return {"input": output["output"]}

chain = (
    agent_executor1
    | {'input':RunnablePassthrough()}
    | agent_executor2
    | StrOutputParser()
)

with_message_history = RunnableWithMessageHistory(
    chain,
    get_session_history,
    input_messages_key="input",
    history_messages_key="history",
)
with_message_history.invoke(
    {"input": chunk_of_contents[0][0]},
    config={"configurable": {"session_id": "abc123"}},
)



[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m Thought: To answer the question, I need to analyze the provided code which appears to be a representation of binary data, likely from the 'AuthenticodeWithTS' structure, which contains RFC3161 timestamped signature information.

Action: pythonrepl
Action Input: 
import binascii

# Convert the bytes to a string representation for readability
auth_data_str = binascii.hexlify(AuthenticodeWithTS).decode('utf-8')
print(auth_data_str)
[0m[33;1m[1;3mSuccessfully executed:
```python
import binascii

# Convert the bytes to a string representation for readability
auth_data_str = binascii.hexlify(AuthenticodeWithTS).decode('utf-8')
print(auth_data_str)

```
Stdout: NameError("name 'AuthenticodeWithTS' is not defined")

If you have completed all tasks, respond with FINAL ANSWER.[0m[32;1m[1;3m Thought: It seems there was an error in the Action Input as 'AuthenticodeWithTS' was not previously defined within the scope of the provided

KeyboardInterrupt: 

In [26]:

# result = agent_executor.invoke({"input":"Write code to make a graph of stock_prices over time"})
# print(result)
def composed_chain(input_dict):
    # Execute the first agent
    result1 = agent_executor1.invoke({
        "input": input_dict["input"],
    })
    print("The summarized code snippets:: ", result1['output'])
    print("------------------------------------------------------------------------")
    
    # Execute the second agent, passing the result from the first
    result2 = agent_executor2.invoke({
        "input": result1["output"],
    })
    
    # Check if agent_executor2 has enough information
    if "I don't have enough information" in result2["output"] or "I need more details" in result2["output"]:
        return
    else:
        return result2

# Wrap the function in a LangChain-compatible chain
prompt=PromptTemplate(template="{input}", input_variables=["input"])
for content in chunk_of_contents[600:]:
    for chunk in content:
        result = composed_chain({'input': chunk})
        if result == None:
            continue
        else:
            print(result)
        
        # Update memory
        memory.save_context({"input": chunk}, {"output": result["output"]})

    



[1m> Entering new AgentExecutor chain...[0m


KeyboardInterrupt: 