In [None]:
import os
import requests
from dotenv import load_dotenv
from huggingface_hub import login
from transformers.utils.quantization_config import BitsAndBytesConfig
from transformers.models.auto.modeling_auto import AutoModelForCausalLM
from transformers.models.auto.tokenization_auto import AutoTokenizer
import gc
import torch


In [None]:
load_dotenv(override=True)
hf_token = os.getenv('HF_TOKEN')
login(token=hf_token)

In [None]:
# model_name = 'cosmo3769/starcoderbase-1b-GGUF'
model_name = 'bigcode/starcoder2-3b'
system_message = "You are an assistant that reimplements Python code in high performance C++ for a windows machine. "
system_message += "Respond only with C++ code; use comments sparingly and do not provide any explanations other than occassional comments. "
system_message += "The C++ response needs to production an identical output in the fastest possible time."

In [None]:
def user_prompt_for(python):
    user_prompt = "Rewrite this python code in C++ with the fastest possible implementation that produces identical output in the least time. Respond on;y with C++ code; do not explain your work other than a few comments. Pay attention to number types to ensure no int overflows. Remember to #include all necessary C++ packages such as iomanip.\n\n"
    user_prompt += python
    return user_prompt

In [None]:
def message_for(python):
    return [
        {"role": "system", "content": system_message},
        {"role": "user", "content": user_prompt_for(python)}
    ]

In [None]:
# write to a file called optimized.cpp
def write_output(cpp):
    code = cpp.replace("```cpp", "").replace("```", "")
    with open("optimized.cpp", "w") as f:
        f.write(cpp)

In [None]:
# Quantization Config - this allows us to load the model into memory and use less memory

quant_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_compute_dtype=torch.bfloat16,
    bnb_4bit_quant_type="nf4"
)

In [None]:
from transformers import StoppingCriteria

class EarlyStoppingCriteria(StoppingCriteria):
    def __init__(self, tokenizer):
        self.tokenizer = tokenizer
        self.completion_indicators = [
            tokenizer.encode("return 0;", add_special_tokens=False),
            tokenizer.encode("}", add_special_tokens=False),
        ]
    
    def __call__(self, input_ids, scores, **kwargs):
        # Check last few tokens for completion patterns
        last_tokens = input_ids[0][-10:].tolist()
        
        for indicator in self.completion_indicators:
            if len(indicator) <= len(last_tokens):
                if last_tokens[-len(indicator):] == indicator:
                    return True
        return False

# Add to generation_kwargs


In [None]:
from transformers.generation.streamers import TextIteratorStreamer
import threading
import torch
import gc

def optimize_starcoder(python):
    print(f"Using model: {model_name}")
    
    # Load tokenizer
    tokenizer = AutoTokenizer.from_pretrained(model_name, padding_side="left")
    tokenizer.pad_token = tokenizer.eos_token

    # Create optimized prompt for StarCoder2
    prompt = f"{system_message}\n\nRewrite this Python code in high-performance C++:\n\n{python}\n\n// C++ implementation:\n"

    # Tokenize input
    inputs = tokenizer(
        prompt,
        return_tensors="pt",
        padding=True,
        return_attention_mask=True
    ).to("cuda")

    attention_mask = (inputs.input_ids != tokenizer.pad_token_id).int().to("cuda")

    # Initialize streamer
    streamer = TextIteratorStreamer(tokenizer, skip_prompt=True, skip_special_tokens=True)

    # Load model
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        device_map="cuda",
        quantization_config=quant_config,
        torch_dtype=torch.float16,
    )

    # Generation parameters
    generation_kwargs = {
        "input_ids": inputs.input_ids,
        "attention_mask": attention_mask,
        "max_new_tokens": 512,
        "pad_token_id": tokenizer.pad_token_id,
        "eos_token_id": tokenizer.eos_token_id,
        "streamer": streamer,
        "do_sample": False,  # More deterministic for code
        "temperature": 0.1,   # Lower temperature for focused output
        "top_p": 0.95,
    }

    # Start generation in separate thread
    thread = threading.Thread(target=model.generate, kwargs=generation_kwargs)
    thread.start()

    # Process streamed output
    reply = ""
    last_check_length = 0
    
    print("Generated C++ code:")
    print("-" * 50)
    
    for text in streamer:
        print(text, end="", flush=True)
        reply += text
        
        # Check for completion every 75 characters
        if len(reply) - last_check_length > 75:
            last_check_length = len(reply)
            
            # Look for complete program indicators
            has_includes = "#include" in reply
            has_main = "int main(" in reply or "int main()" in reply
            has_return = "return 0;" in reply
            
            # Count braces for balance
            open_braces = reply.count("{")
            close_braces = reply.count("}")
            
            # More conservative stopping conditions
            if has_includes and has_main and has_return:
                if open_braces > 0 and open_braces == close_braces:
                    if len(reply) > 200:  # Ensure substantial code
                        print("\n" + "-" * 50)
                        print("[Stopping: Complete C++ program detected]")
                        break
    
    # Wait for generation to complete
    thread.join()
    
    # Clean the generated code and write to file
    cleaned_reply = reply.replace("```cpp", "").replace("```", "").strip()
    write_output(cleaned_reply)
    
    print(f"\nGenerated {len(cleaned_reply)} characters")
    print("Code saved to optimized.cpp")
    
    # Cleanup
    del model, tokenizer, streamer, inputs, attention_mask
    torch.cuda.empty_cache()
    gc.collect()
    
    return cleaned_reply

In [None]:
pi = """
import time

def calculate(iterations, param1, param2):
    result = 1.0
    for i in range(1, iterations+1):
        j = i * param1 - param2
        result -= (1/j)
        j = i * param1 + param2
        result += (1/j)
    return result

start_time = time.time()
result = calculate(100_000_000, 4, 1) * 4
end_time = time.time()

print(f"Result: {result:.12f}")
print(f"Execution Time: {(end_time - start_time):.6f} seconds")
"""

In [None]:
exec(pi)

In [None]:
optimize_starcoder(pi)

In [None]:
!g++ -O3 -std=c++17 -march=native -o optimized.exe optimized.cpp
!optimized.exe

In [None]:
import gradio as gr

def gen_prompt(python):
    return f"{system_message}\n\nRewrite this Python code in high-performance C++:\n\n{python}\n\n// C++ implementation:\n"

with gr.Blocks() as ui:
    with gr.Row():
        python_code = gr.Textbox(label="Python code: ", lines=10, value=pi)
        cpp = gr.Textbox(label="C++ code:", lines=10)
    with gr.Row():
        model = gr.Dropdown([model_name], label="Select model", value=model_name)
        convert = gr.Button("Convert code")

    convert.click(optimize_starcoder, inputs=[python_code], outputs=[cpp])

ui.launch(inbrowser=True)