In [1]:
!pip install -q python-dotenv requests gradio IPython huggingface_hub transformers bitsandbytes



In [2]:
# imports
import os
import io
import sys
import json
import requests
from dotenv import load_dotenv
from IPython.display import Markdown, display, update_display
import gradio as gr
import subprocess
from huggingface_hub import login, InferenceClient
from transformers import AutoTokenizer
from google.colab import userdata
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
import torch
import gc
import traceback

In [3]:
# Sign in to HuggingFace Hub

hf_token = userdata.get('HF_TOKEN')
login(hf_token, add_to_git_credential=True)

In [4]:
code_qwen = "Qwen/CodeQwen1.5-7B-Chat"

In [48]:
system_message = (
    "You are an assistant that strictly outputs high-performance C++ code from Python. "
    "Do not include any explanations, markdown formatting, or text other than the C++ code itself. "
    "Respond only with functional C++ code. "
    "Ensure the code works on x86 systems and includes all necessary headers like <iostream>, <iomanip>, and <chrono>. "
    "The C++ response must match the Python implementation's output in both precision and functionality. "
    "Pay attention to type conversions, floating-point precision, and overflow handling. "
)


In [49]:
def user_prompt_for(python):
    user_prompt = "Rewrite this Python code in C++ with the fastest possible implementation that produces identical output in the least time. "
    user_prompt += "Respond only with C++ code; do not explain your work other than a few comments. "
    user_prompt += "Pay attention to number types to ensure no int overflows. Remember to #include all necessary C++ packages such as iomanip.\n\n"
    user_prompt += "Ensure the C++ code produces identical output, avoids any int overflows, and uses appropriate types "
    user_prompt += "like double for floating-point operations."
    user_prompt += python
    return user_prompt

In [8]:
def messages_for(python):
    return [
        {"role": "system", "content": system_message},
        {"role": "user", "content": user_prompt_for(python)}
    ]

In [9]:
pi = """
import time

def calculate(iterations, param1, param2):
    result = 1.0
    for i in range(1, iterations+1):
        j = i * param1 - param2
        result -= (1/j)
        j = i * param1 + param2
        result += (1/j)
    return result

start_time = time.time()
result = calculate(100_000_000, 4, 1) * 4
end_time = time.time()

print(f"Result: {result:.12f}")
print(f"Execution Time: {(end_time - start_time):.6f} seconds")
"""

In [10]:
python_hard = """
def lcg(seed, a=1664525, c=1013904223, m=2**32):
    value = seed
    while True:
        value = (a * value + c) % m
        yield value

def max_subarray_sum(n, seed, min_val, max_val):
    lcg_gen = lcg(seed)
    random_numbers = [next(lcg_gen) % (max_val - min_val + 1) + min_val for _ in range(n)]
    max_sum = float('-inf')
    for i in range(n):
        current_sum = 0
        for j in range(i, n):
            current_sum += random_numbers[j]
            if current_sum > max_sum:
                max_sum = current_sum
    return max_sum

def total_max_subarray_sum(n, initial_seed, min_val, max_val):
    total_sum = 0
    lcg_gen = lcg(initial_seed)
    for _ in range(20):
        seed = next(lcg_gen)
        total_sum += max_subarray_sum(n, seed, min_val, max_val)
    return total_sum

# Parameters
n = 10000         # Number of random numbers
initial_seed = 42 # Initial seed for the LCG
min_val = -10     # Minimum value of random numbers
max_val = 10      # Maximum value of random numbers

# Timing the function
import time
start_time = time.time()
result = total_max_subarray_sum(n, initial_seed, min_val, max_val)
end_time = time.time()

print("Total Maximum Subarray Sum (20 runs):", result)
print("Execution Time: {:.6f} seconds".format(end_time - start_time))
"""

In [11]:
quant_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_quant_type="nf4"
)

In [35]:
def write_output(cpp):
    """
    Cleans and writes the generated C++ code to a file.

    Args:
        cpp (str): The raw output from the model, which may contain explanations or markdown.

    """
    # Remove markdown markers and explanations
    if "```cpp" in cpp:
        cpp = cpp.split("```cpp")[-1]  # Extract content after the first ```cpp
    if "```" in cpp:
        cpp = cpp.split("```")[0]  # Stop at the closing ```

    # Remove trailing explanatory lines or text
    lines = cpp.splitlines()
    clean_lines = []
    for line in lines:
        # Only include lines that look like valid C++ code or comments
        if line.strip() and not line.startswith("This C++ code") and not line.startswith("Note that"):
            clean_lines.append(line)

    # Join cleaned lines and write to file
    clean_code = "\n".join(clean_lines)
    with open("optimized.cpp", "w") as f:
        f.write(clean_code.strip())


In [36]:
def execute_cpp(code):
    """
    Compiles and executes C++ code on a PC with x86 architecture.

    Args:
        code (str): The C++ code to compile and execute.

    Returns:
        str: Output from the compiled C++ program or an error message.
    """
    write_output(code)  # Save the C++ code to a file
    try:
        # Adjust compilation flags for x86 architecture
        compile_cmd = ["clang++", "-O3", "-std=c++17", "-march=native", "-o", "optimized", "optimized.cpp"]
        compile_result = subprocess.run(compile_cmd, check=True, text=True, capture_output=True)

        # Execute the compiled binary
        run_cmd = ["./optimized"]
        run_result = subprocess.run(run_cmd, check=True, text=True, capture_output=True)

        # Return the output of the program
        return run_result.stdout

    except subprocess.CalledProcessError as e:
        # Return the error message if compilation or execution fails
        return f"An error occurred:\n{e.stderr}"


In [51]:
def post_process_cpp(cpp_code):
    """
    Cleans the raw C++ code by removing unwanted explanations, markdown formatting, and extraneous text.

    Args:
        cpp_code (str): Raw C++ code generated by the model.

    Returns:
        str: Cleaned and usable C++ code.
    """
    # Remove markdown markers and explanations
    if "```cpp" in cpp_code:
        cpp_code = cpp_code.split("```cpp")[-1]  # Extract content after the first ```cpp
    if "```" in cpp_code:
        cpp_code = cpp_code.split("```")[0]  # Stop at the closing ```

    # Remove lines that don't look like valid C++ code
    valid_cpp_lines = []
    for line in cpp_code.splitlines():
        if not line.strip().startswith("//") and not line.lower().startswith("note") and line.strip():
            valid_cpp_lines.append(line)

    return "\n".join(valid_cpp_lines).strip()


In [52]:
def stream_code_qwen(python):
    """
    Converts Python code to high-performance C++ and cleans the output.
    Writes the cleaned C++ code to a file and returns the cleaned code and logs.

    Args:
        python (str): The Python code to convert.

    Returns:
        tuple: (Cleaned C++ code, Logs for the operation)
    """
    import traceback
    import gc
    import torch

    logs = ""
    result = ""

    try:
        # Logs to track progress
        logs += "Clearing CUDA cache and garbage collecting...\n"

        # Clear CUDA cache and garbage collection
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        gc.collect()
        logs += "CUDA cache cleared and garbage collection completed.\n"

        # Load model and tokenizer
        model_name = code_qwen  # Replace with the correct Hugging Face model name
        logs += f"Loading model: {model_name}...\n"
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForCausalLM.from_pretrained(
            model_name,
            quantization_config=quant_config,  # Apply quantization configuration here
            device_map="auto"  # Automatically map model to available GPUs/CPUs
        )
        logs += "Model and tokenizer loaded successfully.\n"

        # Prepare input text
        messages = messages_for(python)
        text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        logs += f"Prepared input text: {text[:100]}...\n"  # Log the first 100 characters

        # Tokenize input
        device = "cuda" if torch.cuda.is_available() else "cpu"
        logs += f"Using device: {device}\n"
        inputs = tokenizer(text, return_tensors="pt").to(device)
        logs += "Input tokenized and sent to the device.\n"

        # Generate text output
        logs += "Starting code generation...\n"
        outputs = model.generate(inputs.input_ids, max_new_tokens=3000, do_sample=True)
        result = tokenizer.decode(outputs[0], skip_special_tokens=True)
        logs += "Code generation completed successfully.\n"

        # Post-process the output to clean the C++ code
        logs += "Post-processing the generated C++ code...\n"
        cleaned_result = post_process_cpp(result)
        logs += "Post-processing completed successfully.\n"

        # Write the cleaned C++ code to a file
        logs += "Writing the cleaned C++ code to 'optimized.cpp'...\n"
        with open("optimized.cpp", "w") as f:
            f.write(cleaned_result)
        logs += "Cleaned C++ code written to 'optimized.cpp'.\n"

    except Exception as e:
        # Log error details
        logs += f"An error occurred:\n{traceback.format_exc()}"

    finally:
        # Clean up resources
        logs += "Cleaning up resources...\n"
        del model
        del inputs
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        gc.collect()
        logs += "Resources cleaned up successfully.\n"

    return cleaned_result, logs


In [46]:
def stream_code_qwen(python):
    import traceback
    import gc
    import torch

    logs = ""
    result = ""

    try:
        # Logs to track progress
        logs += "Clearing CUDA cache and garbage collecting...\n"

        # Clear CUDA cache and garbage collection
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        gc.collect()
        logs += "CUDA cache cleared and garbage collection completed.\n"

        # Load model and tokenizer
        model_name = code_qwen  # Replace with the correct Hugging Face model name
        logs += f"Loading model: {model_name}...\n"
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForCausalLM.from_pretrained(
            model_name,
            quantization_config=quant_config,  # Apply quantization configuration here
            device_map="auto"  # Automatically map model to available GPUs/CPUs
        )
        logs += "Model and tokenizer loaded successfully.\n"

        # Prepare input text
        messages = messages_for(python)
        text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
        logs += f"Prepared input text: {text[:100]}...\n"  # Log the first 100 characters

        # Tokenize input
        device = "cuda" if torch.cuda.is_available() else "cpu"
        logs += f"Using device: {device}\n"
        inputs = tokenizer(text, return_tensors="pt").to(device)
        logs += "Input tokenized and sent to the device.\n"

        # Generate text output
        logs += "Starting code generation...\n"
        outputs = model.generate(inputs.input_ids, max_new_tokens=3000, do_sample=True)
        result = tokenizer.decode(outputs[0], skip_special_tokens=True)
        logs += "Code generation completed successfully.\n"

    except Exception as e:
        # Log error details
        logs += f"An error occurred:\n{traceback.format_exc()}"

    finally:
        # Clean up resources
        logs += "Cleaning up resources...\n"
        del model
        del inputs
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        gc.collect()
        logs += "Resources cleaned up successfully.\n"

    return result, logs


In [16]:

def optimize(python, model):
  if model == "CodeQwen":
    result = stream_code_qwen(python)
  else:
    raise ValueError("Unknown model")
  for stream_so_far in result:
    yield stream_so_far

In [17]:
css = """
.python {background-color: #306998;}
.cpp {background-color: #050;}
"""

In [53]:
with gr.Blocks(css=css) as ui:
    gr.Markdown("## Convert code from Python to C++ using CodeQwen")
    with gr.Row():
        python = gr.Textbox(label="Python code:", value=pi, lines=10)  # Input for Python code
        cpp = gr.Textbox(label="C++ code:", lines=10)  # Output for converted C++ code
    with gr.Row():
        convert = gr.Button("Convert code")  # Button to trigger code conversion
    with gr.Row():
        python_run = gr.Button("Run Python")  # Button to execute the Python code
        cpp_run = gr.Button("Run C++")  # Button to execute the C++ code
    with gr.Row():
        python_out = gr.TextArea(label="Python result:", elem_classes=["python"])  # Display Python execution result
        cpp_out = gr.TextArea(label="C++ result:", elem_classes=["cpp"])  # Display C++ execution result
    with gr.Row():
        logs = gr.TextArea(label="Logs", lines=15)  # TextArea to display logs and errors

    # Link the 'convert' button to stream_code_qwen
    convert.click(stream_code_qwen, inputs=[python], outputs=[cpp, logs])

    # Link the Python execution button
    python_run.click(execute_python, inputs=[python], outputs=[python_out])

    # Link the C++ execution button
    cpp_run.click(execute_cpp, inputs=[cpp], outputs=[cpp_out])

ui.launch(inbrowser=True, debug=True, share=True)


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

