In [79]:
pip install -U langchain-community pinecone-client openai tiktoken ipywidgets -U langchain-openai astor


Collecting astor
  Downloading astor-0.8.1-py2.py3-none-any.whl.metadata (4.2 kB)
Downloading astor-0.8.1-py2.py3-none-any.whl (27 kB)
Installing collected packages: astor
Successfully installed astor-0.8.1
Note: you may need to restart the kernel to use updated packages.


In [80]:
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import FAISS
from dotenv import load_dotenv
import os
import ast
import astor


In [151]:
# Load the environment variables from .env file
load_dotenv()
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')

EMBEDDINGS_SIZE_LARGE = 3072
EMBEDDINGS_SIZE_SMALL = 1536

In [138]:
import ast
import astor
import os

class CodeVisitor(ast.NodeVisitor):
    def __init__(self):
        self.code_elements = []

    
    # def visit_Import(self, node):
    #     self.elements.append(node)  # Store the AST node directly

    # def visit_ImportFrom(self, node):
    #     self.elements.append(node)  # Store the AST node directly

    # def visit_ClassDef(self, node):
    #     self.elements.append(node)  # Store the AST node directly

    # # If you want to process body items as well
    # def visit_Assign(self, node):
    #     self.elements.append(node)  # Store assignment nodes

    # def visit_Expr(self, node):
    #     self.elements.append(node)  # Store expression nodes

    # def visit_Return(self, node):
    #     self.elements.append(node)  # Store return nodes

    # def visit_FunctionDef(self, node):
    #     self.elements.append(node)  # Store the AST node directly

    def visit_FunctionDef(self, node):
        # Store function source code and metadata
        self.code_elements.append({
            "page_content": astor.to_source(node),
            "metadata": {
                "type": "function",
                "name": node.name
            }
        })

    def visit_ClassDef(self, node):
        # Store the class itself
        self.code_elements.append({
            "page_content": astor.to_source(node),
            "metadata": {
                "type": "class",
                "name": node.name
            }
        })
        # Also visit methods within the class
        method_visitor = CodeVisitor()
        for n in node.body:
            if isinstance(n, ast.FunctionDef):
                method_visitor.visit(n)
        self.code_elements.extend(method_visitor.code_elements)

def parse_code_from_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as file:
        source_code = file.read()
    tree = ast.parse(source_code)
    visitor = CodeVisitor()
    visitor.visit(tree)
    return [
        {**element, "metadata": {**element["metadata"], "source": os.path.normpath(file_path)}}
        for element in visitor.code_elements
    ]

def traverse_directory(directory):
    all_code_elements = []
    for root, dirs, files in os.walk(directory):
        for file in files:
            if file.endswith('.py'):
                file_path = os.path.join(root, file)
                code_elements = parse_code_from_file(file_path)
                all_code_elements.extend(code_elements)
    return all_code_elements

# Example usage
project_directory = "./Smarderobe/server"
all_code_details = traverse_directory(project_directory)
    

In [129]:
embeddings = OpenAIEmbeddings( model ="text-embedding-3-large", openai_api_key=OPENAI_API_KEY)


In [130]:
# Function to initialize a FAISS vector store from documents
def create_faiss_library(documents):
    texts = [d['page_content'] for d in documents]
    metadatas = [d['metadata'] for d in documents]
    # Adjust the following line if the FAISS library API differs
    return FAISS.from_texts(texts,  metadatas=metadatas, embedding=embeddings)

In [139]:
library = create_faiss_library(all_code_details)


In [147]:
query = "user class"

In [148]:
query_answer = library.similarity_search(query, 5)

In [142]:
print(query_answer[0])

page_content="@router.get('/verification/{token}', response_class=HTMLResponse)\ndef validate_user(token: str, request: Request, db: Session=Depends(get_db)):\n    try:\n        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])\n        user_id: int = payload.get('id')\n        query = select(User).where(User.id == user_id)\n        db_user = db.scalars(query).first()\n        if not db_user:\n            raise HTTPException(status_code=404, detail='User not found')\n        db_user.email_verified = True\n        db.commit()\n        return templates.TemplateResponse(request=request, name=\n            'account_verified.html', context={'message':\n            'Your account has been verified.'})\n    except JWTError:\n        return templates.TemplateResponse(request=request, name=\n            'account_verified.html', context={'message': 'Token is not valid.'}\n            )\n    except Exception:\n        return templates.TemplateResponse(request=request, name=\n        

In [149]:
print(query_answer[1].page_content)

class UserOutSchema(BaseModel):
    first_name: str = Field(..., min_length=1, max_length=100)
    last_name: str = Field(..., min_length=1, max_length=100)
    email: EmailStr = Field(..., min_length=1, max_length=50, description=
        'Email has to be Unique and is required.')
    email_verified: bool
    is_active: bool
    id: int
    profile: Optional[ProfileSchema]



In [150]:
for answer in query_answer:
    print(answer.metadata)
    print(answer.metadata['source'])
    print(answer.metadata['name'])
    print("\n")

{'type': 'class', 'name': 'User', 'source': 'Smarderobe\\server\\app\\database\\models\\user.py'}
Smarderobe\server\app\database\models\user.py
User


{'type': 'class', 'name': 'UserOutSchema', 'source': 'Smarderobe\\server\\app\\database\\schemas\\user_schema.py'}
Smarderobe\server\app\database\schemas\user_schema.py
UserOutSchema


{'type': 'class', 'name': 'UserLoginSchema', 'source': 'Smarderobe\\server\\app\\database\\schemas\\user_schema.py'}
Smarderobe\server\app\database\schemas\user_schema.py
UserLoginSchema


{'type': 'function', 'name': 'validate_user', 'source': 'Smarderobe\\server\\app\\routers\\user_router.py'}
Smarderobe\server\app\routers\user_router.py
validate_user


{'type': 'class', 'name': 'UserSchema', 'source': 'Smarderobe\\server\\app\\database\\schemas\\user_schema.py'}
Smarderobe\server\app\database\schemas\user_schema.py
UserSchema




In [109]:
query2 = "upload image"

In [110]:
query2_answer = library.similarity_search(query2, 2)

In [111]:
print(query2_answer[0].metadata['source'])
print(query2_answer[0].page_content)

Smarderobe\server\app\routers\image_router.py
@router.get('/{body_section_id}', status_code=200)
def get_image(body_section_id: int, user_id: Annotated[int, Depends(
    get_user_id)], db: Session=Depends(get_db)):
    image_url = db.scalars(select(ItemImage.url).join(ClothingItem, 
        ClothingItem.id == ItemImage.clothing_item_id).join(Type, Type.id ==
        ClothingItem.type_id).where(and_(Type.body_section_id ==
        body_section_id, ClothingItem.user_id == user_id))).all()
    return image_url

@router.get('/display/{image_name}', status_code=200)
def get_image_with_name(image_name: str):
    images = os.listdir(IMAGEDIR)
    return FileResponse(f'{IMAGEDIR}{image_name}')

@router.delete('/{image_url}', status_code=status.HTTP_204_NO_CONTENT)
def delete_clothing_item(image_url: str, db: Session=Depends(get_db)):
    """
    Deletes an image based on an url
    """
    db_image = db.scalars(select(ItemImage).where(ItemImage.url == image_url)
        ).first()
    if db_ima

In [112]:
for answer in query2_answer:
    print(answer.metadata["source"])
    print(answer.page_content)
    print("\n")

Smarderobe\server\app\routers\image_router.py
@router.get('/{body_section_id}', status_code=200)
def get_image(body_section_id: int, user_id: Annotated[int, Depends(
    get_user_id)], db: Session=Depends(get_db)):
    image_url = db.scalars(select(ItemImage.url).join(ClothingItem, 
        ClothingItem.id == ItemImage.clothing_item_id).join(Type, Type.id ==
        ClothingItem.type_id).where(and_(Type.body_section_id ==
        body_section_id, ClothingItem.user_id == user_id))).all()
    return image_url

@router.get('/display/{image_name}', status_code=200)
def get_image_with_name(image_name: str):
    images = os.listdir(IMAGEDIR)
    return FileResponse(f'{IMAGEDIR}{image_name}')

@router.delete('/{image_url}', status_code=status.HTTP_204_NO_CONTENT)
def delete_clothing_item(image_url: str, db: Session=Depends(get_db)):
    """
    Deletes an image based on an url
    """
    db_image = db.scalars(select(ItemImage).where(ItemImage.url == image_url)
        ).first()
    if db_ima

In [66]:
print(query2_answer[1].metadata['source'])
print(query2_answer[1].page_content)

Smarderobe\server\app\routers\clothing_item_router.py
def list_clothing_items(db): Expression: Constant(value='\n    Get all clothing items\n    ') Assign: Name(id='result', ctx=Store()) = Call(func=Attribute(value=Call(func=Attribute(value=Name(id='db', ctx=Load()), attr='scalars', ctx=Load()), args=[Call(func=Attribute(value=Call(func=Name(id='Select', ctx=Load()), args=[Name(id='ClothingItem', ctx=Load())], keywords=[]), attr='options', ctx=Load()), args=[Call(func=Name(id='selectinload', ctx=Load()), args=[Attribute(value=Name(id='ClothingItem', ctx=Load()), attr='item_images', ctx=Load())], keywords=[])], keywords=[])], keywords=[]), attr='all', ctx=Load()), args=[], keywords=[]) If Return: Name(id='result', ctx=Load()) def list_clothing_items(clothing_id, db): Expression: Constant(value='\n    Get one clothing item based on its id\n    ') Assign: Name(id='result', ctx=Store()) = Call(func=Attribute(value=Call(func=Attribute(value=Name(id='db', ctx=Load()), attr='scalars', ctx=Loa

In [70]:
query3 = "get single user details"

In [71]:
query3_answer = library.similarity_search(query3, 2)

In [72]:
print(query3_answer[0].metadata['source'])
print(query3_answer[0].page_content)

Smarderobe\server\app\routers\user_router.py
def list_users(db): Expression: Constant(value='\n    Fetches all users\n    ') Assign: Name(id='result', ctx=Store()) = Call(func=Attribute(value=Call(func=Attribute(value=Name(id='db', ctx=Load()), attr='scalars', ctx=Load()), args=[Call(func=Attribute(value=Call(func=Name(id='select', ctx=Load()), args=[Name(id='User', ctx=Load())], keywords=[]), attr='options', ctx=Load()), args=[Call(func=Name(id='selectinload', ctx=Load()), args=[Attribute(value=Name(id='User', ctx=Load()), attr='profile', ctx=Load())], keywords=[])], keywords=[])], keywords=[]), attr='all', ctx=Load()), args=[], keywords=[]) If Return: Name(id='result', ctx=Load()) def get_email_validation_status(user_id, db): Assign: Name(id='query', ctx=Store()) = Call(func=Attribute(value=Call(func=Name(id='select', ctx=Load()), args=[Name(id='User', ctx=Load())], keywords=[]), attr='where', ctx=Load()), args=[Compare(left=Attribute(value=Name(id='User', ctx=Load()), attr='id', ctx

In [76]:
for answer in query3_answer:
    print(answer.metadata["source"])
    print(answer.page_content)
    print("\n")

Smarderobe\server\app\routers\user_router.py
def list_users(db): Expression: Constant(value='\n    Fetches all users\n    ') Assign: Name(id='result', ctx=Store()) = Call(func=Attribute(value=Call(func=Attribute(value=Name(id='db', ctx=Load()), attr='scalars', ctx=Load()), args=[Call(func=Attribute(value=Call(func=Name(id='select', ctx=Load()), args=[Name(id='User', ctx=Load())], keywords=[]), attr='options', ctx=Load()), args=[Call(func=Name(id='selectinload', ctx=Load()), args=[Attribute(value=Name(id='User', ctx=Load()), attr='profile', ctx=Load())], keywords=[])], keywords=[])], keywords=[]), attr='all', ctx=Load()), args=[], keywords=[]) If Return: Name(id='result', ctx=Load()) def get_email_validation_status(user_id, db): Assign: Name(id='query', ctx=Store()) = Call(func=Attribute(value=Call(func=Name(id='select', ctx=Load()), args=[Name(id='User', ctx=Load())], keywords=[]), attr='where', ctx=Load()), args=[Compare(left=Attribute(value=Name(id='User', ctx=Load()), attr='id', ctx