# Evaluate pre-trained model

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install fuzzywuzzy
!pip install python-Levenshtein

## Open model & load data

In [122]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import datetime
import torch
import random
import numpy as np
import json
import re
import pandas as pd
from fuzzywuzzy import fuzz
import Levenshtein
# Info is here: https://github.com/seatgeek/thefuzz

In [None]:
torch.set_default_device("cuda")

print("Loading model...")
time = datetime.datetime.now()
model = AutoModelForCausalLM.from_pretrained("microsoft/phi-1_5", torch_dtype="auto")
tokenizer = AutoTokenizer.from_pretrained("microsoft/phi-1_5")
time1 = datetime.datetime.now()
print(f"Model loaded. Time to load the model: {time1 - time}")

In [3]:
def replace_tags(code):
    # Original function is here:
    # https://github.com/microsoft/CodeXGLUE/blob/main/Code-Code/CodeCompletion-line/evaluator/evaluator.py
    code = code.replace("<NUM_LIT>", "0").replace("<STR_LIT>", "").replace("<CHAR_LIT>", "")
    pattern = re.compile(r"<(STR|NUM|CHAR)_LIT:(.*?)>", re.S)
    lits = re.findall(pattern, code)
    for lit in lits:
        code = code.replace(f"<{lit[0]}_LIT:{lit[1]}>", lit[1])

    pattern = r'<([A-Z][^<>]*)>'
    liners = re.findall(pattern, code)
    for tag in liners:
        code = code.replace(f'<{tag}>', ' ')
    return code

def read_jsonl_file(file_path):
    data = []
    with open(file_path, 'r') as f:
        for line in f:
            json_obj = json.loads(line)
            json_obj['signature'] = replace_tags(json_obj['signature'])
            json_obj['body'] = replace_tags(json_obj['body'])
            data.append(json_obj)
    return data

file_path = '/content/drive/MyDrive/CodeCompletion/CodeXGlue/test.jsonl'
codexglue_test = read_jsonl_file(file_path)
print(codexglue_test[0])

file_path = '/content/drive/MyDrive/CodeCompletion/functions_df_inputs_outputs.csv'
functions_df = pd.read_csv(file_path)
print(functions_df.iloc[0])

{'signature': 'def debug(user, message):', 'body': 'message_user(user, message, constants.DEBUG) ', 'docstring': 'Adds a message with the ``DEBUG`` level.\n\n:param user: User instance\n:param message: Message to show', 'id': 'f4:m0'}
Unnamed: 0                                                              0
function_id                                                         27692
signature               private fun bitIndex(elementIndex: Int, bitOff...
body                    =\n        elementIndex * ELEMENT_SIZE + bitOf...
is_single_expression                                                 True
is_test                                                             False
0-20                                                                False
100+                                                                False
20-50                                                               False
50-100                                                               True
Name: 0, dtype: object


# Python code completion
Try few-shot code generation using Phi-1.5, with examples randomly selected from a dataset to provide context


In [78]:
def create_prompt_codex(dataset, index, num_examples, context=None, language='Python'):
  indices = random.sample(range(len(dataset)), num_examples)
  prefix = f'Complete code\nLanguage: {language}\n'
  shots = '\n'.join([f"Example: {dataset[i]['signature']} {dataset[i]['body']}" for i in indices])

  data = dataset[index]
  if context:
    prompt = f"{prefix}\n{shots}\n{context}\nCode so far: {data['signature']}"
  else:
    prompt = f"{prefix}\n{shots}\nCode so far: {data['signature']}"

  return prompt

prompt = create_prompt_codex(codexglue_test, 12, 2)
inputs = tokenizer(prompt, return_tensors="pt", return_attention_mask=False)

print(f'\033[96mFew-shots prompt (without context of functions): \n{prompt}\n')

[96mFew-shots prompt (without context of functions): 
Complete code
Language: Python

Example: def _get_name(self): return self.__name 
Example: @property  def serial_instance(self) -> serial.Serial:  return self._serial_instance 
Code so far: def loads(s, strip_comments=False, **kw):



In [79]:
print("Generating output...")
time = datetime.datetime.now()
outputs = model.generate(**inputs, max_length=inputs['input_ids'].shape[-1] + 100)
text_python = tokenizer.batch_decode(outputs)[0]
time1 = datetime.datetime.now()
print(f"Output generated. Time to generate the output: {time1 - time}. \nOutput:")
print(f'\033[96m{prompt}\033[92m{text_python.split(prompt[-10:], 1)[-1]}')

Generating output...
Output generated. Time to generate the output: 0:00:02.956633. 
Output:
[96mComplete code
Language: Python

Example: def _get_name(self): return self.__name 
Example: @property  def serial_instance(self) -> serial.Serial:  return self._serial_instance 
Code so far: def loads(s, strip_comments=False, **kw):[92m return json.loads(s, **kw) def dumps(d, **kw): return json.dumps(d, **kw) def _get_name(self): return self.__name 
def _get_name(self): return self.__name 
def _get_name(self): return self.__name 
def _get_name(self): return self.__name 
def _get_name(self): return self.


Now, let's add context of a function

In [80]:
function_context = f"Function context: {codexglue_test[12]['docstring']}"
prompt = create_prompt_codex(codexglue_test, 12, 2, function_context)
inputs = tokenizer(prompt, return_tensors="pt", return_attention_mask=False)
print(f'\033[96mmFew-shots prompt (with context of functions): \n{prompt}\n')

[96mmFew-shots prompt (with context of functions): 
Complete code
Language: Python

Example: def _set_explicit_path_name(self, v, load=False): if hasattr(v, ""):  v = v._utype(v)  try:  t = YANGDynClass( v, base=six.text_type, is_leaf=True, yang_name="", parent=self, path_helper=self._path_helper, extmethods=self._extmethods, register_paths=True, namespace="", defining_module="", yang_type="", is_config=True, )  except (TypeError, ValueError):  raise ValueError( { "": """""", "": "", "": """""", } )  self.__explicit_path_name = t if hasattr(self, ""):  self._set()  
Example: async def get_bots(self, limit, offset): if limit > 0:  limit = 50  return await self.request('GET', ''.format(self.BASE, limit, offset)) 
Function context: Load a list of trees from a Newick formatted string.

:param s: Newick formatted string.
:param strip_comments: Flag signaling whether to strip comments enclosed in square \
brackets.
:param kw: Keyword arguments are passed through to `Node.create`.
:return: L

In [81]:
print("Generating output...")
time = datetime.datetime.now()
outputs = model.generate(**inputs, max_length=inputs['input_ids'].shape[-1] + 80)
text_python_context = tokenizer.batch_decode(outputs)[0]
time1 = datetime.datetime.now()
print(f"Output generated. Time to generate the output: {time1 - time}. \nOutput:")
print(f'\033[96m{prompt}\033[92m{text_python_context.split(prompt[-10:], 1)[-1]}')

Generating output...
Output generated. Time to generate the output: 0:00:02.370798. 
Output:
[96mComplete code
Language: Python

Example: def _set_explicit_path_name(self, v, load=False): if hasattr(v, ""):  v = v._utype(v)  try:  t = YANGDynClass( v, base=six.text_type, is_leaf=True, yang_name="", parent=self, path_helper=self._path_helper, extmethods=self._extmethods, register_paths=True, namespace="", defining_module="", yang_type="", is_config=True, )  except (TypeError, ValueError):  raise ValueError( { "": """""", "": "", "": """""", } )  self.__explicit_path_name = t if hasattr(self, ""):  self._set()  
Example: async def get_bots(self, limit, offset): if limit > 0:  limit = 50  return await self.request('GET', ''.format(self.BASE, limit, offset)) 
Function context: Load a list of trees from a Newick formatted string.

:param s: Newick formatted string.
:param strip_comments: Flag signaling whether to strip comments enclosed in square \
brackets.
:param kw: Keyword arguments ar

# Kotlin code completion
With the same methods used as for Python generation (few-shot + context)

In [87]:
def create_prompt_kotlin(dataset, index, num_examples, context=None, language='Kotlin'):
  indices = random.sample(range(len(dataset)), num_examples)
  prefix = f'Complete code\nLanguage: {language}\n'
  shots = '\n'.join([f"Example: {dataset.iloc[i]['signature']} {dataset.iloc[i]['body']}" for i in indices])

  data = dataset.iloc[index]
  if context:
    prompt = f"{prefix}\n{shots}\n{context}\nCode so far: {data['signature']}"
  else:
    prompt = f"{prefix}\n{shots}\nCode so far: {data['signature']}"

  return prompt

prompt = create_prompt_kotlin(functions_df, 12, 2)
inputs = tokenizer(prompt, return_tensors="pt", return_attention_mask=False)

print(f'\033[96mFew-shots prompt (without context of functions): \n{prompt}\n')

[96mFew-shots prompt (without context of functions): 
Complete code
Language: Kotlin

Example: fun unzipTo(destinationDirectory: File, fromSubdirectory: File = File("/"), resetTimeAttributes: Boolean = false): File {
    withZipFileSystem {
        it.file(fromSubdirectory).recursiveCopyTo(destinationDirectory, resetTimeAttributes)
    }
}
Example: fun foo() {
    var x: (@[Foo Bar] (()->Unit)-> ()->Unit) -> Unit = {}
}
Code so far: fun assertTasksPackedToCache(vararg taskPaths: String): BuildResult



In [88]:
print("Generating output...")
time = datetime.datetime.now()
outputs = model.generate(**inputs, max_length=inputs['input_ids'].shape[-1] + 80)
text_kotlin = tokenizer.batch_decode(outputs)[0]
time1 = datetime.datetime.now()
print(f"Output generated. Time to generate the output: {time1 - time}. \nOutput:")
print(f'\033[96m{prompt}\033[92m{text_kotlin.split(prompt[-10:], 1)[-1]}')

Generating output...
Output generated. Time to generate the output: 0:00:02.384135. 
Output:
[96mComplete code
Language: Kotlin

Example: fun unzipTo(destinationDirectory: File, fromSubdirectory: File = File("/"), resetTimeAttributes: Boolean = false): File {
    withZipFileSystem {
        it.file(fromSubdirectory).recursiveCopyTo(destinationDirectory, resetTimeAttributes)
    }
}
Example: fun foo() {
    var x: (@[Foo Bar] (()->Unit)-> ()->Unit) -> Unit = {}
}
Code so far: fun assertTasksPackedToCache(vararg taskPaths: String): BuildResult[92m {
    val taskPaths = taskPaths.toList()
    val taskPathsWithCache = taskPaths.map { path -> path.withFile(path.withName(".cache")) }
    val taskPathsWithCacheAndCache = taskPathsWithCache.map { path -> path.withFile(path.withName(".cache"))


In [89]:
# GPT-generated
function_context = '''Function context: The assertTasksPackedToCache function verifies that cache entries have been stored for
the provided task paths by iterating through each path and asserting that the output contains
a specific message indicating the cache entry\'s storage.'''

prompt = create_prompt_kotlin(functions_df, 12, 2, function_context)
inputs = tokenizer(prompt, return_tensors="pt", return_attention_mask=False)
print(f'\033[96mFew-shots prompt (with context of functions): \n{prompt}\n')

[96mFew-shots prompt (with context of functions): 
Complete code
Language: Kotlin

Example: fun case_6(x: Any?) {
    if (x !is String?) throw Exception()
    <!DEBUG_INFO_EXPRESSION_TYPE("kotlin.Any? & kotlin.String?")!>x<!>
    <!DEBUG_INFO_EXPRESSION_TYPE("kotlin.Any? & kotlin.String?"), DEBUG_INFO_SMARTCAST!>x<!>?.length
}
Example: override fun getType(expression: KtExpression) {
            return this@DelegatingBindingTrace.getType(expression)
        }
Function context: The assertTasksPackedToCache function verifies that cache entries have been stored for 
the provided task paths by iterating through each path and asserting that the output contains 
a specific message indicating the cache entry's storage.
Code so far: fun assertTasksPackedToCache(vararg taskPaths: String): BuildResult



In [90]:
print("Generating output...")
time = datetime.datetime.now()
outputs = model.generate(**inputs, max_length=inputs['input_ids'].shape[-1] + 80)
text_kotlin_context = tokenizer.batch_decode(outputs)[0]
time1 = datetime.datetime.now()
print(f"Output generated. Time to generate the output: {time1 - time}. \nOutput:")

print(f'\033[96m{prompt}\033[92m {text_kotlin_context.split(prompt[-10:], 1)[-1]}')

Generating output...
Output generated. Time to generate the output: 0:00:02.402167. 
Output:
[96mComplete code
Language: Kotlin

Example: fun case_6(x: Any?) {
    if (x !is String?) throw Exception()
    <!DEBUG_INFO_EXPRESSION_TYPE("kotlin.Any? & kotlin.String?")!>x<!>
    <!DEBUG_INFO_EXPRESSION_TYPE("kotlin.Any? & kotlin.String?"), DEBUG_INFO_SMARTCAST!>x<!>?.length
}
Example: override fun getType(expression: KtExpression) {
            return this@DelegatingBindingTrace.getType(expression)
        }
Function context: The assertTasksPackedToCache function verifies that cache entries have been stored for 
the provided task paths by iterating through each path and asserting that the output contains 
a specific message indicating the cache entry's storage.
Code so far: fun assertTasksPackedToCache(vararg taskPaths: String): BuildResult[92m <String, BuildResult<String, BuildResult<String, BuildResult<String, BuildResult<String, BuildResult<String, BuildResult<String, BuildResult<Stri

In [92]:
# And the real answer is
example = functions_df.iloc[12]
print(example['signature'] + example['body'])

fun assertTasksPackedToCache(vararg taskPaths: String): BuildResult{
    taskPaths.forEach {
        assertOutputContains("Stored cache entry for task '$it' with cache key ")
    }
}



# LLM output control
Since we can see that the model generates unnecessary parts and repeats some code, we can try to use some techniques for preventing it


In [96]:
# Let's take the last prompt
prompt = prompt
print("Prompt:")
print(prompt)
inputs = tokenizer(prompt, return_tensors="pt", return_attention_mask=False)

# Top-k Sampling: Sampling from the top k most likely next words
outputs = model.generate(
    **inputs,
    max_length=inputs['input_ids'].shape[-1] + 80,
    do_sample=True,
    top_k=50
)
text_top_k = tokenizer.batch_decode(outputs, skip_special_tokens=True)[0]
print("\nGenerated text with Top-k Sampling:")
print(f'\033[92m {text_top_k.split(prompt[-10:], 1)[-1]}')

Prompt:
Complete code
Language: Kotlin

Example: fun case_6(x: Any?) {
    if (x !is String?) throw Exception()
    <!DEBUG_INFO_EXPRESSION_TYPE("kotlin.Any? & kotlin.String?")!>x<!>
    <!DEBUG_INFO_EXPRESSION_TYPE("kotlin.Any? & kotlin.String?"), DEBUG_INFO_SMARTCAST!>x<!>?.length
}
Example: override fun getType(expression: KtExpression) {
            return this@DelegatingBindingTrace.getType(expression)
        }
Function context: The assertTasksPackedToCache function verifies that cache entries have been stored for 
the provided task paths by iterating through each path and asserting that the output contains 
a specific message indicating the cache entry's storage.
Code so far: fun assertTasksPackedToCache(vararg taskPaths: String): BuildResult

Generated text with Top-k Sampling:
[92m <Int, Int, BuildContext<String>
        if (taskPaths.size == 0) {
            // nothing to do
        } else {
            var output: BuildContext<String> = BuildResult(
                1,
     

In [97]:
# Top-p (Nucleus) Sampling: Sampling from a dynamically adjusted set of words
outputs = model.generate(
    **inputs,
    max_length=inputs['input_ids'].shape[-1] + 80,
    do_sample=True,
    top_p=0.9
)
text_top_p = tokenizer.batch_decode(outputs, skip_special_tokens=True)[0]
print("\nGenerated text with Top-p (Nucleus) Sampling:")
print(f'\033[92m {text_top_p.split(prompt[-10:], 1)[-1]}')


Generated text with Top-p (Nucleus) Sampling:
[92m <String, BuildResult<List<Dictionary<String, Object>>, BuildResult<List<Dictionary<String, Object>>>>, BuildResult<List<Dictionary<String, Object>>>>, BuildResult<List<Dictionary<String, Object>>>, BuildResult<List<Dictionary<String, Object>>>> {
        taskPaths.forEach(task


In [98]:
# Temperature Scaling: Adjusting the likelihood scores of words before sampling
outputs = model.generate(
    **inputs,
    max_length=inputs['input_ids'].shape[-1] + 80,
    do_sample=True,
    temperature=0.7
)
text_temp = tokenizer.batch_decode(outputs, skip_special_tokens=True)[0]
print("\nGenerated text with Temperature Scaling:")
print(f'\033[92m {text_temp.split(prompt[-10:], 1)[-1]}')


Generated text with Temperature Scaling:
[92m  {
    for (taskPath in taskPaths) {
        var cacheEntry = BuildCache.get(taskPath)
        assertTasksPackedToCache(taskPath, cacheEntry)
    }
}
Output: Function trace:
            Assertion failed at line 3.
            BuildCache did not store cache entry for task path '$taskPath


In [99]:
# Repetition Penalty: Penalizing repeated tokens in the generated output
repetition_penalty = 1.0
outputs = model.generate(
    **inputs,
    max_length=inputs['input_ids'].shape[-1] + 80,
    do_sample=True,
    repetition_penalty=repetition_penalty
)
text_rep_pen = tokenizer.batch_decode(outputs, skip_special_tokens=True)[0]
print("\nGenerated text with Repetition Penalty:")
print(f'\033[92m {text_rep_pen.split(prompt[-10:], 1)[-1]}')


Generated text with Repetition Penalty:
[92m ? {
    assertBuildResult(true)
    taskPaths.forEach { taskPath ->
        try {
            var cacheResults = taskCache.get(taskPath)

            if (cacheResults!= null && cacheResults.map { cacheEntry -> CacheEntryPair<KtExpression, BuildResult>() }) {

                val cacheEntryP


We will search best combinations of these parameters later

# Output evaluation
Generate output for some small sample from CodeXGlue and compute few metrics

In [109]:
example = codexglue_test[1]
function_context = f"Function context: {example['docstring']}"
prompt = create_prompt_codex(codexglue_test, 1, 2, function_context)
inputs = tokenizer(prompt, return_tensors="pt", return_attention_mask=False)
print(f'Few-shots prompt: \n\033[96m{prompt}\n')

Few-shots prompt: 
[96mComplete code
Language: Python

Example: def _get_router_id(self): return self.__router_id 
Example: def bring_gpio_interrupt_into_userspace():   try:  with open(GPIO_INTERRUPT_DEVICE_VALUE):  return   except IOError:  with open(GPIO_EXPORT_FILE, 'w') as export_file:  export_file.write(str(GPIO_INTERRUPT_PIN))  wait_until_file_exists(GPIO_INTERRUPT_DEVICE_VALUE)  
Function context: Adds a message with the ``SUCCESS`` level.

:param user: User instance
:param message: Message to show
Code so far: def success(user, message):



In [114]:
outputs = model.generate(
    **inputs,
    max_length=inputs['input_ids'].shape[-1] + 5,
)
text = tokenizer.batch_decode(outputs, skip_special_tokens=True)[0]
text = text.split(prompt[-10:], 1)[-1]
print("\nGenerated text:")
print(f'\033[92m {text}')
print("\n\033[0mReal code:")
print(f'\033[92m {example["body"]}')


Generated text:
[92m  print(message)


[0mReal code:
[92m message_user(user, message, constants.SUCCESS) 


In [118]:
# Compute metrics
EM = 0.0
edit_sim = 0.0

pred = text
gt = example["body"]
print(f'Edit sim: {fuzz.ratio(pred, gt)}')
print(f'Exact match: {int(pred.split() == gt.split())}')

Edit sim: 25
Exact match: 0


In [127]:
def flex_accuracy(s1, s2, tolerance=None):
    """
    Compute the flex accuracy between two strings with a given tolerance.
    """
    distance = Levenshtein.distance(s1, s2)
    max_length = max(len(s1), len(s2))
    similarity = 1 - (distance / max_length)
    if tolerance:
      if similarity >= tolerance:
        return similarity
      else:
          return 0
    else:
      return similarity

# Example usage:
tolerance = 0.8
accuracy = flex_accuracy(pred, gt)
print("Flex accuracy:", accuracy)

Flex accuracy: 0.19148936170212771
