## Package Installs

In [1]:
%%capture
%pip install sentence-transformers
%pip install langchain_community
%pip install openai
%pip install faiss-cpu
%pip install langchain_openai



### Import Modules

In [2]:
import os
from openai import AzureOpenAI
from langchain_community.document_loaders import TextLoader
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import CharacterTextSplitter
from langchain.prompts import PromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain.chains import RetrievalQA
from langchain.text_splitter import RecursiveCharacterTextSplitter
import re

## Connect to LLM

In [3]:
def connect():
    client = AzureOpenAI(
      azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT"), 
      api_key=os.getenv("AZURE_OPENAI_API_KEY"),  
      api_version="2024-02-01"
    )
    assert client is not None, "Failed to create AzureOpenAI client"
    return client


## File and Database Paths

In [4]:
# dirty_log_file = "C:/Users/kr4193/Desktop/Log_error_reporter/Prep_work/clean_Geiger_for_LLMs.log"
dirty_log_file = "C:/Users/kr4193/Desktop/Log_error_reporter/Prep_work/SMOKE-ZSB-DP12-002.log"
# dirty_log_file = "C:/Users/kr4193/Desktop/Log_error_reporter/Prep_work/cmdexec.log"
# dirty_log_file = "C:/Users/kr4193/Desktop/Log_error_reporter/Prep_work/zt3.log"
# dirty_log_file = "C:/Users/kr4193/Desktop/Log_error_reporter/Prep_work/data_for_karthik.txt"
log_file_path = "C:/Users/kr4193/Desktop/Log_error_reporter/Prototype/clean.log"
embedding_model = "all-MiniLM-L6-v2"
database_name = "sample_db"

## Data Cleaning

In [5]:
with open(dirty_log_file, 'r') as file:
    lines = file.readlines()

def process_line(line):
    # Match and capture everything after 'D:' or 'I:'
    match = re.search(r'[DI]:\s*(.*)', line)
    if match:
        return match.group(1).strip()  # Return the captured group, stripped of leading/trailing whitespace
    return line.strip()  # If no match, return the line as is

current_suite = None
current_module = None
current_test = None
ans = None
suites = []
inbetween = []
failures = []
to_retrieve = []
log = []
previous_line = None  # Initialize previous_line variable
suite_test_cases = {}  # Dictionary to store suite name as key and failed/error test cases as values

with open(log_file_path, 'w') as output_file:
    for line in lines:
        line = process_line(line)

        # Check for suite start
        suite_match = re.search(r'Entering suite: (\w+)', line)
        if suite_match:
            current_suite = suite_match.group(1)
            suites.append(current_suite)
            suite_test_cases[current_suite] = []  # Initialize list for test cases
            continue

        # Check for module start
        module_match = re.search(r' Execute module', line)
        if module_match:
            current_module = module_match.group(1)
            continue

        # Check for test start
        test_start_match = re.search(r'Running test: (\w+)', line)
        if test_start_match:
            current_test = test_start_match.group(1)
            continue

        # Check for test outcome
        test_outcome_match = re.search(r'test outcome\s*:\s*(\w+)', line, re.IGNORECASE)
        if test_outcome_match:
            result = test_outcome_match.group(1).lower()
            if result in ['failed', 'error']:
                if current_suite and current_test:
                    suite_test_cases[current_suite].append(current_test)
                    for line in inbetween:
                        output_file.write(line + '\n')
                    log.append(inbetween)
                    inbetween.clear()
            current_test = None  # Reset current test
            continue

        # Check for failure
        failure_match = re.search(r'(failed)', line, re.IGNORECASE)
        if failure_match:
            if current_suite and current_module:
                failures.append((suite_match, module_match, line.strip()))

        # Check for the end of the suite and the result
        if current_suite:
            suite_end_match = re.search(r'^Suite', line)
            if suite_end_match:
                result, suite = line.split(" ")[-1], line.split(" ")[-2]
                suite_end_match = None
                if result == 'failed' or result == 'error':
                    to_retrieve.append(suite[:-1])
                    # for line in inbetween:
                    #     output_file.write(line + '\n')
                    # log.append(inbetween)
                    suites.remove(suite[:-1])
                else:
                    suites.remove(suite[:-1])
            if len(suites) == 0:
                inbetween.clear()

        # Check if the current line is the same as the previous line
        if line != previous_line:
            inbetween.append(line)
        previous_line = line  # Update previous_line

# Filter out suites with no failed/error test cases
suite_test_cases = {
    suite: list(set(test_cases))  # Convert to set and back to list to ensure uniqueness
    for suite, test_cases in suite_test_cases.items()
    if test_cases  # Ensure the test_cases list is not empty
}


print(suite_test_cases)


{'Enterprise8021x': ['client_required_capable_positive', 'client_enabled_required_positive', 'client_enabled_capable_positive', 'client_disabled_disabled_positive', 'client_enabled_disabled_positive', 'client_disabled_capable_positive', 'client_required_required_positive'], 'PmfStates': ['client_enabled_wpa2_psk_pmf_required_ess_positive', 'client_disabled_wpa2_psk_pmf_disabled_ess_positive', 'client_disabled_wpa2_psk_pmf_optional_ess_positive', 'client_required_wpa2_psk_pmf_required_ess_positive', 'client_enabled_wpa2_psk_pmf_optional_ess_positive', 'client_enabled_wpa2_psk_pmf_disabled_ess_positive'], 'RoamSweep': ['wpa2', 'wpa3', 'open_5GHz_ps', 'open_2'], 'SecuritySweep': ['owe_open', 'wpa3_eap_ttls_cisco_ise', 'wpa2_psk_pmf_required', 'wpa2_eap_tls', 'open_hidden_dfs', 'wpa3_eap_tls_cisco_ise', 'wpa2_eap_ttls_pap', 'wpa3_eap_tls_aruba_clearpass', 'wpa3_peap_aruba_clearpass', 'wpa3_eap_ttls_pap', 'wpa_mixed_mode_12_eap_tls', 'wpa3_psk_11r', 'wpa2_psk_pmf_optional', 'wpa_mixed_mode_

### Generate Chunks and splitting the documents

In [6]:
def generate_chunks(filename):
    data = []
    final_chunk = []

    loader = TextLoader(filename, encoding="utf-8")
    rawdata = loader.load()
    if rawdata:
        text_splitter = CharacterTextSplitter(separator = "\n",length_function = len, chunk_size=5000, chunk_overlap=100)
        
        data = text_splitter.split_documents(rawdata)
        print(len(data))
        if data:
            print(f"generate_chunks - {filename}")
            final_chunk += data
        else:
            print(f"generate_chunks - data is None")
    else:
        print(f"generate_chunks - rawdata is None")

    print(f"{filename} data chunks ready for embedding")

        # Add more conditions for other file types if needed
    print("prepare_data_chunks: finished")
    return final_chunk

### Create and Load the Vector DB

In [7]:
def create_vectordb(filepath, databasename):
    data = generate_chunks(filepath)
    if data:
        print(f"Starting to create {filepath} ...")
        embeddings = HuggingFaceEmbeddings(model_name=embedding_model)
        print(f"embedding : {embeddings}")
        if embeddings:
            vdatabase = FAISS.from_documents(data, embeddings)
            vdatabase.save_local(databasename)
            print(f" vectordatabase {databasename} ready...")
        else:
            print(f"Empty Embeddings")
        return vdatabase
    else:
        print("chunk data received is null, exiting database creation")
        return None

In [8]:
def load_vector_db(embedding_model_name, vector_db_name):
    print(f" Loading {vector_db_name} ...")
    if os.path.exists(vector_db_name):
        print(f" database {vector_db_name} present!!")
        embeddings = HuggingFaceEmbeddings(model_name=embedding_model_name)
        if embeddings:
            vector_db = FAISS.load_local(vector_db_name, embeddings, allow_dangerous_deserialization=True)
            print(f" Loading {vector_db_name} Done!!")
        else:
            print(f"Empty Embeddings")
    else:
        print(f" No file path found for {vector_db_name}..")
    return vector_db

### Retriever

file_path = 
def read_python_file(file_path):
    with open(file_path, 'r') as file:
        return file.read()

## Prompt template

In [9]:
prompt_template = PromptTemplate(
    input_variables=["context", "question"],
    template="""
    You are a helpful assistant. Use the following context to answer the question.
    Context: {context}
    Question: {question}
    """
)

### Chain

In [10]:
def qa_bot():
    vectordb = get_retriever()
    retriever = vectordb.as_retriver(search_kwargs = {"k":10})
    query = "What is the error in the log file?"
    while query != "quit":
        query = input("Enter your query: ")
        output = chain.invoke(query)
        print(output)
    qa_bot()
    


## Main function

In [11]:
# Press the green button in the gutter to run the script.
# if __name__ == '__main__':
def retriever_fn(test_suite,test_case, vector_db):
    
    
    # Initialze the retriver with retrieval method
    #retriever = jira_vector_db.as_retriever(search_type="similarity_score_threshold", search_kwargs={"score_threshold": 0.1})

    #search_type - mmr :- 
    retriever = vector_db.as_retriever( search_type = "mmr", search_kwargs={"k":1,"score_threshold": 0.4})
    # search_type="mmr",
    # Query the database to get symantical search output
    retrieved_output = retriever.invoke(f"Information on test case {test_case}")
    # print(f"retrieved_output: {retrieved_output}")
    # print(f"vectordb ready")
    return retrieved_output

In [12]:
def final(retrieved_output,test_suite,test_case):
    client = connect()
    # print(retrieved_output)
    response = client.chat.completions.create(
            model="gpt-4o", # model = "deployment_name".
            messages = [
            {
                "role": "system",
                "content": (
                    "You are a helpful assistant. If you are not sure about the answer simply reply with I dont know"
                    "Always identify and classify failed and error tests into one of the three categories: Product issue, ATF Script Issues, and Setup issues. "
                    "Classify with the help of the log entry that is provided"
                    "The format of tests is as follows: Test Suites -> Test Cases. Test cases are the lowest level of granularity. "
                    #"Test Suites start with the line 'Entering suites: <suitename>' and end with 'Suite <suitename> <result>'. "
                    #"Test Modules start with 'Execute module: <module_name>'.Always ignore test suites with result of  passed or skipped.Always ignore test cases with result of  passed or skipped.  "
                    "Test Cases start with the line 'Running test: <test_name>' and end with the line '<test_name>: <result>'."
                    "You will be given the name of the test suite and test case name first. Then you will be given the relevant logs"
                    "Provide the output in the following structure without using triple quotes or the word 'json':\
                    { \"test_suite\": \"\", \"test_case\": \"\", \"test_description\": \"\", \"classification\": \"\", \"reasoning\": \"\" }. "
                    "List your reasoning for each test case in less than 100 words about why you classified the test case into the category."
                    # "Product issue would be "
                )
            },
            # {
            #     "role": "user",
            #     "content": f"Here is the script that was used to run the test: {retrieved_output}"
            # },
            {
                "role": "user",
                "content": (
                    f'the test suit is {test_suite}'
                    f'the test case is {test_case}'
                    f'Here is a log entry for analysis:\n{retrieved_output}\n\n'
                    
                )
            },
        ])
        
    print(response.choices[0].message.content)
    with open("output_context.txt", "a") as output_file:
        output_file.write(response.choices[0].message.content)

In [13]:
# Create vector data base
local_vectordb = create_vectordb(log_file_path, database_name)

# Load existing database using db name
vector_db = load_vector_db(embedding_model, database_name)
# for suite in to_retrieve:
#     final(retriever_fn(suite,vector_db))
with open("output_context.txt", "w") as output_file:
    pass


Created a chunk of size 14160, which is longer than the specified 5000
Created a chunk of size 14160, which is longer than the specified 5000
Created a chunk of size 14159, which is longer than the specified 5000
Created a chunk of size 14159, which is longer than the specified 5000
Created a chunk of size 5315, which is longer than the specified 5000
Created a chunk of size 5325, which is longer than the specified 5000
Created a chunk of size 5315, which is longer than the specified 5000
Created a chunk of size 5325, which is longer than the specified 5000
Created a chunk of size 5315, which is longer than the specified 5000
Created a chunk of size 5325, which is longer than the specified 5000
Created a chunk of size 5315, which is longer than the specified 5000
Created a chunk of size 5325, which is longer than the specified 5000
Created a chunk of size 5315, which is longer than the specified 5000
Created a chunk of size 5325, which is longer than the specified 5000
Created a chunk 

2091
generate_chunks - C:/Users/kr4193/Desktop/Log_error_reporter/Prototype/clean.log
C:/Users/kr4193/Desktop/Log_error_reporter/Prototype/clean.log data chunks ready for embedding
prepare_data_chunks: finished
Starting to create C:/Users/kr4193/Desktop/Log_error_reporter/Prototype/clean.log ...
embedding : model_name='all-MiniLM-L6-v2' cache_folder=None model_kwargs={} encode_kwargs={} multi_process=False show_progress=False
 vectordatabase sample_db ready...
 Loading sample_db ...
 database sample_db present!!
 Loading sample_db Done!!


In [14]:
for key in suite_test_cases:
    for i in range(len(suite_test_cases[key])):
        final(retriever_fn(key,suite_test_cases[key][i],vector_db),key,suite_test_cases[key][i])


{
    "test_suite": "Enterprise8021x",
    "test_case": "client_required_capable_positive",
    "test_description": "The test validates that the WLAN security settings are correctly set and retrieved as 'wpa sae', 'wpa eap-tls', 'wpa eap-ttls', 'wpa eap-fast', and 'wpa peap' as expected.",
    "classification": "Product issue",
    "reasoning": "The repeated assertions indicate that the retrieved WLAN security setting is 'wpa psk' instead of the expected values like 'wpa sae' and others. This discrepancy suggests a potential product issue related to WLAN security settings not being applied correctly."
}
{
    "test_suite": "Enterprise8021x",
    "test_case": "client_enabled_required_positive",
    "test_description": "Test to verify if the client can enable and connect to a WPA2-PSK network with PMF required.",
    "classification": "Setup issues",
    "reasoning": "The log entries do not show any critical errors but multiple repetitions of network association and configuration changes

In [15]:
%run app.py

UnicodeDecodeError: 'utf-8' codec can't decode byte 0x92 in position 1671: invalid start byte