## Calculate code properties among 10% of original data and save

In [None]:
import os
import random
from datasets import Dataset
from utils.utils import get_code_style_score, get_code_modularity_score, read_jsonl_to_dict, write_dict_to_jsonl


def compute_code_score(example):
    code = example['code']
    try:
        score_style = get_code_style_score(code)
        score_modularity = get_code_modularity_score(code)
    except Exception:
        score_style = {
            'score_var': -1.0,
            'score_pep8': -1.0,
            'score_style': -1.0,
        }
        score_modularity = -1.0

    example['score_style'] = score_style
    example['score_modularity'] = score_modularity
    return example


def check_code_score(example):
    return example['score_style']['score_var'] >= 0 and example['score_style']['score_pep8'] >= 0 and example['score_modularity'] >= 0


dataset = read_jsonl_to_dict(os.path.join(os.getcwd(), 'data', 'my_code_contests_train.jsonl'))
demonstration = []

# aggregate demonstration code
# keys for dataset: dict_keys(['name', 'description', 'public_tests', 'private_tests', 'generated_tests', 'source', 'difficulty', 'solutions', 'incorrect_solutions', 'cf_contest_id', 'cf_index', 'cf_points', 'cf_rating', 'cf_tags', 'is_description_translated', 'untranslated_description', 'time_limit', 'memory_limit_bytes', 'input_file', 'output_file'])
# keys for solutions: dict_keys(['cc', 'modules', 'passed', 'solution'])
for data in dataset:
    for i in range(len(data['solutions']['solution'])):
        if data['solutions']['passed'][i]:
            demonstration.append(
                {
                    'description': data['description'],
                    'code': data['solutions']['solution'][i],
                    # more information?
                }
            )

# calculate code metrics
random.seed(42)
demonstration = random.sample(demonstration, len(demonstration) // 10) # 10% of total data
demonstration = Dataset.from_list(demonstration)
demonstration = demonstration.map(compute_code_score, num_proc=16)
demonstration = demonstration.filter(check_code_score, num_proc=16)

# save
# demonstration.save_to_disk(os.path.join(os.getcwd(), 'data', 'demonstration'))
write_dict_to_jsonl(list(demonstration), os.path.join(os.getcwd(), 'data', 'demonstration.jsonl'))

## Get 100 demonstrations of particular code property with evenness

In [None]:
import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from utils.utils import read_jsonl_to_dict, write_dict_to_jsonl, get_code_style_score, get_code_modularity_score, get_average_length_of_variables


random.seed(42) # for reproducibility
num_sample = 10 # number of samples to be sampled from each bin

# load demonstration pool
# each data consists of (problem description, code, style score, modularity score)
file_name = 'demonstration'
path = f'/home/kdy20401/Workspace/Proj-Code-Generation/MC/data/{file_name}.jsonl'
demonstration = read_jsonl_to_dict(path)
print(f'number of codes in demonstration pool: {len(demonstration)}')

code = []
style = [] # score_pep8
modularity = [] # score_modularity
var_len = []
for data in demonstration:
    code.append(data['code'])
    style.append(data['score_style']['score_pep8'])
    modularity.append(data['score_modularity'])
    var_len.append(get_average_length_of_variables(data['code']))

style_df = pd.DataFrame({'style': np.array(style)})
modularity_df = pd.DataFrame({'modularity': np.array(modularity)})
var_len_df = pd.DataFrame({'var_len': np.array(var_len)})

# bins: 0~0.1, 0.1~0.2, ..., 0.9~1.0
num_bin = 10
bins = np.linspace(0, 1, num_bin + 1)

# find the grid cell to which each data point belongs
# include_lowest=True makes 0 style or modularity value included in the first bin
# style_df['style_bin'] = pd.cut(style_df['style'], bins=bins, labels=False, include_lowest=True)
# modularity_df['modularity_bin'] = pd.cut(modularity_df['modularity'], bins=bins, labels=False, include_lowest=True)
var_len_df['var_len_bin'] = pd.cut(var_len_df['var_len'], bins=bins, labels=False, include_lowest=True)

# sample data points from each bin
# if the number of data points in the bin is less than num_sample, duplication can occur
# style_sampled_points = style_df.groupby(['style_bin']).apply(lambda x: x.sample(num_sample, replace=True if len(x) < num_sample else False))
# modularity_sampled_points = modularity_df.groupby(['modularity_bin']).apply(lambda x: x.sample(num_sample, replace=True if len(x) < num_sample else False))
var_len_sampled_points = var_len_df.groupby(['var_len_bin']).apply(lambda x: x.sample(num_sample, replace=True if len(x) < num_sample else False))

# style_sampled_points.index => (style_bin, code_index)
# (deduplicated) index of sampled data points 
# style_index = list(set([e[1] for e in style_sampled_points.index]))
# modularity_index = list(set([e[1] for e in modularity_sampled_points.index]))
var_len_index = list(set([e[1] for e in var_len_sampled_points.index]))

#  the number of samples is less than expected
# assert len(style_index) == num_bin * num_sample and len(modularity_index) == num_bin * num_sample
assert len(var_len_index) == num_bin * num_sample
        
selected_demonstration_by_style = [demonstration[i] for i in style_index]
selected_demonstration_by_modularity = [demonstration[i] for i in modularity_index]
selected_demonstration_by_var_len = [demonstration[i] for i in var_len_index]

# save each demonstration which has high coverage of style or modularity
# write_dict_to_jsonl(selected_demonstration_by_style, os.path.join(os.getcwd(), 'data', 'style_demonstration.jsonl'))
# write_dict_to_jsonl(selected_demonstration_by_modularity, os.path.join(os.getcwd(), 'data', 'modularity_demonstration.jsonl'))
write_dict_to_jsonl(selected_demonstration_by_var_len, os.path.join(os.getcwd(), 'data', 'var_len_demonstration.jsonl'))

# for visualization
# plt.scatter(style_sampled_points['style'], np.array([0.5] * len(style_sampled_points)), color='red', label='Sampled Data')
# plt.scatter(modularity_sampled_points['modularity'], np.array([0.5] * len(modularity_sampled_points)), color='blue', label='Sampled Data')
# plt.xlabel('Style')
# plt.ylabel('Modularity (tmp)')
# plt.legend()
# plt.show()    