In [13]:
%run parsing.ipynb
import networkx as nx
import matplotlib.pyplot as plt
import numpy as np
from typing import List
from stop_words import get_stop_words
from nltk.corpus import stopwords
import re
from nltk import WordNetLemmatizer, FreqDist
import string
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.decomposition import LatentDirichletAllocation

In [11]:
# https://networkx.github.io/documentation/latest/tutorial.html#edges
class WeightGraph:
    def __init__(self):
        self.g = nx.Graph()
        
    def add(self, a, b, delta):
        self.g.add_node(a)
        self.g.add_node(b)
        new_value = self.get(a, b) + delta
        self.g.add_edge(a, b, weight=new_value)
        
    def get(self, a, b):
        if b in self.g.adj[a]:
            return self.g.adj[a][b]["weight"]
        return 0
    
    def print_statistics(self):
        # https://networkx.github.io/documentation/latest/tutorial.html#analyzing-graphs
        node_count = len(self.g.nodes)
        edge_count = len(self.g.edges)
        cc = list(nx.connected_components(self.g))
        print("WeightGraph statistics: "
              + str(node_count) + " nodes, "
              + str(edge_count) + " edges, "
              + str(len(cc)) + " connected component(s), with sizes: ["
              + ", ".join([str(len(c)) for c in cc])
              + "]")
    
    def show_weight_histogram(self):
        # https://matplotlib.org/3.3.1/api/_as_gen/matplotlib.pyplot.hist.html
        # import pdb; pdb.set_trace()  # debugger
        edge_weights = [self.g[e[0]][e[1]]["weight"] for e in self.g.edges]
        plt.hist(edge_weights, "auto", facecolor='b', alpha=0.75)
        plt.axvline(np.array(edge_weights).mean(), color='k', linestyle='dashed', linewidth=1)
        plt.xlabel('Coupling Strength')
        plt.ylabel('Amount')
        plt.title('Histogram of edge weights in coupling graph')
        plt.grid(True)
        plt.show()
        
        # import pdb; pdb.set_trace()
        node_weights = [sum([self.g[n][n2]["weight"] for n2 in self.g.adj[n]]) for n in self.g.nodes]
        plt.hist(node_weights, "auto", facecolor='g', alpha=0.75)
        plt.axvline(np.array(node_weights).mean(), color='k', linestyle='dashed', linewidth=1)
        # plt.xscale("log")
        # plt.yscale("log")
        plt.xlabel('Coupling Strength')
        plt.ylabel('Amount')
        plt.title('Histogram of node weights')
        plt.grid(True)
        plt.show()
        
    def visualize(self):
        # https://networkx.github.io/documentation/latest/reference/generated/networkx.drawing.nx_pylab.draw_networkx.html
        plt.figure(figsize=(8, 8))
        
        nx.draw_kamada_kawai(self.g, alpha=0.2, node_size=100)
        #nx.draw(self.g, alpha=0.2, node_size=100)
        
        plt.show()

In [12]:
class RepoFile:
    def __init__(self, repo, file_obj):
        self.repo = repo
        self.file_obj = file_obj
        self.content = None
        self.tree = None
        
    def get_path(self):
        return self.file_obj.path
    
    def get_content(self):
        if self.content is None:
            self.content = self.repo.get_file_object_content(self.file_obj)
        return self.content
    
    def get_content_without_copyright(self):
        tree = self.get_tree()
        first_root_child = tree.root_node.children[0]
        if first_root_child.type == "comment":
            return self.get_content()[first_root_child.end_byte:].decode("utf-8")
        else:
            return self.get_content().decode("utf-8")
    
    def get_tree(self):
        if self.tree is None:
            self.tree = java_parser.parse(self.get_content())
        return self.tree
    
    def node_text(self, node):
        return self.content[node.start_byte:node.end_byte].decode("utf-8")
    
    def walk_tree(self, node_handler):
        """ node_handler gets the current logic-path and node for each ast node"""
        self.walk_tree_cursor(self.get_tree().walk(), self.get_path(), node_handler)
    
    def walk_tree_cursor(self, cursor, prefix, node_handler):
        if not cursor.node.is_named:
            return
        node_handler(prefix, cursor.node)
        # cursor.current_field_name() is the role that this node has in its parent
        tree_node_name = None
        if cursor.node.type == "class_declaration":
            idfield = cursor.node.child_by_field_name("name")
            tree_node_name = self.node_text(idfield)
        elif cursor.node.type == "field_declaration":
            idfield = cursor.node.child_by_field_name("declarator").child_by_field_name("name")
            tree_node_name = self.node_text(idfield)
        elif cursor.node.type == "method_declaration":
            idfield = cursor.node.child_by_field_name("name")
            tree_node_name = self.node_text(idfield)

        if tree_node_name is not None:
            prefix = prefix + "/" + tree_node_name
            # found_nodes.register(prefix)

        if cursor.goto_first_child():
            self.walk_tree_cursor(cursor, prefix, node_handler)
            while cursor.goto_next_sibling():
                self.walk_tree_cursor(cursor, prefix, node_handler)
            cursor.goto_parent()


In [5]:
class MetricsGeneration:
    # ascii art: http://patorjk.com/software/taag/#p=display&f=Soft&t=STRUCTURAL%0A.%0ALINGUISTIC%0A.%0AEVOLUTIONARY%0A.%0ADYNAMIC
    def __init__(self, repo):
        self.repo = repo
    
    def calculate_structural_connections(self) -> WeightGraph:
        """
 ,---. ,--------.,------. ,--. ,--. ,-----.,--------.,--. ,--.,------.   ,---.  ,--.                     
'   .-''--.  .--'|  .--. '|  | |  |'  .--./'--.  .--'|  | |  ||  .--. ' /  O  \ |  |                     
`.  `-.   |  |   |  '--'.'|  | |  ||  |       |  |   |  | |  ||  '--'.'|  .-.  ||  |                     
.-'    |  |  |   |  |\  \ '  '-'  ''  '--'\   |  |   '  '-'  '|  |\  \ |  | |  ||  '--.                  
`-----'   `--'   `--' '--' `-----'  `-----'   `--'    `-----' `--' '--'`--' `--'`-----'   
        """
        coupling_graph = WeightGraph()

        package_query_1 = JA_LANGUAGE.query("(package_declaration (identifier) @decl)")
        package_query_2 = JA_LANGUAGE.query("(package_declaration (scoped_identifier) @decl)")
        import_query = JA_LANGUAGE.query("(import_declaration (scoped_identifier) @decl)")
        class_query = JA_LANGUAGE.query("(class_declaration name: (identifier) @decl)")


        def _get_package(file) -> List[str]:
            packages = package_query_1.captures(file.get_tree().root_node) + package_query_2.captures(file.get_tree().root_node)
            assert len(packages) <= 1
            if len(packages) == 1:
                return file.node_text(packages[0][0]).split(".")
            else:
                return []

        def _get_imports(file) -> List[str]:
            imports = import_query.captures(file.get_tree().root_node)
            result = []
            for import_statement in imports:
                import_string = file.node_text(import_statement[0])
                if not import_string.startswith("java"):
                    result.append(import_string)
            return result

        def _get_main_class_name(file) -> List[str]:
            classes = class_query.captures(file.get_tree().root_node)
            if len(classes) >= 1:
                return file.node_text(classes[0][0])
            else:
                return None

        def _mark_connected(a, b):
            coupling_graph.add(a, b, 1)

        #######

        full_class_name_to_id = {}

        files = self._get_all_files()
        for file in files:
            class_name = _get_main_class_name(file)
            if class_name is not None:
                full_class_name = ".".join(_get_package(file) + [class_name])
                full_class_name_to_id[full_class_name] = file.get_path()

        for file in files:
            imports = _get_imports(file)
            for i in imports:
                if i in full_class_name_to_id:
                    # print("import RESOLVED: " + i)
                    _mark_connected(file.get_path(), full_class_name_to_id[i])
                else:
                    pass # print("cannot resolve import: " + i)

        coupling_graph.print_statistics()
        # coupling_graph.show_weight_histogram()
        coupling_graph.visualize()
        return coupling_graph
    
    
    def calculate_linguistic_connections(self) -> WeightGraph:
        """
,--.   ,--.,--.  ,--. ,----.   ,--. ,--.,--. ,---. ,--------.,--. ,-----.                                
|  |   |  ||  ,'.|  |'  .-./   |  | |  ||  |'   .-''--.  .--'|  |'  .--./                                
|  |   |  ||  |' '  ||  | .---.|  | |  ||  |`.  `-.   |  |   |  ||  |                                    
|  '--.|  ||  | `   |'  '--'  |'  '-'  '|  |.-'    |  |  |   |  |'  '--'\                                
`-----'`--'`--'  `--' `------'  `-----' `--'`-----'   `--'   `--' `-----'              
        """
        # constants
        MIN_WORD_LENGTH = 3
        MAX_WORD_LENGTH = 50
        MIN_WORD_USAGES = 2  # any word used less often will be ignored
        MAX_DF = 0.95  # any terms that appear in a bigger proportion of the documents than this will be ignored (corpus-specific stop-words)
        MAX_FEATURES = 1000  # the size of the LDA thesaurus - amount of words to consider for topic learning
        TOPIC_COUNT = 20  # 100 according to paper
        LDA_ITERATIONS = 10  # 3.000 according to paper, but at least 500
        LDA_RANDOM_SEED = 42
        
        
        # keywords from python, TS and Java
        custom_stop_words = ["abstract", "and", "any", "as", "assert", "async", "await", "boolean", "break", "byte", "case", "catch", "char", "class", "const", "constructor", "continue", "debugger", "declare", "def", "default", "del", "delete", "do", "double", "elif", "else", "enum", "except", "export", "extends", "false", "False", "final", "finally", "float", "for", "from", "function", "get", "global", "goto", "if", "implements", "import", "in", "instanceof", "int", "interface", "is", "lambda", "let", "long", "module", "new", "None", "nonlocal", "not", "null", "number", "of", "or", "package", "pass", "private", "protected", "public", "raise", "require", "return", "set", "short", "static", "strictfp", "string", "super", "switch", "symbol", "synchronized", "this", "throw", "throws", "transient", "true", "True", "try", "type", "typeof", "var", "void", "volatile", "while", "with", "yield"]
        stop_words = set(list(get_stop_words('en')) + list(stopwords.words('english')) + custom_stop_words)
        splitter = r"(?:[\W_]+|(?<![A-Z])(?=[A-Z])|(?<!^)(?=[A-Z][a-z]))"
        lemma = WordNetLemmatizer()
        printable_characters = set(string.printable)
        
        def _normalize_word(word):
            return lemma.lemmatize(lemma.lemmatize(word.lower(), pos = "n"), pos = "v")
        
        def _get_text(content_string):
            # https://stackoverflow.com/questions/5486337/how-to-remove-stop-words-using-nltk-or-python
            # https://agailloty.rbind.io/en/project/nlp_clean-text/
            content_string = ''.join(c for c in content_string if c in string.printable)
            words = re.split(splitter, content_string)
            words = [_normalize_word(word) for word in words]
            words = [word for word in words if len(word) >= MIN_WORD_LENGTH and len(word) <= MAX_WORD_LENGTH]
            words = [word for word in words if not word in stop_words]
            return words
        
        
        print("Extracting words...")
        
        # see https://docs.python.org/2/library/collections.html#collections.Counter
        freq_dist = FreqDist()
        
        files = self._get_all_files()
        file_words = []  # List of (file,wordList) - tuples
        for file in files:
            words = _get_text(file.get_content_without_copyright())
            file_words.append((file, words))
            for word in words:
                freq_dist[word] += 1
                
        for word in freq_dist:
            if freq_dist[word] < MIN_WORD_USAGES:
                del freq_dist[word]
        print("Found words:", len(freq_dist))
        print("Creating vectorizer...")
        # print([(w, a) for w, a in freq_dist.most_common()][0:MAX_FEATURES])
        # [(word, amount) for word, amount in freq_dist.most_common() if word.isdigit()]
        # import pdb; pdb.set_trace()
        
        
        # see https://scikit-learn.org/stable/auto_examples/applications/plot_topics_extraction_with_nmf_lda.html#sphx-glr-auto-examples-applications-plot-topics-extraction-with-nmf-lda-py
        tf_vectorizer = CountVectorizer(max_df=MAX_DF, min_df=MIN_WORD_USAGES,
                                        max_features=MAX_FEATURES)
        
        tf = tf_vectorizer.fit_transform([' '.join(words) for file, words in file_words])
        
        lda = LatentDirichletAllocation(n_components=TOPIC_COUNT, max_iter=LDA_ITERATIONS,
                                        learning_method='online',
                                        learning_offset=50.,  # dafuq does this do?
                                        random_state=LDA_RANDOM_SEED)
        print("Training LDA...")
        lda.fit(tf)
        print("Generating result output...")
        
        tf_feature_names = tf_vectorizer.get_feature_names()
        
        def print_top_words(model, feature_names, n_top_words=10):
            for topic_idx, topic in enumerate(model.components_):
                message = "Topic #%d: " % topic_idx
                message += " ".join([feature_names[i]
                                     for i in topic.argsort()[:-n_top_words - 1:-1]])
                print(message)
            print()
        print_top_words(lda, tf_feature_names)
            
    
    # -------------------------------------------------------------------------------------------
    
    def _get_all_files(self) -> List[RepoFile]:
        return [RepoFile(self.repo, o) for o in self.repo.get_file_objects()]