## Project Description: Next Code Token Prediction Using LSTM
#### Project Overview:

This project aims to develop a self-supervied learning model for predicting the next code token in a given code snippet. The model is built using Long Short-Term Memory (LSTM) networks, which are well-suited for sequence prediction tasks. The project includes the following steps:

1- Data Collection: We use py150 from kaggle as our dataset. This rich, complex code provides a good challenge for our model.

2- Data Preprocessing: The text data is tokenized, converted into sequences, and padded to ensure uniform input lengths. The sequences are then split into training and testing sets.

3- Model Building: An LSTM model is constructed with an embedding layer, two LSTM layers, and a dense output layer with a softmax activation function to predict the probability of the next word.

4- Model Training: The model is trained using the prepared sequences, with early stopping implemented to prevent overfitting. Early stopping monitors the validation loss and stops training when the loss stops improving.

5- Model Evaluation: The model is evaluated using a set of example codes to test its ability to predict the next token accurately.

6- Deployment: A Streamlit web application is developed to allow users to input a sequence of tokens and get the predicted next token in real-time.

In [None]:
import os
import tokenize


def tokenize_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read().replace('\t', '    ')  
            tokens = tokenize.generate_tokens(iter(content.splitlines()).__next__)
            token_list = []
            for token in tokens:
                if token.type not in (tokenize.COMMENT, tokenize.NL):
                    if token.string == '':
                        continue
                    token_list.append(token.string)
            return token_list
    except tokenize.TokenError as e:
        return []
    except Exception as e:
        return []

def tokenize_directory(directory_path):
    tokenized_files = {}
    
    for filename in os.listdir(directory_path):
        if filename.endswith('.py'):
            file_path = os.path.join(directory_path, filename)
            tokens = tokenize_file(file_path)
            tokenized_files[filename] = tokens
    
    return tokenized_files


directory_path = "reduced_set"

tokenized_data = tokenize_directory(directory_path)

print(len(tokenized_data))
print(type(tokenized_data))

for idx, (keys, values) in enumerate(tokenized_data.items()):
    if idx >= 5:  
        break
    print(keys, values)

1235
<class 'dict'>
test_label.py ['from', '__future__', 'import', 'division', ',', 'print_function', ',', 'unicode_literals', 'import', 'sys', 'import', 'os', 'sys', '.', 'path', '.', 'insert', '(', '0', ',', 'os', '.', 'path', '.', 'join', '(', 'os', '.', 'path', '.', 'dirname', '(', '__file__', ')', ',', "'..'", ')', ')', 'testinfo', '=', '"s, t 5, s, t 10.1, s, q"', 'tags', '=', '"Label, text, ScaleTo"', 'import', 'cocos', 'from', 'cocos', '.', 'director', 'import', 'director', 'from', 'cocos', '.', 'sprite', 'import', 'Sprite', 'from', 'cocos', '.', 'actions', 'import', '*', 'from', 'cocos', '.', 'text', 'import', '*', 'import', 'pyglet', 'class', 'TestLayer', '(', 'cocos', '.', 'layer', '.', 'Layer', ')', ':', '    ', 'def', '__init__', '(', 'self', ')', ':', '        ', 'super', '(', 'TestLayer', ',', 'self', ')', '.', '__init__', '(', ')', 'x', ',', 'y', '=', 'director', '.', 'get_window_size', '(', ')', 'self', '.', 'text', '=', 'Label', '(', '"hello"', ',', '(', 'x', '//', '2

In [None]:
def build_vocabulary(tokenized_data):
    """
    Build a vocabulary mapping tokens to unique IDs.
    """
    all_tokens = [token for tokens in tokenized_data.values() for token in tokens]
    vocab = {token: idx for idx, token in enumerate(set(all_tokens), start=1)}  
    return vocab



def convert_tokens_to_ids(tokenized_data, vocab):
    """
    Convert tokenized data into token IDs using the vocabulary.
    """
    token_ids_data = {
        filename: [vocab[token] for token in tokens if token in vocab]
        for filename, tokens in tokenized_data.items()
    }
    return token_ids_data

vocab = build_vocabulary(tokenized_data)
for idx, (keys, values) in enumerate(vocab.items()):
    if idx >= 5:  
        break
    print(keys, values)

token_ids_data = convert_tokens_to_ids(tokenized_data, vocab)

for idx, (keys, values) in enumerate(token_ids_data.items()):
    if idx >= 5:  
        break
    print(keys, values)


solver_id 1
O_CREAT 2
"Method %s not found" 3
test_reduce 4
'subfield2' 5
test_label.py [20411, 24400, 11408, 28511, 9713, 7431, 9713, 42954, 11408, 23525, 11408, 16292, 23525, 10979, 29264, 10979, 30659, 10276, 26342, 9713, 16292, 10979, 29264, 10979, 37479, 10276, 16292, 10979, 29264, 10979, 4311, 10276, 6282, 2641, 9713, 7695, 2641, 2641, 29357, 38511, 13184, 26505, 38511, 41343, 11408, 13396, 20411, 13396, 10979, 34853, 11408, 34853, 20411, 13396, 10979, 9166, 11408, 26750, 20411, 13396, 10979, 15170, 11408, 40722, 20411, 13396, 10979, 21312, 11408, 40722, 11408, 26657, 34600, 8483, 10276, 13396, 10979, 10632, 10979, 5629, 2641, 32233, 40407, 23931, 34058, 10276, 41912, 2641, 32233, 37222, 23962, 10276, 8483, 9713, 41912, 2641, 10979, 34058, 10276, 2641, 6513, 9713, 42402, 38511, 34853, 10979, 34999, 10276, 2641, 41912, 10979, 21312, 38511, 7280, 10276, 41139, 9713, 10276, 6513, 5304, 19123, 9713, 42402, 5304, 19123, 2641, 2641, 41912, 10979, 21312, 10979, 33969, 10276, 7963, 10276

In [3]:
import pickle
with open("vocab1.pkl", "wb") as file:
    pickle.dump(vocab, file)

In [None]:
def prepare_sequences(token_ids_data, sequence_length=4):
    """
    Prepare input-output pairs for training a model.
    Each input is a sequence of token IDs, and the output is the next token ID.
    """
    x = []
    y = []
    for token_ids in token_ids_data.values():
        for i in range(len(token_ids) - sequence_length):
            input_seq = token_ids[i:i + sequence_length]
            output_token = token_ids[i + sequence_length]
            x.append(input_seq)
            y.append(output_token)
    return x, y

sequence_length = 4
x,y = prepare_sequences(token_ids_data, sequence_length)

print("Training Data:")
for i in range(5):  
    print(f"Input: {x[i]}, Output: {y[i]}")

Training Data:
Input: [20411, 24400, 11408, 28511], Output: 9713
Input: [24400, 11408, 28511, 9713], Output: 7431
Input: [11408, 28511, 9713, 7431], Output: 9713
Input: [28511, 9713, 7431, 9713], Output: 42954
Input: [9713, 7431, 9713, 42954], Output: 11408


In [None]:
print(len(x), len(y))

814010 814010


In [None]:
from sklearn.model_selection import train_test_split
import numpy as np

x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2, random_state=42)

x_train = np.array(x_train)
x_test = np.array(x_test)

y_train = np.array(y_train)
y_test = np.array(y_test)

In [7]:
print(len(x_train), len(y_train), len(x_test), len(y_test))

651208 651208 162802 162802


In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Embedding,LSTM,Dense,Dropout,GRU

total_words = len(vocab) + 1
max_sequence_len = 4

model=Sequential()
model.add(Embedding(total_words,100,input_length=max_sequence_len-1))
model.add(LSTM(150,return_sequences=True))
model.add(Dropout(0.2))
model.add(LSTM(100))
model.add(Dense(total_words,activation="softmax"))

model.compile(loss="sparse_categorical_crossentropy",optimizer='adam',metrics=['accuracy'])

2024-11-30 00:38:01.365332: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:477] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1732907281.381006   72508 cuda_dnn.cc:8310] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1732907281.385963   72508 cuda_blas.cc:1418] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-11-30 00:38:01.403289: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
I0000 00:00:1732907282.956443   72508 gpu_device.cc:2022] Created device /job:localhost/replica:0/task:0/device:GPU:0 with 27

In [None]:
history=model.fit(x_train,y_train,epochs=25,validation_data=(x_test,y_test),verbose=1)
model.summary()

Epoch 1/25


I0000 00:00:1732907285.986743   72594 cuda_dnn.cc:529] Loaded cuDNN version 90300


[1m20351/20351[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m324s[0m 16ms/step - accuracy: 0.2891 - loss: 4.8415 - val_accuracy: 0.4585 - val_loss: 3.7151
Epoch 2/25
[1m20351/20351[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m319s[0m 16ms/step - accuracy: 0.4794 - loss: 3.4349 - val_accuracy: 0.5032 - val_loss: 3.4204
Epoch 3/25
[1m20351/20351[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m320s[0m 16ms/step - accuracy: 0.5333 - loss: 2.9790 - val_accuracy: 0.5288 - val_loss: 3.2770
Epoch 4/25
[1m20351/20351[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m320s[0m 16ms/step - accuracy: 0.5675 - loss: 2.6814 - val_accuracy: 0.5452 - val_loss: 3.2065
Epoch 5/25
[1m20351/20351[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m319s[0m 16ms/step - accuracy: 0.5901 - loss: 2.4721 - val_accuracy: 0.5543 - val_loss: 3.1653
Epoch 6/25
[1m20351/20351[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m320s[0m 16ms/step - accuracy: 0.6069 - loss: 2.3125 - val_accuracy: 0.5581 - val_loss: 3.13

In [None]:
def predict_next_word(model, tokenizer, text, max_sequence_len):
    token_list = tokenizer.texts_to_sequences([text])[0]
    if len(token_list) >= max_sequence_len:
        token_list = token_list[-(max_sequence_len-1):]  
    token_list = pad_sequences([token_list], maxlen=max_sequence_len-1, padding='pre')
    predicted = model.predict(token_list, verbose=0)
    predicted_word_index = np.argmax(predicted, axis=1)
    for word, index in tokenizer.word_index.items():
        if index == predicted_word_index:
            return word
    return None

In [None]:
model.save("next_word_lstm.h5")