In [1]:
import os
import json
import random
from sklearn.model_selection import train_test_split
from collections import Counter
from collections import defaultdict
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.utils import shuffle
import numpy as np
import re

In [2]:
initial_feature_names = [
    'targetName',
    'className',
    'returnType',
    'methodModifiers',
    'otherAnnotations',
    'otherMethodsNames',
    'otherMethodsAnnotations',
    'returnsNull',
]
class AnnotationUsage:
    def __init__(self, usage_json):
        self.annotation_name = usage_json['name']
        features_json = usage_json['features']
        self.features_list = [
            features_json.get('targetName', ''),
            features_json.get('className', ''),
            features_json.get('returnType', ''),
            features_json.get('methodModifiers', []),
            features_json.get('otherAnnotations', []),
            features_json.get('otherMethodsNames', []),
            features_json.get('otherMethodsAnnotations', []),
            1 if features_json.get('returnsNull', False) else 0,
        ]

    def __str__(self):
        return f'{self.annotation_name}'

class UsagesLoader:
    def __init__(self, processing_result_path):
        self.processing_result_path = processing_result_path

    def load(self):
        usages_by_target = defaultdict(list)
        for root, dirs, files in os.walk(self.processing_result_path):
            for file in files:
                if not file.endswith('json'):
                    continue
                with open(os.path.join(root, file), 'r') as read_file:
                    data = json.load(read_file)
                    target_type = data['keyInfo']['name']
                    new_usages = [AnnotationUsage(usage_json) for usage_json in data["usages"]]
                    usages_by_target[target_type] = usages_by_target[target_type] + new_usages
        return usages_by_target

In [3]:
class Baseline:
    def __init__(self):
        self.ordered_by_quantity = np.array([])

    def fit(self, X, y):
        self.ordered_by_quantity = np.array([x[0] for x in Counter(y).most_common()])

    def predict(self, X):
        return np.array([self.ordered_by_quantity for _ in X])


In [4]:
class Metric:
    def __init__(self, predicted_y, expected_y):
        self.orders = [(list(predicted_y[i]) + [expected_y[i]]).index(expected_y[i]) + 1 for i in range(len(expected_y))]

    def top_i(self, i):
        return sum(map(lambda x: x <= i, self.orders)) / len(self.orders)

In [5]:
def encode_names(column, column_name):
    """
    Converts column of camelCase names to 100 columns with most popular words
    with 1 (if name contains word) and 0 (otherwise)
    """

    def split_camel_case(x):
        words = [word.lower() for word in re.findall(r'[A-Z]?[a-z]+|[A-Z]+(?=[A-Z]|$)', x)]
        return ' '.join(words)

    vectorizer = CountVectorizer(preprocessor=split_camel_case, max_features=200)
    new_columns = vectorizer.fit_transform(column).toarray()
    new_names = [column_name + '_' + name for name in vectorizer.get_feature_names()]
    return new_columns, new_names

def encode_lists(column, column_name):
    """
    Converts column of lists of words to 100 columns with most popular words
    with 1 (if list contains word) and 0 (otherwise)
    """

    joined_words = np.array([' '.join(x) for x in column])

    vectorizer = CountVectorizer(max_features=200)
    new_columns = vectorizer.fit_transform(joined_words).toarray()
    new_names = [column_name + '_' + name for name in vectorizer.get_feature_names()]
    return new_columns, new_names


def encode_column(column, column_name):
    """
    Converts column of some type to column of integers
    """
    if isinstance(column[0], str):
        if (column == '').all():
            return None, []
        return encode_names(column, column_name)
    elif isinstance(column[0], list):
        if np.alltrue([x == [] for x in column]):
            return None, []
        return encode_lists(column, column_name)
    else:
        return np.array([column]).T, [column_name]

In [6]:
encode_column(np.array(['getSomethingGood', 'returnSomething'], dtype=object), 'a')

(array([[1, 1, 0, 1],
        [0, 0, 1, 1]]),
 ['a_get', 'a_good', 'a_return', 'a_something'])

In [7]:
encode_column(np.array([['public'], ['public', 'static']], dtype=object), 'b')

(array([[1, 0],
        [1, 1]]),
 ['b_public', 'b_static'])

In [8]:
encode_column(np.array([1, 0], dtype=object), 'c')

(array([[1],
        [0]], dtype=object),
 ['c'])

In [9]:
usages_loader = UsagesLoader(
    '/Users/danilbk/Programming/Kotlin/test/intellij-community/project-processing-results/processing/java/annotations/processing/0.0.0')
usages_by_target_type = usages_loader.load()

In [10]:
cache = {}

In [11]:
def load_for_target(target_type, ignored_annotation=()):
    if target_type in cache:
        return cache[target_type]
    method_usages = list(
        filter(lambda x: x.annotation_name not in ignored_annotation, usages_by_target_type[target_type]))
    raw_X = np.array([np.array(usage.features_list, dtype=object) for usage in method_usages])
    X = None
    all_new_names = []
    if len(raw_X) == 0:
        X = np.array([])
        all_new_names = initial_feature_names
    else:
        for col in range(raw_X.shape[1]):
            new_columns, new_names = encode_column(raw_X[:, col], initial_feature_names[col])
            if new_columns is None:
                continue
            all_new_names += new_names
            if X is None:
                X = new_columns
            else:
                X = np.concatenate((X, new_columns), axis=1)
    y = [usage.annotation_name for usage in method_usages]
    cache[target_type] = (X, y, all_new_names)
    return X, y, all_new_names

In [12]:
def calculate(X, y, model):
    X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.2, shuffle=True, random_state=42)

    model.fit(X_train, y_train)
    predicted = model.predict(X_test)

    metric = Metric(predicted, y_test)
    # print(f'Count: {len(y_test)}')
    for i in range(1, 2):
        print(f'Top {i}: {metric.top_i(i)}')

In [13]:
target_types = [
    'AnnotationType',
    'Constructor',
    'Field',
    'LocalVariable',
    'Method',
    'Module',
    'Package',
    'Parameter',
    'RecordComponent',
    'Type',
    'TypeParameter',
    'TypeUse'
]

In [14]:
from sklearn.svm import LinearSVC


class SVM:
    def __init__(self):
        self.model = LinearSVC(max_iter=1000000)

    def fit(self, X, y):
        self.model.fit(X, y)

    def predict(self, X):
        return np.array([np.array([y_pred]) for y_pred in self.model.predict(X)])


In [15]:
for target_type in target_types:
    X, y, feature_names = load_for_target(target_type)
    rnd = random.Random(42)
    for _ in range(1):
        size = 10000
        X, y = shuffle(X, y, random_state=42)
        X = X[:size]
        y = y[:size]
        if len(X) == 0:
            continue
        print(target_type)
        svm = SVM()
        print("SVM")
        calculate(X, y, svm)
        print("Baseline")
        calculate(X, y, Baseline())
        print()

AnnotationType
SVM
Top 1: 0.8594249201277955
Baseline
Top 1: 0.3801916932907348

Constructor
SVM
Top 1: 0.5474452554744526
Baseline
Top 1: 0.3193430656934307

Field
SVM




Top 1: 0.681625
Baseline
Top 1: 0.549

LocalVariable
SVM
Top 1: 0.7167405764966741
Baseline
Top 1: 0.38137472283813745

Method
SVM




Top 1: 0.832125
Baseline
Top 1: 0.507125

Package
SVM
Top 1: 0.796875
Baseline
Top 1: 0.59375

Parameter
SVM




Top 1: 0.784625
Baseline
Top 1: 0.8365

Type
SVM
Top 1: 0.8203093476798924
Baseline
Top 1: 0.3409549428379287

TypeUse
SVM




Top 1: 0.873
Baseline
Top 1: 0.855

