# Model Assessment

This notebook contains the 2nd stage of the model assessment. The 1st stage, the evaluation metrics, were created from the training process in the final training. For the 3rd stage, the qualitative analysis, individual samples that were created in this notebook were considered and analyzed.

## Notebook-Setup

In [None]:
import transformers
import torch
import tqdm
import json


## Sample Generation

### Model Import

In [None]:
## define run name
run_name = "finalTraining_v1"
# run_name = "MLPC-2048-StarCoderBase7B"

# define model for tokenizer
model_name = "codellama/CodeLlama-7b-hf"
# model_name = "bigcode/starcoderbase-7b"

# dataset import folder
export_folder = "./dataset/" + run_name + "/"

# model save path
model_save_path = "./models/" + run_name + "/"

# model checkpoint path
model_checkpoint_path = "./checkpoints/" + run_name + "/"

In [None]:
## Test loading model and inference with that model

# load quantization config for 4bit quantization -> must be same as training
quantization_config = transformers.BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_use_double_quant=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16)

# load model from model_save_path with quantization config
model = transformers.AutoModelForCausalLM.from_pretrained(model_save_path, quantization_config=quantization_config, low_cpu_mem_usage=True)

In [None]:
# load tokenizer
tokenizer = transformers.AutoTokenizer.from_pretrained(model_name)

# add pad token
tokenizer.add_special_tokens({'pad_token': '[PAD]'})

### Sample Generator

In [None]:
def generate_samples(model, tokenizer, prompt, seed=42, custom_eos_token="<END>", max_length=400):
    # set a seed for generation
    torch.manual_seed(seed)

    input_ids = tokenizer.encode(prompt, return_tensors='pt')
    input_ids = input_ids.to('cuda')
    end_token_id = tokenizer.encode(custom_eos_token, add_special_tokens=False)[0]

    output = model.generate(input_ids, eos_token_id=end_token_id, temperature=0.1, max_length=max_length, do_sample=True, num_return_sequences=1)
    generated_text = tokenizer.decode(output[0], skip_special_tokens=True)
    
    # delete <START> from generated text
    generated_text = generated_text.replace("<START>", "")
    print(generated_text)
    
    # cleanup
    del output
    del input_ids
    torch.cuda.empty_cache()

    return generated_text

In [None]:
### DEPRECATED SAMPLE GENERATOR 1 - provide input in the confidence levels ###

# import torch.nn.functional as F

# # Encode the prompt
# input_ids = tokenizer.encode(prompt, return_tensors='pt')

# # Initialize the output as the input
# output = input_ids

# # Loop until the end token is generated
# while True:
#     # Predict the probabilities of the next token
#     with torch.no_grad():
#         outputs = model(output)
#     predictions = outputs.logits[:, -1, :]
#     probabilities = F.softmax(predictions, dim=-1)

#     # Get the top 5 tokens and their probabilities
#     top_probs, top_token_ids = torch.topk(probabilities, 5)

#     # Print the top 5 tokens and their probabilities
#     previous_text = tokenizer.decode(output[0], skip_special_tokens=True)
#     print(f"\nPrevious text: {previous_text} add tokens:")
#     for i in range(5):
#         token = tokenizer.decode(top_token_ids[0][i])
#         prob = top_probs[0][i].item()
#         print(f"Token: {token}, Confidence: {prob}")

#     # Get the token with the highest probability
#     max_prob, max_token_id = torch.max(probabilities, dim=-1)

#     # Check if the confidence is over 50%
#     if max_prob.item() < 0.50:
#         break

#     # Append the token to the output
#     output = torch.cat([output, max_token_id.unsqueeze(0)], dim=-1)

# # Decode the output
# generated_text = tokenizer.decode(output[0], skip_special_tokens=True)

In [None]:
### DEPRECATED SAMPLE GENERATOR 2 ###
# def deprecated_sample_generator2(model, tokenizer, prompt, use_custom_eos=False, custom_eos_token="}", max_length=3000, confidence_threshold=0.01):
#     """ Generate code completions for a given prompt using the model and tokenizer.
#     """


#     # Encode the prompt
#     input_ids = tokenizer.encode(prompt, return_tensors='pt')

#     # Initialize the output as the input
#     output = input_ids
    
#     # Loop until the end token is generated or counter is at max_length
#     for i in tqdm.tqdm(range(max_length)):
#         # Predict the probabilities of the next token
#         with torch.no_grad():
#             outputs = model(output)
#         predictions = outputs.logits[:, -1, :]
#         probabilities = torch.nn.functional.softmax(predictions, dim=-1)

#         # Get the token with the highest probability
#         max_prob, max_token_id = torch.max(probabilities, dim=-1)

#         # Check if the confidence is over the threshold
#         if max_prob.item() < confidence_threshold:
#             break

#         # Append the token to the output
#         output = torch.cat([output, max_token_id.unsqueeze(0)], dim=-1)

#         if len(output[0]) > 3 + len(custom_eos_token):
#             evtl_end = tokenizer.decode(output[0][-3:], skip_special_tokens=True)
#             if use_custom_eos:
#                 if custom_eos_token in evtl_end:
#                     break
#             # check for <EOS> in evtl_end
#             if "<EOS>" in evtl_end:
#                 break
        
#         # decode every 1000 iterations and print output
#         if len(output[0]) % 50 == 0:
#             # print(tokenizer.decode(output[0], skip_special_tokens=True))
#             print("Length of output: ", len(output[0]))
#             print("Max prob: ", max_prob.item())
#             print("Max token: ", max_token_id.item())
#             print("Counter: ", i)
#             print("")

        
#     # Decode the output
#     generated_text = tokenizer.decode(output[0], skip_special_tokens=True)

#     # delete <START> from generated text
#     generated_text = generated_text.replace("<START>", "")

#     print(generated_text)

#     # cleanup
#     del output
#     del input_ids
#     torch.cuda.empty_cache()

#     return generated_text

In [None]:
# test generate_nitrox_json_fast_hope
prompt_test = '<START> {'
test_output_0 = generate_samples(model, tokenizer, prompt_test, seed=0)

print(test_output_0)

In [None]:
def generate_multi_prompts(prompts, model, tokenizer, custom_eos_token="<END>", 
max_length=6000, export_path="gen_json"):
    counter = 0
    for prompt in tqdm.tqdm(prompts):
        generated_text = generate_samples(model, tokenizer, prompt, custom_eos_token=custom_eos_token, max_length=max_length)

        # save generated text as file named after the model Type
        with open(export_path + "/" + str(counter) + "_" + prompt.split("nitrox.dlc.mirror.model.")[1].split(",")[0].replace("\"","") + ".json", "w") as f:
            f.write(generated_text)
        counter += 1

### Generate JSON Samples

In [None]:
# definition of prompts. Every prompt should be used 5 times for inference with a different seed.
prompts = [
  '<START> { "@class" : "nitrox.dlc.mirror.model.EntityModel"',
  '<START> { "@class" : "nitrox.dlc.mirror.model.ValueObjectModel"',
  '<START> { "@class" : "nitrox.dlc.mirror.model.AggregateRootModel"',
  '<START> { "@class" : "nitrox.dlc.mirror.model.IdentityModel"',
  '<START> { "@class" : "nitrox.dlc.mirror.model.EnumModel"',
  '<START> { "@class" : "nitrox.dlc.mirror.model.DomainServiceModel"',
  '<START> { "@class" : "nitrox.dlc.mirror.model.RepositoryModel"',
  '<START> { "@class" : "nitrox.dlc.mirror.model.ApplicationServiceModel"',
  '<START> { "@class" : "nitrox.dlc.mirror.model.DomainEventModel"',
  '<START> { "@class" : "nitrox.dlc.mirror.model.DomainCommandModel"',
  '<START>',
  '<START> { ',
  '<START> { "',
  '<START> { "@class" ',
  '<START> { "@class" : ',
  '<START> { "@class" : "nitrox.',
  '<START> { "@class" : "nitrox.dlc.',
  '<START> { "@class" : "nitrox.dlc.mirror.',
  '<START> { "@class" : "nitrox.dlc.mirror.model.',
  '',
]

prompts = 5 * prompts

In [None]:
# export path
export_path = "gen_json"

# define the max length of the generated json samples. For the 11GB VRAM 2080 Ti, the max token length is 4000.
max_token_length = 4000

# defines the custom end token
eos_token="<END>"


In [None]:
# generate generate_multi_prompts
generate_multi_prompts(prompts, model, tokenizer, custom_eos_token=eos_token, 
max_length=max_token_length, export_path=export_path)

## Parsibility Assessment

### Post-Processing

In [None]:
def close_json(json_str):
    """ Close a json string with missing closing brackets
    """
    stack = []

    for char in json_str:
        if char in '{[':
            stack.append(char)
        elif char in '}]':
            if stack:
                stack.pop()

    while stack:
        char = stack.pop()
        if char == '{':
            json_str += '}'
        elif char == '[':
            json_str += ']'

    return json_str

In [None]:
def complete_json(json_file):
    """
    Postprocessing function to complete and clean the JSON file.
    """

    # 1. check the end of the JSON and delete the last uncomplete key/value pair. (Delete till the first komma ",")
    json_file = json_file[:json_file.rfind(",")]

    # 2. create a array with the unclosed brackets in the json -> move sequential through the json and add when brackets are open and delete when they are closed. ((, {, [) -> (,), }, ])
    json_file = close_json(json_file)
   
    # 3. replace double quotes
    json_file = json_file.replace('""', '"')

    return json_file

In [None]:
# read all json_files in gen_json folder
import os

json_files = []
for file in os.listdir(export_path):
    if file.endswith(".json"):
        with open(export_path + "/" + file, "r") as f:
            json_files.append(f.read())

In [None]:
# postprocess all json files
completed_jsons = []
for json_file in json_files:
    completed_jsons.append(complete_json(json_file))

In [None]:
# save all jsons to gen_json/completed folder
for i in range(len(completed_jsons)):
    with open(export_path + "/completed/" + str(i) + ".json", "w") as f:
        f.write(complete_json(completed_jsons[i]))

### Parse Processed JSON Samples

In [None]:
# parse json with json.loads and look for errors
def parse_json(test_json):
    try:
        json.loads(test_json)
        return "No error"
    except Exception as e:
        return e

In [None]:
# read all jsons in gen_json/completed folder
jsons_to_parse = []
path_to_json = []
for file in os.listdir(export_path + "/completed"):
    if file.endswith(".json"):
        with open(export_path +"/completed/" + file, "r") as f:
            jsons_to_parse.append(f.read())
            path_to_json.append(file)

In [None]:
# test all jsons
results = {}
for number, json_file in enumerate(jsons_to_parse):
    # append results to results dict
    results[number] = export_path(json_file)

# save results to results.txt
# with open(json_folder + "/00_results.txt", "w") as f:
with open(export_path + "/parsibility_results.txt", "w") as f:
    for key in results:
        f.write(str(key) + ": " + str(results[key]) + "; " + path_to_json[key] + "\n")

# write the results in a csv file
import csv
with open(export_path + "/parsibility_results.csv", "w") as f:
    writer = csv.writer(f)
    for key in results:
        writer.writerow([key, results[key], path_to_json[key]])

The results of the samples pasibility are saved in the following files:
- [gen_json/parsibility_results.csv](gen_json/parsibility_results.csv)
- [gen_json/parsibility_results.txt](gen_json/parsibility_results.txt)