In [None]:
from datasets import load_dataset

category = 'py'
df = load_dataset('JetBrains-Research/template-generation', category, split='dev')

In [None]:
import ast

df = df.map(lambda example: {'topics': ast.literal_eval(example['topics'])}, batched=False)


In [None]:
topics_to_check = ['fastapi', 'django', 'flask']


def classify_by_topic(dp):
    class_topic = None
    
    for topic in dp['topics']:
        if topic in topics_to_check:
            if class_topic is None:
                class_topic = topic
            else:
                return {'class_topic': None}

    return {'class_topic': class_topic}


def class_topic_not_none(example):
    return example['class_topic'] is not None


df = df.map(classify_by_topic)
df = df.filter(class_topic_not_none)

In [None]:
df

In [None]:
from collections import Counter

Counter(df['class_topic'])

In [None]:
repos_path = '/Users/Maria.Tigina/PycharmProjects/agents-eval-data/repos'

In [None]:
from git import Repo

for dp in df:
    Repo.clone_from(f'https://github.com/{dp["full_name"]}.git',
                    f'{repos_path}/{"__".join(dp["full_name"].split("/"))}')

In [None]:
subset = {topic: [] for topic in topics_to_check}

for dp in df.shuffle(22):
    topic = dp['class_topic']

    if len(subset[topic]) < 3:
        subset[topic].append(dp)

In [None]:
from datasets import Dataset
df_s = Dataset.from_list([item for s in subset.values() for item in s]) 

In [None]:
df_s

In [None]:
!pip install tree-sitter

In [None]:
!git clone https://github.com/tree-sitter/tree-sitter-python

In [None]:
# !cd tree-sitter-python & npm install & npx tree-sitter generate

In [None]:
import fnmatch
import os
from tree_sitter import Language, Parser

Language.build_library("build/my-languages.so", ["tree-sitter-python"])
java_language = Language("build/my-languages.so", "python")
parser = Parser()
parser.set_language(java_language)


def get_nodes_by_type(node, t: str):
    t_nodes = []
    if node.type == t:
        t_nodes.append(node)
    else:
        for child in node.children:
            t_nodes.extend(get_nodes_by_type(child, t))
    return t_nodes


def get_node_content(node, code) -> str:
    start_byte = node.start_byte
    end_byte = node.end_byte
    return code[start_byte:end_byte].decode('utf-8')


def extract_methods_from_code(code: str, language: str):
    node_t_by_language = {
        "py": 'function_definition',
        "java": 'method_declaration'
    }

    t = node_t_by_language[language]
    tree = parser.parse(code)
    all_methods = get_nodes_by_type(tree.root_node, t)
    method_contents = []
    for method in all_methods:
        method_content = get_node_content(method, code)
        method_contents.append(method_content)

    return method_contents


def get_repo_files(directory: str, extension: str):
    for root, dir, files in os.walk(directory):
        for file in files:
            if fnmatch.fnmatch(file, f"*.{extension}"):
                yield os.path.join(root, file)

In [None]:
from sentence_transformers import SentenceTransformer
import numpy as np

vect = {}
for entity in ['files', 'methods']:
    vect[entity] = {
        'content': [],
        'repo': [],
        'path': [],
        'vects': np.empty((0, 1024)),
        'class_topic': []
    }

for dp in df_s:
    repo = dp['full_name']
    print(repo)
    for file in get_repo_files(os.path.join(repos_path, "__".join(dp["full_name"].split("/"))), category):
        with open(file, 'r') as f:
            file_content = f.read()
            vect['files']['content'].append(file_content)
            vect['files']['repo'].append(repo)
            vect['files']['path'].append(file)
            vect['files']['class_topic'].append(dp['class_topic'])

            source_code = bytes(file_content, "utf8")
            methods = extract_methods_from_code(source_code, category)
            for method in methods:
                vect['methods']['content'].append(method)
                vect['methods']['repo'].append(repo)
                vect['methods']['path'].append(file)
                vect['methods']['class_topic'].append(dp['class_topic'])

model = SentenceTransformer('thenlper/gte-large')

for entity in ['files', 'methods']:
    vect[entity]['vects'] = model.encode(vect[entity]['content'])
    print(vect[entity]['vects'].shape)
    print(len(vect[entity]['content']))
    print(len(vect[entity]['path']))


In [None]:
import collections
print(collections.Counter(vect['methods']['class_topic']))

In [None]:
import collections
print(collections.Counter(vect['files']['class_topic']))

In [None]:
import collections
import pandas as pd
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt


def classify(classifier, entity: str):
    clusters = classifier.fit_predict(vect[entity]['vects'])
    cluster_counts = collections.Counter(clusters)

    large_clusters = {cluster: count for cluster, count in cluster_counts.items()}
    print(large_clusters)

    labels_mask = clusters != -1
    print(f'Count noise: {np.count_nonzero(labels_mask == False)}')
    repo = np.array(vect[entity]['repo'])
    path = np.array(vect[entity]['path'])
    content = np.array(vect[entity]['content'])
    class_topic = np.array(vect[entity]['class_topic'])
    
    df_labels = pd.DataFrame({
        'label': clusters[labels_mask],
        'repo': repo[labels_mask],
        'path': path[labels_mask], 
        'content': content[labels_mask], 
        'class_topic': class_topic[labels_mask]
    })
        
    tsne = TSNE(n_components=2, random_state=0)  # 2D t-SNE
    methods_tsne = tsne.fit_transform(vect[entity]['vects'])
    
    # plot the result
    plt.figure(figsize=(8, 8))
    plt.scatter(
        methods_tsne[:, 0],
        methods_tsne[:, 1],
        c=clusters,
        cmap='viridis'
    )
    plt.show()
    
    for c in large_clusters.keys():
        if c == -1:
            continue
        print(f"Class {c}:")
        # print(*list(df_labels[df_labels['label'] == c]['repo']), sep='\n')
        print(collections.Counter(df_labels[df_labels['label'] == c]['class_topic']))
        # for _, dp in df_labels[df_labels['label'] == c].iterrows():
        #     print(dp['path'])
        #     print(dp['content'])
        #     print('------------------------------------\n\n')
        print("====================================")

In [None]:
from sklearn.cluster import DBSCAN

# -1 stands for noise
dbscan = DBSCAN(eps=0.4, min_samples=5)
classify(dbscan, 'methods')

In [None]:
from sklearn.cluster import KMeans

kmeans = KMeans(n_clusters=3, random_state=42)
classify(kmeans, 'methods')