In [5]:
import transformers
from transformers import BloomForCausalLM
from transformers import BloomForTokenClassification
from transformers import BloomForTokenClassification
from transformers import BloomTokenizerFast
import torch
import json
import random
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
import pickle

#! pip install sentence_transformers

# Code Summarization Evaluation Dataset Selection

## [CodeSearchNet](https://github.com/github/CodeSearchNet#data-details)

Quality is catastrophically poor. "docstrings" are literally the docstrings from the associated functions written by devs. Quality varies massively, as is to be expected from GitHub. Many docstrings don't actually describe what the code does.

## [code-docstring-corpus](https://github.com/EdinburghNLP/code-docstring-corpus)

Without even looking at the data, this will likely suffer from the same problem. Code is scraped from GitHub and docstrings are extracted. Pass.

## Initial Hypothesis

It appears that the type and quality of "code summarization' that would be useful to auditors (likely higher-level, more abstract but correct and informative) is largely absent from popular code summarization benchmark datasets.

# Summary Evaluation Functions

In [2]:
sentence_model = SentenceTransformer('all-mpnet-base-v2')

In [3]:
test = np.array([['alpha', 'ALPHA-t'],
        ['Beta', 'epsilon'],
        ['That There Man', 'Who There man'],
        ['Lets have cheese', 'Can we have cheese?']])

def evaluate(y):
    ''' Calculates the consine similarity of two sets of code summaries using MPNET-base encodings.
    
        Args:
          y: a NumPy array (2, n) of predicted code summaries and the corresponding ground truth summary.
        
        Output:
          A NumPy array (1, n) of sentence similarity scores from -1 (opposite) to 1 (proportional).
    '''
    
    similarities = []
    pred_embed = sentence_model.encode(y[:,0])
    truth_embed = sentence_model.encode(y[:,1])
    
    for i in range(len(pred_embed)):
        similarities.append(cosine_similarity([pred_embed[i]], [truth_embed[i]])[0][0])
    
    return np.array(similarities).reshape((1,len(similarities)))

evaluate(test)

array([[0.68946874, 0.3345313 , 0.39983743, 0.8875231 ]], dtype=float32)

# Bloom for Causal Language Modeling

In [5]:
# Use only to re-download models if existing files are deleted
# Use only to store model if the existing files are delected

#tokenizer = BloomTokenizerFast.from_pretrained("bigscience/bloom-1b3")
#model = BloomForCausalLM.from_pretrained("bigscience/bloom-1b3")

# Model Sizes (serialized byte stream)
# 1b3: 6.4G 
# 2b5: #.#G (runs out of memory loading CausalLM)

#pickle.dump(model, open('CausalLM_bloom-1b3.pkl', 'wb'))
#pickle.dump(tokenizer, open('Tokenizer_bloom-1b3.pkl', 'wb'))
#del model
#del tokenizer

In [6]:
tokenizer = pickle.load(open('Tokenizer_bloom-1b3.pkl', 'rb'))
model = pickle.load(open('CausalLM_bloom-1b3.pkl', 'rb'))

| x | y | generated_text | novel_portion |
|---|---|----------------|---------------|
|   |   |                |               |

In [7]:
# outline for functions

def draw_sample(corpus, k, n, code_len_max, docstring_len_max):
    '''
        corpus: filename of data corpus
        k: int. sample size
        n: tuple. min and max number of observations that meet other args' criteria to keep
        code_len_max: filter sample for observations whose code len is below code_len_max
        docstring_len_max: filter sample for observations whose docstring len is below docstring_len_max
    '''
    
    sample_tmp = []

    with open(corpus, 'r') as json_file:
        json_list = list(json_file)

    sample_indices = random.choices(range(0, len(json_list)), k=k)

    for i in sample_indices:
        docstring = json.loads(json_list[i])['docstring']
        code = json.loads(json_list[i])['code']
        if len(code) < code_len_max and len(docstring) < docstring_len_max:

            # Remove docstring from code
            if code.find('"""') != -1:
                start = code.index('"""')
                end = code[start + 3:].index('"""') + start + 6
                clean_code = code[:start] + code[end:]
            elif code.find("'''") != -1:
                start = code.index("'''")
                end = code[start + 3:].index("'''") + start + 6
                clean_code = code[:start] + code[end:]
            else:
                clean_code = code

            sample_tmp.append([clean_code, docstring])

    json_file.close()
    print(len(sample_tmp))
    
    if n[0] <= len(sample_tmp) <= n[1]:
        return sample_tmp
    else:
        return None
    
def promptize(examples):
    '''
        Turns an array of [[code, docstring]] examples into a single LLM prompt, dropping the last
        row's docstring as the target to be predicted.
    '''
    res = []
    for i, ex in enumerate(examples):
        if i < len(examples) - 1:
            res.append(ex[0] + '\n\n##' + ex[1])
        else:
            res.append(ex[0] + '\n##')
    return np.array(res).reshape((1, len(examples)))

def generate(seed, output_max_tokens):
    '''
        Takes a prompt, tokenizes it, calls Bloom and returns a predicted (generated) code summary.
    '''
    prompt = ""

    for p in promptize(seed)[0]:
        prompt += p + '\n\n'
    inputs = tokenizer(prompt, return_tensors="pt")
    result_length = len(inputs.tokens()) + output_max_tokens
    
    g = tokenizer.decode(model.generate(inputs["input_ids"], 
                         max_length=result_length,
                         no_repeat_ngram_size=4,
                         skip_special_tokens=True,
                         early_stopping=True
                         )[0])
    return g
    

In [10]:
sample_tmp = draw_sample(corpus='./python_valid_0.jsonl', 
                         k=20, 
                         n=(2, 6), 
                         code_len_max=300, 
                         docstring_len_max=100)

if sample_tmp is None:
    print('Sampling criteria not met.')
else:
    # Generated a prediction
    y_hat = generate(sample_tmp, 25)
    
    # Find best match across all predicted lines
    sims = []
    threshold = 0.35
    
    prompt = ""

    # TEMP NEED TO REMOVE
    for p in promptize(sample_tmp)[0]:
        prompt += p + '\n\n'

    for line in y_hat[len(prompt):].split('\n'):
        s = evaluate(np.array([[line, sample_tmp[len(sample_tmp) - 1][1]]]))
        sims.append(s)

    max_similarity = np.amax(np.array(sims))
    max_similarity_index = sims.index(max_similarity)

    #if max_similarity >= threshold:
    print('y :',sample_tmp[len(sample_tmp) - 1][1])
    print('y\u0302 :', y_hat[len(prompt):].split('\n')[max_similarity_index].strip())
    print('Cosine similarity:', max_similarity)
    #else:
    #    print('No match above threshold.')

3
y : Get Existing Message

        http://dev.wheniwork.com/#get-existing-message
ŷ : def get_messages(self, start, end):
Cosine similarity: 0.55403036


**Debugging**

In [13]:
# Prediction (prompt + generated text)
print(y_hat)

def decode(cls, decrypted):
        
        pad = ord(decrypted[-1])
        if pad < 1 or pad > 32:
            pad = 0
        return decrypted[:-pad]

##删除解密后明文的补位字符
        @param decrypted: 解密后的明文
        @return: 删除补位字符后的明文

def signal_receive(self, fd):
        
        connections = self.connections
        if connections(fd) and self.twait[connections(fd)]:
            procid = random.sample(self.twait[connections(fd)], 1)[0]
            self.awake(procid)

##Awake one process waiting to receive data on fd

def empty(self):
        
        self.mutex.acquire()
        n = not self._qsize()
        self.mutex.release()
        return n
##

class Process(threading.Thread):
    
    def __init__(self, pid, name, args, kwargs):
        
       


In [14]:
# Prompt
print(prompt)

def decode(cls, decrypted):
        
        pad = ord(decrypted[-1])
        if pad < 1 or pad > 32:
            pad = 0
        return decrypted[:-pad]

##删除解密后明文的补位字符
        @param decrypted: 解密后的明文
        @return: 删除补位字符后的明文

def signal_receive(self, fd):
        
        connections = self.connections
        if connections(fd) and self.twait[connections(fd)]:
            procid = random.sample(self.twait[connections(fd)], 1)[0]
            self.awake(procid)

##Awake one process waiting to receive data on fd

def empty(self):
        
        self.mutex.acquire()
        n = not self._qsize()
        self.mutex.release()
        return n
##




In [15]:
# Ground truth comment
print(sample_tmp[-1][1])

Return True if the queue is empty, False otherwise (not reliable!).


In [None]:
# TO DELETE
#prompt = ""

#for p in promptize(sample_tmp)[0]:
#    prompt += p + '\n\n'

#print('GROUND TRUTH:', sample_tmp[len(sample_tmp) - 1][1])

In [13]:
# TO DELETE
#inputs = tokenizer(prompt, return_tensors="pt")
#result_length = len(inputs.tokens()) + 25

In [None]:
# TO DELETE
# Greedy Search
#y_hat = tokenizer.decode(model.generate(inputs["input_ids"], 
#                       max_length=result_length,
#                       no_repeat_ngram_size=4,
#                       skip_special_tokens=True,
#                       early_stopping=True
#                      )[0])
#print(y_hat[len(prompt):])

In [None]:
# Just take first line from prediction
# Deprecated in favor of search method above

#novel_bit = y_hat[len(prompt):]
#novel_bit = novel_bit[:novel_bit.find('\n')].replace('##','')
#print('y :',sample_tmp[len(sample_tmp) - 1][1])
#print('y\u0302 :',novel_bit)

#print('Cosine Similarity:', *evaluate(np.array([[novel_bit, sample_tmp[len(sample_tmp) - 1][1]]]))[0])

## Pipeline

In [24]:
# Draw large sample from dataset

sample_tmp = draw_sample(corpus='./python_valid_0.jsonl', 
                         k=200, 
                         n=(12, 40), 
                         code_len_max=300, 
                         docstring_len_max=150)

if not sample_tmp is None:
    print(len(sample_tmp))

# Split into n sized groups (n=number of predictions desired)


# For each group:
  # Process into prompt
  # Run inference
  # Calculate similarity between y and y_hat

# Store large sample: (x, y, y_hat, similarity)



25
25


In [25]:
# num_shots: The number of examples to provide in a single prompt.
# Takes a sample and divides it into sets of examples of size num_shots. Drops examples from sample until it is divisible by num_shots.

num_shots = 4

num_obs = len(sample_tmp)

while num_obs % num_shots != 0:
    sample_tmp = sample_tmp[:num_obs-1]
    num_obs = len(sample_tmp)
    
prompts = []

for i in range(0,len(sample_tmp), num_shots):
    prompts.append(sample_tmp[i:i+num_shots])

In [26]:
predictions = []

for prompt in prompts:
    predictions.append(generate(prompt, 50))

In [75]:
for i in prompts:
    x = ''
    for p in promptize(i)[0]:
            x += p + '\n\n'

    print(x)

def Element(self):
        
        if not self._element:
            self.Refind(maxSearchSeconds=TIME_OUT_SECOND, searchIntervalSeconds=self.searchWaitTime)
        return self._element

##Property Element.
        Return `ctypes.POINTER(IUIAutomationElement)`.

def rewindbody(self):
        
        if not self.seekable:
            raise IOError, "unseekable file"
        self.fp.seek(self.startofbody)

##Rewind the file to the start of the body (if seekable).

def make_tarfile(output_filename, source_dir):
  
  with tarfile.open(output_filename, "w:gz") as tar:
    tar.add(source_dir, arcname=os.path.basename(source_dir))

##Tar a directory

def _cell(x):
    
    x_no_none = [i if i is not None else "" for i in x]
    return array(x_no_none, dtype=np_object)
##


def isnumber(self, string, *args):
        
        try:
            n, u = utility.analyze_number(string)
        except SyntaxError:
            return False
        return True

##Is number
        args:
            s

IndexError: string index out of range

array([['def Element(self):\n        \n        if not self._element:\n            self.Refind(maxSearchSeconds=TIME_OUT_SECOND, searchIntervalSeconds=self.searchWaitTime)\n        return self._element',
        'Property Element.\n        Return `ctypes.POINTER(IUIAutomationElement)`.'],
       ['def rewindbody(self):\n        \n        if not self.seekable:\n            raise IOError, "unseekable file"\n        self.fp.seek(self.startofbody)',
        'Rewind the file to the start of the body (if seekable).'],
       ['def make_tarfile(output_filename, source_dir):\n  \n  with tarfile.open(output_filename, "w:gz") as tar:\n    tar.add(source_dir, arcname=os.path.basename(source_dir))',
        'Tar a directory'],
       ['def _cell(x):\n    \n    x_no_none = [i if i is not None else "" for i in x]\n    return array(x_no_none, dtype=np_object)',
        'translate an array x into a MATLAB cell array']], dtype='<U187')

In [None]:
out = []

for i in range(len(prompts)):
    out.append([prompts[i][0][0]]

In [None]:




sims = []
# TEMP NEED TO REMOVE
for p in promptize(sample_tmp)[0]:
    prompt += p + '\n\n'

for line in y_hat[len(prompt):].split('\n'):
    s = evaluate(np.array([[line, sample_tmp[len(sample_tmp) - 1][1]]]))
    sims.append(s)

max_similarity = np.amax(np.array(sims))
max_similarity_index = sims.index(max_similarity)

#if max_similarity >= threshold:
print('y :',sample_tmp[len(sample_tmp) - 1][1])
print('y\u0302 :', y_hat[len(prompt):].split('\n')[max_similarity_index].strip())
    print('Cosine similarity:', max_similarity)

In [21]:
print(predictions[0])

def GetPreviousSiblingControl(self) -> 'Control':
        
        ele = _AutomationClient.instance().ViewWalker.GetPreviousSiblingElement(self.Element)
        return Control.CreateControlFromElement(ele)

##Return `Control` subclass or None.

def transmit_content_metadata(self, user):
        
        exporter = self.get_content_metadata_exporter(user)
        transmitter = self.get_content_metadata_transmitter()
        transmitter.transmit(exporter.export())

##Transmit content metadata to integrated channel.

def _send(self, stanza):
        
        self.fix_out_stanza(stanza)
        element = stanza.as_xml()
        self._write_element(element)

##Same as `send` but assume `lock` is acquired.

def _f_gene(sid, prefix="G_"):
    
    sid = sid.replace(SBML_DOT, ".")
    return _clip(sid, prefix)
##

def _clip(s, prefix):
    
    s = s.replace(" ", "_")
    s = s[:


In [23]:
for p in sample_tmp[0:4]:
    print(p[0], '\n\n', p[1], '\n\n')

def GetPreviousSiblingControl(self) -> 'Control':
        
        ele = _AutomationClient.instance().ViewWalker.GetPreviousSiblingElement(self.Element)
        return Control.CreateControlFromElement(ele) 

 Return `Control` subclass or None. 


def transmit_content_metadata(self, user):
        
        exporter = self.get_content_metadata_exporter(user)
        transmitter = self.get_content_metadata_transmitter()
        transmitter.transmit(exporter.export()) 

 Transmit content metadata to integrated channel. 


def _send(self, stanza):
        
        self.fix_out_stanza(stanza)
        element = stanza.as_xml()
        self._write_element(element) 

 Same as `send` but assume `lock` is acquired. 


def _f_gene(sid, prefix="G_"):
    
    sid = sid.replace(SBML_DOT, ".")
    return _clip(sid, prefix) 

 Clips gene prefix from id. 


