## Import

In [25]:
import os
import numpy as np
from torch import Tensor
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel

In [26]:
CODEBASE_DIR = "./example-codebase/pytorch-examples-main/"
IGNORED_DIRECTORIES = ["node_modules", "public/build"]
IGNORED_FILES = ["package-lock.json", "yarn.lock"]
ALLOWED_EXTENSIONS = [".ts", ".tsx", ".py"]

In [27]:
IMAGE_EXTENSIONS = [
    ".png",
    ".jpg",
    ".jpeg",
    ".gif",
    ".bmp",
    ".svg",
    ".ico",
]


# Codebase loader

In [28]:
def load_codebase(directory):
    snippets = []
    for filename in os.listdir(directory):
        # Skip hidden files and directories
        if filename.startswith('.'):
            continue

        filepath = os.path.join(directory, filename)

        if os.path.isdir(filepath):
            # If it's a directory, recursively load its contents
            snippets.extend(load_codebase(filepath))
        else:
            if any(ignored in filepath for ignored in IGNORED_DIRECTORIES):
                continue
            if filename in IGNORED_FILES:
                continue
            if not any(filepath.endswith(ext) for ext in ALLOWED_EXTENSIONS):
                continue

            with open(filepath, 'r') as file:
                content = file.read().strip()
                if content:  # Check if content is not empty
                    snippets.append(content)
    return snippets


In [29]:
def average_pool(last_hidden_states: Tensor,
                 attention_mask: Tensor) -> Tensor:
    last_hidden = last_hidden_states.masked_fill(
        ~attention_mask[..., None].bool(), 0.0)
    return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None]

**Generate Embeddings**

In [30]:
def generate_embeddings(snippets):
    prefix = "query: "  # Assuming all code snippets are queries
    input_texts = [prefix + snippet for snippet in snippets]

    tokenizer = AutoTokenizer.from_pretrained('thenlper/gte-base')
    model = AutoModel.from_pretrained('thenlper/gte-base')

    batch_dict = tokenizer(input_texts, max_length=512,
                           padding=True, truncation=True, return_tensors='pt')
    outputs = model(**batch_dict)
    embeddings = average_pool(
        outputs.last_hidden_state, batch_dict['attention_mask'])

    return F.normalize(embeddings, p=2, dim=1).detach().numpy()

In [31]:
from langchain.vectorstores import Chroma

def store_vectors(embeddings, snippets):
    persist_directory = "./data/db/chroma"

    vectorstore = Chroma.from_documents(
        documents=snippets,
        embedding=embeddings, 
        persist_directory=persist_directory
        )

    print(vectorstore._collection.count()) # 6

In [32]:
def find_k_nearest_neighbors(query_embedding, embeddings, k=5):
    # Using cosine similarity as embeddings are normalized
    similarities = np.dot(embeddings, query_embedding.T)
    sorted_indices = similarities.argsort(axis=0)[-k:][::-1]
    return sorted_indices.squeeze()


In [33]:
from pymilvus import connections, FieldSchema, CollectionSchema, DataType, Collection, utility

connections.connect(host='127.0.0.1', port='19530')

def create_milvus_collection(collection_name, dim):
    if utility.has_collection(collection_name):
        utility.drop_collection(collection_name)
    
    fields = [
            FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
            FieldSchema(name="embeddings", dtype=DataType.FLOAT_VECTOR, is_primary=True, auto_id=False),
            FieldSchema(name="snippet", dtype=DataType.VARCHAR, max_length=500)
    ]
    schema = CollectionSchema(fields=fields, description='Code search text')
    collection = Collection(name=collection_name, schema=schema)
    
    index_params = {
        'metric_type': "L2",
        'index_type': "IVF_FLAT",
        'params': {"nlist": 2048}
    }
    collection.create_index(field_name='title_vector', index_params=index_params)
    return collection



In [34]:
snippets = load_codebase(CODEBASE_DIR)
embeddings = generate_embeddings(snippets)
collection = create_milvus_collection('Code_search', 768)
#store_vectors(embeddings, snippets)

# example query
query = "get references to RandomForestClassifier?"
query_embedding = generate_embeddings([query])
nearest_neighbors = find_k_nearest_neighbors(query_embedding, embeddings)
top_matches = nearest_neighbors[:2]
print("Query:", query)
print("Top Matches:")
for index in top_matches:
    # print the first 500 characters to illustrate the found match
    print(f"- Matched Code:\n{snippets[index][:500]}...\n")

PrimaryKeyException: <PrimaryKeyException: (code=1, message=Primary key type must be DataType.INT64 or DataType.VARCHAR.)>

In [None]:
print(embeddings)

[[-0.01519461 -0.01412962  0.00734623 ... -0.00222553  0.01647083
   0.00826925]
 [-0.00277823 -0.00075571 -0.00057353 ... -0.00381351  0.01342814
  -0.00425682]
 [-0.00645242  0.00088455 -0.0103819  ...  0.0043426   0.02141117
   0.00353106]
 ...
 [-0.00723172 -0.02022929  0.01277409 ...  0.00926638  0.01049688
   0.02608984]
 [-0.00788832 -0.03672303 -0.010886   ...  0.022343    0.0282235
   0.02083131]
 [ 0.00549793 -0.00149163  0.00424555 ... -0.00705904  0.00584576
   0.00531193]]


In [None]:
# example query
query = "get references to Deep Residual Learning for Image Recognition " #siamese network?"
query_embedding = generate_embeddings([query])
nearest_neighbors = find_k_nearest_neighbors(query_embedding, embeddings)
top_matches = nearest_neighbors[:2]
print("Query:", query)
print("Top Matches:")
for index in top_matches:
    # print the first 500 characters to illustrate the found match
    print(f"- Matched Code:\n{snippets[index][:500]}...\n")

Query: get references to Deep Residual Learning for Image Recognition 
Top Matches:
- Matched Code:
from __future__ import print_function
import argparse, random, copy
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision import transforms as T
from torch.optim.lr_scheduler import StepLR


class SiameseNetwork(nn.Module):
    """
        Siamese network for image similarity estimation.
        The network is composed of two ident...

- Matched Code:
import torch


class TransformerNet(torch.nn.Module):
    def __init__(self):
        super(TransformerNet, self).__init__()
        # Initial convolution layers
        self.conv1 = ConvLayer(3, 32, kernel_size=9, stride=1)
        self.in1 = torch.nn.InstanceNorm2d(32, affine=True)
        self.conv2 = ConvLayer(32, 64, kernel_size=3, stride=2)
        self.in2 = torch.nn.In