# Code Docstring Generator

Generate code Docstring of python code using both OpenAI and Qwen.
First of all, giving model the coding problem, asked to output the code that solved it the fastest for each model with Docstring.
And if press the Run Python button below, the code will run and the elapsed time will be printed.
My box lacks memory to use "Qwen/CodeQwen1.5-7B-Chat" as it is. So I'm going to use quantization first.


In [1]:
import torch
print(torch.cuda.is_available())  # If True, We can use GPU
print(torch.cuda.get_device_name(0))

True
NVIDIA GeForce RTX 3060 Ti


In [2]:
# imports
import os
import io
import sys
import json
import requests
from dotenv import load_dotenv
from openai import OpenAI
from IPython.display import Markdown, display, update_display
import gradio as gr
import subprocess

load_dotenv(override=True)
os.environ['OPENAI_API_KEY'] = os.getenv('OPENAI_API_KEY', 'your-key-if-not-using-env')
os.environ['HF_TOKEN'] = os.getenv('HF_TOKEN', 'your-key-if-not-using-env')

openai = OpenAI()
OPENAI_MODEL = "gpt-4o"

In [52]:
system_message_code = "You are an assistant that generates optimal solution of python code."
system_message_code += "Add Docstrings for analyzing the functions and classes and adding explanations of Python code based on the purpose or input/output information, except Test example."
system_message_code += "Respond only with Python code; do not provide any explanation outside of code."

In [32]:
def user_prompt_for_code(quesion, python):
    user_prompt = "Given the coding problem, Rewrite this Python code with time and memory optimalization that produces identical output in the least time. "
    user_prompt += "Remember to include all necessary Python library and packages such as import numpy.\n\n"
    user_prompt += question
    user_prompt += python
    return user_prompt

In [6]:
question = '''머쓱이는 태어난 지 6개월 된 조카를 돌보고 있습니다. 
조카는 아직 "aya", "ye", "woo", "ma" 네 가지 발음을 최대 한 번씩 사용해 조합한(이어 붙인) 발음밖에 하지 못합니다. 
문자열 배열 babbling이 매개변수로 주어질 때,머쓱이의 조카가 발음할 수 있는 단어의 개수를 return하도록 solution 함수를 완성해주세요.'''

python = '''
from itertools import permutations
def solution(babbling):
    able_ans=[]  
    for i in range(1,5):
        res = list(permutations(["aya", "ye", "woo", "ma"], i))
        able_ans.append(res)
        
    bab_array=[]
    for item in able_ans:
        if isinstance(item, tuple):
            result.append(''.join(item))
        elif isinstance(item, list):
            for subitem in item:
                if isinstance(subitem, tuple):
                    bab_array.append(''.join(subitem))
    for k in babbling:
        bab_array.append(k)

    return len(bab_array) - len(set(bab_array))
'''

In [7]:
def messages_for_code(question, python):
    return [
        {"role": "system", "content": system_message_code},
        {"role": "user", "content": user_prompt_for_code(question, python)}
    ]

In [8]:
# write to a file called optimized.cpp

def write_output(py):
    code = py.replace("```python","").replace("```","")
    with open("optimized.py", "w") as f:
        f.write(code)

In [9]:
def optimize_gpt(question, python):    
    stream = openai.chat.completions.create(model=OPENAI_MODEL, messages=messages_for_code(question, python), stream=True)
    reply = ""
    for chunk in stream:
        fragment = chunk.choices[0].delta.content or ""
        reply += fragment
        print(fragment, end='', flush=True)
    write_output(reply)

In [11]:
def stream_gpt(question, python):    
    stream = openai.chat.completions.create(model=OPENAI_MODEL, messages=messages_for_code(question, python), stream=True)
    reply = ""
    for chunk in stream:
        fragment = chunk.choices[0].delta.content or ""
        reply += fragment
        yield reply.replace('```python\n','').replace('```','')

In [15]:
from huggingface_hub import login, InferenceClient
from transformers import AutoTokenizer

In [16]:
hf_token = os.environ['HF_TOKEN']
login(hf_token, add_to_git_credential=True)

Note: Environment variable`HF_TOKEN` is set and is the current active token independently from the token you've just configured.


In [17]:
import torch
#del model  # 기존에 있던 모델 삭제
torch.cuda.empty_cache()
torch.cuda.reset_peak_memory_stats()

In [18]:
# local에서 Qwen/CodeQwen1.5-7B-Chat모델 사용하기위해 4bit 양자화. 기존 14GB인데 8GB스펙이라 4GB로 사용하기 위함. 위에께 수정한거 이게 원본

from transformers import AutoModelForCausalLM, AutoTokenizer
import torch
code_qwen = "Qwen/CodeQwen1.5-7B-Chat"
model_id = code_qwen

model = AutoModelForCausalLM.from_pretrained(
    model_id,
    #device_map="auto",  # 자동으로 GPU 할당
    load_in_4bit=True,  # 4비트 양자화
    trust_remote_code=True
)

tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)

The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.
Sliding Window Attention is enabled but not implemented for `sdpa`; unexpected results may be encountered.


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

In [49]:
def qwen_manual_prompt(system_message_code, user_message):
    return (
        f"<|im_start|>system\n{system_message_code}<|im_end|>\n"
        f"<|im_start|>user\n{user_message}<|im_end|>\n"
        f"<|im_start|>assistant\n"
    )

In [50]:
from transformers import pipeline
import re
pipe = pipeline("text-generation", model=model, tokenizer=tokenizer)

def run_code_qwen(question, python):
    messages = messages_for_code(question, python)
    prompt = qwen_manual_prompt(system_message_code, user_prompt_for_code(question, python))
    #prompt = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    inputs = tokenizer(prompt, return_tensors="pt").to("cuda")
    if "token_type_ids" in inputs:
        del inputs["token_type_ids"]
    inputs = {k: v.to("cuda") for k, v in inputs.items()}
    output_ids = model.generate(**inputs, max_new_tokens=3000)
    response = tokenizer.decode(output_ids[0], skip_special_tokens=True)
    return response

def extract_python_code(response: str) -> str:
    # ```python과 ``` 사이 텍스트 추출 (```python 포함하지 않음)
    match = re.search(r"```python\s*(.*?)\s*```", response, re.DOTALL)
    if match:
        return match.group(1).strip()
    else:
        return "No Python code found."

Device set to use cuda:0


In [None]:
def optimize(question, python, selected_model):
    if selected_model == "GPT":
        generator = stream_gpt(question, python)
        for stream_so_far in generator:
            yield stream_so_far
        
    elif selected_model == "QWEN":
        response = run_code_qwen(question, python)
        generator = extract_python_code(response)
        yield generator
    else:
        raise ValueError("Unknown model")

## Adding execute time

In [67]:
import time

def execute_python(code):
    try:
        output = io.StringIO()
        sys.stdout = output
        start_time = time.time()
        exec(code)
        end_time = time.time()
        print("Execution Time: {:.6f} seconds".format(end_time - start_time))
    finally:
        sys.stdout = sys.__stdout__
    return output.getvalue()

In [68]:
css = """
.python {background-color: #306998;}
"""

In [69]:
import gradio as gr
#gradio with executation
with gr.Blocks() as ui:
    with gr.Row():
        que = gr.Textbox(label="Question:", lines=10 ,value=question)
        py = gr.Textbox(label="Original Python code:", lines=10, value=python)
        result = gr.Textbox(label="Optimized code:", lines=10)
    with gr.Row():
        selected_model = gr.Dropdown(["GPT", "QWEN"], label="Select model", value="GPT")
        convert = gr.Button("Convert code")
    with gr.Row():
        python_run = gr.Button("Run Python")
    with gr.Row():
        python_out = gr.TextArea(label="Python result:", elem_classes=["python"])
        
    python_run.click(execute_python, inputs=[result], outputs=[python_out])   
    convert.click(optimize, inputs=[que, py, selected_model], outputs=[result])
    
# gradio without excutation
# with gr.Blocks() as ui:
#     with gr.Row():
#         que = gr.Textbox(label="Question:", lines=10 ,value=question)
#         py = gr.Textbox(label="Original Python code:", lines=10, value=python)
#         result = gr.Textbox(label="Optimized code:", lines=10)
#     with gr.Row():
#         selected_model = gr.Dropdown(["GPT", "QWEN"], label="Select model", value="GPT")
#         convert = gr.Button("Convert code")
    
#     convert.click(optimize, inputs=[que, py, selected_model], outputs=[result])

#ui.launch(inbrowser=True)

In [26]:
ui.launch(inbrowser=True)

* Running on local URL:  http://127.0.0.1:7861

To create a public link, set `share=True` in `launch()`.




Traceback (most recent call last):
  File "C:\Users\82103\anaconda3\envs\llms\Lib\site-packages\gradio\queueing.py", line 625, in process_events
    response = await route_utils.call_process_api(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\82103\anaconda3\envs\llms\Lib\site-packages\gradio\route_utils.py", line 322, in call_process_api
    output = await app.get_blocks().process_api(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\82103\anaconda3\envs\llms\Lib\site-packages\gradio\blocks.py", line 2136, in process_api
    result = await self.call_function(
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\82103\anaconda3\envs\llms\Lib\site-packages\gradio\blocks.py", line 1674, in call_function
    prediction = await utils.async_iteration(iterator)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\82103\anaconda3\envs\llms\Lib\site-packages\gradio\utils.py", line 729, in async_iteration
    return await anext(iterato