In [85]:

!curl -fsSL https://ollama.com/install.sh | sh
import subprocess
process = subprocess.Popen("ollama serve", shell=True) #runs on a different thread
!ollama pull 'codellama-13b'
!pip install ollama
import ollama

>>> Cleaning up old version at /usr/local/lib/ollama
>>> Installing ollama to /usr/local
>>> Downloading Linux amd64 bundle
############################################################################################# 100.0%                                                           14.9%##############################                                                       43.9%
>>> Adding ollama user to video group...
>>> Adding current user to ollama group...
>>> Creating ollama systemd service...
>>> The Ollama API is now available at 127.0.0.1:11434.
>>> Install complete. Run "ollama" from the command line.
[?2026h[?25l[1Gpulling manifest ⠋ [K[?25h[?2026l[?2026h[?25l[1Gpulling manifest ⠙ [K[?25h[?2026l[?2026h[?25l[1Gpulling manifest [K[?25h[?2026l
Error: pull model manifest: file does not exist


In [19]:
import subprocess
import json
import radon.metrics
import tempfile

In [90]:
def run_ollama_locally(model, prompt):

    command = ["ollama", "run", model, prompt]
    result = subprocess.run(command, capture_output=True, text=True)
    print("result   ",result.stdout.strip())
    return result.stdout.strip()

def extract_code_from_json(response):

    try:
        data = json.loads(response)
        print(data)
        return data.get("code", "")
    except json.JSONDecodeError:
        return ""

def calculate_maintainability_index(code):

    try:
        with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as temp_file:
            temp_file.write(code.encode())
            temp_file_path = temp_file.name

        with open(temp_file_path, "r") as f:
            code_content = f.read()

        mi = radon.metrics.mi_visit(code_content, True)  
        return mi
    except Exception as e:
        return f"Error calculating MI: {e}"



In [91]:
def generate_cot_refactoring_prompt(code):
    return f"""You are a senior software engineer tasked with refactoring the following Python code.
Use Chain of Thought reasoning to refactor step by step:

Original code:
```python
{code}
```

1. First, analyze the code structure and identify issues:
2. Measure code quality metrics (conceptually):
3. List specific refactoring opportunities:
4. Apply these refactorings one by one:
5. Verify improvements (conceptually):

Please provide the fully refactored code at the end.
```

Please return the final refactored code inside the following JSON structure:
<output>
{{"code":  $refactored_code}}
</output>
"""

def generate_tot_refactoring_prompt(code):
    return  f"""you are a senior software engineer refactoring Python code.
    Use Tree of Thought reasoning by exploring multiple refactoring paths:
    
    Original code:
    ```python
    {code}
    ```
    
    Path 1: Style Improvements
    - Do PEP 8 style fixes
    - Do naming improvements
    - Evaluate on readability?
    
    Path 2: Structural Improvements
    - identify  functions that can be extracted
    - eliminate repeated code
    - consider data structure organisation
    - evaluate based on maintainability
    
    Path 3: Algorithm Improvements
    - identify big complexity algorithms
    - consider modern approaches rather than old approaches
    - run code and check if it improves performance
    
    Now, select the best improvement path or combine them based on maintainability index. Select the one with the higher maintainability index. if index>90. good otherwise, refine. refinement should happen only once. do not fall in loop.
    
   Please return the final refactored code inside the following JSON structure:
<output>
{{"code": $refactored_code}}
</output>
    """
    
  

def generate_zero_shot_refactoring_prompt(code):
    return f"""Refactor this Python code to improve maintainability, readability, and efficiency:

            ```python
            {code}
            ```
            
            Return the refactored code inside a JSON object:
            {{"code": "<refactored_code>"}}
            """

def generate_few_shot_refactoring_prompt(code):
    return f"""Refactor the Python code following the patterns in these examples:

            Example - Before:
            ```python
            def sum_list(lst):
                s = 0
                for i in range(len(lst)):
                    s += lst[i]
                return s
            ```
            
            Example - After:
            ```python
            def sum_list(numbers):
                return sum(numbers)
            ```
            
            Now refactor this code following similar patterns:
            ```python
            {code}
            ```
            
            Return the refactored code inside a JSON object:
            {{"code": <refactored_code>}}
            """

In [92]:

def refactor_code(code, method, model="codellama"):
    if method == "cot":
        prompt = generate_cot_refactoring_prompt(code)
    elif method == "tot":
        prompt = generate_tot_refactoring_prompt(code)
    elif method == "zero_shot":
        prompt = generate_zero_shot_refactoring_prompt(code)
    elif method == "few_shot":
        prompt = generate_few_shot_refactoring_prompt(code)
    else:
        raise ValueError("Invalid refactoring method")
    
    response = run_ollama_locally(model, prompt)
    return extract_code_from_json(response)

# Example usage
if __name__ == "__main__":
    sample_code = """

def generate_monthly_report(year, month, department):
    db_host = "localhost"
    db_user = "admin"
    db_pass = "password123"
    db_name = "company_data"
    
    print("Connecting to database...")
    
    
    if department == "sales":
        query = "SELECT * FROM sales WHERE YEAR(date) = " + str(year) + " AND MONTH(date) = " + str(month)
    elif department == "marketing":
        query = "SELECT * FROM marketing_campaigns WHERE YEAR(date) = " + str(year) + " AND MONTH(date) = " + str(month)
    elif department == "hr":
        query = "SELECT * FROM employee_records WHERE YEAR(hire_date) = " + str(year) + " AND MONTH(hire_date) = " + str(month)
    else:
        print("Error: Unknown department")
        return None
    

    print("Executing query: " + query)

    

    if department == "sales":
        results = [
            {"id": 1, "date": "2023-01-15", "amount": 1500, "customer": "ABC Corp"},
            {"id": 2, "date": "2023-01-22", "amount": 2200, "customer": "XYZ Inc"}
        ]
    elif department == "marketing":
        results = [
            {"id": 1, "date": "2023-01-10", "campaign": "Social Media", "cost": 5000, "leads": 120},
            {"id": 2, "date": "2023-01-20", "campaign": "Email", "cost": 1000, "leads": 45}
        ]
    else:
        results = [
            {"id": 1, "hire_date": "2023-01-05", "employee": "John Doe", "position": "Developer"},
            {"id": 2, "hire_date": "2023-01-12", "employee": "Jane Smith", "position": "Designer"}
        ]
    

    print("Generating report...")
    

    report = "========= Monthly Report =========\n"
    report += "Department: " + department + "\n"
    report += "Period: " + str(month) + "/" + str(year) + "\n"
    report += "================================\n\n"
    

    if department == "sales":
        total_sales = 0
        for sale in results:
            total_sales += sale["amount"]
            report += "Sale ID: " + str(sale["id"]) + "\n"
            report += "Date: " + sale["date"] + "\n"
            report += "Customer: " + sale["customer"] + "\n"
            report += "Amount: $" + str(sale["amount"]) + "\n"
            report += "-----------------\n"
        
        report += "\nTotal Sales: $" + str(total_sales) + "\n"
        

        if total_sales < 5000:
            commission_rate = 0.05
        elif total_sales < 10000:
            commission_rate = 0.07
        else:
            commission_rate = 0.1
        
        commission = total_sales * commission_rate
        report += "Commission (Rate: " + str(commission_rate * 100) + "%): $" + str(commission) + "\n"
    
    elif department == "marketing":
        total_cost = 0
        total_leads = 0
        for campaign in results:
            total_cost += campaign["cost"]
            total_leads += campaign["leads"]
            report += "Campaign: " + campaign["campaign"] + "\n"
            report += "Date: " + campaign["date"] + "\n"
            report += "Cost: $" + str(campaign["cost"]) + "\n"
            report += "Leads Generated: " + str(campaign["leads"]) + "\n"
            report += "-----------------\n"
        
        report += "\nTotal Cost: $" + str(total_cost) + "\n"
        report += "Total Leads: " + str(total_leads) + "\n"
        
        if total_leads > 0:
            cost_per_lead = total_cost / total_leads
            report += "Cost per Lead: $" + str(cost_per_lead) + "\n"
    
    else:
        for record in results:
            report += "Employee: " + record["employee"] + "\n"
            report += "Hire Date: " + record["hire_date"] + "\n"
            report += "Position: " + record["position"] + "\n"
            report += "-----------------\n"
        
        report += "\nTotal New Hires: " + str(len(results)) + "\n"
    

    filename = department + "_report_" + str(year) + "_" + str(month) + ".txt"
    print("Saving report to file: " + filename)

    print("\nReport Preview:")
    print(report)
    
    return report
"""
    
    methods = ["cot", "tot", "zero_shot", "few_shot"]
    
    for method in methods:
        print(f"\n{method.upper()} Refactoring:")
        refactored_code = refactor_code(sample_code, method)
        print(refactored_code)
        
        if refactored_code:
            mi_score = calculate_maintainability_index(refactored_code)
            print(f"Maintainability Index: {mi_score}")


COT Refactoring:
result    


TOT Refactoring:
result    


ZERO_SHOT Refactoring:
result    


FEW_SHOT Refactoring:
result    



In [72]:
mi = calculate_maintainability_index('''
import numpy as np

def calculate_stats(numbers: List[int]) -> Tuple[float, float, float]:
    total = np.sum(numbers)
    mean = np.mean(numbers)

    variance = calculate_variance(numbers, mean)
    std_dev = variance ** 0.5

    return mean, variance, std_dev

def calculate_variance(numbers: List[int], mean: float) -> float:
    variance = np.sum((numbers - mean) ** 2) / len(numbers)
    return variance
    ''')

In [65]:
mi

66.72097341308248

In [73]:
import timeit
from memory_profiler import memory_usage
from radon.complexity import cc_visit
from radon.metrics import h_visit
import ast

def evaluate_code_refactoring(code):
    exec_time = measure_execution_time(code)
    memory_used = measure_memory_usage(code)
    readability_score = evaluate_readability(code)
    maintainability_score = evaluate_maintainability(code)

    result = {
        "execution_time_seconds": exec_time,
        "memory_used_bytes": memory_used,
        "readability_score": readability_score,
        "maintainability_score": maintainability_score
    }

    return result


def measure_execution_time(code):
    try:
        timer = timeit.Timer(code)
        exec_time = timer.timeit(number=1)
        return exec_time
    except:
        return -1  # Return negative number if error occurs


def measure_memory_usage(code):
    try:
        mem_usage = memory_usage((exec, (), {'code': code}))
        return max(mem_usage) * 1024 * 1024  # Convert to bytes
    except:
        return -1  # Return negative number if error occurs


def evaluate_readability(code):
    try:
        score = 0
        if "def " in code:
            score += 2
        if "import " in code:
            score += 1
        lines = len(code.split('\n'))
        if lines < 50:
            score += 2
        tree = ast.parse(code)
        docstrings = [node for node in ast.walk(tree) if isinstance(node, ast.FunctionDef) and ast.get_docstring(node)]
        if docstrings:
            score += 2
        complexity = cc_visit(code)
        if complexity:
            score += max([item.complexity for item in complexity])
        return score
    except:
        return -1  # Return negative number if error occurs


def evaluate_maintainability(code):

    try:
        with tempfile.NamedTemporaryFile(delete=False, suffix=".py") as temp_file:
            temp_file.write(code.encode())
            temp_file_path = temp_file.name

        with open(temp_file_path, "r") as f:
            code_content = f.read()

        mi = radon.metrics.mi_visit(code_content, True)  # Calculate Maintainability Index
        return mi
    except Exception as e:
        return f"Error calculating MI: {e}"


# Example code to evaluate
code = """
def calculate_stats(numbers):
    total = sum(numbers)
    mean = total / len(numbers)
    
    variance = sum((x - mean) ** 2 for x in numbers) / len(numbers)
    std_dev = variance ** 0.5
    
    return mean, variance, std_dev
"""

evaluation = evaluate_code_refactoring(code)
print(evaluation)


{'execution_time_seconds': 7.440003173542209e-07, 'memory_used_bytes': -1, 'readability_score': 6, 'maintainability_score': 70.54250698541472}


In [61]:
!pip install memory_profiler

Collecting memory_profiler
  Downloading memory_profiler-0.61.0-py3-none-any.whl.metadata (20 kB)
Downloading memory_profiler-0.61.0-py3-none-any.whl (31 kB)
Installing collected packages: memory_profiler
Successfully installed memory_profiler-0.61.0


In [10]:
!pip install radon

Collecting radon
  Downloading radon-6.0.1-py2.py3-none-any.whl.metadata (8.2 kB)
Collecting mando<0.8,>=0.6 (from radon)
  Downloading mando-0.7.1-py2.py3-none-any.whl.metadata (7.4 kB)
Downloading radon-6.0.1-py2.py3-none-any.whl (52 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m52.8/52.8 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading mando-0.7.1-py2.py3-none-any.whl (28 kB)
Installing collected packages: mando, radon
Successfully installed mando-0.7.1 radon-6.0.1
