Import necessary **modules**:

In [4]:
%matplotlib inline
import os
import artm
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import csv
import tree_sitter
from tqdm import tqdm
from datetime import datetime, timedelta
from tree_sitter import Language, Parser
from glob import glob
from collections import Counter
from operator import itemgetter
from typing import List, Tuple

Write a function to get the dates.

In [5]:
def get_dates(number: int, delta: int) -> List:
    """
    Creates a list of a given number of the datetime objects with a given step.
    :param number: the amount of dates.
    :param delta: the time step between dates
    :return: a list of datetime objects.
    """
    dates = []
    date = datetime.now()
    for i in range(number):
        dates.append(date)
        date = date - timedelta(days=delta)
    dates.sort()
    return dates

In [6]:
dates = get_dates(5, 31)
print(dates)

[datetime.datetime(2019, 10, 26, 0, 9, 43, 913184), datetime.datetime(2019, 11, 26, 0, 9, 43, 913184), datetime.datetime(2019, 12, 27, 0, 9, 43, 913184), datetime.datetime(2020, 1, 27, 0, 9, 43, 913184), datetime.datetime(2020, 2, 27, 0, 9, 43, 913184)]


Write a function to checkout a repository by date.

In [7]:
def checkout_by_date(repository: str, directory: str, date: datetime) -> None:
    """
    Checkout a given repository into a folder for a given date and time.
    :param repository: address of processed project.
    :param directory: address of target directory for a checkout.
    :param date: date and time of the last commit for the checkout
    :return: None.
    """
    os.system('cp -r ' + repository + ' ' + directory)
    os.system('(cd ' + directory + '; git checkout `git rev-list -n 1 --before="' + date.strftime('%Y-%m-%d') + '" master`)')
    # TODO: consider non-master branches

In [8]:
checkout_by_date('/home/diannao/java-design-patterns/', '/home/diannao/java-design-patterns_new', dates[2])

Write a function to get the file extensions for various languages.

In [9]:
def get_extensions(lang: str) -> str:
    """
    Returns the extension for a given language. TODO: more than one extension.
    :param lang: language name.
    :return: the extension.
    """
    extensions = {'cpp': 'cpp',
                  'java': 'java',
                  'python': 'py'}
    return extensions[lang]

Write a function to get a list of files with a given extension from a directory.

In [10]:
def get_a_list_of_files(directory: str, extension: str) -> List[str]:
    """
    Get a list of files with a given extension.
    :param directory: the root directory that is studied.
    :param extension: extension of the listed files.
    :return: list of file paths.
    """
    list_of_files = [y for x in os.walk(directory) for y in glob(os.path.join(x[0], '*.' + extension))]
    return list_of_files

In [11]:
files = get_a_list_of_files('/home/diannao/PycharmProjects/topic-dynamics/topic_dynamics/tests/test_files/', 'java')
len(files)

1

Write a function to read the contents of the file.

In [12]:
def read_file(file: str) -> bytes:
    """
    Read the contents of the file.
    :param file: address of the file.
    :return: bytes with the contents of the file.
    """
    with open(file, 'r') as fin:
        code = bytes(fin.read(), 'utf-8')
    return code

Write a function to get the positional bytes of the node.

In [13]:
def get_positional_bytes(node: tree_sitter.Node) -> Tuple[int, int]:
    """
    Extract start and end byte.
    :param node: node on the AST.
    :return: (start byte, end byte)
    """
    start = node.start_byte
    end = node.end_byte
    return start, end

Write the utility functions for parsing.

In [14]:
PARSERS = {}


def get_tree_sitter_dir() -> str:
    """
    Get tree-sitter directory.
    :return: absolute path.
    """
    return '/home/diannao/PycharmProjects/topic-dynamics/topic_dynamics/parsers/'


def get_tree_sitter_so() -> str:
    """
    Get build tree-sitter `.so` location.
    :return: absolute path.
    """
    tree_sitter_dir = get_tree_sitter_dir()
    bin_loc = os.path.join(tree_sitter_dir, "build/langs.so")
    return bin_loc


def main_parse() -> None:
    """
    Initialize tree-sitter library.
    :return: None
    """
    # root directory for tree-sitter
    tree_sitter_dir = get_tree_sitter_dir()
    # grammar locations
    c_grammar_loc = os.path.join(tree_sitter_dir, "vendor/tree-sitter-c")
    c_sharp_grammar_loc = os.path.join(tree_sitter_dir, "vendor/tree-sitter-c-sharp")
    cpp_grammar_loc = os.path.join(tree_sitter_dir, "vendor/tree-sitter-cpp")
    java_grammar_loc = os.path.join(tree_sitter_dir, "vendor/tree-sitter-java")
    python_grammar_loc = os.path.join(tree_sitter_dir, "vendor/tree-sitter-python")
    # location for library
    bin_loc = get_tree_sitter_so()
    # build everything
    Language.build_library(
        # Store the library in the `bin_loc`
        bin_loc,
        # Include languages
        [
            c_grammar_loc,
            c_sharp_grammar_loc,
            cpp_grammar_loc,
            java_grammar_loc,
            python_grammar_loc
        ]
    )


def get_parser(lang: str) -> Parser:
    """
    Initialize parser for a specific language.
    :param lang: language to use.
    :return: parser.
    """
    global PARSERS
    if lang not in PARSERS:
        parser = Parser()
        parser.set_language(Language(get_tree_sitter_so(), lang))
        PARSERS[lang] = parser
    else:
        parser = PARSERS[lang]
    return parser


Initialize the parsing.

In [15]:
main_parse()

Write a function to get the identifiers of a file.

In [16]:
def get_identifiers(file: str, lang: str) -> List[Tuple[str, int]]:
    """
    Gather a sorted list of identifiers in the file and their count.
    :param file: address of the file.
    :param lang: the language of file.
    :return: a list of tuples, identifier and count.
    """
    code = read_file(file)
    tree = get_parser(lang).parse(code)
    root = tree.root_node
    identifiers = []
    node_types = {'c': ['identifier', 'type_identifier'],
                  'c-sharp': ['identifier', 'type_identifier'],
                  'cpp': ['identifier', 'type_identifier'],
                  'java': ['identifier', 'type_identifier'],
                  'python': ['identifier', 'type_identifier']}

    def traverse_tree(node: tree_sitter.Node) -> None:
        """
        Run down the AST from a given node and gather identifiers from its childern.
        :param node: starting node.
        :return: None.
        """
        for child in node.children:
            if child.type in node_types[lang]:
                start, end = get_positional_bytes(child)
                identifier = code[start:end].decode('utf-8').lower()
                if '\n' not in identifier:  # Will break output files. Can add other bad characters later
                    identifiers.append(identifier)
            if len(child.children) != 0:
                traverse_tree(child)

    traverse_tree(root)
    sorted_identifiers = sorted(Counter(identifiers).items(), key=itemgetter(1), reverse=True)

    return sorted_identifiers

In [17]:
identifiers = get_identifiers('../../tests/test_files/test.java', 'java')
print(identifiers)

[('i', 9), ('anarray', 6), ('length', 2), ('system', 2), ('out', 2), ('arraydemo', 1), ('main', 1), ('string', 1), ('args', 1), ('print', 1), ('println', 1)]


Write a function to transform the identifiers into a writeable format.

In [18]:
def transform_identifiers(identifiers: List) -> List[str]:
    """
    Transform the original list of identifiers into the writable form.
    :param identifiers: list of tuples, identifier and count.
    :return: a list of identifiers in the writable for, "identifier:count".
    """
    formatted_identifiers = []
    for identifier in identifiers:
        if identifier[0].rstrip() != '':  # Checking for occurring empty tokens.
            formatted_identifiers.append(identifier[0].rstrip() + ':' + str(identifier[1]).rstrip())
    return formatted_identifiers

In [19]:
formatted = transform_identifiers(identifiers)
print (formatted)

['i:9', 'anarray:6', 'length:2', 'system:2', 'out:2', 'arraydemo:1', 'main:1', 'string:1', 'args:1', 'print:1', 'println:1']


Write the functions that transform the tokens into UCI bag of words format:

In [20]:
def uci_format(directory: str, name: str) -> None:
    """
    Transform the temporary file with tokens into the UCI bag-of-words format.
    :param directory: the directory with the dataset.
    :param name: name of the processed dataset.
    :return: None.
    """
    number_of_documents = 0
    number_of_nnz = 0
    set_of_tokens = set()
    with open(os.path.abspath(os.path.join(directory, name + '_tokens.txt')), 'r') as fin:
        for line in fin:
            number_of_documents = number_of_documents + 1
            for token in line.rstrip().split(';')[2].split(','):
                number_of_nnz = number_of_nnz + 1
                set_of_tokens.add(token.split(':')[0])
    number_of_tokens = len(set_of_tokens)
    sorted_list_of_tokens = sorted(list(set_of_tokens))
    sorted_dictionary_of_tokens = {}
    with open(os.path.abspath(os.path.join(directory, 'vocab.' + name + '.txt')), 'w+') as fout:
        for index in range(len(sorted_list_of_tokens)):
            sorted_dictionary_of_tokens[sorted_list_of_tokens[index]] = index + 1
            fout.write(sorted_list_of_tokens[index] + '\n')
    with open(os.path.abspath(os.path.join(directory, name + '_tokens.txt')), 'r') as fin, open(os.path.abspath(os.path.join(directory, 'docword.' + name + '.txt')), 'w+') as fout:
        fout.write(str(number_of_documents) + '\n' + str(number_of_tokens) + '\n' + str(number_of_nnz) + '\n')
        for line in fin:
            file_tokens = line.rstrip().split(';')[2].split(',')
            file_tokens_separated = []
            file_tokens_separated_numbered = []
            for entry in file_tokens:
                file_tokens_separated.append(entry.split(':'))
            for entry in file_tokens_separated:
                file_tokens_separated_numbered.append([sorted_dictionary_of_tokens[entry[0]], int(entry[1])])
            file_tokens_separated_numbered = sorted(file_tokens_separated_numbered, key=itemgetter(0), reverse=False)
            for entry in file_tokens_separated_numbered:
                fout.write(str(line.split(';')[0]) + ' ' + str(entry[0]) + ' ' + str(entry[1]) + '\n')

Write the main function that slices and tokenizes the repository.

In [21]:
def tokenize_the_repository(repository: str, number: int, delta: int, lang: str, name: str) -> None:
    """
    Split the repository, parse the files, write the data into a file.
    :param repository: path to the repository to process.
    :param number: the amount of dates.
    :param delta: the time step between dates
    :param lang: language of parsing.
    :param name: name of the dataset (directories with resulting files)
    :return: None.
    """
    directory = os.path.abspath(os.path.join(repository, os.pardir, name + '_processed'))
    os.mkdir(directory)
    dates = get_dates(number, delta)
    lists_of_files = {}
    for date in dates:
        subdirectory = os.path.abspath(os.path.join(directory, date.strftime('%Y-%m-%d')))
        checkout_by_date(repository, subdirectory, date)
        lists_of_files[date.strftime('%Y-%m-%d')] = get_a_list_of_files(subdirectory, get_extensions(lang))
    indexes_of_slices = {}
    count = 0
    with open(os.path.abspath(os.path.join(directory, name + '_tokens.txt')), 'w+') as fout:
        for date in dates:
            starting_index = count + 1
            for file in lists_of_files[date.strftime('%Y-%m-%d')]:
                if os.path.isfile(file):  # TODO: implement a better file-checking mechanism
                    try:
                        identifiers = get_identifiers(file, lang)
                        if len(identifiers) != 0:
                            count += 1
                            formatted_identifiers = transform_identifiers(identifiers)
                            fout.write(str(count) + ';' + file + ';' + ','.join(formatted_identifiers) + '\n')
                    except UnicodeDecodeError:
                        continue
            ending_index = count
            indexes_of_slices[date.strftime('%Y-%m-%d')] = (starting_index, ending_index)
    with open(os.path.abspath(os.path.join(directory, name + '_tokens_info.txt')), 'w+') as fout:
        for date in indexes_of_slices.keys():
            fout.write(date + ';' + str(indexes_of_slices[date][0]) + ',' + str(indexes_of_slices[date][1]) + '\n')
    uci_format(directory, name)

In [22]:
tokenize_the_repository('/home/diannao/Documents/java-design-patterns/', 36, 31, 'java', 
     'java-design-patterns')

Create **batches** and the **dictionary**:

In [23]:
def create_batches(directory: str, name: str) -> Tuple[artm.BatchVectorizer, artm.Dictionary]:
    """
    Create the batches and the dictionary from the dataset.
    :param directory: the directory with the dataset.
    :param name: name of the processed dataset.
    :return: BatchVectorizer and Dictionary.
    """
    batch_vectorizer = artm.BatchVectorizer(data_path=directory, data_format='bow_uci',
                                        collection_name=name, target_folder=os.path.abspath(os.path.join(directory, name + '_batches')))
    dictionary = batch_vectorizer.dictionary
    return batch_vectorizer, dictionary

In [24]:
batch, dictionary = create_batches('/home/diannao/Documents/java-design-patterns_processed/', 'java-design-patterns')

Define the model:

In [25]:
def define_model(number_of_topics: int, dictionary: artm.Dictionary, SparceTheta: float, SparsePhi: float, DecorrelatorPhi: float) -> artm.artm_model.ARTM:   
    """
    Define ARTM model.
    :param number_of_topics: number of topics.
    :param dictionary: Batch Vectorizer dictionary.
    :param SparceTheta: Sparse Theta Parameter.
    :param SparcePhi: Sparse Phi Parameter.
    :param DecorrelatorPhi: Decorellator Phi Parameter.
    :return: ARTM model.
    """
    topic_names = ['topic_{}'.format(i) for i in range(1, number_of_topics + 1)]
    model_artm = artm.ARTM(topic_names=topic_names, cache_theta=True,
                             scores=[artm.PerplexityScore(name='PerplexityScore',
                                                          dictionary=dictionary),
                                     artm.SparsityPhiScore(name='SparsityPhiScore'),
                                     artm.SparsityThetaScore(name='SparsityThetaScore'),
                                     artm.TopicKernelScore(name='TopicKernelScore',
                                                           probability_mass_threshold=0.3),
                                     artm.TopTokensScore(name='TopTokensScore', num_tokens=15)],
                             regularizers=[artm.SmoothSparseThetaRegularizer(name='SparseTheta',
                                                                             tau=SparceTheta),
                                           artm.SmoothSparsePhiRegularizer(name='SparsePhi', tau=SparsePhi),
                                           artm.DecorrelatorPhiRegularizer(name='DecorrelatorPhi', tau=DecorrelatorPhi)])
    return model_artm

In [27]:
model_artm = define_model(50,dictionary,-0.15,-0.1,1.5e+5)

Train the model:

In [28]:
def train_model(model: artm.artm_model.ARTM, number_of_document_passes: int, number_of_collection_passes: int, 
                dictionary: artm.Dictionary, batch_vectorizer: artm.BatchVectorizer) -> None:
    """
    Train the ARTM model.
    :param model: the trained model.
    :param number_of_document_passes: number of document passes.
    :param number_of_collection_passes: number of collection passes.
    :param dictionary: Batch Vectorizer dictionary.
    :param batch_vectorizer: Batch Vectorizer.
    :return: None.
    """
    model.num_document_passes = number_of_document_passes
    model.initialize(dictionary=dictionary)
    model.fit_offline(batch_vectorizer=batch_vectorizer, num_collection_passes=number_of_collection_passes)

In [29]:
train_model(model_artm, 1, 25, dictionary, batch)

Save the parameters of the model:

In [30]:
def save_parameters(model: artm.artm_model.ARTM, directory: str, name: str) -> None:
    """
    Save the parameters of the model.
    :param model: the model.
    :param directory: the directory with the dataset.
    :param name: name of the processed dataset.
    :return: None.
    """
    with open(os.path.abspath(os.path.join(directory, 'results', name + '_parameters.txt')), 'w+') as fout:
        fout.write('Sparsity Phi: {0:.3f}'.format(
            model.score_tracker['SparsityPhiScore'].last_value) + '\n')
        fout.write('Sparsity Theta: {0:.3f}'.format(
            model.score_tracker['SparsityThetaScore'].last_value) + '\n')
        fout.write('Kernel contrast: {0:.3f}'.format(
            model.score_tracker['TopicKernelScore'].last_average_contrast) + '\n')
        fout.write('Kernel purity: {0:.3f}'.format(
            model.score_tracker['TopicKernelScore'].last_average_purity) + '\n')
        fout.write('Perplexity: {0:.3f}'.format(
            model.score_tracker['PerplexityScore'].last_value) + '\n')
        
    plt.plot(range(model.num_phi_updates),
             model.score_tracker['PerplexityScore'].value, 'r--', linewidth=2)
    plt.xlabel('Iterations count')
    plt.ylabel('Perplexity')
    plt.grid(True)
    plt.savefig(os.path.abspath(os.path.join(directory, 'results', name + '_perplexity.png')), dpi = 300)
    plt.close()
    
    plt.plot(range(model.num_phi_updates),
             model.score_tracker['SparsityPhiScore'].value, 'r--', linewidth=2)
    plt.xlabel('Iterations count')
    plt.ylabel('Phi Sparsity')
    plt.grid(True)
    plt.savefig(os.path.abspath(os.path.join(directory, 'results', name + '_phi_sparsity.png')), dpi = 300)
    plt.close()

    plt.plot(range(model.num_phi_updates),
             model.score_tracker['SparsityThetaScore'].value, 'r--', linewidth=2)
    plt.xlabel('Iterations count')
    plt.ylabel('Theta Sparsity')
    plt.grid(True)
    plt.savefig(os.path.abspath(os.path.join(directory, 'results', name + '_theta_sparsity.png')), dpi = 300)
    plt.close()

Save the most popular tokens:

In [31]:
def save_most_popular_tokens(model: artm.artm_model.ARTM, directory: str, name: str) -> None:
    """
    Save the most popular tokens of the model.
    :param model: the model.
    :param directory: the directory with the dataset.
    :param name: name of the processed dataset.
    :return: None.
    """
    with open(os.path.abspath(os.path.join(directory, 'results', name + '_most_popular_tokens.txt')), 'w+') as fout:
        for topic_name in model.topic_names:
            fout.write(topic_name + ' : ' + str(model.score_tracker['TopTokensScore'].last_tokens[topic_name]) + '\n')

Save the matrices:

In [32]:
def save_matrices(model: artm.artm_model.ARTM, directory: str, name: str) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    Save the Phi and Theta matrices.
    :param model: the model.
    :param directory: the directory with the dataset.
    :param name: name of the processed dataset.
    :return: Two matrices as DataFrames.
    """
    phi_matrix = model.get_phi().sort_index(axis=0)
    phi_matrix.to_csv(os.path.abspath(os.path.join(directory, 'results', name + '_phi.csv')))
    theta_matrix = model.get_theta().sort_index(axis=1)
    theta_matrix.to_csv(os.path.abspath(os.path.join(directory, 'results', name + '_theta.csv')))
    return phi_matrix, theta_matrix

Save the most topical files:

In [33]:
def save_most_topical_files(number_of_topics: int, theta: pd.DataFrame, directory: str, name: str) -> None:
    """
    Save the most topical files of the model.
    :param number_of_topics: the number of topics.
    :param theta: Theta matrix.
    :param directory: the directory with the dataset.
    :param name: name of the processed dataset.
    :return: None.
    """
    file_address = {}
    with open(os.path.abspath(os.path.join(directory, name + '_tokens.txt')), 'r') as fin:
        for line in fin:
            file_address[int(line.split(';')[0])] = line.split(';')[1]
    with open(os.path.abspath(os.path.join(directory, 'results', name + '_most_topical_files.txt')), 'w+') as fout:
        for i in range(1, number_of_topics + 1):
            fout.write('Topic ' + str(i) + '\n\n')
            dictionary_of_the_topic = theta.sort_values(by='topic_' + str(i), axis=1, ascending=False).loc['topic_' + str(i)][:10].to_dict()
            for j in dictionary_of_the_topic.keys():
                fout.write(str(j) + ' ' + str(dictionary_of_the_topic[j]) + ' ' + file_address[int(j)] + '\n')
            print('\n')

Visualize the dynamics:

In [50]:
def save_dynamics(directory: str, name: str) -> None:
    """
    Save figures with the dynamics.
    :param directory: the directory with the dataset.
    :param name: name of the processed dataset.
    :return: None.
    """
    indexes = {}
    with open(os.path.abspath(os.path.join(directory, name + '_tokens_info.txt')), 'r') as fin:
        for line in fin:
            indexes[line.rstrip().split(';')[0]] = (int(line.rstrip().split(';')[1].split(',')[0]),
                                                    int(line.rstrip().split(';')[1].split(',')[1]))
    topics_weight = []
    with open(os.path.abspath(os.path.join(directory, 'results', name + '_theta.csv')), 'r') as fin:
        reader = csv.reader(fin)
        next(reader, None)
        for row in reader:
            topics_weight.append([])
            for year in indexes.keys():
                topics_weight[-1].append(sum(float(i) for i in row[indexes[year][0]:indexes[year][1] + 1]))
    topics_weight = np.asarray(topics_weight)
    topics_weight_percent = np.zeros((topics_weight.shape[0], topics_weight.shape[1]))
    for i in range(topics_weight.shape[0]):
        for j in range(topics_weight.shape[1]):
            topics_weight_percent[i, j] = topics_weight[i, j] / np.sum(topics_weight[:, j], keepdims=True) * 100
    np.savetxt(os.path.abspath(os.path.join(directory, 'results', name + '_dynamics.txt')), topics_weight, '%10.5f')
    np.savetxt(os.path.abspath(os.path.join(directory, 'results', name + '_dynamics_percent.txt')), topics_weight_percent, '%10.5f')
    dynamics = []
    for i in range(topics_weight_percent.shape[0]):
        dynamics.append(['topic_{}'.format(i + 1), min(topics_weight_percent[i]),
                         max(topics_weight_percent[i]), max(topics_weight_percent[i]) / min(topics_weight_percent[i])])
    dynamics = sorted(dynamics, key = itemgetter(3), reverse=True)
    with open(os.path.abspath(os.path.join(directory, 'results', name + '_dynamics_percent_change.txt')), 'w+') as fout:
        for item in dynamics:
            fout.write(item[0] + '\t' + str(format(item[1], '.5f')) + '\t' + str(format(item[2], '.5f')) + '\t' + str(format(item[3], '.5f')) + '\n')
    
    plt.stackplot(indexes.keys(), topics_weight)
    plt.xlabel('Year')
    plt.ylabel('Proportion (a. u.)')
    plt.savefig(os.path.abspath(os.path.join(directory, 'results', name + '_dynamics.png')), dpi = 300)
    plt.close()

    plt.stackplot(indexes.keys(), topics_weight_percent)
    plt.xlabel('Year')
    plt.ylabel('Proportion (%)')
    plt.savefig(os.path.abspath(os.path.join(directory, 'results', name + '_dynamics_percent.png')), dpi = 300)
    plt.close()

Commence all the writing:

In [51]:
def save_all_data(model: artm.artm_model.ARTM, directory: str, name: str, number_of_topics: int) -> None:
    """
    Save the parameters of the model.
    :param model: the model.
    :param directory: the directory with the dataset.
    :param name: name of the processed dataset.
    :param number_of_topics: the number of topics.
    :param theta: Theta matrix.
    :return: None.
    """
    if not os.path.exists(os.path.abspath(os.path.join(directory, 'results'))):
        os.makedirs(os.path.abspath(os.path.join(directory, 'results')))
    save_parameters(model, directory, name)
    save_most_popular_tokens(model, directory, name)
    phi_matrix, theta_matrix = save_matrices(model, directory, name)
    save_most_topical_files(number_of_topics, theta_matrix, directory, name)
    save_dynamics(directory, name)

In [52]:
save_all_data(model_artm, '/home/diannao/Documents/java-design-patterns_processed/', 'java-design-patterns', 50)





































































































