In [None]:
from datasets import Dataset, load_dataset
from typing import TypedDict, Literal, List

In [None]:
CodeSearchNetLanguage = Literal['python', 'go', 'java', 'javascript', 'php', 'ruby']
CodeSearchNetSplit = Literal['train', 'test', 'validation']

class CodeSearchNetSample(TypedDict):
  repository_name: str
  func_path_in_repository: str
  func_name: str
  whole_func_string: str
  language: CodeSearchNetLanguage
  func_code_string: str
  func_code_tokens: List[str]
  func_documentation_string: str
  func_documentation_string_tokens: List[str]
  split_name: CodeSearchNetSplit
  func_code_url: str

# Pre-process and save pairs

In [None]:
from typing import cast

dataset_name = "code_search_net"

def load(language: CodeSearchNetLanguage, split: CodeSearchNetSplit, take: int) -> Dataset:
  ds = cast(Dataset, load_dataset(dataset_name, language, split=split))
  return Dataset.from_dict(ds[:take])

# Generate negative samples

In [None]:
from typing import Iterator
import numpy as np
from numpy.random import default_rng


random_generator = default_rng(seed=42)
batch_size = 100

def generate_negative_samples(iterator: Iterator, negative_samples_per_sample: int):
  for batched_sample in iterator:
    processed_codes = [' '.join(code_tokens) for code_tokens in batched_sample['func_code_tokens']]
    processed_comments = [' '.join(comment_tokens) for comment_tokens in batched_sample['func_documentation_tokens']]
    batch_indexes = range(len(processed_codes))

    for index, code, comment in zip(batch_indexes, processed_codes, processed_comments):
      indexes = [i for i in batch_indexes if i != index]
      negative_indexes = random_generator.choice(indexes, negative_samples_per_sample, replace=False)
      negative_comments = np.array(processed_comments)[negative_indexes]
      for negative_comment in negative_comments:
        yield {
          "code": code,
          "comment_positive": comment,
          "comment_negative": negative_comment,
        }

def pre_process_sample(sample):
  return {
    "code": ' '.join(sample['func_code_tokens']),
    "comment": ' '.join(sample['func_documentation_tokens'])
  }

train_ds: Dataset = load('python', 'train', take=1000).map(pre_process_sample)

full_ds: Dataset = Dataset.from_generator(lambda: generate_negative_samples(train_ds.iter(batch_size=batch_size), 3)) # type: ignore

full_ds[0]

# Train

In [None]:
from keras.preprocessing.text import Tokenizer


tokenizer = Tokenizer()
processed_comments = [' '.join(sample['func_documentation_tokens']) for sample in load('python', 'train', 100)] # type: ignore
tokenizer.fit_on_sequences(processed_comments)

In [None]:
from keras.layers import Input, Embedding, Flatten, Concatenate, Dense
from keras.models import Model

max_sequence_length = 100


def model_test():
  # Define input layers
  input_layer1 = Input(shape=(max_sequence_length,), name='code')
  input_layer2 = Input(shape=(max_sequence_length,), name='comment_positive')
  input_layer3 = Input(shape=(max_sequence_length,), name='comment_negative')

  print(input_layer1)
  print(input_layer2)
  print(input_layer3)

  # Embedding layers
  embedding_dim = 50
  embedding_layer1 = Embedding(input_dim=len(tokenizer.word_index) + 1, output_dim=embedding_dim)(input_layer1)
  embedding_layer2 = Embedding(input_dim=len(tokenizer.word_index) + 1, output_dim=embedding_dim)(input_layer2)
  embedding_layer3 = Embedding(input_dim=len(tokenizer.word_index) + 1, output_dim=embedding_dim)(input_layer3)

  # Flatten the embedding layers
  flatten_layer1 = Flatten()(embedding_layer1)
  flatten_layer2 = Flatten()(embedding_layer2)
  flatten_layer3 = Flatten()(embedding_layer3)

  # Concatenate the flattened embeddings
  concatenated = Concatenate()([flatten_layer1, flatten_layer2, flatten_layer3])

  # Dense layer for classification
  dense_layer = Dense(64, activation='relu')(concatenated)

  # Output layer
  output_layer = Dense(1, activation='sigmoid', name='output')(dense_layer)

  # Create the model
  model = Model(inputs=[input_layer1, input_layer2, input_layer3], outputs=output_layer)

  # Compile the model
  model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'], run_eagerly=True)

  return model

In [None]:
model = model_test()


In [None]:
train_tf_dataset = full_ds.to_tf_dataset(batch_size=batch_size)

model.fit(train_tf_dataset, epochs=10, batch_size=batch_size)