In [None]:
!pip install polars




In [None]:
import polars as pl

splits = {
    'train': 'hf://datasets/blastwind/deprecated-github-code-haskell-function/data/train-*-of-*.parquet',
    'test': 'hf://datasets/blastwind/deprecated-github-code-haskell-function/data/test-*-of-*.parquet',
    'valid': 'hf://datasets/blastwind/deprecated-github-code-haskell-function/data/valid-00000-of-00001-636cb804972d8982.parquet'
}


df = pl.read_parquet(splits['train'])



In [None]:

dataframe = df.to_pandas()

dataframe = dataframe.loc[
    (dataframe["is_commented"] == True) & (dataframe["n_ast_nodes"] < 50),
    ["full_code", "uncommented_code", "function_only_code"]
]


dataframe["comments"] = dataframe.apply(
    lambda row: row["full_code"].replace(row["uncommented_code"], "").strip(), axis=1
)

dataframe = dataframe.head(10000)

dataframe.to_csv('data-small.csv', index=False)



NameError: name 'df' is not defined

In [3]:
import spacy
from collections import Counter

nlp = spacy.load("en_core_web_sm")


def tokenize_and_build_vocab(column):
    vocab_counter = Counter()
    for text in column:
        tokens = [token.text for token in nlp(text.lower())]
        vocab_counter.update(tokens)
    return vocab_counter


vocab_counter = tokenize_and_build_vocab(dataframe["comments"])

vocab = {token: idx for idx, (token, _) in enumerate(vocab_counter.items(), start=4)}  # Starting at index 4 for special tokens
vocab['<unk>'] = 0
vocab['<pad>'] = 1
vocab['<bos>'] = 2
vocab['<eos>'] = 3

def text_to_ids(text, vocab):

    tokens = [token.text for token in nlp(text.lower())]

    return [vocab.get(token, vocab['<unk>']) for token in tokens]

text = dataframe["comments"]
token_ids = text_to_ids(text, vocab)

print(f"Text: {text}")
print(f"Token IDs: {token_ids}")


KeyboardInterrupt: 

In [1]:
import pickle

class CustomTokenizer:
    def __init__(self, oov_token="<UNK>"):
        """
        Initializes the tokenizer with empty mappings and an OOV (out-of-vocabulary) token.
        """
        self.word_to_id = {}
        self.id_to_word = {}
        self.oov_token = oov_token

        self.word_to_id[oov_token] = 0
        self.word_to_id["<start>"] = 1;
        self.word_to_id["<end>"] = 2;
        self.id_to_word[0] = oov_token
        self.id_to_word[1] = "<start>"
        self.id_to_word[2] = "<end>"

    def load(self, word_list):
        """
        Dynamically loads a list of words into the tokenizer and assigns free IDs.

        Args:
            word_list (list of str): The list of words to add to the tokenizer.
        """
        # Ensure all words are lowercase
        word_list = [word.lower() for word in word_list]

        # Start assigning IDs from the current maximum ID + 1
        next_id = max(self.word_to_id.values()) + 1

        for word in word_list:
            if word not in self.word_to_id:
                self.word_to_id[word] = next_id
                self.id_to_word[next_id] = word
                next_id += 1

    def encode(self, text):
        """
        Encodes a string into a list of IDs.

        Args:
            text (str): The input text to encode.

        Returns:
            list of int: List of word IDs.
        """
        words = text.lower().split()  # Convert text to lowercase and split into words
        return [self.word_to_id.get(word, self.word_to_id[self.oov_token]) for word in words]

    def decode(self, ids):
        """
        Decodes a list of IDs into a string.

        Args:
            ids (list of int): List of word IDs to decode.

        Returns:
            str: The decoded string.
        """
        return ' '.join(self.id_to_word.get(i, self.oov_token) for i in ids)

    def export(self, filename):
        """
        Exports the word_to_id and id_to_word mappings to a binary file.

        Args:
            filename (str): The path of the file to save the mappings.
        """
        with open(filename, 'wb') as f:
            pickle.dump((self.word_to_id, self.id_to_word), f)

    def import_mappings(self, filename):
        """
        Loads word_to_id and id_to_word mappings from a binary file.

        Args:
            filename (str): The path of the file to load the mappings from.
        """
        with open(filename, 'rb') as f:
            self.word_to_id, self.id_to_word = pickle.load(f)


# New section

In [2]:
import pandas as pd
dataframe = pd.read_csv('data-small.csv')
tokenizer = CustomTokenizer()
dataframe = dataframe.head(10)
tokenizer = CustomTokenizer()
for comment in dataframe["comments"]:
  tokenizer.load(comment.lower().split())

In [5]:
dataframe.size

40

In [6]:
!pip install tree_sitter==0.23.0
!pip install tree_sitter_haskell



In [3]:
import tree_sitter_haskell as tshaskell
import numpy as np
from tree_sitter import Language, Parser


HS_LANGUAGE = Language(tshaskell.language())
parser = Parser(HS_LANGUAGE)


In [4]:
def extract_tree_features(code):
    try:
        example_bytes = code.encode()
        tree = parser.parse(example_bytes)
        root_node = tree.root_node

        def extract_features(node, parent_index=None, nodes=[], edges=[]):
            node_type = node.type
            start_pos = node.start_point
            end_pos = node.end_point
            code_value = code[node.start_byte:node.end_byte]
            tokenizer.load(node_type.lower().split())
            feature_vector = [node_type]
            node_index = len(nodes)
            nodes.append(feature_vector)

            if parent_index is not None:
                edges.append((parent_index, node_index))

            for child in node.children:
                extract_features(child, parent_index=node_index, nodes=nodes, edges=edges)

            return nodes, edges

        nodes, edge_list = extract_features(root_node)
        code_value = code[root_node.start_byte:root_node.end_byte]
        tokenizer.load(code_value.lower().split())
        num_nodes = len(nodes)
        adj_matrix = np.zeros((num_nodes, num_nodes), dtype=np.float32)
        for parent, child in edge_list:
            adj_matrix[parent, child] = 1

        return nodes, adj_matrix
    except Exception as e:
        print(f"Error parsing code: {e}")
        return None, None
dataframe["nodes"], dataframe["adjacency_matrix"] = zip(*dataframe["function_only_code"].apply(extract_tree_features))


In [None]:
#dataframe.to_csv('data-ast-small.csv', index=False)
#print(dataframe["nodes"].size)
#tokenizer.export("tokens.bin")
import pandas as pd
dataframe = pd.read_csv('data-ast-small.csv')
tokenizer = CustomTokenizer()


FileNotFoundError: [Errno 2] No such file or directory: 'data-ast-small.csv'

[1]


In [None]:
!pip install tensorflow
!pip install keras



In [5]:
import tensorflow as tf
from keras.layers import Layer
from keras import activations
import keras.backend as K

class GCNLayer(Layer):
    def __init__(self, units, activation='relu', initializer='glorot_uniform', sparse=False, use_bias=True, **kwargs):
        self.activation = activations.get(activation)
        self.output_dim = units
        self.initializer = initializer
        self.sparse = sparse
        self.use_bias = use_bias

        super(GCNLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        self.kernel = self.add_weight(name='kernel',
                                          shape=(input_shape[0][-1], self.output_dim),
                                          initializer=self.initializer,
                                          trainable=True)
        if self.use_bias:
            self.bias = self.add_weight(name='bias',
                                              shape=(self.output_dim,),
                                              initializer='zeros',
                                              trainable=True)
        else:
            self.bias = None

        super(GCNLayer, self).build(input_shape)

    def call(self, x):
        assert isinstance(x, list)
        nodes, edges = x
        identity = tf.eye(tf.shape(edges)[-1], batch_shape=[tf.shape(edges)[0]], dtype=edges.dtype)
        edges = edges + identity
        output = tf.matmul(edges,nodes)
        output = tf.matmul(output, self.kernel)

        if self.use_bias:
            output += self.bias

        return self.activation(output)

    def compute_output_shape(self, input_shape):
        assert isinstance(input_shape, list)
        return (None,input_shape[0][1], self.output_dim)

    def get_config(self):
        config = {
            'units': self.output_dim,
            'activation': activations.serialize(self.activation),
        }

        base_config = super(GCNLayer, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))


In [6]:
import keras
import keras.utils
from keras.layers import Input, Dense, Embedding, Activation, concatenate, Flatten, GRU, TimeDistributed, dot
from keras.models import Model

class CodeGNNGRU:
    def __init__(self, config):
        config['modeltype'] = 'codegnngru'

        self.config = config
        self.tdatvocabsize = config['tdatvocabsize']
        self.comvocabsize = config['comvocabsize']
        self.smlvocabsize = config['smlvocabsize']
        self.tdatlen = config['tdatlen']
        self.comlen = config['comlen']
        self.smllen = config['maxastnodes']

        self.config['batch_maker'] = 'graph_multi_1'

        self.embdims = 100
        self.smldims = 256
        self.recdims = 256
        self.tdddims = 256

    def create_model(self):

        tdat_input = Input(shape=(self.tdatlen,))
        com_input = Input(shape=(self.comlen,))
        node_input = Input(shape=(self.smllen,))
        edge_input = Input(shape=(self.smllen, self.smllen))

        tdel = Embedding(output_dim=self.embdims, input_dim=self.tdatvocabsize, mask_zero=False)
        tde = tdel(tdat_input)

        se = tdel(node_input)

        tenc = GRU(self.recdims, return_state=True, return_sequences=True)
        tencout, tstate_h = tenc(tde)

        de = Embedding(output_dim=self.embdims, input_dim=self.comvocabsize, mask_zero=False)(com_input)
        dec = GRU(self.recdims, return_sequences=True)
        decout = dec(de, initial_state=tstate_h)

        tattn = dot([decout, tencout], axes=[2, 2])
        tattn = Activation('softmax')(tattn)
        tcontext = dot([tattn, tencout], axes=[2, 1])

        astwork = se

        for i in range(self.config['asthops']):
            astwork = GCNLayer(100)([astwork, edge_input])

        astwork = GRU(self.recdims, return_sequences=True)(astwork, initial_state=tstate_h)

        aattn = dot([decout, astwork], axes=[2, 2])
        aattn = Activation('softmax')(aattn)
        acontext = dot([aattn, astwork], axes=[2, 1])

        context = concatenate([tcontext, decout, acontext])

        out = TimeDistributed(Dense(self.tdddims, activation="relu"))(context)

        out = Flatten()(out)
        out1 = Dense(self.comvocabsize, activation="softmax")(out)

        model = Model(inputs=[tdat_input, com_input, node_input, edge_input], outputs=out1)

        model.compile(loss='categorical_crossentropy', optimizer='adamax', metrics=['accuracy'])
        return self.config, model


In [None]:

import numpy as np
import tensorflow as tf
from keras.utils import to_categorical
import ast

config = {
    'tdatvocabsize': 1000,
    'comvocabsize': 1000,
    'smlvocabsize':1000,
    'tdatlen': 200,
    'comlen': 500,
    'maxastnodes': 70,
    'asthops': 3,
}
model_instance = CodeGNNGRU(config)
config, model = model_instance.create_model()

tdat_input = np.zeros((len(dataframe) * config['comlen'], config['tdatlen']), dtype=np.float32)
com_input = np.zeros((len(dataframe) * config['comlen'], config['comlen']), dtype=np.float32)
node_input = np.zeros((len(dataframe) * config['comlen'], config['maxastnodes']), dtype=np.float32)
edge_input = np.zeros((len(dataframe) * config['comlen'], config['maxastnodes'], config['maxastnodes']), dtype=np.float32)
target_output = np.zeros((len(dataframe) * config['comlen'], config['comvocabsize']), dtype=np.float32)
sample_idx = 0

for i, (nodes, adj_matrix, tdat_text, com_text) in enumerate(zip(dataframe["nodes"], dataframe["adjacency_matrix"], dataframe["function_only_code"],dataframe["comments"])):
    tdat_encoded = tokenizer.encode(tdat_text)
    com_encoded = [1] + tokenizer.encode(com_text) + [2]
    num_nodes = min(len(nodes), int(config['maxastnodes']))
    node_encoded = []
    for node in nodes[:num_nodes]:
        encoded_node = tokenizer.encode(node[0])
        if len(encoded_node) > 1:
          node_encoded.append(encoded_node[0])

    for j in range(len(com_encoded)):
        target_output[sample_idx, com_encoded[j]] = 1
        tdat_input[sample_idx, :len(tdat_encoded)] = tdat_encoded[:config["tdatlen"]]
        com_input[sample_idx, :len(com_encoded)] = com_encoded[:config["comlen"]]
        node_input[sample_idx, :len(node_encoded)] = node_encoded[:num_nodes]
        edge_input[sample_idx, :num_nodes, :num_nodes] = adj_matrix[:num_nodes, :num_nodes]
        sample_idx +=1

print(tdat_input.shape)
print(com_input.shape)
print(node_input.shape)
print(edge_input.shape)
print(target_output.shape)
print(tdat_input)
print(com_input)
print(node_input)
print(edge_input)
print(target_output)


optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
model.compile(
    loss='categorical_crossentropy',
    optimizer=optimizer,
    metrics=['accuracy']
)

model.fit(
    [tdat_input, com_input, node_input, edge_input],
    target_output,
    epochs=5
)


(5000, 200)
(5000, 500)
(5000, 70)
(5000, 70, 70)
(5000, 1000)
[[136.   6. 137. ...   0.   0.   0.]
 [136.   6. 137. ...   0.   0.   0.]
 [136.   6. 137. ...   0.   0.   0.]
 ...
 [  0.   0.   0. ...   0.   0.   0.]
 [  0.   0.   0. ...   0.   0.   0.]
 [  0.   0.   0. ...   0.   0.   0.]]
[[1. 3. 4. ... 0. 0. 0.]
 [1. 3. 4. ... 0. 0. 0.]
 [1. 3. 4. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
[[0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 ...
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]
 [0. 0. 0. ... 0. 0. 0.]]
[[[0. 1. 0. ... 0. 0. 0.]
  [0. 0. 1. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  ...
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]]

 [[0. 1. 0. ... 0. 0. 0.]
  [0. 0. 1. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  ...
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]]

 [[0. 1. 0. ... 0. 0. 0.]
  [0. 0. 1. ... 0. 0. 0.]
  [0. 0. 0. ... 0

In [8]:
model.save('code_gnn_model.keras')
print("Model saved to 'code_gnn_model.h5'")

Model saved to 'code_gnn_model.h5'


In [9]:
def predict_commentary_greedy(model, tokenizer, tdat_encoded, node_encoded, edge_matrix, config, max_length=20):
    """
    Predict commentary for given input in a greedy manner.

    Args:
        model: The trained CodeGNNGRU model.
        tokenizer: Tokenizer to encode/decode text.
        tdat_encoded: Encoded representation of the function code.
        node_encoded: Encoded representation of the AST nodes.
        edge_matrix: Adjacency matrix of the AST.
        config: Configuration dictionary containing lengths and vocab sizes.
        max_length: Maximum length of the generated commentary.

    Returns:
        Generated commentary string.
    """
    # Initialize inputs
    tdat_input = np.zeros((1, config['tdatlen']), dtype=np.float32)
    node_input = np.zeros((1, config['maxastnodes']), dtype=np.float32)
    edge_input = np.zeros((1, config['maxastnodes'], config['maxastnodes']), dtype=np.float32)
    com_input = np.zeros((1, config['comlen']), dtype=np.float32)

    # Populate input tensors
    tdat_input[0, :len(tdat_encoded)] = tdat_encoded[:config['tdatlen']]
    node_input[0, :len(node_encoded)] = node_encoded[:config['maxastnodes']]
    edge_input[0, :len(edge_matrix), :len(edge_matrix)] = edge_matrix[:config['maxastnodes'], :config['maxastnodes']]

    # Initialize generated commentary with the start token
    generated_commentary = [tokenizer.encode('<start>')[0]]

    for _ in range(max_length):
        com_input[0, :len(generated_commentary)] = generated_commentary
        predictions = model.predict([tdat_input, com_input, node_input, edge_input], verbose=0)
        next_token = np.argmax(predictions)
        generated_commentary.append(next_token)
        decoded_commentary = tokenizer.decode(generated_commentary)
        if next_token == tokenizer.encode('<end>')[0]:
            break

    decoded_commentary = tokenizer.decode(generated_commentary)
    return decoded_commentary


In [10]:

code='''
factorial :: Integer -> Integer
factorial 0 = 1
factorial n = n * factorial (n - 1)

'''
tdat_encoded = tokenizer.encode(code)
nodes, edges = extract_tree_features(code)
num_nodes = min(len(nodes), int(config['maxastnodes']))
node_encoded = []
for node in nodes[:num_nodes]:
    encoded_node = tokenizer.encode(node[0])
    if len(encoded_node) > 1:
      node_encoded.append(encoded_node[0])
predict_commentary_greedy(model, tokenizer, tdat_encoded,node_encoded, edges, config)

'<start> <end>'

In [2]:
!pip freeze

absl-py==1.4.0
accelerate==1.1.1
aiohappyeyeballs==2.4.3
aiohttp==3.11.1
aiosignal==1.3.1
alabaster==1.0.0
albucore==0.0.19
albumentations==1.4.20
altair==4.2.2
annotated-types==0.7.0
anyio==3.7.1
argon2-cffi==23.1.0
argon2-cffi-bindings==21.2.0
array_record==0.5.1
arviz==0.20.0
astropy==6.1.6
astropy-iers-data==0.2024.11.11.0.32.38
astunparse==1.6.3
async-timeout==4.0.3
atpublic==4.1.0
attrs==24.2.0
audioread==3.0.1
autograd==1.7.0
babel==2.16.0
backcall==0.2.0
beautifulsoup4==4.12.3
bigframes==1.26.0
bigquery-magics==0.4.0
bleach==6.2.0
blinker==1.9.0
blis==0.7.11
blosc2==2.7.1
bokeh==3.6.1
Bottleneck==1.4.2
bqplot==0.12.43
branca==0.8.0
CacheControl==0.14.1
cachetools==5.5.0
catalogue==2.0.10
certifi==2024.8.30
cffi==1.17.1
chardet==5.2.0
charset-normalizer==3.4.0
chex==0.1.87
clarabel==0.9.0
click==8.1.7
cloudpathlib==0.20.0
cloudpickle==3.1.0
cmake==3.30.5
cmdstanpy==1.2.4
colorcet==3.1.0
colorlover==0.3.0
colour==0.1.5
community==1.0.0b1
confection==0.1.5
cons==0.4.6
contourpy==1