In [1]:
import sys
sys.path.append('../')
from utils import load_data
import numpy as np
np.random.seed(42)
from tqdm import tqdm

In [2]:
import torch
from transformers import AutoModel, AutoTokenizer, AutoModelForSeq2SeqLM, SummarizationPipeline, AutoModelWithLMHead

In [3]:
model_id = "Salesforce/codet5p-220m-bimodal"
# model_id = "Salesforce/codet5p-770m-py"
# model_id = "Salesforce/codet5p-2b"

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cuda')

In [5]:
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
model = AutoModel.from_pretrained(model_id,  trust_remote_code=True)
# model = AutoModelForSeq2SeqLM.from_pretrained(model_id,
#                                               torch_dtype=torch.float16,
#                                               trust_remote_code=True).to(device)


In [6]:
model.to(device)
model.eval()
print()




In [7]:
def process_context_in_batches(context, batch_size=20):
    context_batches = [context[i:i + batch_size] for i in range(0, len(context), batch_size)]
    processed_context = []
    for batch in context_batches:
        processed_context.extend(code_to_nl_batch(batch))
    
    return processed_context



def code_to_nl_batch(codes: list[str], max_length: int = 100) -> list[str]:
    encoded_input = tokenizer(codes, return_tensors="pt", padding=True, truncation=True).to(device)
    input_ids = encoded_input.input_ids
    generated_ids = model.generate(input_ids, max_length=max_length).cpu()
    return [tokenizer.decode(g, skip_special_tokens=True) for g in generated_ids]

In [8]:
text = """def func(string, size=None):
if isinstance(string, unicode):
    string = string.encode('utf-8')
    renderer = QtSvg.QSvgRenderer(QtCore.QByteArray(string))
if not renderer.isValid():
    raise ValueError('Invalid SVG data.')
if size is None:
    size = renderer.defaultSize()
    image = QtGui.QImage(size, QtGui.QImage.Format_ARGB32)
    painter = QtGui.QPainter(image)
    renderer.render(painter)
return image
"""

In [9]:
code_to_nl_batch([text])

['Render SVG string to a QImage.']

In [11]:
settings = 'cross_file_first'
data = load_data('test', 'r', 'python', settings)

Loading data: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:01<00:00,  1.25s/it]


In [12]:
raw_samples = data['easy']
len(raw_samples)

12000

In [13]:
raw_samples = np.random.choice(raw_samples, 3000)

In [14]:
sample = raw_samples[0]
sample = np.random.choice(raw_samples)

In [15]:
sample.keys()

dict_keys(['repo_name', 'file_path', 'context', 'import_statement', 'code', 'next_line', 'gold_snippet_index'])

In [16]:
sample['gold_snippet_index']

1

In [17]:
print(sample['context'][2])

def make_coordinate_grid(spatial_size, type):
    d, h, w = spatial_size
    x = torch.arange(w).type(type)
    y = torch.arange(h).type(type)
    z = torch.arange(d).type(type)

    x = (2 * (x / (w - 1)) - 1)
    y = (2 * (y / (h - 1)) - 1)
    z = (2 * (z / (d - 1)) - 1)
   
    yy = y.view(1, -1, 1).repeat(d, 1, w)
    xx = x.view(1, 1, -1).repeat(d, h, 1)
    zz = z.view(-1, 1, 1).repeat(1, h, w)

    meshed = torch.cat([xx.unsqueeze_(3), yy.unsqueeze_(3), zz.unsqueeze_(3)], 3)

    return meshed


In [18]:
code_snippet = sample['context'][0]

In [19]:
print(code_snippet)

class SynchronizedBatchNorm2d(_SynchronizedBatchNorm):
    r"""Applies Batch Normalization over a 4d input that is seen as a mini-batch
    of 3d inputs

    .. math::

        y = \frac{x - mean[x]}{ \sqrt{Var[x] + \epsilon}} * gamma + beta

    This module differs from the built-in PyTorch BatchNorm2d as the mean and
    standard-deviation are reduced across all devices during training.

    For example, when one uses `nn.DataParallel` to wrap the network during
    training, PyTorch's implementation normalize the tensor on each device using
    the statistics only on that device, which accelerated the computation and
    is also easy to implement, but the statistics might be inaccurate.
    Instead, in this synchronized version, the statistics will be computed
    over all training samples distributed on multiple devices.
    
    Note that, for one-GPU or CPU-only case, this module behaves exactly same
    as the built-in PyTorch implementation.

    The mean and standard-deviation

In [20]:
code_to_nl_batch([code_snippet])

['def batch_norm ( self, num_features, x, num_height, x_width, eps = 1e-5, momentum = 0.1, affine = True ) : if num_features < 1 : raise ValueError ( "num_features must be greater than 0" ) if num_height < 1 : raise ValueError ( "num_height must be greater than 0" ) if x < 0 : raise ValueError ( "x must be greater than 0"']

In [23]:
import re
def extract_and_remove_docstrings(code):
    docstring_pattern = r'(""".*?"""|\'\'\'.*?\'\'\')'
    docstrings = re.findall(docstring_pattern, code, flags=re.DOTALL)
    cleaned_code = re.sub(docstring_pattern, '', code, flags=re.DOTALL)
    return cleaned_code, docstrings

In [25]:
cleaned_code_snippet, docstrings = extract_and_remove_docstrings(code_snippet)

In [28]:
code_to_nl_batch([remove_docstrings(cleaned_code_snippet)])

['A SynchronizedBatchNorm2d class to ensure that the input is 4D.']

In [33]:
from transformers import pipeline
summarizer = pipeline("summarization", "t5-small", device=device)

config.json:   0%|          | 0.00/1.21k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/242M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/147 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/2.32k [00:00<?, ?B/s]

spiece.model:   0%|          | 0.00/792k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.39M [00:00<?, ?B/s]

In [34]:
def summarize_docstrings(docstrings):
    summaries = []
    for docstring in docstrings:
        summary = summarizer(docstring, max_length=50, min_length=25, do_sample=False)
        summaries.append(summary[0]['summary_text'])
    return summaries

In [35]:
summarize_docstrings(docstrings)

Token indices sequence length is longer than the specified maximum sequence length for this model (650 > 512). Running this sequence through the model will result in indexing errors


['\'"Applies Batch Normalization over a 4d input that is seen as a mini-batch of 3d inputs . the mean and standard-deviation are reduced across all devices during training . ']

In [73]:
sample.keys()

dict_keys(['repo_name', 'file_path', 'context', 'import_statement', 'code', 'next_line', 'gold_snippet_index'])

In [36]:
print(docstrings[0])

"""Applies Batch Normalization over a 4d input that is seen as a mini-batch
    of 3d inputs

    .. math::

        y = \frac{x - mean[x]}{ \sqrt{Var[x] + \epsilon}} * gamma + beta

    This module differs from the built-in PyTorch BatchNorm2d as the mean and
    standard-deviation are reduced across all devices during training.

    For example, when one uses `nn.DataParallel` to wrap the network during
    training, PyTorch's implementation normalize the tensor on each device using
    the statistics only on that device, which accelerated the computation and
    is also easy to implement, but the statistics might be inaccurate.
    Instead, in this synchronized version, the statistics will be computed
    over all training samples distributed on multiple devices.
    
    Note that, for one-GPU or CPU-only case, this module behaves exactly same
    as the built-in PyTorch implementation.

    The mean and standard-deviation are calculated per-dimension over
    the mini-batches and 

In [61]:
print(sample['code'])




class KPDetector(nn.Module):
    """
    Detecting canonical keypoints. Return keypoint position and jacobian near each keypoint.
    """

    def __init__(self, block_expansion, feature_channel, num_kp, image_channel, max_features, reshape_channel, reshape_depth,
                 num_blocks, temperature, estimate_jacobian=False, scale_factor=1, single_jacobian_map=False):
        super(KPDetector, self).__init__()




In [64]:
sample.keys()

dict_keys(['repo_name', 'file_path', 'context', 'import_statement', 'code', 'next_line', 'gold_snippet_index'])

In [67]:
print(sample['import_statement'])

from torch import nn
from src.pretrained.face_vid2vid.sync_batchnorm import SynchronizedBatchNorm2d as BatchNorm2d
from src.pretrained.face_vid2vid.modules.util import KPHourglass, make_coordinate_grid, AntiAliasInterpolation2d, ResBottleneck
import torch
import torch.nn.functional as F


In [65]:
sample['file_path']

'src/pretrained/face_vid2vid/modules/keypoint_detector.py'

In [None]:
# next line -> multi line
# code + ml -> describe ml without keywords
# golden context <-> nl ml

In [38]:
sample['next_line']

'        self.predictor = KPHourglass(block_expansion, in_features=image_channel,'

In [None]:
# {[()]} + ' + " + \+ + \*

In [68]:
sample['file_path']

'src/pretrained/face_vid2vid/modules/keypoint_detector.py'

In [69]:
s = """
from torch import nn
from src.pretrained.face_vid2vid.sync_batchnorm import SynchronizedBatchNorm2d as BatchNorm2d
from src.pretrained.face_vid2vid.modules.util import KPHourglass, make_coordinate_grid, AntiAliasInterpolation2d, ResBottleneck
import torch
import torch.nn.functional as F
        self.predictor = KPHourglass(block_expansion, in_features=image_channel,

"""

In [72]:
s.index("self.predictor = KPHourglass(block_expansion, in_features=image_channel,")

297

In [62]:
print(sample['context'][1])

class KPHourglass(nn.Module):
    """
    Hourglass architecture.
    """ 

    def __init__(self, block_expansion, in_features, reshape_features, reshape_depth, num_blocks=3, max_features=256):
        super(KPHourglass, self).__init__()
        
        self.down_blocks = nn.Sequential()
        for i in range(num_blocks):
            self.down_blocks.add_module('down'+ str(i), DownBlock2d(in_features if i == 0 else min(max_features, block_expansion * (2 ** i)),
                                                                   min(max_features, block_expansion * (2 ** (i + 1))),
                                                                   kernel_size=3, padding=1))

        in_filters = min(max_features, block_expansion * (2 ** num_blocks))
        self.conv = nn.Conv2d(in_channels=in_filters, out_channels=reshape_features, kernel_size=1)

        self.up_blocks = nn.Sequential()
        for i in range(num_blocks):
            in_filters = min(max_features, block_expansion * (

In [63]:
code_to_nl_batch([sample['context'][1]])

['KPHourglass architecture.']

In [42]:
"Give detailed description for following code:" + sample['next_line']

'Give detailed description for following code:        self.predictor = KPHourglass(block_expansion, in_features=image_channel,'

In [48]:
print(sample['code'])




class KPDetector(nn.Module):
    """
    Detecting canonical keypoints. Return keypoint position and jacobian near each keypoint.
    """

    def __init__(self, block_expansion, feature_channel, num_kp, image_channel, max_features, reshape_channel, reshape_depth,
                 num_blocks, temperature, estimate_jacobian=False, scale_factor=1, single_jacobian_map=False):
        super(KPDetector, self).__init__()




In [57]:
code, docs = extract_and_remove_docstrings(sample['code'])

In [56]:
code_to_nl_batch([sample['code']])

['def detect_canonical_keypoints ( self ) : self. _check_input ( ) self. _check_input ( ) self. _check_input ( ) self. _check_input ( ) self. _check_input ( ) self. _check_input ( ) self. _check_input ( ) self. _check_input ( ) self. _check_input ( ) self. _check_input ( ) self. _check_input (']

In [59]:
code_to_nl_batch([code])

['def __init__ ( self, block_expansion, feature_channel, max_features, reshape_channel, reshape_depth, estimate_jacobian = False, scale_factor = 1, single_jacobian_map = False ) : super ( KPDetector, self ). __init__ ( block_expansion, feature_channel, max_features, reshape_channel, reshape_depth, estimate_jacobian, scale_factor, single_jacobian']

In [55]:
code_to_nl_batch([sample['next_line']])

['def predictor ( self, image_channel, in_features, * * kwargs ) : if not self. _predictor : self. _predictor = KPHourglass ( image_channel, in_features, * * kwargs ) return self. _predictor']

In [53]:
summarize_docstrings(docs)

Your max_length is set to 50, but your input_length is only 32. Since this is a summarization task, where outputs shorter than the input are typically wanted, you might consider decreasing max_length manually, e.g. summarizer('...', max_length=16)


['""" Detecting canonical keypoints . return keypoint position and jacobian near each keypoint .']

In [43]:
code_to_nl_batch(["Give detailed description for following code:\n" + sample['next_line']])

['def predictor ( self, image_channel, in_features = None ) : if in_features is None : in_features = self. image_channels if in_features is None : in_features = self. image_channels if self. predictor is None : self. predictor = KPHourglass ( image_channel, in_features ) return self. predictor']

In [26]:
samples = []
for i, sample in tqdm(enumerate(raw_samples), total=len(raw_samples)):
    sample['nl_code'] = code_to_nl_batch([sample['code']])[0]
    sample['nl_context'] = code_to_nl_batch(sample['context'])
    samples.append(sample)

In [22]:

len(sample['code'])

3585

In [24]:
print(sample['nl_code'])
print(sample['code'][-500:])

This function returns a function that returns the tokenized answer spans that better match the annotated answer.
lse


def squad_convert_example_to_features(
    example, max_seq_length, doc_stride, max_query_length, padding_strategy, is_training
):
    features = []
    if is_training and not example.is_impossible:
        # Get start and end position
        start_position = example.start_position
        end_position = example.end_position

        # If the answer cannot be found in the text, then skip this example.
        actual_text = " ".join(example.doc_tokens[start_position : (end_position + 1)])



In [27]:
for nl, code in zip(sample['nl_context'], sample['context']):
    print(nl)
    print(code)
    print("*"*50)

Returns True if the TTF language generator is available.
def is_tf_available():
    return _tf_available
**************************************************
Returns True if the torch is available.
def is_torch_available():
    return _torch_available
**************************************************
Runs basic whitespace cleaning and splitting on a piece of text.
def whitespace_tokenize(text):
    """Runs basic whitespace cleaning and splitting on a piece of text."""
    text = text.strip()
    if not text:
        return []
    tokens = text.split()
    return tokens
**************************************************
This class is derived from a dictionary and can be used as a base class for batch encoding.
class BatchEncoding(UserDict):
    """
    Holds the output of the :meth:`~transformers.tokenization_utils_base.PreTrainedTokenizerBase.encode_plus` and
    :meth:`~transformers.tokenization_utils_base.PreTrainedTokenizerBase.batch_encode` methods (tokens,
    attention_masks, etc)