## On the Naturalness of Software Experiment

In [1]:
%pip install --upgrade tiktoken
%pip install --upgrade openai
%pip install numpy 
%pip install jdc

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


### The experiment is built on top of the following statistical formulas:

- The language model is based on the n-gram theory:
$$p(a_4|a_1a_2a_3) = \frac{\text{count}(a_1a_2a_3a_4)}{\text{count}(a_1a_2a_3*)}$$

- These models are estimated on a corpus using maximum likelihood based frequency-counting of token sequences. Thus, if “∗” is a wildcard, we can estimate the probability that \(a_4\) follows the tokens \(a_1\), \(a_2\), \(a_3\) with:
$$p(a_i|a_1...a_{i-1}) \approx p(a_i|a_{i-3}a_{i-2}a_{i-1})$$

- Validation: use cross-entropy to validate the quality of the language model. Cross-entropy is, in a sense, a measure of the "surprise" experienced by the model when it sees the actual data:
$$H_M(s) = -\frac{1}{n} \sum_{i=1}^{n} \log p_M(a_i|a_1...a_{i-1})$$



#### necessary import

In [2]:
import tiktoken

import numpy as np
import heapq 
import math
import jdc

#### build a model object for the trigram language model
Since the paper used trigram, I also used trigram for minimized token input and more efficient memory useage.
##### variables:
- `limit` is the number of suggestions that the model will provide. 
- `model_type` denotes what the model will be trained on (natural language, programing language)
- `distribution` will store probability distribution of third token after each of the two token combination.
- `train_set` and `experiment_set` will sepaprate training and experiment set for validation.

In [3]:
class trigram_model:
    def __init__(self, limit=5, model_type="code"):
        self.limit = limit
        self.type = model_type
        self.distribution = {}
        self.data = []  
        self.train_set = []
        self.experiment_set = []

    def get_type(self):
        """
        return the type of data the model is trained on
        :return: type of data
        """
        return self.type
    def get_limit(self):
        """
        return the maximum number of suggestions the model can provide
        :return: suggestion limit 
        """
        return self.limit

#### train() for training the model against a specific data set.
for each two token combination, record the third token and track it's frequency.

In [4]:
%%add_to trigram_model
def train(self, data):
    """
    Trains the model using the provided list of tokens.
    :param data: List of tokens to train the model.
    """
    self.data = data
    unique_token_size = len(set(data))
    total_size = len(data)
    print(f"the {self.get_type()} model is trained based on {total_size} token(s)")
    print(f"the {self.get_type()} model is trained based on {unique_token_size} unique token(s)")
    experiment_size = int(total_size * 0.1)  # Size of the experimental set (10% of the total)

    # To ensure that the experiment set is a continuous chunk, we select a random starting index
    start_index = np.random.randint(0, total_size - experiment_size)

    # Define the end index for the experiment set
    end_index = start_index + experiment_size

    # Split the original data into training and experiment sets while maintaining continuity
    self.experiment_set = data[start_index:end_index]
    self.train_set = data[:start_index] + data[end_index:]

    # Now, you can proceed with the training on the self.train_set
    for i in range(len(self.train_set) - 2):
        # Create the trigram parts
        token1, token2, token3 = self.train_set[i], self.train_set[i + 1], self.train_set[i + 2]
        key = (token1, token2)

        # If the key already exists in the distribution, update the frequency of the third token
        if key in self.distribution:
            if token3 in self.distribution[key]:
                self.distribution[key][token3] += 1  # Increment the count for the existing third token
            else:
                self.distribution[key][token3] = 1  # Initialize the count for the new third token
        else:
            # If the key doesn't exist, create a new entry in the distribution
            self.distribution[key] = {token3: 1}

##### deterministic_train() do the same thing except that the training set and experiment set are fixed for each training iteration. use this method to stably test the model's reliability with changing dataset.

In [5]:
%%add_to trigram_model
def deterministic_train(self, data):
    """
    Trains the model using the provided list of tokens.
    :param data: List of tokens to train the model.
    """
    self.data = data
    unique_token_size = len(set(data))
    total_size = len(data)
    print(f"the {self.get_type()} model is trained based on {total_size} token(s)")
    print(f"the {self.get_type()} model is trained based on {unique_token_size} unique token(s)")
    experiment_size = int(total_size * 0.1)  # Size of the experimental set (10% of the total)
    # Split the original data into training and experiment sets while maintaining continuity
    self.experiment_set = data[:experiment_size]
    self.train_set = data[experiment_size:]

    # Now, you can proceed with the training on the self.train_set
    for i in range(len(self.train_set) - 2):
        # Create the trigram parts
        token1, token2, token3 = self.train_set[i], self.train_set[i + 1], self.train_set[i + 2]
        key = (token1, token2)

        # If the key already exists in the distribution, update the frequency of the third token
        if key in self.distribution:
            if token3 in self.distribution[key]:
                self.distribution[key][token3] += 1  # Increment the count for the existing third token
            else:
                self.distribution[key][token3] = 1  # Initialize the count for the new third token
        else:
            # If the key doesn't exist, create a new entry in the distribution
            self.distribution[key] = {token3: 1}

##### re-train the dataset

In [6]:
%%add_to trigram_model
def retrain(self):
    """
    Retrains the model with the original training data.
    the training and experiment set will be randomized
    """
    if self.data:
        self.train(self.data)

#### make prediction on given tokens. only give up to `limit` amount of suggestions

In [7]:
%%add_to trigram_model
def predict(self, tokens):
        """
        Predicts the next set of tokens based on the input.
        :param tokens: List of tokens to base the prediction on.
        :return: A list of predicted tokens.
        """

        if not isinstance(tokens, tuple) or len(tokens) != 2:
            raise ValueError("Input must be a tuple of exactly two tokens.")

        # Check if the token pair is in the distribution.
        if tuple(tokens) not in self.distribution:
            # print("Token sequence not found in the training data.")
            return []

        # Get all possible continuations and their frequencies.
        possible_tokens = self.distribution[tuple(tokens)]

        # If there are fewer continuations than the limit, return all of them.
        if len(possible_tokens) <= self.limit:
            return list(possible_tokens.keys())

        # Otherwise, we need to extract the 'self.limit' most frequent continuations.
        # 'heapq.nlargest' helps efficiently find the largest elements in a collection.
        # We use a lambda function to specify that we're comparing the values (frequencies) in the dictionary.
        most_frequent_tokens = heapq.nlargest(self.limit, possible_tokens, key=lambda x: possible_tokens[x])
        return most_frequent_tokens

#### calculate self entropy and cross entropy. self entropy will be calculated from experiment set; cross entropy will be based on provided data.

In [8]:
%%add_to trigram_model
def calculate_self_entropy(self):
        """
        Calculates the cross-entropy for the experiment set based on the trigram model.
        :return: The cross-entropy value.
        """
        n = len(self.experiment_set) - 2  # since we're working with trigrams
        if n <= 0:
            return 0  # Avoid division by zero or taking log of zero. Handle the edge case.

        total_log_probability = 0.0

        # Iterate through the experiment set with a step of 3 since it's a trigram model
        for i in range(n):
            # Extract trigram
            a1, a2, a3 = self.experiment_set[i], self.experiment_set[i+1], self.experiment_set[i+2]
            
            # Fetch the conditional probability of the trigram from the distribution
            bigram_prob = self.distribution.get((a1, a2), {})
            trigram_prob = bigram_prob.get(a3, 0)
            
            # To get conditional probability, we need to normalize by the sum of all possibilities for the bigram
            conditional_prob = trigram_prob / (sum(bigram_prob.values()) or 1)
            
            # The provided formula uses log base e. If the probability is 0, it's undefined, so we skip it.
            if conditional_prob > 0:
                total_log_probability += math.log(conditional_prob)

        # Compute the cross-entropy
        entropy = - total_log_probability / n
        return entropy

def calculate_cross_entropy(self,data):
    """
    Predicts the next set of tokens based on the input.
    :param tokens: List of tokens to base the prediction on.
    :return: A list of predicted tokens.
    """
    # Determine the length for 10% of the data
    ten_percent_length = len(data) // 10
    if ten_percent_length <= 2:
        return 0  # Avoid taking log of zero and ensure we have at least one trigram. Handle the edge case.

    # Randomly select a starting point for the 10% segment
    start_idx = np.random.randint(0, len(data) - ten_percent_length + 1)
    subset = data[start_idx:start_idx + ten_percent_length]

    n = len(subset) - 2  # since we're working with trigrams
    total_log_probability = 0.0

    # Iterate through the subset with a step of 1 since it's a trigram model
    for i in range(n):
        # Extract trigram
        a1, a2, a3 = subset[i], subset[i + 1], subset[i + 2]

        # Fetch the conditional probability of the trigram from the distribution
        bigram_prob = self.distribution.get((a1, a2), {})
        trigram_prob = bigram_prob.get(a3, 0)

        # To get conditional probability, we need to normalize by the sum of all possibilities for the bigram
        conditional_prob = trigram_prob / (sum(bigram_prob.values()) or 1)

        # The provided formula uses log base e. If the probability is 0, it's undefined, so we skip it.
        if conditional_prob > 0:
            total_log_probability += math.log(conditional_prob)

    # Calculate the cross-entropy according to the formula
    cross_entropy = -total_log_probability / n
    return cross_entropy

#### calculate average accuracy of the model. the avg accuracy is calculated by making prediction on each of the two token combinations in experiment set and if the provided suggestions include the correct answer, it is considered successful and vice versa.

In [9]:
%%add_to trigram_model
def calculate_avg_accuracy(self):
    """
    Calculate the average prediction accuracy based on the experiment_set.
    :return: The average accuracy as a float.
    """
    
    correct_predictions = 0
    total_predictions = 0
    
    # Iterate over the experiment_set with an index
    for i in range(len(self.experiment_set) - 2):
        # Get the actual tokens
        actual_tokens = self.experiment_set[i:i+3]
        
        # Get the predicted tokens using the first two tokens
        predicted_tokens = self.predict(tuple(actual_tokens[:2]))
        
        # Check if the third actual token is in the predicted tokens
        if actual_tokens[2] in predicted_tokens:
            correct_predictions += 1
        
        total_predictions += 1
    
    # Return the average accuracy
    return correct_predictions / total_predictions if total_predictions > 0 else 0

#### utilize openai's tokenizer to convert raw text into token

In [10]:
def tokenize(file_path):
    encoding = tiktoken.get_encoding("cl100k_base")
    data = []
    with open(file_path, 'r', encoding='utf-8') as file:
        # Read and print each line
        for line in file:
            data.extend(encoding.encode(line))
    return data

#### tokenize the dataset and feed them into the model

In [11]:
english_file_path = "data/english.txt"
python_file_path = "data/Python_code_data.txt"

model_eng = trigram_model(limit = 3, model_type = "natural language")
model_py = trigram_model(limit = 3, model_type = "programming language")
data1 = tokenize(english_file_path)
data2 = tokenize(python_file_path)
model_eng.deterministic_train(data1)
model_py.deterministic_train(data2)

the natural language model is trained based on 2599953 token(s)
the natural language model is trained based on 31955 unique token(s)
the programming language model is trained based on 237536 token(s)
the programming language model is trained based on 7499 unique token(s)


#### Run the model and gather insights!

In [12]:
self_entropy1 = model_eng.calculate_self_entropy()
avg_accuracy1 = model_eng.calculate_avg_accuracy()
cross_entropy1 = model_eng.calculate_cross_entropy(data2)

self_entropy2 = model_py.calculate_self_entropy()
avg_accuracy2 = model_py.calculate_avg_accuracy()
cross_entropy2 = model_py.calculate_cross_entropy(data1)

print("self_entropy of model training on english: ", self_entropy1)
print("self_entropy of model training on python: ", self_entropy2)
print("cross_entropy of model training on english: ", cross_entropy1)
print("cross_entropy of model training on python: ", cross_entropy2)
print("avg accuracy of model training on english: ", avg_accuracy1)
print("avg accuracy of model training on python: ", avg_accuracy2)

self_entropy of model training on english:  1.2718418231084239
self_entropy of model training on python:  1.0818544238227277
cross_entropy of model training on english:  0.1272424575200946
cross_entropy of model training on python:  0.0023872427918150514
avg accuracy of model training on english:  0.25172985426530714
avg accuracy of model training on python:  0.60923750578923


### challenges along the journey
1. Dealing with unknown token
- problem can be resolvewd with Kneser-Ney algorithm and Laplace smoothing
- for simplicity I ignore the unseen token
2. Data collection
- I searched from github, online libraries for data
- programmatically parse necessary data from raw data in different sources.
- Available dataset are too big to train on. 
3. tokenizer
- I want to write my own tokenizer but parsing code token it's much more complicated then I thought
- Then I used openai's tokenizer for parsing code
4. shortage of dataset
- due to shortage of data, its hard to perform cross entropy
5. The data I collected is too far from the experiment.
- Although the accuracy of my prediction is even better than what the paper has proposed, the entropy I calculated does not align with their's