# ViperGPT -- Quick Start

This notebook is meant to be a quick entry into ViperGPT. **Warning:** This notebook will execute arbitrary code on your machine. Proceed at your own risk.

Before running this code, modify any parameters at `configs/my_config.yaml`. For example, you may want to change the BLIP model (`blip_v2_model_type`) from XXL to XL if your GPU does not have enough memory.

In [None]:
# CV Import takes a 30-60 secondss to load if executed before and takes longer if first executed
from main_simple_lib import *

In [1]:
# This box contains all the background functionality that was added for the Testing part of the Thesis

import os 
import datetime
from io import TextIOWrapper
import threading

# For timeout management
from contextlib import contextmanager
class TimeoutException(Exception): pass

imageFolderPath = "C:/Users/Felix/viper/testing/images"
queryTextPath = "C:/Users/Felix/viper/testing/query.txt"
logFolderPath  = "C:/Users/Felix/viper/testing/logs"
logNoResultFolderPath  = "C:/Users/Felix/viper/testing/logsNoResult"

# Shows Images each time the code gets executed
useImageDisplay = False

# 0 Single Log File, 1 Multiple Log Files, anything else disables code retrieval from log files
getCodeFromLog = 1 

# For the single log file path
codeLogFilePath = f'{logNoResultFolderPath}/log_NR0_TheBloke_WizardLM-1.0-Uncensored-CodeLlama-34B-GGUF.txt'

# For multiple log files, this must be part of the log file name -> allows comparison between codeblocks from multiple log files, 
# these will then be sorted for uniqueness
codeLogPartName = 'gpt-4o'
multipleLogsFolderPath = "C:/Users/Felix/viper/testing/logsNoResult"


# Theses are only used to decide which ai is used in the my_config file
codeGeneratorAi = "lm_studio"  # This is for LM-Studio AIs
#codeGeneratorAi = "codex" # This this the orginally implemented gpt3/Online AIs

#region Functions

def log_everything(querys, codes, imagePaths, results=None):
    
    if results is None:
        count = len([name for name in os.listdir(logNoResultFolderPath) if os.path.isfile(os.path.join(logNoResultFolderPath, name))])
        # Create log file
        with open(f'{logNoResultFolderPath}/log_NR{count}_{get_code_generator_model()}.txt', 'w') as f:
            write_config_to_log(f)
           
            # Write all queries/codes/images to log file
            counter = 1
            for query, code, imagePath in zip(querys, codes, imagePaths):
                f.write('---------------'+str(counter)+'---------------\n')
                f.write('\n')
                f.write(f'Image: {imagePath}\n')
                f.write(f'Query: {query}\n')
                f.write('\n')
                f.write(f'Code: {code}\n')
                f.write('\n')
                f.write('---------------'+str(counter)+'---------------\n')
                f.write('\n')
                counter += 1

        print(f'Logged query {count} to {logFolderPath}/log_NR{count}.txt')
        return
    else:
        if getCodeFromLog != 1:
            # Get log file count
            count = len([name for name in os.listdir(logFolderPath) if os.path.isfile(os.path.join(logFolderPath, name))])
            # Create log file
            with open(f'{logFolderPath}/log_{count}__{get_code_generator_model()}.txt', 'w') as f:
                write_config_to_log(f)

                # Write all queries/codes/images/results to log file
                counter = 1
                for query, code, imagePath, result in zip(querys, codes, imagePaths, results):
                    f.write('---------------'+str(counter)+'---------------\n')
                    f.write('\n')
                    f.write(f'Image: {imagePath}\n')
                    f.write(f'Query: {query}\n')
                    f.write(f'Result: {result}\n')
                    f.write('\n')
                    f.write(f'Code: {code}\n')
                    f.write('\n')
                    f.write('---------------'+str(counter)+'---------------\n')
                    f.write('\n')
                    counter += 1

            print(f'Logged query {count} to {logFolderPath}/log_{count}_{get_code_generator_model()}.txt')
            return
        else:
            # Get log file count
            count = len([name for name in os.listdir(logFolderPath) if os.path.isfile(os.path.join(logFolderPath, name))])
            # Create log file
            with open(f'{logFolderPath}/log_{count}__{codeLogPartName}.txt', 'w') as f:
                write_config_to_log(f)

                # Write all queries/codes/images/results to log file, uses multiple resultlists
                counter = 1
                for query, codelists, imagePath, resultlists in zip(querys, codes, imagePaths, results):
                    partialCounter = 1
                    f.write('---------------'+str(counter)+'---------------\n')
                    f.write('\n')
                    for code, result in zip(codelists, resultlists):
                        
                        f.write('---------------'+str(partialCounter)+'---------------\n')
                        f.write('\n')
                        f.write(f'Image: {imagePath}\n')
                        f.write(f'Query: {query}\n')
                        f.write(f'Result: {result}\n')
                        f.write('\n')
                        f.write(f'Code: {code}\n')
                        f.write('\n')
                        f.write('---------------'+str(partialCounter)+'---------------\n')
                        f.write('\n')
                        partialCounter += 1
                    f.write('---------------'+str(counter)+'---------------\n')
                    f.write('\n')
                    counter += 1

            print(f'Logged query {count} to {logFolderPath}/log_{count}_{codeLogPartName}.txt')
            return
        

def write_config_to_log(f: TextIOWrapper):
    if getCodeFromLog == 0:
        f.write('THIS IS A RERUN FROM A LOG FILE \n')
        f.write('Code Log File:\n')
        f.write(codeLogFilePath)
        f.write('\n')
        f.write('\n')

        # Get config from log file
        config = get_config_from_log(codeLogFilePath)
        for line in config:
            f.write(line)
    elif getCodeFromLog == 1:
        f.write('THIS IS A RERUN FROM A LOG FILEs \n')
        f.write('Code Log Files:\n')
        counter = 1
        for file in get_log_file_with_part_name(multipleLogsFolderPath, codeLogPartName):
            f.write('---------------'+str(counter)+'---------------\n')
            f.write(file)
            config = get_config_from_log(codeLogFilePath)
            for line in config:
                f.write(line)
            f.write('---------------'+str(counter)+'---------------\n')
            f.write('\n')
            counter += 1
        f.write('\n')
        f.write('\n')

    else:
        # Write Config Variables/Time to log file
        f.write('Config Variables:\n')
        f.write(str(config))
        f.write('\n')
        f.write('Code Generator AI:\n')
        f.write(codeGeneratorAi + get_code_generator_model())
        f.write('\n')
        
    f.write('Time:\n')
    f.write(str(datetime.datetime.now()))
    f.write('\n')
    f.write('\n')

def get_code_generator_model():
    if codeGeneratorAi == "gpt3":
        return config.gpt3.model
    elif codeGeneratorAi == "codex":
        return config.codex.model
    elif codeGeneratorAi == "lm_studio":
        model = config.lm_studio.model
        return model.replace('/', '_') 


# Limits time for a function to execute on windows without signal
@contextmanager
def time_limit(seconds):
    result_container = [None, False, None]  # [result, completed, exception]

    def target_wrapper(func, args, kwargs, result_container):
        try:
            result_container[0] = func(*args, **kwargs)
            result_container[1] = True
        except Exception as e:
            result_container[2] = e  # Store the exception

    def limited_function(func, *args, **kwargs):
        if not callable(func):
            raise ValueError(f"Expected a callable function, got {type(func)} instead.")
        
        thread = threading.Thread(target=target_wrapper, args=(func, args, kwargs, result_container))
        thread.start()
        thread.join(timeout=seconds)
        if result_container[2]:  # If an exception was caught
            raise result_container[2]  # Raise the stored exception
        if not result_container[1]:
            raise TimeoutError(f"Function execution exceeded the time limit of {seconds} seconds")
        return result_container[0]
    
    yield limited_function


def execute_code_updated(code, image, syntax=None):
    if syntax is None:
        syntax = Syntax(code, "python", theme="monokai", line_numbers=True, start_line=0)
    compiled_code = compile(code, '<codex>', 'exec')
    my_fig = plt.figure(figsize=(4, 4))
    exec(compiled_code, globals())
    result = execute_command(image, my_fig, time_wait_between_lines, syntax)
    
    #with time_limit(240) as limited:
    #    result = limited(execute_command, image, my_fig, time_wait_between_lines, syntax)

    return result


# Use this to rerun code execution
def get_codes_queries_images_from_log(logFilePath, codeless=False):
    with open(logFilePath, 'r') as f:
        lines = f.readlines()
        codes = []
        queries = []
        images = []
        syntaxes = []


        currtentCode = ""
        currtentQuery = ""
        currtentImage = ""
        returnFound = False
        for line in lines:
            if line.startswith('Code:'):
                currtentCode = line[6:]
            elif not line.startswith('    ') and currtentCode != "" and returnFound:
                code_for_syntax = currtentCode.replace("(image, my_fig, time_wait_between_lines, syntax)", "(image)")
                codes.append(currtentCode)
                syntaxes.append(Syntax(code_for_syntax, "python", theme="monokai", line_numbers=True, start_line=0))
                queries.append(currtentQuery)
                images.append(currtentImage)
                currtentCode = ""
                returnFound = False
            elif currtentCode != "":
                currtentCode += line
            elif line.startswith('Query:'):
                currtentQuery = line[7:].strip()
            elif line.startswith('Image:'):
                currtentImage = line[7:].strip()
            if "return" in line or line.startswith('---------------'):
                if currtentCode != "":
                    returnFound = True
        if codeless:
            return queries, images, syntaxes
        return codes, queries, images, syntaxes

def get_config_from_log(logFilePath):
    with open(logFilePath, 'r') as f:
        lines = f.readlines()
        configlines = []
        for line in lines:
            if line.startswith('Time'):
                break
            else:
                configlines.append(line)
        return configlines    

def get_codes_log(logFilePath):
    codes = []
    with open(logFilePath, 'r') as f:
        lines = f.readlines()
        currtentCode = ""
        returnFound = False
        for line in lines:
            if line.startswith('Code:'):
                currtentCode = line[6:]
            elif not line.startswith('    ') and currtentCode != "" and returnFound:
                codes.append(currtentCode)
                currtentCode = ""
                returnFound = False
            elif currtentCode != "":
                currtentCode += line
            if "return" in line or line.startswith('---------------'):
                if currtentCode != "":
                    returnFound = True
    return codes



def get_log_file_with_part_name(logFileFolderPath, partName) -> list[str]:
    # load add file names in folder
    files = os.listdir(logFileFolderPath)

    # filter files if it cointains partName and add folderpath at the beginning
    files = [logFileFolderPath + "/" + file for file in files if partName in file]

    if len(files) == 0:
        return None
    return files

def compare_code_blocks_from_nr_logs(listOfCodesLogFilePaths):
    # load all codeblocks from log files
    codeblockslist = []
    for file in listOfCodesLogFilePaths:
        codeblockslist.append(get_codes_log(file))

    uniqueCodeBlocks = []
    # compare codeblocks
    for i in range(len(codeblockslist[0])):
        set_codeblocks = set()
        for j in range(len(codeblockslist)):
            set_codeblocks.add(codeblockslist[j][i])
        uniqueCodeBlocks.append(list(set_codeblocks))
    
    return uniqueCodeBlocks
        
        
            

            


#endregion
'''
# Test get_log_file_with_part_name
testfiles = get_log_file_with_part_name(multipleLogsFolderPath, codeLogPartName)
testcodeblocklist = compare_code_blocks_from_nr_logs(testfiles)
print(len(testcodeblocklist))
uniqueCodes = 0
for codeblock in testcodeblocklist:
    uniqueCodes += len(codeblock)
print(uniqueCodes)
'''



'\n# Test get_log_file_with_part_name\ntestfiles = get_log_file_with_part_name(multipleLogsFolderPath, codeLogPartName)\ntestcodeblocklist = compare_code_blocks_from_nr_logs(testfiles)\nprint(len(testcodeblocklist))\nuniqueCodes = 0\nfor codeblock in testcodeblocklist:\n    uniqueCodes += len(codeblock)\nprint(uniqueCodes)\n'

In [None]:
# This box contains the code for getting the queries and images from the folder and ttxt file

print("Starting query processing...")

# Initialize all queries and images
queries = []
images = []
with open(queryTextPath, 'r') as f:
    for line in f:
        # If line is empty, skip
        if not line.strip():
            continue
        
        # If line is a comment, skip
        if line.startswith('#'):
            continue

        # Split at | to get the image name and query
        query = line.split('|')[1]
        image = line.split('|')[0]

        # Add the query to the list
        queries.append(query)

        # Add the image corresponding to the queryto the list
        images.append(imageFolderPath + "/" + image)

#print(queries)
#print(images)



In [None]:
# This box contains the code generation loop, it can be repeated without needing to execute any code or change in settings

def code_generator(queries):
    # Get all codes
    codes = []
    syntaxes = []
    code_counter = 1
    for query in queries:
        print("Starting code generation...")

        code_and_syntax = get_code(query, codeGeneratorAi)

        print("Starting code processing...")

        if code_and_syntax is None:
            print("Code generation failed. Skipping query.")
            codes.append("Code generation failed. Skipping query.")
            continue
        
        print("-----------------Code: "+str(code_counter)+" -----------------")
        print(code_and_syntax)
        print("------------------- "+str(code_counter)+" --------------------")

        code = code_and_syntax[0]
        syntax = code_and_syntax[1]

        # Add the code to the list
        codes.append(code)

        # Add the syntax to the list
        syntaxes.append(syntax)
        code_counter += 1
    return codes, syntaxes


runs = 4


if getCodeFromLog == 0:
    codes, queries, images, syntaxes = get_codes_queries_images_from_log(codeLogFilePath)
    print("Got codes from log file.")
    print("Code log file: "+codeLogFilePath)
    print("Code count: "+str(len(codes)))

elif getCodeFromLog == 1:
    files = get_log_file_with_part_name(logNoResultFolderPath, codeLogPartName)
    codes = compare_code_blocks_from_nr_logs(files)
    
    print("Got codes from log files.")
    print("Code log files: ")
    for file in files:
        print(file)
    print("Codelists count: "+str(len(codes)))
    uniqueCodes = 0
    for codeblock in codes:
        uniqueCodes += len(codeblock)
    print("Unique code count: "+ str(uniqueCodes))

    
else:
    for i in range(runs):
        print("Starting code generation, run "+str(i+1)+"/"+str(runs) +" ...")
        codes, syntaxes = code_generator(queries)
        log_everything(queries, codes, images)

if getCodeFromLog != 1 and getCodeFromLog != 0:
    log_everything(queries, codes, images)

In [None]:
#  This part contains the coe execution stage, as this is the most processing intensive stage, it is recommended to not use it unless you want the code to be executed


executeCode = True


if executeCode:
    if getCodeFromLog != 1:
        results = []
        # Execute all codes
        print("Starting code execution...")
        code_counter = 0
        for code, image, syntax in zip(codes, images, syntaxes):
            code_counter += 1
            print("Executing code " + str(code_counter) + "/" + str(len(codes)) + "...")
            loaded_image = load_image(image)
            if useImageDisplay:
                show_single_image(image) 
            if code == "Code generation failed. Skipping query.":
                results.append("Code execution failed. Skipped query.")
                continue
            try:
                result =execute_code_updated(code, loaded_image, syntax)
                print("Code execution done. Result: " + str(result))
                results.append(result)
            except Exception as e:
                print("Code execution failed. Error: " + str(e))
                results.append("Code execution failed. Error: " + str(e))
        # Log all queries + codes + images + results
        log_everything(queries, codes, images, results)
    else:
        resultslists = []
        # Execute all codes
        print("Starting code execution from multiple logs...")
        code_counter = 0
        for codelist, image in zip(codes, images):
            code_counter += 1
            partcial_code_counter = 0
            results = []
            for code in codelist:
                partcial_code_counter += 1
                print("Executing code " + str(partcial_code_counter) + "/" + str(len(codelist)) + "... of code " + str(code_counter) + "/" + str(len(codes)) + "...")
                loaded_image = load_image(image)
                if code == "Code generation failed. Skipping query.":
                    results.append("Code execution failed. Skipped query.")
                    continue
                try:
                    result =execute_code_updated(code, loaded_image)
                    print("Code execution done. Result: " + str(result))
                    results.append(result)
                except Exception as e:
                    print("Code execution failed. Error: " + str(e))
                    results.append("Code execution failed. Error: " + str(e))
            resultslists.append(results)
        # Log all queries + codes + images + results
        log_everything(queries, codes, images, resultslists)
print("Batch processing done.")
