# Подготовка датасета - 2 часть, генерация по файлам

### Импорты и сетап окружения

In [1]:
import datasets
import os
import pandas as pd
import subprocess
import re
from tqdm.notebook import tqdm
from pathlib import Path
import asyncio
import aiopath
import sys
import time

ROOT_DIR = str(aiopath.AsyncPath.cwd().parent)
os.chdir(ROOT_DIR)
DATA_DIR = ROOT_DIR+'/data'
REPOS_DIR = DATA_DIR+'/repos'
if ROOT_DIR not in sys.path:
    sys.path.append(ROOT_DIR)

from evaluating.generator import make_model_with_tokenizer, generate_completions
from scoring.scoring import Scorer

gvm_root = os.environ['GVM_ROOT']
os.environ['PATH'] = f"{gvm_root}/bin:{gvm_root}/pkgsets/go1.24.2/global/bin:{gvm_root}/gos/go1.24.2/bin:{gvm_root}/pkgsets/go1.24.2/global/overlay/bin:{os.environ['PATH']}"

В части 1 [dataset_0](./dataset_0.ipynb) подготовлен датасет test_candidates_ds с отфильтрованными файлами

In [2]:
test_candidates_ds = datasets.load_from_disk(DATA_DIR+'/test_candidates_ds')

### Записываем промпты в датасет

In [3]:
system_message = """
You are an expert programmer. 
You should only return output test file containing working code.
The user is going to give you code and would like to have unit tests for the first file.
All the other files are just dependencies to give you context of all the possible test cases to produce.
Cover all possible inputs and their respective outputs using tests.
Each subtest must be wrapped into t.Run
"""

def get_prompt(row) -> str:
    file_path = '../data/repos/'+row['project_path']+row['relative_project_path']
    f = open(file_path, 'r')
    file_content = f.read()
    f.close()

    prompt = [
        {"role": "system", "content": system_message},
        {"role": "user", "content": f"{file_content}"}
    ]

    return prompt

def finalize_row(row) -> dict:
    return {
        'project_path': row['project_path'],
        'relative_package_path': row['relative_go_package'],
        'relative_file_path': row['relative_project_path'],
        'prompt': get_prompt(row)
    }

by_file_ds = test_candidates_ds.map(finalize_row, num_proc=32).select_columns(['project_path', 'relative_package_path', 'relative_file_path', 'prompt'])

by_file_ds.save_to_disk('../data/by_file_ds')

print(by_file_ds)
by_file_ds[0]

Saving the dataset (0/1 shards):   0%|          | 0/33251 [00:00<?, ? examples/s]

Dataset({
    features: ['project_path', 'relative_package_path', 'relative_file_path', 'prompt'],
    num_rows: 33251
})


{'project_path': '766dc882d779f07821bde740ce49802f67ae42b3/backend/',
 'relative_package_path': 'controllers/',
 'relative_file_path': 'controllers/controllers.go',
 'prompt': [{'content': '\nYou are an expert programmer. \nYou should only return output test file containing working code.\nThe user is going to give you code and would like to have unit tests for the first file.\nAll the other files are just dependencies to give you context of all the possible test cases to produce.\nCover all possible inputs and their respective outputs using tests.\nEach subtest must be wrapped into t.Run\n',
   'role': 'system'},
  {'content': 'package controllers\n\nimport (\n\t"encoding/json"\n\t"fmt"\n\techo "github.com/labstack/echo/v4"\n\t"golang.org/x/net/websocket"\n\t"stream/models"\n)\n\n// Controller interface has two methods\ntype Controller interface {\n\t// Homecontroller renders initial home page\n\tHomeController(e echo.Context) error\n\n\t// StreamController responds with live cpu statu

### Генерируем completions исходной моделью `deepseek-ai/deepseek-coder-1.3b-instruct`

In [4]:
by_file_ds = datasets.load_from_disk(DATA_DIR+'/by_file_ds')

In [5]:
by_file_test_ds = by_file_ds.shuffle().take(500)

by_file_test_ds.save_to_disk('../data/by_file_test_ds')

print(by_file_test_ds)
by_file_test_ds[0]

Saving the dataset (0/1 shards):   0%|          | 0/500 [00:00<?, ? examples/s]

Dataset({
    features: ['project_path', 'relative_package_path', 'relative_file_path', 'prompt'],
    num_rows: 500
})


{'project_path': '8844bcf1e44c80eb6c94e96c1c0466177f3bba94/',
 'relative_package_path': 'packages/testutil/testchain/',
 'relative_file_path': 'packages/testutil/testchain/mock_nodeconn.go',
 'prompt': [{'content': '\nYou are an expert programmer. \nYou should only return output test file containing working code.\nThe user is going to give you code and would like to have unit tests for the first file.\nAll the other files are just dependencies to give you context of all the possible test cases to produce.\nCover all possible inputs and their respective outputs using tests.\nEach subtest must be wrapped into t.Run\n',
   'role': 'system'},
  {'content': 'package testchain\n\nimport (\n\t"github.com/iotaledger/goshimmer/packages/ledgerstate"\n)\n\ntype MockedNodeConn struct {\n\tid                              string\n\tonPullBacklog                   func(addr *ledgerstate.AliasAddress)\n\tonPullState                     func(addr *ledgerstate.AliasAddress)\n\tonPullConfirmedTransaction

In [6]:
by_file_test_ds = datasets.load_from_disk(DATA_DIR+'/by_file_test_ds')

In [7]:
tokenizer, model = make_model_with_tokenizer('original')

def generate_completion_for_row(row: dict) -> dict:
    start = time.time()
    completion = generate_completions(tokenizer, model, [row['prompt']])[0]
    return {
        'generate_time': time.time()-start,
        'completion': completion,
    }

by_file_test_with_completions_ds = by_file_test_ds.map(generate_completion_for_row, num_proc=1)

by_file_test_with_completions_ds.save_to_disk(DATA_DIR+'/by_file_test_with_completions_ds')
del model, tokenizer

print(by_file_test_with_completions_ds)
by_file_test_with_completions_ds[0]

2025-05-19 03:26:59.243822: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1747614419.373649    7231 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1747614419.411928    7231 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1747614419.701020    7231 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1747614419.701056    7231 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking the same target more than once.
W0000 00:00:1747614419.701060    7231 computation_placer.cc:177] computation placer alr

Map:   0%|          | 0/500 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/500 [00:00<?, ? examples/s]

Dataset({
    features: ['project_path', 'relative_package_path', 'relative_file_path', 'prompt', 'generate_time', 'completion'],
    num_rows: 500
})


{'project_path': '8844bcf1e44c80eb6c94e96c1c0466177f3bba94/',
 'relative_package_path': 'packages/testutil/testchain/',
 'relative_file_path': 'packages/testutil/testchain/mock_nodeconn.go',
 'prompt': [{'content': '\nYou are an expert programmer. \nYou should only return output test file containing working code.\nThe user is going to give you code and would like to have unit tests for the first file.\nAll the other files are just dependencies to give you context of all the possible test cases to produce.\nCover all possible inputs and their respective outputs using tests.\nEach subtest must be wrapped into t.Run\n',
   'role': 'system'},
  {'content': 'package testchain\n\nimport (\n\t"github.com/iotaledger/goshimmer/packages/ledgerstate"\n)\n\ntype MockedNodeConn struct {\n\tid                              string\n\tonPullBacklog                   func(addr *ledgerstate.AliasAddress)\n\tonPullState                     func(addr *ledgerstate.AliasAddress)\n\tonPullConfirmedTransaction

### Скорим результат

In [3]:
by_file_test_with_completions_ds = datasets.load_from_disk(DATA_DIR+'/by_file_test_with_completions_ds')

In [4]:
async def score_row(row: dict) -> dict:
    start = time.time()
    scorer = Scorer(row['project_path'], row['relative_package_path'], None, relative_file_path=row['relative_file_path'])

    evaluation_result = await scorer.score(row['completion'])

    reward = scorer.calculate_reward(evaluation_result)

    return {
        'score_time': time.time()-start,
        'result': evaluation_result,
        'reward': reward,
    }

results = {}
for i, row in tqdm(enumerate(by_file_test_with_completions_ds), total=len(by_file_test_with_completions_ds)):
    results[(row['project_path'], row['relative_package_path'], row['relative_file_path'])] = await score_row(row)

by_file_test_scored_ds = by_file_test_with_completions_ds.map(lambda row: results[(row['project_path'], row['relative_package_path'], row['relative_file_path'])], num_proc=1)

by_file_test_scored_ds.save_to_disk(DATA_DIR+'/by_file_test_scored_ds')

print(by_file_test_scored_ds)
by_file_test_scored_ds[0]               

  0%|          | 0/500 [00:00<?, ?it/s]

Map:   0%|          | 0/500 [00:00<?, ? examples/s]

Saving the dataset (0/1 shards):   0%|          | 0/500 [00:00<?, ? examples/s]

Dataset({
    features: ['project_path', 'relative_package_path', 'relative_file_path', 'prompt', 'generate_time', 'completion', 'score_time', 'result', 'reward'],
    num_rows: 500
})


{'project_path': '8844bcf1e44c80eb6c94e96c1c0466177f3bba94/',
 'relative_package_path': 'packages/testutil/testchain/',
 'relative_file_path': 'packages/testutil/testchain/mock_nodeconn.go',
 'prompt': [{'content': '\nYou are an expert programmer. \nYou should only return output test file containing working code.\nThe user is going to give you code and would like to have unit tests for the first file.\nAll the other files are just dependencies to give you context of all the possible test cases to produce.\nCover all possible inputs and their respective outputs using tests.\nEach subtest must be wrapped into t.Run\n',
   'role': 'system'},
  {'content': 'package testchain\n\nimport (\n\t"github.com/iotaledger/goshimmer/packages/ledgerstate"\n)\n\ntype MockedNodeConn struct {\n\tid                              string\n\tonPullBacklog                   func(addr *ledgerstate.AliasAddress)\n\tonPullState                     func(addr *ledgerstate.AliasAddress)\n\tonPullConfirmedTransaction

### Анализ результатов

In [5]:
df = datasets.load_from_disk(DATA_DIR+'/by_file_test_scored_ds').to_pandas()

In [6]:
df['coverage'] = df['result'].apply(lambda x: float(x['coverage']))
df['mutation_score'] = df['result'].apply(lambda x: float(x['mutation_score']))
df['error_type'] = df['result'].apply(lambda x: str(x['error_type']))
df['all_passed'] = df['result'].apply(lambda x: int(x['test_results']['all_passed']))
df['is_error'] = df['error_type'] != ''

In [7]:
print('error count', len(df.query('is_error')))

df.query('is_error')['error_type'].value_counts()

error count 370


error_type
test_build_failed    170
completion_parse     133
go_tool_cover         34
other                 21
get_deps               7
goimports              5
Name: count, dtype: int64

In [9]:
print('reward', df['reward'].mean())
print('coverage', df.query('is_error == False')['coverage'].mean())
print('mutation_score', df.query('all_passed == 1')['mutation_score'].mean())
print('all_passed_count', len(df.query('all_passed == 1')['mutation_score']))

reward 0.17719959840000002
coverage 21.313846153846157
mutation_score 0.29743661666666665
all_passed_count 60
