In [1]:
import ast
import os
import json
import tokenize
from io import BytesIO
from tqdm import tqdm

In [2]:
%load_ext autoreload
%autoreload 2

## Source code extraction

In [25]:
def extract_comments(source_code):
    comments = []
    tokens = tokenize.tokenize(BytesIO(source_code.encode("utf-8")).readline)
    for toknum, tokval, _, _, _ in tokens:
        if toknum == tokenize.COMMENT:
            comments.append(tokval.strip("# ").strip())
    return comments

def extract_docstrings_and_defs(file_path):
    with open(file_path, "r", encoding="utf-8") as f:
        source = f.read()

    tree = ast.parse(source)
    results = []
    module_docstring = ast.get_docstring(tree)
    for node in ast.walk(tree):
        if isinstance(node, (ast.FunctionDef, ast.ClassDef)):
            name = node.name
            docstring = ast.get_docstring(node)
            node_type = "function" if isinstance(node, ast.FunctionDef) else "class"
            source_lines = source.splitlines()
            start_line = node.lineno - 1  # ast 行号从1开始，列表索引从0开始
            end_line = node.end_lineno if hasattr(node, 'end_lineno') else start_line
            source_code = '\n'.join(source_lines[start_line:end_line])
            results.append({
                "type": node_type,
                "name": name,
                "docstring": docstring or "",
                "source_code": source_code,
                "file_docstring": module_docstring
            })

    comments = extract_comments(source)
    return results, comments

def generate_qa_from_entry(entry):
    name = entry["name"]
    doc = entry["docstring"]
    if not doc:
        return None

    # question = f"What does the {entry['type']} `{name}` do?"
    # answer = doc.strip()
    source_code = entry.get("source_code", "")
    file_docstring = entry.get("file_docstring", "")

    return {
        "name": name,
        "docstring": doc.strip(),
        "file_docstring": file_docstring,
        "source": "source_code",
        "type": entry["type"],
        "code": source_code
    }

def process_directory(dir_path):
    qa_pairs = []
    for root, _, files in tqdm(os.walk(dir_path)):
        for file in tqdm(files):
            if file.endswith(".py"):
                full_path = os.path.join(root, file)
                try:
                    entries, comments = extract_docstrings_and_defs(full_path)
                    for entry in entries:
                        qa = generate_qa_from_entry(entry)
                        if qa:
                            qa["file"] = full_path
                            qa_pairs.append(qa)
                except Exception as e:
                    print(f"Failed to parse {full_path}: {e}")
    return qa_pairs

In [None]:
directory = "/home/cc/transformers/src/transformers"
qa_data = process_directory(directory)

# 保存结果为 JSONL 文件
with open("source_code_qa.jsonl", "w", encoding="utf-8") as f:
    for qa in qa_data:
        f.write(json.dumps(qa, indent=4, ensure_ascii=False) + "\n")

print(f"Extracted {len(qa_data)} QA pairs.")

## Git commit extraction

In [22]:
from git import Repo
import os
import json

In [33]:
# 仓库路径：替换为你本地 transformers 的路径
REPO_PATH = "/home/cc/transformers"
repo = Repo(REPO_PATH)

output = []

# 遍历最近 N 个 commit（可调整）
for commit in repo.iter_commits('main', max_count=100):
    commit_data = {
        "commit_hash": commit.hexsha,
        "author": commit.author.name,
        "date": commit.committed_datetime.isoformat(),
        "message": commit.message.strip()
    }

    # 获取 diff 的简要变化（可设置为 full_diff=True 看更多上下文）
    diffs = commit.diff(commit.parents[0] if commit.parents else None, create_patch=True)

    diff_texts = []
    for diff in diffs:
        try:
            diff_texts.append(diff.diff.decode("utf-8", errors="ignore"))
        except Exception as e:
            continue

    diff_summary = "\n".join(diff_texts)
    commit_data["diff_summary"] = diff_summary

    # 构造 QA 对
    # qa_item = {
    #     "question": f"What changed in commit {commit.hexsha[:7]}?",
    #     "answer": f"{commit.message.strip()}\n\nSummary of changes:\n{diff_summary[:1000]}...",
    #     "source": "git_commit",
    #     "metadata": commit_data
    # }
    qa_item = {
        "question": f"What changed in commit {commit.hexsha[:7]}?",
        "answer": f"{commit.message.strip()}",
        "source": "git_commit",
        "metadata": commit_data
    }

    output.append(qa_item)

# 保存为 JSONL
with open("qa_from_commits.jsonl", "w") as f:
    for item in output:
        f.write(json.dumps(item, indent=4, ensure_ascii=False) + "\n")

## Github issue extraction

In [1]:
from github import Github
g_token = input("Enter your github token: ")

In [13]:
g = Github(g_token)  # 用你的 GitHub Token

repo = g.get_repo("huggingface/transformers")
issues = repo.get_issues(state="closed")  # 可加过滤条件
# issues[0].__dict__
# real_issues = [i for i in all_issues if not i.pull_request]
for issue in issues[:2]:
    print("Title:", issue.title)
    print("Body:", issue.body)

Title: Check fork
Body: # What does this PR do?

<!--
Congratulations! You've made it this far! You're not quite done yet though.

Once merged, your PR is going to appear in the release notes with the title you set, so make sure it's a great title that fully reflects the extent of your awesome contribution.

Then, please replace this with a description of the change and which issue is fixed (if applicable). Please also include relevant motivation and context. List any dependencies (if any) that are required for this change.

Once you're done, someone will review your PR shortly (see the section "Who can review?" below to tag some potential reviewers). They may suggest changes to make the code even better. If no one reviewed your PR after a week has passed, don't hesitate to post a new comment @-mentioning the same persons---sometimes notifications get lost.
-->

<!-- Remove if not applicable -->

Fixes # (issue)


## Before submitting
- [ ] This PR fixes a typo or improves the docs (yo

In [23]:
count = 0
qa_pairs = []
for issue in issues:
    if issue.pull_request:
        continue
    # print("********************Issue********************")
    # print("Title:", issue.title)
    # print("Body:", issue.body)
    comments = issue.get_comments()
    # print("********************Comments********************")
    comments_list = []
    for comment in comments:
        comments_list.append(
            {
                "comment_author": comment.user.login,
                "comment_body": comment.body
            }
        )
        # print("Comment by", comment.user.login)
        # print(comment.body)
    qa_pair = {
        "Issue Title": issue.title.strip(),
        "Issue Body": issue.body.strip(),
        "Issue Comments": comments_list,
        "source": "github_issue",
        "metadata": {
            "issue_number": issue.number,
            "url": issue.html_url,
            "created_at": str(issue.created_at)
        }
    }
    qa_pairs.append(qa_pair)
    count += 1
    if count > 10:
        break


with open("qa_from_issues.jsonl", "w") as f:
    for qa in qa_pairs:
        f.write(json.dumps(qa, indent=4, ensure_ascii=False) + "\n")
