In [3]:
# Used to track processing time
import time

# Used to write a file with the current datetime
import datetime
import os

from textwrap import dedent

# Used to import LLM and set up mutli-agent crews
from langchain_community.llms import Ollama
from crewai import Crew, Agent, Task

## Test Inputs

In [4]:
import csv  # For Function 6
import sqlite3  # For Function 7
import smtplib  # For Function 8
import requests  # For Function 9
import os  # For Function 10
import re  # For Function 10


# (1) Calculate Fibonacci sequence
def f(n):
    a, b = 0, 1
    for _ in range(n):
        a, b = b, a + b
    return a

# (2) Merge two sorted lists
def m(a, b):
    c = []
    i, j = 0, 0
    while i < len(a) and j < len(b):
        if a[i] < b[j]:
            c.append(a[i])
            i += 1
        else:
            c.append(b[j])
            j += 1
    c.extend(a[i:])
    c.extend(b[j:])
    return c

# (3) Find the greatest common divisor using Euclid's algorithm
def g(x, y):
    while y != 0:
        x, y = y, x % y
    return x

# (4) Find the maximum subarray sum (Kadane's Algorithm)
def k(l):
    m, c = float('-inf'), 0
    for n in l:
        c = max(n, c + n)
        m = max(m, c)
    return m

# (5) Calculate factorial recursively
def f(n):
    return 1 if n == 0 else n * f(n - 1)

# (6) Reading and processing a CSV file
def r(f):
    with open(f, mode='r') as c:
        r = csv.reader(c)
        h = next(r)
        d = [dict(zip(h, l)) for l in r]
    return d

# (7) Connecting to a database and executing a query
def q(d, s):
    with sqlite3.connect(d) as c:
        x = c.cursor()
        x.execute(s)
        r = x.fetchall()
    return r

# (8) Sending an email using SMTP
def e(u, p, t, m):
    s = smtplib.SMTP('smtp.example.com', 587)
    s.starttls()
    s.login(u, p)
    s.sendmail(u, t, m)
    s.quit()

# (9) Fetching JSON data from a web API
def j(u):
    r = requests.get(u)
    if r.status_code == 200:
        return r.json()
    else:
        return {}

# (10) Performing a file search with regex
def s(p, r):
    m = re.compile(r)
    l = []
    for d, _, f in os.walk(p):
        for x in f:
            if m.search(x):
                l.append(os.path.join(d, x))
    return l


## Initialize LLM

In [5]:
llm = Ollama(model="Llama3")

## Creating Agents

In [None]:
class CrewAgents:

    def variable_name_agent(self):
        return Agent(
            role='Senior Python Developer',
            goal=dedent(
                '''
                To review Python functions and ensure that variable names are
                following best practices. Variable names are concise, descriptive,
                and easy to understand.
                '''
            ),
            backstory=dedent(
                '''
                You're a Python developer who's been programming for decades. You're
                renowned for writing excellent code, and particularly for writing
                high-quality variable names.
                '''),
            allow_delegation=False,
            verbose=True,
            llm=llm
    )

    def docstring_agent(self):
        return Agent(
            role='Senior Code Documentation Expert',
            goal='Write docstrings for Python functions.',
            backstory=dedent(
                '''
                You're a renowned code documentation expert, specifically in Python.
                You know how to write clean, concise docstrings that include all the
                information relevant to developers trying to understand a function.
                '''),
            allow_delegation=False,
            verbose=True,
            llm=llm
    )
    
    def quality_assurance_agent(self):
        return Agent(
            role="Senior Python Quality Assurance Tester",
            goal=dedent(
            '''
            To review Python code and ensure that it follows best practices.
            Double-check that variable names are accurate and concise and that
            the docstrings accurately reflect what the function actually does.
            '''
            ),
            backstory=dedent(
            '''
            You're a renowned quality assurance expert. You've been reviewing code
            for decades and ensure that code is flawless, with excellent variable
            names and docstrings.
            '''
            ),
            allow_delegation=False,
            verbose=True,
            llm=llm
        )
    
    def code_writer_agent(self):
        return Agent(
            role="Senior Code Writer",
            goal=dedent(
            '''
            Take code and strip out everything except the code
            and docstrings.
            '''
            ),
            backstory=dedent(
            '''
            You do an amazing job of getting code and stripping it
            of everything that's not code or documentation.
            '''
            ),
            allow_delegation=False,
            verbose=True,
            llm=llm
        )

## Create Tasks

In [None]:
class CrewTasks:

    def improve_variable_names(self, agent, provided_code):
        return Task(
            description=dedent(
                f'''
                Review the variable names in the Python code. Update
                the variable names so that they're accurate, concise, and
                easy to read and understand. Update the variable names only;
                don't change anything else about the functions, and do not
                add comments anywhere in the code.

                Python Function
                ---------------
                {provided_code}
                '''
            ),
            expected_output=dedent(
                '''
                A Python function with variable names that are
                accurate, concise, and easy to understand.
                '''
            ),
            agent=agent
        )

    def add_docstring(self, agent, provided_code):
        return Task(
            description=dedent(
                f'''
                Provide docstrings to the Python code provided below.
                Ensure the documentation is concise, accurate, and easy
                to read.

                Python Function
                ---------------
                {provided_code}
                '''
            ),
            expected_output=dedent(
                '''
                A Python function with a well-written, concise, and
                easy to understand docstring.
                '''
            ),
            agent=agent
        )

    def review_code(self, agent, provided_code):
        return Task(
            description=dedent(
                f'''
                Review code and documentation and ensure it meets the
                highest standards of quality. The variable names are
                clear, concise, and accurate; the docstrings are
                accurate and descriptive, so that any developer reading
                them, even a junior developer, would understand exactly
                what they do and how to use them.

                Python Function
                ---------------
                {provided_code}
                '''
            ),
            expected_output=dedent(
                '''
                A Python function with excellent variable names and
                well-written, concise, and easy to understand docstring.
                '''
            ),
            agent=agent
        )

    def write_code(self, agent, provided_code):
        return Task(
            description=dedent(
                f'''
                Review the provided code and remove anything that
                is not code or docstrings. This information will
                then be written to a file that can be immediately used
                within a Python script.

                Python Function
                ---------------
                {provided_code}
                '''
            ),
            expected_output=dedent(
                '''
                Code that consists ONLY of functions and their
                docstrings. All other information is stripped out.
                '''
            ),
            agent=agent
        )

## Read File
Reads a file of functions, each immediately proceeded by a comment.

These functions are read line by line and added to the `functions` list
for further processing.

In [None]:
# Define relative file path
test_suite_file_path = 'rhys_research/test_suite/test_functions_raw_practice.py'

# Save all captured functions to a list
functions = []

with open(test_suite_file_path, 'r', encoding='utf-8') as file:

    function_code      = ''
    capturing_function = False

    for line in file:

        # Check if the line is blank
        if line.strip() == '':

            # If the line is blank and we're capturing
            # a function, we've reached the end of the
            # current function; reset
            if capturing_function:
                functions.append(function_code.strip())
                function_code = ''
                capturing_function = False

        # If line isn't blank, start capturing function
        else:
            function_code += line
            capturing_function = True

    # If the file doesn't end in a blank line,
    # ensure the last function is saved.
    if capturing_function:
        functions.append(function_code.strip())

## Process Functions

In [None]:
# Initialize Agents & Tasks
agents = CrewAgents()
tasks  = CrewTasks()

# Create Agents
variable_name_agent = agents.variable_name_agent()
docstring_agent     = agents.docstring_agent()
qa_agent            = agents.quality_assurance_agent()
code_writer_agent   = agents.code_writer_agent()

# Process Functions
updated_functions = []

for function in functions:

    # Create tasks
    improve_variable_names = tasks.improve_variable_names(variable_name_agent, function)
    add_docstrings         = tasks.add_docstring(docstring_agent, improve_variable_names)
    review_code            = tasks.review_code(qa_agent, add_docstrings)
    write_code_to_file     = tasks.write_code(code_writer_agent, review_code)

    # Create crew & assign tasks
    crew = Crew(
        agents = [
            variable_name_agent,
            docstring_agent,
            qa_agent,
            code_writer_agent
        ],
        tasks = [
            improve_variable_names,
            add_docstrings,
            review_code,
            write_code_to_file
        ],
        verbose=True
    )

    # Start code-improvement process
    start_time = time.perf_counter()
    output = crew.kickoff()
    end_time = time.perf_counter()

    updated_functions.append(output.strip())

    print("################################")
    print("## Refactored Function Output ##")
    print("################################")
    print(f">>> Time taken: {end_time - start_time:.2f} seconds <<<\n")

## Write updated functions to file
We may not need to write output to a file, hence the commenting out.

In [None]:
# def remove_triple_backticks(func_str):
#     """Remove triple backticks from the beginning and end of a string."""
#     return func_str.strip('`').strip()
    
# # Check if directory exists; if not, create it
# # This directory is relative to the location of this Jupyter Notebook
# save_location_file_path = 'rhys_research/test_outputs'  # Removed leading slash for relative path
# if not os.path.exists(save_location_file_path):
#     os.makedirs(save_location_file_path)

# # Create custom datetime output, used for filename
# current_datetime = datetime.datetime.now().strftime('%y-%m-%d_%H-%M-%S') + '.txt'
# output_filename = 'function_output_' + current_datetime
# full_file_path = os.path.join(save_location_file_path, output_filename)

# # Write to the file
# with open(full_file_path, 'w', encoding='utf-8') as f:
#     for func in updated_functions:
#         cleaned_func = remove_triple_backticks(func)
#         f.write(cleaned_func + "\n")

# print(f'Functions successfully written to file "{full_file_path}".')

## Ideal Outputs
Below are ideal version of the test inputs.

In [6]:
# import csv
# import os
# import re
# import smtplib
# import sqlite3

# import requests


# def fibonacci(n):
#     """
#     Calculate the nth Fibonacci number using an iterative approach.

#     Parameters:
#         n (int): The position in the Fibonacci sequence to compute.

#     Returns:
#         int: The nth Fibonacci number.
#     """
#     current, next_value = 0, 1
#     for _ in range(n):
#         current, next_value = next_value, current + next_value
#     return current

# def merge(sorted_list1, sorted_list2):
#     """
#     Merge two sorted lists into a single sorted list.

#     Parameters:
#         sorted_list1 (list): The first sorted list.
#         sorted_list2 (list): The second sorted list.

#     Returns:
#         list: A merged and sorted list containing all elements from both input lists.
#     """
#     merged_list = []
#     index1, index2 = 0, 0
#     while index1 < len(sorted_list1) and index2 < len(sorted_list2):
#         if sorted_list1[index1] < sorted_list2[index2]:
#             merged_list.append(sorted_list1[index1])
#             index1 += 1
#         else:
#             merged_list.append(sorted_list2[index2])
#             index2 += 1
#     merged_list.extend(sorted_list1[index1:])
#     merged_list.extend(sorted_list2[index2:])
#     return merged_list

# def gcd(a, b):
#     """
#     Compute the greatest common divisor (GCD) of two numbers using Euclid's algorithm.

#     Parameters:
#         a (int): The first number.
#         b (int): The second number.

#     Returns:
#         int: The greatest common divisor of a and b.
#     """
#     while b != 0:
#         a, b = b, a % b
#     return a

# def max_subarray_sum(array):
#     """
#     Find the maximum sum of any contiguous subarray within the given array using Kadane's algorithm.

#     Parameters:
#         array (list of int): List of integers, possibly including negative numbers.

#     Returns:
#         int: The maximum sum of any contiguous subarray.
#     """
#     max_sum, current_sum = float('-inf'), 0
#     for number in array:
#         current_sum = max(number, current_sum + number)
#         max_sum = max(max_sum, current_sum)
#     return max_sum

# def factorial(n):
#     """
#     Calculate the factorial of a non-negative integer recursively.

#     Parameters:
#         n (int): The number to calculate the factorial of.

#     Returns:
#         int: The factorial of n.
#     """
#     return 1 if n == 0 else n * factorial(n - 1)

# def read_csv(filepath):
#     """
#     Read a CSV file and return a list of dictionaries, each representing a row.

#     Parameters:
#         filepath (str): The path to the CSV file.

#     Returns:
#         list of dict: A list of dictionaries keyed by the headers of the CSV with corresponding row values.
#     """
#     with open(filepath, mode='r') as file:
#         reader = csv.reader(file)
#         headers = next(reader)
#         data = [dict(zip(headers, row)) for row in reader]
#     return data

# def execute_query(database, sql):
#     """
#     Execute a SQL query on a SQLite database and return the results.

#     Parameters:
#         database (str): The path to the SQLite database file.
#         sql (str): The SQL query string.

#     Returns:
#         list of tuple: The results of the SQL query.
#     """
#     with sqlite3.connect(database) as conn:
#         cursor = conn.cursor()
#         cursor.execute(sql)
#         result = cursor.fetchall()
#     return result

# def send_email(user, password, to_address, message):
#     """
#     Send an email using SMTP protocol.

#     Parameters:
#         user (str): The SMTP username (email address).
#         password (str): The SMTP password.
#         to_address (str): The recipient's email address.
#         message (str): The email message to send.
#     """
#     server = smtplib.SMTP('smtp.example.com', 587)
#     server.starttls()
#     server.login(user, password)
#     server.sendmail(user, to_address, message)
#     server.quit()

# def fetch_json(url):
#     """
#     Fetch JSON data from a URL using an HTTP GET request.

#     Parameters:
#         url (str): The URL to fetch data from.

#     Returns:
#         dict or list: The JSON parsed response, or an empty dictionary if the request fails.
#     """
#     response = requests.get(url)
#     if response.status_code == 200:
#         return response.json()
#     else:
#         return {}

# def search_files(path, regex):
#     """
#     Search for files in a given directory that match a regular expression.

#     Parameters:
#         path (str): The path to the directory where files are searched.
#         regex (str): The regular expression pattern to match filenames against.

#     Returns:
#         list of str: A list of file paths that match the regex pattern.
#     """
#     pattern = re.compile(regex)
#     matched_files = []
#     for dirpath, _, filenames in os.walk(path):
#         for filename in filenames:
#             if pattern.search(filename):
#                 matched_files.append(os.path.join(dirpath, filename))
#     return matched_files
