In [None]:
!pip install -q pandas numpy datasets transformers torch tqdm


In [6]:
import os
import json
import re
from typing import List, Dict, Tuple, Optional, Union
import pandas as pd
import numpy as np
from datasets import load_dataset, Dataset, DatasetDict
from transformers import AutoTokenizer
import torch
from tqdm.auto import tqdm

In [23]:
def load_and_filter_the_stack(limit: int = 100000) -> Dataset:
    """Загрузка и фильтрация данных из датасета The Stack"""
    dataset = load_dataset("bigcode/the-stack", split="train", streaming=True)
    
    filtered_data = []
    for sample in tqdm(dataset.take(limit * 2)):
        # Фильтруем только Python код с подходящей длиной
        if sample['lang'] == 'Python' and len(sample['text']) > 100 and len(sample['text']) < 8192:
            filtered_data.append({
                'code': sample['text'],
                'language': sample['lang'],
                'repo_name': sample['repo_name']
            })
            if len(filtered_data) >= limit:
                break
    
    return Dataset.from_pandas(pd.DataFrame(filtered_data))

In [8]:
def load_code_alpaca() -> Dataset:
    """Загрузка датасета CodeAlpaca"""
    return load_dataset("sahil2801/CodeAlpaca-20k")["train"]

In [9]:
def load_code_search_net() -> Dataset:
    """Загрузка датасета CodeSearchNet"""
    return load_dataset("code_search_net", "python")["train"]

In [11]:
def extract_function_docstring_pairs(dataset: Dataset) -> List[Dict]:
    """Извлечение пар функция-документация из кода
    
    Ищет в коде функции с документацией и извлекает:
    - Имя функции
    - Аргументы
    - Документацию (docstring)
    - Тело функции
    """
    pairs = []
    
    for item in tqdm(dataset):
        if 'code' in item and isinstance(item['code'], str):
            code = item['code']
            
            # Поиск функций с документацией с помощью регулярного выражения
            func_match = re.search(r'def\s+(\w+)\s*\((.*?)\)(?:\s*->.*?)?:\s*(?:"""|\'\'\')(.*?)(?:"""|\'\'\')(.+?)(?=def|\Z)', 
                                code, re.DOTALL)
            
            if func_match:
                func_name = func_match.group(1)
                func_args = func_match.group(2)
                docstring = func_match.group(3).strip()
                func_body = func_match.group(4).strip()
                
                # Фильтрация по минимальной длине документации и тела функции
                if len(docstring) > 10 and len(func_body) > 20:
                    pairs.append({
                        'prompt': f"Write a function named {func_name} that {docstring}",
                        'code': f"def {func_name}({func_args}):\n{func_body}"
                    })
    
    return pairs

In [12]:
def check_contamination(text: str, benchmark_problems: List[Dict]) -> bool:
    """Проверка на наличие контаминации с тестовыми датасетами
    
    Проверяет, не содержит ли текст примеры из тестовых наборов данных
    """
    for problem in benchmark_problems:
        if problem['prompt'] in text or problem['canonical_solution'] in text:
            return True
    
    return False

In [13]:
def load_humaneval_problems() -> List[Dict]:
    """Загрузка задач из датасета HumanEval"""
    dataset = load_dataset("openai/openai_humaneval")["test"]
    problems = []
    
    for item in dataset:
        problems.append({
            'prompt': item['prompt'],
            'canonical_solution': item['canonical_solution'],
            'task_id': item['task_id']
        })
    
    return problems

In [14]:
def load_mbpp_problems() -> List[Dict]:
    """Загрузка задач из датасета MBPP"""
    dataset = load_dataset("Muennighoff/mbpp")["test"]
    problems = []
    
    for item in dataset:
        problems.append({
            'prompt': item['text'],
            'canonical_solution': item['code'],
            'task_id': item['task_id']
        })
    
    return problems

In [15]:
def prepare_training_data(pairs: List[Dict], benchmark_problems: List[Dict]) -> List[Dict]:
    """Подготовка тренировочных данных
    
    Удаляет примеры, которые могут содержать контаминацию с тестовыми датасетами
    """
    clean_pairs = []
    
    for pair in tqdm(pairs):
        if not check_contamination(pair['prompt'], benchmark_problems) and  not check_contamination(pair['code'], benchmark_problems):
            clean_pairs.append(pair)
    
    return clean_pairs

In [16]:
def convert_to_chatml_format(pairs: List[Dict]) -> List[Dict]:
    """Конвертация данных в формат ChatML
    
    Преобразует пары промпт-код в формат диалога для обучения модели
    """
    formatted_data = []
    
    for pair in pairs:
        formatted_data.append({
            "type": "chatml",
            "messages": [
                {
                    "role": "system",
                    "content": "You are a helpful coding assistant."
                },
                {
                    "role": "user",
                    "content": pair['prompt']
                },
                {
                    "role": "assistant",
                    "content": pair['code']
                }
            ],
            "source": "curated_dataset"
        })
    
    return formatted_data

In [17]:
def save_dataset(data: List[Dict], filepath: str) -> None:
    """Сохранение датасета в формате JSONL"""
    with open(filepath, 'w', encoding='utf-8') as f:
        for item in data:
            f.write(json.dumps(item) + '\n')

In [18]:
def prepare_complete_dataset(output_file: str = "training_data.jsonl", limit: int = 100000):
    """Полный пайплайн подготовки датасета
    
    1. Загрузка тестовых датасетов для проверки контаминации
    2. Загрузка и фильтрация данных из The Stack
    3. Загрузка данных из CodeAlpaca
    4. Извлечение пар функция-документация
    5. Удаление примеров с контаминацией
    6. Конвертация в формат ChatML
    7. Сохранение результатов
    """
    print("Загрузка тестовых датасетов для проверки контаминации...")
    humaneval_problems = load_humaneval_problems()
    mbpp_problems = load_mbpp_problems()
    benchmark_problems = humaneval_problems + mbpp_problems
    
    print("Загрузка датасета The Stack...")
    stack_dataset = load_and_filter_the_stack(limit=limit // 2)
    
    print("Загрузка датасета CodeAlpaca...")
    code_alpaca = load_code_alpaca()
    
    print("Извлечение пар функция-документация...")
    stack_pairs = extract_function_docstring_pairs(stack_dataset)
    
    print("Подготовка данных из CodeAlpaca...")
    code_alpaca_pairs = [{'prompt': item['instruction'], 'code': item['output']} 
                          for item in code_alpaca if len(item['instruction']) > 0 and len(item['output']) > 0]
    
    print("Удаление контаминации...")
    all_pairs = stack_pairs + code_alpaca_pairs
    clean_pairs = prepare_training_data(all_pairs, benchmark_problems)
    
    print("Конвертация в формат ChatML...")
    formatted_data = convert_to_chatml_format(clean_pairs)
    
    print(f"Сохранение {len(formatted_data)} примеров в {output_file}...")
    save_dataset(formatted_data, output_file)
    
    print("Подготовка датасета завершена!")
    return formatted_data

In [24]:
prepare_complete_dataset()

Загрузка тестовых датасетов для проверки контаминации...
Загрузка датасета The Stack...


100000it [04:00, 415.26it/s]


Загрузка датасета CodeAlpaca...
Извлечение пар функция-документация...


0it [00:00, ?it/s]

Подготовка данных из CodeAlpaca...





Удаление контаминации...


100%|██████████| 20016/20016 [00:05<00:00, 3550.85it/s]


Конвертация в формат ChatML...
Сохранение 20009 примеров в training_data.jsonl...
Подготовка датасета завершена!


[{'type': 'chatml',
  'messages': [{'role': 'system',
    'content': 'You are a helpful coding assistant.'},
   {'role': 'user',
    'content': 'Create an array of length 5 which contains all even numbers between 1 and 10.'},
   {'role': 'assistant', 'content': 'arr = [2, 4, 6, 8, 10]'}],
  'source': 'curated_dataset'},
 {'type': 'chatml',
  'messages': [{'role': 'system',
    'content': 'You are a helpful coding assistant.'},
   {'role': 'user',
    'content': 'Formulate an equation to calculate the height of a triangle given the angle, side lengths and opposite side length.'},
   {'role': 'assistant',
    'content': 'Height of triangle = opposite side length * sin (angle) / side length'}],
  'source': 'curated_dataset'},
 {'type': 'chatml',
  'messages': [{'role': 'system',
    'content': 'You are a helpful coding assistant.'},
   {'role': 'user',
    'content': 'Write a replace method for a string class which replaces the given string with a given set of characters.'},
   {'role': '