In [9]:
from sklearn.cluster import KMeans
import os
import sys
import javalang
import pandas as pd
from typing import List
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score
import pandas as pd
import sys
import matplotlib.pyplot as plt

In [10]:
def get_god_classes(path_to_directory: str) -> List[str]:
    df = generate_class_method_count_df(path_to_directory)
    df['is_god'] = df['number_of_methods'].apply(lambda x: x > df['number_of_methods'].mean() + 6*df['number_of_methods'].std())
    return df[df['is_god']]

def generate_class_method_count_df(path_to_directory: str) -> pd.DataFrame:
    df = pd.DataFrame(columns=['class_name', 'number_of_methods', 'path'])
    for path_walk, _, files_walk in os.walk(path_to_directory):
        for file in files_walk:
            if file.endswith('.java'):
                with open(os.path.join(path_walk, file), 'r') as f:
                    tree = javalang.parse.parse(f.read())
                for _, class_declaration in tree.filter(javalang.tree.ClassDeclaration):
                    number_of_methods = len(class_declaration.methods)
                    df.loc[len(df.index)] = [class_declaration.name, number_of_methods, os.path.join(path_walk, file).replace('\\', '/')]
    return df

In [11]:
path = "./resources/xerces2-j-src"
god_df = get_god_classes(path)
god_df

Unnamed: 0,class_name,number_of_methods,path,is_god
109,CoreDocumentImpl,125,./resources/xerces2-j-src/org/apache/xerces/do...,True
216,DTDGrammar,101,./resources/xerces2-j-src/org/apache/xerces/im...,True
476,XSDHandler,118,./resources/xerces2-j-src/org/apache/xerces/im...,True
679,XIncludeHandler,116,./resources/xerces2-j-src/org/apache/xerces/xi...,True


In [12]:
god_df.drop(columns=['is_god']).to_csv('./generated/god_classes.csv', index=False)

In [13]:
def get_fields(class_declaration: javalang.tree.ClassDeclaration) -> set[str]:
    set_fields = set()
    for m in class_declaration.fields:
        set_fields.add(m.declarators[0].name)
    return set_fields

def get_methods(class_declaration: javalang.tree.ClassDeclaration) -> set[str]:
    set_methods = set()
    for m in class_declaration.methods:
        set_methods.add(m.name)
    return set_methods

def get_fields_accessed_by_method(method_declaration: javalang.tree.MethodDeclaration) -> set[str]:
    set_field_accesses = set()
    for _,p in method_declaration.filter(javalang.tree.MemberReference):
        set_field_accesses.add(p.qualifier if p.qualifier != '' else p.member)
    return set_field_accesses

def get_methods_accessed_by_method(method_declaration: javalang.tree.MethodDeclaration) -> set[str]:
    set_method_accesses = set()
    for _,p in method_declaration.filter(javalang.tree.MethodInvocation):
        set_method_accesses.add(p.member)
    return set_method_accesses


def generate_feature_dataframe(node: javalang.tree.ClassDeclaration, set_class_methods: set, set_class_fields: set) -> pd.DataFrame:
    features = set()
    features.update(set_class_fields)
    features.update(set_class_methods)
    features = list(features)
    df = pd.DataFrame(columns=['method_name']+features)
    for m in list(set_class_methods):
        df.loc[len(df)] = {'method_name': m}
    for m in node.methods:
        method_name = m.name
        method_features = set()
        method_features = method_features.union(get_fields_accessed_by_method(m))
        method_features = method_features.union(get_methods_accessed_by_method(m))
        for f in list(method_features):
            if f in features:
                if not df['method_name'].isin([method_name]).any():
                    df.loc[len(df)] = {'method_name': method_name}
                df.loc[df['method_name'] == method_name, f] = 1
    return df


def extract_feature_vectors(path_java_file: str, save_directory_path: str = './') -> pd.DataFrame:
    with open(path_java_file, 'r') as f:
        tree = javalang.parse.parse(f.read())
    class_name = path_java_file.split('/')[-1].split('.')[0]
    class_features = {}
    for _,n in tree.filter(javalang.tree.ClassDeclaration):
        if(n.name == class_name):
            df = generate_feature_dataframe(n, get_methods(n), get_fields(n))
            df = df.fillna(0)
            column_names = df.columns.difference(['method_name'])
            df[column_names] = df[column_names].astype(int)
            if not save_directory_path.endswith('/') != './':
                save_directory_path = save_directory_path+'/'
            df.to_csv(save_directory_path+class_name+'.csv', index=False)
            class_features[class_name] = df
    return class_features

In [14]:
class_features = {}
for path in god_df['path']:
    class_features.update(extract_feature_vectors(path, save_directory_path='./generated/feature_vectors/'))

In [15]:
for k,v in class_features.items():
    num_vectors = v.shape[0]
    num_features = v.shape[1] - 1
    print(f'Class {k} has {num_vectors} vectors and {num_features} features')

Class CoreDocumentImpl has 117 vectors and 139 features
Class DTDGrammar has 91 vectors and 166 features
Class XSDHandler has 106 vectors and 226 features
Class XIncludeHandler has 108 vectors and 200 features


In [16]:
feature_csv_files_paths = ['./generated/feature_vectors/' + str(f) for f in os.listdir('./generated/feature_vectors') if f != 'god_classes.csv']
feature_csv_files_paths

['./generated/feature_vectors/CoreDocumentImpl.csv',
 './generated/feature_vectors/DTDGrammar.csv',
 './generated/feature_vectors/XIncludeHandler.csv',
 './generated/feature_vectors/XSDHandler.csv']

In [17]:
save_path = './generated/clusterings/'
def k_means_clustering(n_clusters: int, path_to_featurevec_csv: str = None, save: bool = False, get:bool = False) -> None:
    df = pd.read_csv(path_to_featurevec_csv)

    X = df.drop('method_name', axis=1).values
    kmeans = KMeans(n_clusters=n_clusters, random_state=0).fit_predict(X)

    df_for_csv = df['method_name'].to_frame()
    df_for_csv['cluster_id'] = kmeans  
    cols = df_for_csv.columns.tolist()
    df_for_csv = df_for_csv[cols[-1:] + cols[:-1]]
    
    class_name = path_to_featurevec_csv.split('/')[-1].split('.')[0]
    if save:
        df_for_csv.sort_values(by='cluster_id').to_csv(save_path+class_name + '_kmeans_'+str(n_clusters)+'.csv', index=False)
    if get:
        return df_for_csv
    
def agglomerative_clustering(path_to_featurevec_csv: str, n_clusters: int, save: bool = False, get: bool = False, linkage='complete') -> None:
    df = pd.read_csv(path_to_featurevec_csv)
    X = df.drop('method_name', axis=1).values
    kmeans = AgglomerativeClustering(n_clusters=n_clusters,linkage=linkage).fit_predict(X)

    df_for_csv = df['method_name'].to_frame()
    df_for_csv['cluster_id'] = kmeans  
    cols = df_for_csv.columns.tolist()
    df_for_csv = df_for_csv[cols[-1:] + cols[:-1]]
    
    class_name = path_to_featurevec_csv.split('/')[-1].split('.')[0]
    if save:
        df_for_csv.sort_values(by='cluster_id').to_csv(save_path+class_name + '_agglomerative_'+str(linkage)+'_'+str(n_clusters)+'.csv', index=False)
    if get:
        return df_for_csv
    

def silhouette(path_to_featurevec_csv: str, clustering_csv_path: str = None, max_clusters: int = None, min_clusters: int = 2):
    df_feature_vector = pd.read_csv(path_to_featurevec_csv).drop('method_name', axis=1).values
    
    if clustering_csv_path: 
        df_clustering = pd.read_csv(clustering_csv_path).drop('method_name', axis=1).values.ravel()
        return silhouette_score(df_feature_vector, df_clustering)
    else:
        kmean_dict = {}
        agglom_dict_complete = {}
        agglom_dict_single = {}
        for k in range(min_clusters, max_clusters + 1):
            df_clustering = k_means_clustering(n_clusters=k, path_to_featurevec_csv=path_to_featurevec_csv, get=True,save=True).drop('method_name', axis=1).values.ravel()
            kmean_dict[k] = silhouette_score(df_feature_vector, df_clustering)

            df_clustering = agglomerative_clustering(path_to_featurevec_csv=path_to_featurevec_csv, n_clusters=k, get = True,save=True).drop('method_name', axis=1).values.ravel()
            agglom_dict_complete[k] = silhouette_score(df_feature_vector, df_clustering)

            df_clustering = agglomerative_clustering(path_to_featurevec_csv=path_to_featurevec_csv, n_clusters=k, get = True,save=True,linkage='single').drop('method_name', axis=1).values.ravel()
            agglom_dict_single[k] = silhouette_score(df_feature_vector, df_clustering)

        return kmean_dict, agglom_dict_complete, agglom_dict_single



In [19]:
scores = {}
for p in feature_csv_files_paths:
    class_name = p.split('/')[-1].split('.')[0]
    scores[class_name] = silhouette(p, max_clusters=60)

  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)
  return fit_method(estimator, *args, **kwargs)


In [20]:
save_path = './generated/silhouette_scores/'
for n,v in scores.items():
    kmeans, agglo_complete, agglo_single = v
    df = pd.DataFrame(columns=['k', 'kmeans', 'agglomerative_complete', 'agglomerative_single'])
    for k in kmeans.keys():
        df.loc[len(df)] = [k, kmeans[k], agglo_complete[k], agglo_single[k]]
    df.to_csv(save_path+'csv/'+n+'_silhouette_scores.csv', index=False)
    
    df.plot(x='k', y=['kmeans', 'agglomerative_complete', 'agglomerative_single'])
    plt.title(n+' silhouette scores')

    max_score = max(max(kmeans.values()), max(agglo_complete.values()), max(agglo_single.values()))
    methods = []
    if max_score in kmeans.values():
        methods.append('kmeans')
    if max_score in agglo_complete.values():
        methods.append('agglomerative_complete')
    if max_score in agglo_single.values():
         methods.append('agglomerative_single')
    
    k = [[k for k, v in kmeans.items() if v == max_score]]
    k.append([k for k, v in agglo_complete.items() if v == max_score])
    k.append([k for k, v in agglo_single.items() if v == max_score])
    plt.yticks(list(plt.yticks()[0]) + [max_score])
    plt.yticks(fontsize=7)
    plt.axhline(y=max_score, color='r', linestyle='--', label=f'k = {k}, method = {methods}')
    plt.legend(loc = 'lower center', bbox_to_anchor = (0.5,-0.4))
    plt.savefig(save_path+'plots/'+n+'_silhouette_scores.png', bbox_inches='tight')

        
    plt.close()

In [21]:
def get_ground_truth(path_featurevec_csv: str, path_keywords_list:str, save_path = './') -> None:

    with open(path_keywords_list, 'r') as f:
        keywords_list = f.read().splitlines()
    df = pd.read_csv(path_featurevec_csv)

    ground_truths = {}
    for method in df['method_name'].values:
        for keyword in keywords_list:
            if keyword in method.lower():
                if keyword not in ground_truths:
                    ground_truths[keyword] = []
                ground_truths[keyword].append(method)
                break
        else:
            if 'none' not in ground_truths:
                ground_truths['none'] = []
            ground_truths['none'].append(method)

    df_to_csv = df['method_name'].to_frame()
    df_to_csv['cluster_id'] = -1
    cols = df_to_csv.columns.tolist()
    df_to_csv = df_to_csv[cols[-1:] + cols[:-1]]
    for i, keyword in enumerate(ground_truths):
        for method in ground_truths[keyword]:
            df_to_csv.loc[df_to_csv['method_name'] == method, 'cluster_id'] = i

    df_to_csv.sort_values(by='cluster_id', inplace=True)
    file_name = path_featurevec_csv.split('/')[-1].split('.')[0]
    file_name = save_path+'ground_truth_'+file_name+'.csv'
    df_to_csv.to_csv(file_name, index=False)

In [22]:
for p in feature_csv_files_paths:
    get_ground_truth(p, './keywords_list.txt', save_path='./generated/ground_truth/')

In [23]:
for f in os.listdir('./generated/ground_truth/'):
    df = pd.read_csv('./generated/ground_truth/'+f)
    print(f)
    print(df['cluster_id'].value_counts())
    print('\n')
    

ground_truth_CoreDocumentImpl.csv
cluster_id
0     69
1     14
2      6
6      6
3      5
4      4
8      4
9      3
5      2
7      2
10     2
Name: count, dtype: int64


ground_truth_DTDGrammar.csv
cluster_id
0    64
2    17
3     6
1     2
4     2
Name: count, dtype: int64


ground_truth_XIncludeHandler.csv
cluster_id
0    90
5     9
3     4
1     2
4     2
2     1
Name: count, dtype: int64


ground_truth_XSDHandler.csv
cluster_id
1    57
3    24
0    18
4     3
5     3
2     1
Name: count, dtype: int64




In [24]:
def get_intrapairs(df: pd.DataFrame) -> list:
    intrapairs = []
    for _, group in df.groupby('cluster_id'):
        if len(group) > 1:
            for i in range(len(group)):
                for j in range(i+1, len(group)):
                    temp_set = set()
                    temp_set.update([group.iloc[i]['method_name'], group.iloc[j]['method_name']])
                    if temp_set not in intrapairs:
                        intrapairs.append(temp_set)
    return intrapairs

def get_intersections(intra1:list, intra2:list ) -> list:
    intersections = []
    for p1 in intra1:
        if p1 in intra2:
            intersections.append(p1)
    return intersections

def get_precision_recall(path_cluster_csv: str, path_ground_truth: str) -> tuple:
    df_d = pd.read_csv(path_cluster_csv)
    df_g = pd.read_csv(path_ground_truth)

    intra_d = get_intrapairs(df_d)
    intra_g = get_intrapairs(df_g)
    inter = get_intersections(intra_d, intra_g)

    p = len(inter) / len(intra_d)
    r = len(inter) / len(intra_g)

    return p,r

In [25]:
pr_dict = {}
for path_ground in os.listdir('./generated/ground_truth'):
    class_name = path_ground.split('_')[2].split('.')[0]
    pr_dict[class_name] = {
        'agglomerative_complete': {},
        'agglomerative_single': {},
        'kmeans': {}
    }
    for algo in pr_dict[class_name].keys():
        file_name = class_name+'_'+algo
        for path_cluster in os.listdir('./generated/clusterings/'):
            if file_name in path_cluster:
                k = int(path_cluster.split('_')[-1].split('.')[0])
                p,r = get_precision_recall('./generated/clusterings/'+path_cluster, './generated/ground_truth/'+path_ground)
                pr_dict[class_name][algo][k] = (p,r)

In [26]:
import pickle

with open('./generated/pr_dict.pickle', 'wb') as handle:
    pickle.dump(pr_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [37]:
# for each class in pr_dict, plot precision recall curve for each algorithm
for class_name in pr_dict.keys():
    # create a figure with 2 plots
    # create more distance between the plots
    fig, axs = plt.subplots(2, figsize=(10,10))
    fig.suptitle(class_name)
    for algo in pr_dict[class_name].keys():
        p = []
        r = []
        ks = []
        for k in sorted(pr_dict[class_name][algo].keys()):
            p.append(pr_dict[class_name][algo][k][0])
            r.append(pr_dict[class_name][algo][k][1])
            ks.append(int(k))
        axs[0].plot(ks, p, label=algo)
        axs[1].plot(ks, r, label=algo)
    optimal = 45 if class_name == 'CoreDocumentImpl' else 2
    p_optimal_y_tick = pr_dict[class_name]['agglomerative_single'][optimal][0]
    r_optimal_y_tick = pr_dict[class_name]['agglomerative_single'][optimal][1]
    axs[0].set_yticks(list(axs[0].get_yticks()) + [p_optimal_y_tick])
    axs[1].set_yticks(list(axs[1].get_yticks()) + [r_optimal_y_tick])
    axs[0].axvline(x=optimal, color='red', linestyle='--', label='agglomerative single\noptimal k='+str(optimal)+', p='+str(round(p_optimal_y_tick, 2)))
    axs[1].axvline(x=optimal, color='red', linestyle='--', label='agglomerative single\noptimal k='+str(optimal)+', p='+str(round(r_optimal_y_tick, 2)))
    axs[0].set_title('Precision')
    axs[1].set_title('Recall')
    axs[0].set_xlabel('k')
    axs[1].set_xlabel('k')
    axs[0].set_ylabel('Precision')
    axs[1].set_ylabel('Recall')
    axs[0].grid()
    axs[1].grid()
    axs[0].legend()
    axs[1].legend()
    plt.savefig('./generated/pr_curves/'+class_name+'_pr_curve.png')
    plt.close()
