In [None]:
import tensorflow as tf
from tensorflow.keras import backend as K

print(tf.__version__)
print(tf.test.is_built_with_cuda())
print(tf.config.list_physical_devices('GPU'))

In [None]:
files = [("sodata/tensorflow/code.txt", "sodata/tensorflow/title.txt", "sodata/tensorflow/meta.txt"),
         ("sodata/machine-learning_python/code.txt","sodata/machine-learning_python/title.txt","sodata/machine-learning_python/meta.txt"),
         ("sodata/data-science_python/code.txt","sodata/data-science_python/title.txt","sodata/data-science_python/meta.txt"),
         ("sodata/data-cleaning_python/code.txt","sodata/data-cleaning_python/title.txt","sodata/data-cleaning_python/meta.txt"),
         ("sodata/data-science_python/code.txt","sodata/data-science_python/title.txt","sodata/data-science_python/meta.txt")
        ]


In [None]:
def process_code_file(path):
    file = open(path, 'r', encoding="utf-8")
    code_file = file.read()

    processed_code = []

    for entry in code_file.split("\n"):

        filtered_lanes = ""

        for line in entry.split("\\n"):
            line = line.strip()
            line = line.replace("(", " ").replace(")", " ").replace("\"", " ").replace("'", " ").replace("&quot;", " ").replace("_", " ").replace(".", " ").replace(",", " ").replace("=", " ")
            line = line.replace(":", " ").replace("[", " ").replace("]", " ").replace("\\"," ").replace("/"," ")
            line = line.replace("+"," ").replace("-"," ").replace("_"," ").replace("&gt;", " ").replace("{","").replace("}","")
            line = line.replace("%", " ").replace("$", " ")
            # remove numbers
            line = res = re.sub('([0-9]+.[0-9]+|[0-9]+)', ' NUM ', line) 
            
            # Split camelcase
            line = re.sub('([A-Z][a-z]+)', r' \1', re.sub('([A-Z]+)', r' \1', line))
            
            line = line.lower()
            line = line.strip()
            if line.startswith("print") or line.startswith("#") or line.startswith("\"#") or line == "" or line == "\"" :
                continue
            
            # remove non alphanumeric characters
            line = re.sub("[^0-9a-zA-Z ]+", ' ', line)
            
            filtered_lanes += line+" "

        processed_code.append(filtered_lanes)

    file.close()
    return processed_code  

In [None]:
def process_title_file(path):
    file = open(path, 'r', encoding="utf-8")
    tile_file = file.read()
    tile_file = tile_file.replace("?", " ").replace(",", " ").replace(".", " ").replace(")", " ").replace("(", " ")
    tile_file = tile_file.replace(":", " ").replace("`", "").replace("[", " ").replace("]", " ")
    tile_file = tile_file.split("\n")
    return tile_file

In [None]:
def process_meta_file(path):
    file = open(path, 'r', encoding="utf-8")
    meta_file = file.read()
    return meta_file.split("\n")

In [None]:
import random
import re

codes = []
titles = []
urls = []
for file in files:
    codes.extend(process_code_file(file[0]))
    titles.extend(process_title_file(file[1]))
    urls.extend(process_meta_file(file[2]))
    
c = list(zip(codes, titles, urls))

random.shuffle(c)

codes, titles, urls = zip(*c)

codes = list(codes)
titles = list(titles)
urls = list(urls)

In [None]:
test_division_pos =  int(len(codes) * 0.8)

training_codes = codes[:test_division_pos]
training_titles = titles[:test_division_pos]

test_codes = codes[test_division_pos:]
test_titles = titles[test_division_pos:]

print(len(training_codes), len(test_codes))

In [None]:

# tokenizer with code + titles
extended_training = []
extended_training.extend(training_codes)
extended_training.extend(training_titles)
len(extended_training)

In [None]:
from tokenizers import Tokenizer
from tokenizers.models import BPE
from tokenizers.trainers import BpeTrainer
from tokenizers.pre_tokenizers import Whitespace

In [None]:
tokenizer = Tokenizer(BPE(unk_token="[UNK]"))
tokenizer.pre_tokenizer = Whitespace()

tokenizer.enable_padding()

trainer = BpeTrainer(special_tokens=["[UNK]", "[CLS]", "[SEP]", "[PAD]", "[MASK]"]) 

tokenizer.train_from_iterator(extended_training, trainer=trainer)

print("Number of tokens", len(tokenizer.get_vocab()))

In [None]:
outputs = tokenizer.encode_batch(extended_training, add_special_tokens=True)

In [None]:
sentence_len = 0
for output in outputs:
    sentence_len = max(sentence_len, len(output.tokens))

In [None]:
print("Max sentence lenght", sentence_len)

In [None]:
embedding_size = 512
longer_input_size = sentence_len
number_of_tokens = len(tokenizer.get_vocab())

input_layer = tf.keras.Input(shape=(longer_input_size,), name="input")
embeding_layer = tf.keras.layers.Embedding(number_of_tokens, embedding_size, name="embeding")(input_layer)

attention_layer = tf.keras.layers.Attention(name="attention")([embeding_layer, embeding_layer])

print(attention_layer.shape)

sum_layer = tf.keras.layers.Lambda(lambda x: K.sum(x, axis=1), name="sum")( attention_layer)
#average_layer = tf.keras.layers.Lambda(lambda x: K.mean(x, axis=1), name="average")( attention_layer)

model = tf.keras.Model(inputs=[input_layer], outputs=[sum_layer], name='siamese_model')

tf.keras.utils.plot_model(model, "cos_model.png", show_shapes=True, expand_nested=True)

In [None]:
input_code = tf.keras.Input(shape=(longer_input_size,), name="code")
input_desc = tf.keras.Input(shape=(longer_input_size,), name="desc")

output_code = model(input_code)
output_desc = model(input_desc)

cos_sim = tf.keras.layers.Dot(axes=1, normalize=True, name='cos_sim')([output_code, output_desc]) 

cos_model = tf.keras.Model(inputs=[input_code, input_desc], outputs=[cos_sim],name='sim_model')  

tf.keras.utils.plot_model(cos_model, "cos_model.png", show_shapes=True, expand_nested=True)

In [None]:
import numpy as np

training_codes = codes[:test_division_pos]
training_titles = titles[:test_division_pos]


negative_titles = training_titles
negative_codes = training_codes

random.shuffle(negative_codes)

positive_results = np.ones((len(training_codes)))
negative_results = np.zeros((len(negative_titles)))

print(len(training_codes), len(negative_codes),len(positive_results))

In [None]:
final_training_codes = []
final_training_codes.extend(training_codes)
final_training_codes.extend(negative_codes)

final_training_titles = []
final_training_titles.extend(training_titles)
final_training_titles.extend(negative_titles)

results = np.concatenate((positive_results, negative_results), axis=0)

print(len(final_training_codes) , len(final_training_titles), len(results))

In [None]:
for idx, code in enumerate(final_training_codes):
    
    output = tokenizer.encode(code, add_special_tokens=True)
    output.pad(sentence_len, direction="right", pad_token="[PAD]")
    
    final_training_codes[idx] = np.array(output.ids)
    
    if idx%5000 == 0:
        print(idx,"/", len(final_training_codes))

final_training_codes = np.array(final_training_codes)

In [None]:
final_training_codes.shape

In [None]:
for idx, code in enumerate(final_training_titles):
    
    output = tokenizer.encode(code, add_special_tokens=True)
    output.pad(sentence_len, direction="right", pad_token="[PAD]")
    
    final_training_titles[idx] = np.array(output.ids)
    
    if idx%5000 == 0:
        print(idx,"/",len(final_training_titles))

final_training_titles = np.array(final_training_titles)

In [None]:
final_training_titles.shape

In [None]:
cos_model.compile(optimizer="adam", loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
#earlystop_callback = EarlyStopping(monitor='val_loss', mode='min', patience=2)
cos_model.fit(x=[final_training_codes, final_training_titles], y=results, epochs=1, verbose=1, batch_size=32)
              #callbacks=[earlystop_callback])

In [None]:
test_codes = codes[test_division_pos:]
test_titles = titles[test_division_pos:]
positive_test_results = np.ones((len(training_codes)))

for idx, code in enumerate(test_codes):
    
    output = tokenizer.encode(code, add_special_tokens=True)
    output.pad(sentence_len, direction="right", pad_token="[PAD]")
    
    test_codes[idx] = np.array(output.ids)
    
    if idx%2500 == 0:
        print(idx,"/",len(test_codes))

test_codes = np.array(test_codes)


for idx, code in enumerate(test_titles):
    
    output = tokenizer.encode(code, add_special_tokens=True)
    output.pad(sentence_len, direction="right", pad_token="[PAD]")
    
    test_titles[idx] = np.array(output.ids)
    
    if idx%2500 == 0:
        print(idx,"/",len(test_titles))

test_titles = np.array(test_titles)


In [None]:
print(cos_model.metrics_names)
cos_model.evaluate(x=[test_codes, test_titles], y=positive_test_results)