# load_ds: simple, medium, hard, each 5

In [None]:
!pip install datasets



In [None]:
import json
from datasets import Dataset, concatenate_datasets

In [None]:
with open("/content/LeetCodeDataset-v0.1.0-test.jsonl", encoding="utf-8") as f:
# with open("LeetCodeDataset-v0.1.0-test.jsonl", encoding="utf-8") as f:
    rows = [json.loads(line) for line in f]

ds = Dataset.from_list(rows)
print(ds)

Dataset({
    features: ['task_id', 'prompt', 'completion', 'entry_point', 'test', 'query', 'response', 'input_output', 'meta'],
    num_rows: 175
})


In [None]:
print(ds[0]['meta'])

{'difficulty': 'Medium', 'lang_code': 'class Solution:\n    def longestEqualSubarray(self, nums: List[int], k: int) -> int:', 'question_id': 2832, 'question_title': "You are given a 0-indexed integer array nums and an integer k.\nA subarray is called equal if all of its elements are equal. Note that the empty subarray is an equal subarray.\nReturn the length of the longest possible equal subarray after deleting at most k elements from nums.\nA subarray is a contiguous, possibly empty sequence of elements within an array.\n\xa0\nExample 1:\n\nInput: nums = [1,3,2,3,1,3], k = 3\nOutput: 3\nExplanation: It's optimal to delete the elements at index 2 and index 4.\nAfter deleting them, nums becomes equal to [1, 3, 3, 3].\nThe longest equal subarray starts at i = 1 and ends at j = 3 with length equal to 3.\nIt can be proven that no longer equal subarrays can be created.\n\nExample 2:\n\nInput: nums = [1,1,2,2,1,1], k = 2\nOutput: 4\nExplanation: It's optimal to delete the elements at index 2

In [None]:
def first_k_by_difficulty(dataset, difficulty, k=5):
    """
    Return the first k rows whose meta['difficulty'] matches `difficulty`.
    """
    subset = dataset.filter(lambda ex: ex["meta"]["difficulty"] == difficulty)
    return subset.select(range(min(k, len(subset))))   # protects against < k rows



In [None]:
easy_ds   = first_k_by_difficulty(ds, "Easy",   k=5)
medium_ds = first_k_by_difficulty(ds, "Medium", k=5)
hard_ds   = first_k_by_difficulty(ds, "Hard",   k=5)
example_ds = concatenate_datasets([easy_ds.select([0]), medium_ds.select([0]), hard_ds.select([0])]) # serve as few shot examples

Filter:   0%|          | 0/175 [00:00<?, ? examples/s]

Filter:   0%|          | 0/175 [00:00<?, ? examples/s]

Filter:   0%|          | 0/175 [00:00<?, ? examples/s]

  block_group = [InMemoryTable(cls._concat_blocks(list(block_group), axis=axis))]
  table = cls._concat_blocks(blocks, axis=0)


# build few shot examples:1S + 1M + 1H

In [None]:
from itertools import islice
from textwrap import dedent

import re

In [None]:
ONE_PROMPT_BLOCK_TEMPLATE = """<start>
{problem}
###
{examples}
###
<fim_prefix>
{prefix}
<fim_suffix>
<fim_middle>
{middle}
<end>""".strip()


# helper for row_to_block()
def extract_problem_and_prefixcode(query: str) -> tuple[str, str]:
    """
    take in field query, return problem text and prefix_code
    e.g.

    * `problem_text`  - up before the first ```python fence, w/o closing ```
    * `prefix_code`   - rest
    """
    #print(query)
    m_open  = re.search(r"```python\s*", query)
    if not m_open:
        raise ValueError("opening ```python fence not found")

    m_block = re.search(r"```python\s*([\s\S]*?)```", query)
    if not m_block:
        raise ValueError("closing ``` fence not found")

    problem = query[: m_open.start()].rstrip()
    code    = m_block.group(1).rstrip()

    # keep the opening fence so prefix *starts* with ```python
    prefix  = "```python\n" + code
    return problem, prefix


# build one prompt block
def row_to_block(q, k_io: int = 2, filled: bool = True) -> str:
    """
    Convert a q from ds to a prompt block, following SHOT_TEMPLATE
    k_io: # i/o examples

    * If `filled` is False i.e. the question to be answered, `{middle}` is left empty for the model to fill
    """
    #print(f"ROWTOBLOCK, q: {q['query']}")
    #print(f"ROWTOBLOCK, q[]:{q['query']}")
    problem, prefix = extract_problem_and_prefixcode(q["query"])

    # 2. Keep k_io I/O examples
    examples = "\n".join(
        f"Input:  {p['input']}\nOutput: {p['output']}"
        for p in q["input_output"][:k_io]
    )

    # 3. Middle — blank for the target task
    middle = q["completion"].strip() if filled else ""

    # 4. Assemble block
    return ONE_PROMPT_BLOCK_TEMPLATE.format(
        problem=problem,
        examples=examples,
        prefix=prefix,
        middle=middle,
    )


In [None]:
few_shot_part = "\n\n".join(
    row_to_block(r, filled=True) for r in example_ds
)

ROWTOBLOCK, You are given a 1-indexed integer array nums of length n.
An element nums[i] of nums is called special if i divides n, i.e. n % i == 0.
Return the sum of the squares of all special elements of nums.
 
Please complete the following python code precisely:
```python
class Solution:
    def sumOfSquares(self, nums: List[int]) -> int:
```
You are given a 1-indexed integer array nums of length n.
An element nums[i] of nums is called special if i divides n, i.e. n % i == 0.
Return the sum of the squares of all special elements of nums.
 
Please complete the following python code precisely:
```python
class Solution:
    def sumOfSquares(self, nums: List[int]) -> int:
```
ROWTOBLOCK, You are given a 0-indexed integer array nums and an integer k.
A subarray is called equal if all of its elements are equal. Note that the empty subarray is an equal subarray.
Return the length of the longest possible equal subarray after deleting at most k elements from nums.
A subarray is a contiguous,

In [None]:
print(few_shot_part)

<start>
You are given a 1-indexed integer array nums of length n.
An element nums[i] of nums is called special if i divides n, i.e. n % i == 0.
Return the sum of the squares of all special elements of nums.
 
Please complete the following python code precisely:
###
Input:  nums = [1,2,3,4]
Output: 21
Input:  nums = [2,7,1,19,18,3]
Output: 63
###
<fim_prefix>
```python
class Solution:
    def sumOfSquares(self, nums: List[int]) -> int:
<fim_suffix>
<fim_middle>
class Solution:
    def sumOfSquares(self, nums: List[int]) -> int:
        n = len(nums)
        return sum(x * x for i, x in enumerate(nums, 1) if n % i == 0)
<end>

<start>
You are given a 0-indexed integer array nums and an integer k.
A subarray is called equal if all of its elements are equal. Note that the empty subarray is an equal subarray.
Return the length of the longest possible equal subarray after deleting at most k elements from nums.
A subarray is a contiguous, possibly empty sequence of elements within an array.
 

# load model

In [None]:
!pip install -q transformers

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch

In [None]:
if torch.backends.mps.is_built():
    device = "mps"
elif torch.backends.cuda.is_built():
    device = "cuda"
else:
    device = "cpu"

In [None]:
print(device)

cuda


In [None]:
# import gc, torch

# # 1. Delete old variables (models, big tensors)
# del model      # or whatever names you used
# # del other_big_tensor
# gc.collect()

# # 2. Empty the GPU cache & see how much frees up
# torch.cuda.empty_cache()
# print(torch.cuda.memory_summary())

In [None]:
MODEL_NAME = "bigcode/starcoder2-3b"
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
model     = AutoModelForCausalLM.from_pretrained(MODEL_NAME).to(device).eval()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/958 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/700 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/12.1G [00:00<?, ?B/s]

# code gen helpers

In [None]:
from typing import Any, Dict

In [None]:
# ────────────────────────────────────────────────────────────────
# 1.  build the full prompt for a single question
# ────────────────────────────────────────────────────────────────
def make_prompt(
    few_shot_part: str,
    q        # one row from `ds`
             # formatter: row → str
) -> str:
    """
    Concatenate the system directive + few-shot examples + target question.
    """
    target_part = row_to_block(q, filled=False)

    return (
        "You are a Python programming assistant. "
        "Solve each new problem exactly as in previous examples. "
        "Do not output anything except the code that goes inside the provided "
        "class skeleton.\n"
        + few_shot_part
        + "\n\n"
        + target_part
        + "\n"
    )

# ────────────────────────────────────────────────────────────────
# 2.  send the prompt to the model and get back raw generation
# ────────────────────────────────────────────────────────────────
def generate_raw(
    prompt: str,
    tokenizer,
    model,
    device,
    max_new_tokens: int = 2048,
) -> str:
    """
    Encode → generate → decode → strip after <end>.
    Returns the model's raw textual response (still with both Solution classes).
    """
    def strip_after_end(text: str) -> str:
        return text.split("<end>", 1)[0].rstrip()

    # encode
    inputs = tokenizer(prompt, return_tensors="pt").to(device)

    # generate
    with torch.no_grad():
        output_ids = model.generate(
            **inputs,
            max_new_tokens=max_new_tokens,
            temperature=0.2, # param from paper eval expermients
            top_p=0.95,
            do_sample=True,
            eos_token_id=tokenizer.convert_tokens_to_ids("<end>"),
        )

    # decode only the new tokens
    gen_text = tokenizer.decode(output_ids[0][inputs.input_ids.shape[-1]:])
    return strip_after_end(gen_text)

# ────────────────────────────────────────────────────────────────
# 3.  pull the final Solution class out of that text
# ────────────────────────────────────────────────────────────────
def extract_solution_code(text: str) -> str:
    """
    Return the slice from the SECOND 'class Solution' onward.
    Fallbacks:
      • if only one hit → return from the first hit
      • if none → return the full text
    """
    hits = list(re.finditer(r'\bclass\s+Solution\b', text))
    if len(hits) >= 2:
        return text[hits[1].start():].lstrip()
    elif hits:
        return text[hits[0].start():].lstrip()
    else:
        return text.strip()

## code gen pipeline (prepare prompt, gen, extract)

In [None]:
def gen_pipeline(q) -> str:
    #print("GENPIPE", q)
    prompt = make_prompt(few_shot_part, q)
    #print("[gen_pipeline] Prompt created (length:", len(prompt), "chars)")
    #print(prompt)
    raw    = generate_raw(prompt, tokenizer, model, device)
    #print("[gen_pipeline] Raw output generated")
    #print(raw)
    solution_code = extract_solution_code(raw)
    #print("[gen_pipeline] Extraction complete")
    #print(solution_code)
    return prompt, raw, solution_code



In [None]:
# note first q of each ds already serves as example in prompt
prompt, raw, solution_code = gen_pipeline(easy_ds.select([1])[0]) # since select returns a ds; to get the entry, index once more
# print each
print(f"[PROMPT]:{prompt}")
print(f"[RAW]:{raw}")
print(f"[SOLUTION_CODE]:{solution_code}")




Setting `pad_token_id` to `eos_token_id`:0 for open-end generation.


[PROMPT]:You are a Python programming assistant. Solve each new problem exactly as in previous examples. Do not output anything except the code that goes inside the provided class skeleton.
<start>
You are given a 1-indexed integer array nums of length n.
An element nums[i] of nums is called special if i divides n, i.e. n % i == 0.
Return the sum of the squares of all special elements of nums.
 
Please complete the following python code precisely:
###
Input:  nums = [1,2,3,4]
Output: 21
Input:  nums = [2,7,1,19,18,3]
Output: 63
###
<fim_prefix>
```python
class Solution:
    def sumOfSquares(self, nums: List[int]) -> int:
<fim_suffix>
<fim_middle>
class Solution:
    def sumOfSquares(self, nums: List[int]) -> int:
        n = len(nums)
        return sum(x * x for i, x in enumerate(nums, 1) if n % i == 0)
<end>

<start>
You are given a 0-indexed integer array nums and an integer k.
A subarray is called equal if all of its elements are equal. Note that the empty subarray is an equal suba

In [None]:
prompt, raw, solution_code = gen_pipeline(medium_ds.select([1])[0])
# print each
print(f"[PROMPT]:{prompt}")
print(f"[RAW]:{raw}")
print(f"[SOLUTION_CODE]:{solution_code}")

Setting `pad_token_id` to `eos_token_id`:0 for open-end generation.


[PROMPT]:You are a Python programming assistant. Solve each new problem exactly as in previous examples. Do not output anything except the code that goes inside the provided class skeleton.
<start>
You are given a 1-indexed integer array nums of length n.
An element nums[i] of nums is called special if i divides n, i.e. n % i == 0.
Return the sum of the squares of all special elements of nums.
 
Please complete the following python code precisely:
###
Input:  nums = [1,2,3,4]
Output: 21
Input:  nums = [2,7,1,19,18,3]
Output: 63
###
<fim_prefix>
```python
class Solution:
    def sumOfSquares(self, nums: List[int]) -> int:
<fim_suffix>
<fim_middle>
class Solution:
    def sumOfSquares(self, nums: List[int]) -> int:
        n = len(nums)
        return sum(x * x for i, x in enumerate(nums, 1) if n % i == 0)
<end>

<start>
You are given a 0-indexed integer array nums and an integer k.
A subarray is called equal if all of its elements are equal. Note that the empty subarray is an equal suba

In [None]:
prompt, raw, solution_code = gen_pipeline(hard_ds.select([1])[0])
# print each
print(f"[PROMPT]:{prompt}")
print(f"[RAW]:{raw}")
print(f"[SOLUTION_CODE]:{solution_code}")

Setting `pad_token_id` to `eos_token_id`:0 for open-end generation.


[PROMPT]:You are a Python programming assistant. Solve each new problem exactly as in previous examples. Do not output anything except the code that goes inside the provided class skeleton.
<start>
You are given a 1-indexed integer array nums of length n.
An element nums[i] of nums is called special if i divides n, i.e. n % i == 0.
Return the sum of the squares of all special elements of nums.
 
Please complete the following python code precisely:
###
Input:  nums = [1,2,3,4]
Output: 21
Input:  nums = [2,7,1,19,18,3]
Output: 63
###
<fim_prefix>
```python
class Solution:
    def sumOfSquares(self, nums: List[int]) -> int:
<fim_suffix>
<fim_middle>
class Solution:
    def sumOfSquares(self, nums: List[int]) -> int:
        n = len(nums)
        return sum(x * x for i, x in enumerate(nums, 1) if n % i == 0)
<end>

<start>
You are given a 0-indexed integer array nums and an integer k.
A subarray is called equal if all of its elements are equal. Note that the empty subarray is an equal suba

# code test helper

In [None]:
! pip install -U pistonpy

Collecting pistonpy
  Downloading pistonpy-0.0.3-py3-none-any.whl.metadata (3.9 kB)
Collecting typing (from pistonpy)
  Downloading typing-3.7.4.3.tar.gz (78 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.6/78.6 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading pistonpy-0.0.3-py3-none-any.whl (9.4 kB)
Building wheels for collected packages: typing
  Building wheel for typing (setup.py) ... [?25l[?25hdone
  Created wheel for typing: filename=typing-3.7.4.3-py3-none-any.whl size=26304 sha256=0ac1d4f7908a975481bdc5556f1bf1cf205cf67cddf071bcb418359c3ddd02bc
  Stored in directory: /root/.cache/pip/wheels/9d/67/2f/53e3ef32ec48d11d7d60245255e2d71e908201d20c880c08ee
Successfully built typing
Installing collected packages: typing, pistonpy
Successfully installed pistonpy-0.0.3 typing-3.7.4.3


In [None]:
from pistonpy import PistonApp

In [None]:
# Initialize the client.
piston = PistonApp()

In [None]:
def test_pipe(
    q: Dict[str, str],   # ? in fact only needs fields 'prompt' (import statements), 'test', 'entry_point'
    sol: str,            # the extract code
    py_version: str = "3.10.0",
) -> str:
    """
    Build a single Python script (solution + tests + driver), execute it
    via Piston, and return the stderr text (empty string means the test passed).

    py_version : str
        Python version tag recognised by Piston (default "3.10.0").

    return empty str if all pass
    """

    merged_code = (
        q["prompt"].rstrip()
        + "\n\n"
        sol.rstrip()                    # candidate implementation
        + "\n\n"
        + q["test"].rstrip()            # assert-based tests
        + "\n\n"
        + f"check({q['entry_point']})\n"  # driver that triggers the tests
    )

    result = piston.run(
        language="python",
        version=py_version,
        code=merged_code,
    )
    #print("[test_pipeline] test complete")
    #print(result)

    return merged_code, result["run"]["stderr"]

# pipeline with 1 iter of gen code + run test

In [None]:
q = easy_ds[2]
def one_iter_pipe(q):
    prompt, raw, code= gen_pipeline(q)
    test_code, stderr_msg = test_pipe(q, code)
    print(f"[one_iter_pipe]: {stderr_msg}")
    return prompt, raw, code, test_code, stderr_msg

In [None]:
prompt, raw, code, test_code, stderr_msg  = one_iter_pipe(easy_ds[2])
print(f"[PROMPT]:{prompt}\n\n [RAW]:{raw} \n\n [CODE]:{code}, [TEST_CODE]:{test_code}, [STDERR_MSG]:{stderr_msg}")

Setting `pad_token_id` to `eos_token_id`:0 for open-end generation.


[gen_pipeline] Prompt created (length: 4710 chars)
You are a Python programming assistant. Solve each new problem exactly as in previous examples. Do not output anything except the code that goes inside the provided class skeleton.
<start>
You are given a 1-indexed integer array nums of length n.
An element nums[i] of nums is called special if i divides n, i.e. n % i == 0.
Return the sum of the squares of all special elements of nums.
 
Please complete the following python code precisely:
###
Input:  nums = [1,2,3,4]
Output: 21
Input:  nums = [2,7,1,19,18,3]
Output: 63
###
<fim_prefix>
```python
class Solution:
    def sumOfSquares(self, nums: List[int]) -> int:
<fim_suffix>
<fim_middle>
class Solution:
    def sumOfSquares(self, nums: List[int]) -> int:
        n = len(nums)
        return sum(x * x for i, x in enumerate(nums, 1) if n % i == 0)
<end>

<start>
You are given a 0-indexed integer array nums and an integer k.
A subarray is called equal if all of its elements are equal. Not