Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ dependencies = [
"lxml>=4.9.0",
"markdownify>=0.11.0",
"tqdm>=4.66.0",
"toml>=0.10.2",
"jsonlines>=4.0.0",
]

[project.optional-dependencies]
Expand Down
236 changes: 236 additions & 0 deletions python/src/scripts/cairocoder_trace_extraction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
#!/usr/bin/env python3

import argparse
import json
import os
import re
import sys
from collections.abc import Iterable, Iterator
from typing import Optional

import jsonlines


def _read_records_jsonl(path: str) -> Iterator[dict]:
"""Read objects using jsonlines, skipping invalid entries."""
with jsonlines.open(os.path.expanduser(path), mode="r") as reader:
for obj in reader.iter(skip_invalid=True):
if isinstance(obj, dict):
yield obj


def _read_records_json_stream(path: str) -> Iterator[dict]:
"""Fallback for files with concatenated pretty-printed JSON objects."""
p = os.path.expanduser(path)
with open(p, encoding="utf-8") as f:
data = f.read()
decoder = json.JSONDecoder()
idx = 0
n = len(data)
while True:
while idx < n and data[idx].isspace():
idx += 1
if idx >= n:
break
obj, end = decoder.raw_decode(data, idx)
if isinstance(obj, dict):
yield obj
idx = end


ANSWER_RE_SQ = re.compile(r"answer\s*=\s*'((?:\\'|[^'])*)'")
ANSWER_RE_DQ = re.compile(r'answer\s*=\s*"((?:\\"|[^"])*)"')
HAS_REASONING_RE = re.compile(r"reasoning=")


def extract_answer_fragment(s: str) -> Optional[str]:
"""Extract the quoted answer=... string from the Prediction-like output.

Returns the unescaped string content if found, otherwise None.
"""
m = ANSWER_RE_SQ.search(s)
if m:
raw = "'" + m.group(1) + "'" # re-wrap for literal_eval
else:
m = ANSWER_RE_DQ.search(s)
if not m:
return None
raw = '"' + m.group(1) + '"'

try:
import ast

return ast.literal_eval(raw)
except Exception:
# Fallback: interpret common escapes
try:
return raw[1:-1].encode("utf-8").decode("unicode_escape")
except Exception:
return raw[1:-1]


def is_single_output(outputs: object) -> tuple[bool, Optional[str]]:
"""Check that outputs is a dict with a single key 'output' string.

Returns (ok, output_string_or_None).
"""
if not isinstance(outputs, dict):
return False, None
if set(outputs.keys()) != {"output"}:
return False, None
val = outputs.get("output")
if not isinstance(val, str):
return False, None
return True, val


def main() -> None:
parser = argparse.ArgumentParser(
description=(
"Extract QA pairs from cc-dataset.jsonl. "
"Use --only-mcp or --only-generated-answers to filter."
)
)
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument(
"--only-mcp",
action="store_true",
help=(
"Extract traces with a single output whose string looks like "
"'Prediction(\n answer=...)' and does NOT contain 'reasoning='"
),
)
group.add_argument(
"--only-generated-answers",
action="store_true",
help=(
"Extract traces with a single output that DOES contain 'reasoning='"
),
)
parser.add_argument(
"--input",
default="cc-dataset.jsonl",
help="Path to input JSONL file (default: cc-dataset.jsonl)",
)
parser.add_argument(
"--output",
default="qa_pairs_cairo_coder.json",
help="Path to output JSON file (default: qa_pairs_cairo_coder.json)",
)

args = parser.parse_args()

input_path = args.input
if not os.path.exists(input_path):
sys.stderr.write(f"Input file not found: {input_path}\n")
sys.exit(1)

results: list[dict] = []
total = 0
matched = 0
skipped = 0

# First try strict JSONL via jsonlines; if that fails, fall back to JSON stream.
try:
iterator: Iterable[dict] = _read_records_jsonl(input_path)
had_any = False
for rec in iterator:
had_any = True
total += 1
ok, out_str = is_single_output(rec.get("outputs"))
if not ok or out_str is None:
skipped += 1
continue

has_reasoning = bool(HAS_REASONING_RE.search(out_str))
looks_like_prediction = out_str.startswith("Prediction(") and ("answer=" in out_str)

if args.only_mcp:
if not looks_like_prediction or has_reasoning:
continue
elif args.only_generated_answers and not has_reasoning:
continue

query = None
try:
inputs = rec.get("inputs")
if isinstance(inputs, dict):
q = inputs.get("query")
if isinstance(q, str):
query = q
except Exception:
query = None

if not query:
skipped += 1
continue

answer = extract_answer_fragment(out_str)
if not answer:
skipped += 1
continue

results.append({"query": query, "answer": answer})
matched += 1

if not had_any:
raise RuntimeError("jsonlines yielded no records; trying stream parser")
except Exception:
for rec in _read_records_json_stream(input_path):
total += 1
ok, out_str = is_single_output(rec.get("outputs"))
if not ok or out_str is None:
skipped += 1
continue

has_reasoning = bool(HAS_REASONING_RE.search(out_str))
looks_like_prediction = out_str.startswith("Prediction(") and ("answer=" in out_str)

if args.only_mcp:
if not looks_like_prediction or has_reasoning:
continue
elif args.only_generated_answers and not has_reasoning:
continue

query = None
try:
inputs = rec.get("inputs")
if isinstance(inputs, dict):
q = inputs.get("query")
if isinstance(q, str):
query = q
except Exception:
query = None

if not query:
skipped += 1
continue

answer = extract_answer_fragment(out_str)
if not answer:
skipped += 1
continue

results.append({"query": query, "answer": answer})
matched += 1

# Write output JSON array
with open(args.output, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False)

print(
json.dumps(
{
"input": input_path,
"output": args.output,
"total": total,
"matched": matched,
"skipped": skipped,
},
indent=2,
)
)


if __name__ == "__main__":
main()
64 changes: 64 additions & 0 deletions python/src/scripts/llm_dataset_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import json

import dspy
from dspy.adapters.baml_adapter import BAMLAdapter


class DatasetAnalyzer(dspy.Signature):
"""
You are provided a dataset of question-answer pairs.
This dataset is related to the Starknet blockchain and the Cairo programming language, and contains
mostly technical questions about code, infrastructure, and the overall Starknet ecosystem.
Your task is to analyze the dataset and provide valuable insights.
"""

dataset: list[dict] = dspy.InputField(
desc="The dataset of question-answer pairs."
)
languages: list[str] = dspy.OutputField(
desc="The list of all languages users have asked queries with."
)
topics: list[tuple[str, int]] = dspy.OutputField(
desc="""The list of all topics users have asked queries about. Try to group similar queries under the same topic. For each topic, provide the approximative percentage of queries that belong to that topic.
For example:
- "how to read from a byte array string? How to read a word from it?" would be -> "Corelib features questions"
- "convert a felt252 enoded string into a byterarray encoded string" would be -> "Corelib features questions"
- "how to run specific test function" -> "writing tests questions"
- "how do I get the current block time i.e block.timestamp in cairo smart contract" -> "APIs for interaction with the starknet state questions"
- "When im importing stuff from a file in my smart contract, what is the difference between super:: and crate:: ?" -> "Cairo language questions"
- "how to use the `assert!` macro in my smart contract" -> "Cairo language questions"
- "I am writing a function in my smart contract. I need to be sure the caller has enough balance or it reverts. how do I do this?" -> "Starknet smart contracts questions"
- "what does this error mean :\n```\n Account validation failed: \"StarknetError { code: KnownErrorCode(ValidateFailure), message: 'The 'validate' entry point panicked with: nError in contract (contract address: 0x0762c126b2655bc371c1075e2914edd42ba40fc2c485b5e8772f05c7e09fec26, class hash: 0x036078334509b514626504edc9fb252328d1a240e4e948bef8d0c08dff45927f, selector: 0x0289da278a8dc833409cabfdad1581e8e7d40e42dcaed693fa4008dcdb4963b3): n0x617267656e742f696e76616c69642d7369676e61747572652d6c656e677468 ('argent invalid signature length'). n' }```" -> "Debugging errors questions"
- "How to declare and deploy a contract with constructor to sepolia or mainnet using starkli?" -> "Starknet network interactions questions"
"""
)
analysis: str = dspy.OutputField(
desc="""A global analysis of the dataset. This field is free-form and can contain all the insights you can gather from the dataset and think are valuable.
Focus on the following aspects to provide a well-rounded analysis that covers all data that could be relevant, including:
- Most common topics and the types of questions asked about them
- Are user's queries mostly answered properly? Does the dataset show that users double-down on answers that they feel are not satisfying?
- What are the most common instances of users not being able to get the answer they want?
- What's the overall quality of the answers?
"""
)

def main():
dspy.configure(lm=dspy.LM("openrouter/x-ai/grok-4-fast:free", max_tokens=30000, cache=False), adapter=BAMLAdapter())
with open("qa_pairs.json") as f:
dataset = json.load(f)
analyzer = dspy.ChainOfThought(DatasetAnalyzer)
response = analyzer(dataset=dataset)
response_dict = {
"languages": response.languages,
"topics": response.topics,
"analysis": response.analysis
}

with open("analysis.json", "w") as f:
json.dump(response_dict, f, indent=4)




if __name__ == "__main__":
main()
Loading