## 1.数据抓取

In [1]:
import os
import git
from pathlib import Path

def clone_repos(repo_list, target_dir):
    for repo_url in repo_list:
        repo_name = repo_url.split('/')[-1].replace('.git', '')
        repo_path = Path(target_dir) / repo_name
        if not repo_path.exists():
            git.Repo.clone_from(repo_url, repo_path)
        else:
            repo = git.Repo(repo_path)
            repo.remotes.origin.pull()

# 使用示例
repo_list = ['https://github.com/LGRY/self-llm.git']
clone_repos(repo_list, './raw_data')

## 2.数据清洗

In [None]:
import ast
import astroid
from typing import List

def clean_python_code(code: str) -> str:
    # 移除注释
    tree = ast.parse(code)
    for node in ast.walk(tree):
        if isinstance(node, ast.Expr) and isinstance(node.value, ast.Str):
            node.value.s = ""

    # 移除空行
    cleaned_code = ast.unparse(tree)
    cleaned_code = "\n".join([line for line in cleaned_code.split("\n") if line.strip()])

    return cleaned_code

def remove_sensitive_info(code: str, sensitive_patterns: List[str]) -> str:
    # 从给定的代码字符串中移除敏感信息
    # 遍历敏感信息模式列表
    for pattern in sensitive_patterns:
        # 使用空字符串替换代码中的敏感信息模式
        code = code.replace(pattern, "[REDACTED]")
    # 返回处理后的代码字符串
    return code

# 使用示例
raw_code = """
# This is a comment
def hello_world():
    print("Hello, World!")  # Another comment

API_KEY = "very_secret_key"
"""

sensitive_patterns = ["very_secret_key"]
cleaned_code = clean_python_code(raw_code)
safe_code = remove_sensitive_info(cleaned_code, sensitive_patterns)
print(safe_code)

## 3. 数据标准化

### 3.1 代码格式化
使用工具如black（Python）或prettier（JavaScript）来标准化代码格式：

In [None]:
import black

def format_python_code(code: str) -> str:
    return black.format_str(code, mode=black.FileMode())

# 使用示例
formatted_code = format_python_code(safe_code)
print(formatted_code)

### 3.2 命名规范化
使用正则表达式统一命名风格

In [None]:
import re
def standardize_naming(code: str, style: str = 'snake_case') -> str:
    """
    将给定的代码字符串标准化为指定的命名风格。

    参数:
        code (str): 需要标准化的代码字符串。
        style (str, 可选): 目标命名风格，默认为'snake_case'。
                           可选值: 'snake_case', 'camelCase'。

    返回:
        str: 标准化后的代码字符串。
    """
    if style == 'snake_case':
        # 定义匹配模式，用于查找小写字母或数字后跟大写字母的情况
        pattern = r'([a-z0-9])([A-Z])'
        # 定义替换模式，在匹配的两个字符之间添加下划线
        replacement = r'\1_\2'
    elif style == 'camelCase':
        def camel_case(match):
            """
            将匹配到的下划线和其后的字符转换为驼峰命名法。

            参数:
                match (re.Match): 正则表达式匹配对象。

            返回:
                str: 转换后的字符串。
            """
            # 返回第一个匹配组（下划线之前的字符）和第二个匹配组（下划线之后的字符）的大写形式
            return match.group(1) + match.group(2).upper()
        # 定义匹配模式，用于查找下划线后跟字母的情况
        pattern = r'(_)([a-zA-Z])'
        # 使用自定义的camel_case函数作为替换模式
        replacement = camel_case

    # 使用正则表达式替换函数将代码字符串中的匹配模式替换为指定的替换模式
    return re.sub(pattern, replacement, code)

# 使用示例
# 假设formatted_code是一个已经格式化的代码字符串
standardized_code = standardize_naming(formatted_code, 'snake_case')
print(standardized_code)

## 4. 知识图谱构建
### 4.1 实体提取
使用AST（抽象语法树）分析代码结构，提取关键实体

In [None]:
import ast

def extract_entities(code: str):
    tree = ast.parse(code)
    entities = {
        'functions': [],
        'classes': [],
        'imports': []
    }

    for node in ast.walk(tree):
        if isinstance(node, ast.FunctionDef):
            entities['functions'].append(node.name)
        elif isinstance(node, ast.ClassDef):
            entities['classes'].append(node.name)
        elif isinstance(node, ast.Import):
            entities['imports'].extend(alias.name for alias in node.names)

    return entities

# 使用示例
entities = extract_entities(standardized_code)
print(entities)

### 4.2 关系建模
使用NetworkX库构建和可视化知识图谱

In [None]:
import networkx as nx
import matplotlib.pyplot as plt

def build_knowledge_graph(entities):
    """
    根据提取的实体构建知识图谱。

    参数:
        entities (dict): 包含提取的实体的字典，格式为:
              {
                  'functions': [函数名列表],
                  'classes': [类名列表],
                  'imports': [导入的模块名列表]
              }

    返回:
        networkx.Graph: 构建的知识图谱。
    """
    # 初始化一个无向图
    G = nx.Graph()

    # 遍历实体字典中的每个实体类型和对应的实体列表
    for entity_type, items in entities.items():
        # 遍历实体列表中的每个实体
        for item in items:
            # 将实体添加为图的节点，并标记其类型
            G.add_node(item, type=entity_type)

    # 添加关系（这里简化处理，实际应根据代码分析确定关系）
    for func in entities['functions']:
        for cls in entities['classes']:
            # 在函数和类之间添加一条边，表示函数属于类
            G.add_edge(func, cls, relation="belongs_to")

    # 返回构建的知识图谱
    return G

def visualize_graph(G):
    """
    使用matplotlib可视化知识图谱。

    参数:
        G (networkx.Graph): 需要可视化的知识图谱。
    """
    # 使用Spring布局算法计算节点的位置
    pos = nx.spring_layout(G)
    # 创建一个新的图形，设置图形的大小
    plt.figure(figsize=(12, 8))
    # 绘制知识图谱，设置节点的标签、颜色、大小、字体大小和字体粗细
    nx.draw(G, pos, with_labels=True, node_color='lightblue', node_size=500, font_size=8, font_weight='bold')
    # 获取图中边的关系标签
    edge_labels = nx.get_edge_attributes(G, 'relation')
    # 绘制边的关系标签
    nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels)
    # 设置图形的标题
    plt.title("Code Knowledge Graph")
    # 关闭坐标轴
    plt.axis('off')
    # 自动调整子图参数，使之填充整个图像区域
    plt.tight_layout()
    # 显示图形
    plt.show()

# 使用示例
G = build_knowledge_graph(entities)
visualize_graph(G)

## 5. RAG系统实现
### 5.1 文本嵌入
使用Sentence Transformers生成文本嵌入

In [None]:
from sentence_transformers import SentenceTransformer

def generate_embeddings(texts):
    model = SentenceTransformer('all-MiniLM-L6-v2')
    embeddings = model.encode(texts)
    return embeddings

# 使用示例
code_snippets = [standardized_code]  # 实际应用中这里会是多段代码
embeddings = generate_embeddings(code_snippets)

### 5.2 向量索引
使用FAISS构建向量索引

In [8]:
import faiss
import numpy as np

def build_faiss_index(embeddings):
    dimension = embeddings.shape[1]
    index = faiss.IndexFlatL2(dimension)
    index.add(embeddings)
    return index

# 使用示例
index = build_faiss_index(np.array(embeddings))

### 5.3 检索实现

In [9]:
def retrieve_similar_codes(query, index, embeddings, k=5):
    query_embedding = generate_embeddings([query])[0]
    distances, indices = index.search(np.array([query_embedding]), k)
    return [(distances[0][i], embeddings[indices[0][i]]) for i in range(k)]

# 使用示例
query = "How to implement a binary search tree?"
similar_codes = retrieve_similar_codes(query, index, embeddings)

## 6. 代码生成模型训练
使用Hugging Face的Transformers库微调代码生成模型

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM, TrainingArguments, Trainer
import torch

def fine_tune_code_model(train_data, model_name="microsoft/CodeGPT-small-py"):
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(model_name)

    def tokenize_function(examples):
        return tokenizer(examples["code"], truncation=True, padding="max_length", max_length=512)

    tokenized_data = train_data.map(tokenize_function, batched=True)

    training_args = TrainingArguments(
        output_dir="./results",
        num_train_epochs=3,
        per_device_train_batch_size=4,
        warmup_steps=500,
        weight_decay=0.01,
        logging_dir='./logs',
    )

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_data,
    )

    trainer.train()
    return model, tokenizer

# 使用示例（需要准备训练数据）
fine_tuned_model, tokenizer = fine_tune_code_model(train_data)

## 7. 工程化实现
### 7.1 API设计
使用FastAPI构建API

In [11]:
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class CodeQuery(BaseModel):
    query: str

@app.post("/generate_code/")
async def generate_code(query: CodeQuery):
    # 1. 检索相关代码
    similar_codes = retrieve_similar_codes(query.query, index, embeddings)

    # 2. 使用微调后的模型生成代码
    # （这里假设我们已经有了fine_tuned_model和tokenizer）
    input_text = f"Query: {query.query}\nSimilar code: {similar_codes[0][1]}\nGenerate:"
    input_ids = tokenizer.encode(input_text, return_tensors="pt")
    output = fine_tuned_model.generate(input_ids, max_length=200, num_return_sequences=1)
    generated_code = tokenizer.decode(output[0], skip_special_tokens=True)

    return {"generated_code": generated_code}

# 运行服务器
# uvicorn main:app --reload