<a href="https://colab.research.google.com/github/Samin-Sadaf7/Python-Dependency-Resolver/blob/main/PythonDependencyResolver.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install requests beautifulsoup4 faiss-cpu sentence-transformers openai numpy

Collecting faiss-cpu
  Downloading faiss_cpu-1.10.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (4.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.11.0->sentence-transformers)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_6

In [34]:
#Import Libraries
import os
import subprocess
import glob
import requests
from bs4 import BeautifulSoup
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
from openai import OpenAI
import re

In [35]:
def clone_repo(github_url):
    """
    Clone the GitHub repository if not already cloned.
    Returns the repository directory name.
    """
    repo_name = github_url.rstrip('/').split('/')[-1]
    if repo_name.endswith('.git'):
        repo_name = repo_name[:-4]
    if os.path.exists(repo_name):
        print(f"Repository '{repo_name}' already exists. Skipping clone.")
        return repo_name
    print(f"Cloning repository from {github_url} ...")
    cmd = ["git", "clone", github_url]
    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0:
        print("Error cloning repository:", result.stderr)
        return None
    return repo_name


def read_codebase(project_dir):
    """
    Read all Python files from a project directory
    """
    code = ""
    for filepath in glob.glob(os.path.join(project_dir, '**', '*.py'), recursive=True):
        try:
            with open(filepath, 'r', encoding='utf-8') as f:
                code += f"\n# File: {filepath}\n" + f.read() + "\n"
        except Exception as e:
            print(f"Error reading {filepath}: {e}")
    return code


def read_requirements(project_dir):
    """
    Read requirements.txt from a project directory
    """
    req_path = os.path.join(project_dir, 'requirements.txt')
    if os.path.exists(req_path):
        with open(req_path, 'r', encoding='utf-8') as f:
            return f.read()
    return ""

In [36]:
def search_web(query):
    """
    Perform targeted web searches for Python package documentation and compatibility information.
    Returns relevant URLs based on detected packages/versions in the query.
    """
    # Extract packages and versions using regex
    package_pattern = r"(\b[\w\-]+\b)(?:[=<>~!]=?|\s+)?([\d\.\*]+)?"
    matches = re.findall(package_pattern, query)

    urls = []
    for package, version in matches:
        # Generate package-specific URLs
        base_urls = [
            f"https://pypi.org/project/{package}/{version if version else ''}",
            f"https://{package}.readthedocs.io/en/{version if version else 'latest'}/",
            f"https://github.com/{package}/{package}/releases/tag/v{version}"
        ]

        # Add version-specific documentation if available
        if version and version != '*':
            urls.extend([
                f"https://{package}.readthedocs.io/en/v{version}/",
                f"https://www.versioneye.com/python/{package}/{version}"
            ])

        urls.extend(base_urls)

    # Add general Python packaging URLs
    urls.extend([
        "https://pip.pypa.io/en/stable/",
        "https://packaging.python.org/en/latest/",
        "https://github.com/pypa/packaging-problems/issues"
    ])

    return list(set(urls))  # Remove duplicates


def collect_data_from_url(url):
    """
    Fetch and extract relevant content from package-specific URLs.
    Returns structured data with source information.
    """
    try:
        response = requests.get(url, timeout=15)
        if response.status_code != 200:
            return ""

        soup = BeautifulSoup(response.text, 'html.parser')
        content = ""

        # PyPI-specific content extraction
        if "pypi.org" in url:
            main_content = soup.find('div', class_='project-description')
            if main_content:
                content += f"PyPI Documentation ({url}):\n"
                content += main_content.get_text(separator="\n", strip=True) + "\n\n"

        # ReadTheDocs-specific content extraction
        elif "readthedocs.io" in url:
            article = soup.find('div', role='main')
            if article:
                content += f"Official Documentation ({url}):\n"
                content += article.get_text(separator="\n", strip=True) + "\n\n"

        # GitHub Releases-specific content extraction
        elif "github.com" in url and "releases" in url:
            release_body = soup.find('div', class_='markdown-body')
            if release_body:
                content += f"GitHub Release Notes ({url}):\n"
                content += release_body.get_text(separator="\n", strip=True) + "\n\n"

        # General fallback
        else:
            main_text = soup.get_text(separator="\n", strip=True)
            content += f"General Content ({url}):\n{main_text}\n\n"

        return content.strip()

    except Exception as e:
        print(f"Error processing {url}: {e}")
        return ""

In [51]:
class EmbeddingModel:
    def __init__(self, model_name="all-MiniLM-L6-v2"):
        self.model = SentenceTransformer(model_name)
        self.tokenizer = lambda text: text.split()  # Simple whitespace tokenizer

    def encode(self, text):
        return self.model.encode(text)

    def count_tokens(self, text):
        return len(self.tokenizer(text))


class VectorDB:
    def __init__(self, embedding_dim, max_tokens=2000):
        self.embedding_dim = embedding_dim
        self.max_tokens = max_tokens
        self.index = faiss.IndexFlatL2(embedding_dim)
        self.texts = []
        self.token_counts = []

    def add(self, embedding, text):
        embedding = np.array(embedding).astype("float32").reshape(1, -1)
        self.index.add(embedding)
        self.texts.append(text)
        self.token_counts.append(len(text.split()))  # Simple token counting

    def search(self, query_embedding, k=10):
        query_embedding = np.array(query_embedding).astype("float32").reshape(1, -1)
        distances, indices = self.index.search(query_embedding, k)

        results = []
        total_tokens = 0
        for i in indices[0]:
            if i < len(self.texts) and (total_tokens + self.token_counts[i]) <= self.max_tokens:
                results.append(self.texts[i])
                total_tokens += self.token_counts[i]
            elif total_tokens >= self.max_tokens:
                break
        return results


def rerank_results(query_embedding, texts, embedding_model, max_tokens=2000):
    """
    Re-rank texts with token-constrained selection
    """
    scored_texts = []
    for text in texts:
        text_embedding = embedding_model.encode(text)
        score = cosine_similarity(query_embedding, text_embedding)
        token_count = embedding_model.count_tokens(text)
        scored_texts.append((text, score, token_count))

    # Sort by descending similarity score
    scored_texts.sort(key=lambda x: x[1], reverse=True)

    # Select texts within token budget
    selected_texts = []
    current_tokens = 0
    for text, score, tokens in scored_texts:
        if current_tokens + tokens <= max_tokens:
            selected_texts.append(text)
            current_tokens += tokens
        else:
            # Attempt to add partial content if remaining space
            remaining_tokens = max_tokens - current_tokens
            if remaining_tokens > 50:  # Only add chunks >50 tokens
                truncated = " ".join(text.split()[:remaining_tokens])
                selected_texts.append(truncated + "... [TRUNCATED]")
                current_tokens += remaining_tokens
            break

    return selected_texts

In [52]:
def get_dependency_status(context, codebase, package_versions, openai_api_key):
    """
    Ask the LLM whether package version changes are required.
    Expects an answer formatted as:
      Package Version Changes Required: <Yes/No>
      Explanation: <brief explanation>
    """
    from openai import OpenAI

    client = OpenAI(api_key=openai_api_key)

    system_message = """You are an expert Python dependency resolver. Analyze the given codebase and
    requirements to determine if package version changes are needed to resolve dependency issues."""

    user_message = f"""
    Relevant context from documentation and discussions:
    {context}

    Codebase:
    {codebase}

    Current package requirements:
    {package_versions}

    Answer in format:
    Package Version Changes Required: <Yes/No>
    Explanation: <brief explanation>"""

    response = client.chat.completions.create(
        model="gpt-3.5-turbo",
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": user_message}
        ],
        temperature=0.2,
        max_tokens=150
    )
    return response.choices[0].message.content.strip()


def get_new_requirements(context, codebase, package_versions, openai_api_key):
    """
    Ask the LLM to generate an updated requirements.txt file.
    If no changes are needed, the LLM should output the original file.
    """
    from openai import OpenAI

    client = OpenAI(api_key=openai_api_key)

    system_message = """You are an expert Python dependency resolver. Generate an updated requirements.txt
    that resolves dependency conflicts. If no changes needed, return the original content."""

    user_message = f"""
    Relevant context from documentation and discussions:
    {context}

    Codebase:
    {codebase}

    Current package requirements:
    {package_versions}

    Output only valid requirements.txt contents:"""

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": system_message},
            {"role": "user", "content": user_message}
        ],
        temperature=0.2,
        max_tokens=500
    )
    return response.choices[0].message.content.strip()

In [56]:
def run_pipeline(github_url, openai_api_key):
    # Clone the main repository
    repo_dir = clone_repo(github_url)
    if not repo_dir:
        print("Failed to clone repository. Exiting.")
        return

    # Path to Dataset directory
    dataset_dir = os.path.join(repo_dir, "Dataset")
    if not os.path.exists(dataset_dir):
        print("Dataset directory not found. Exiting.")
        return

    # Process each project in the Dataset directory
    for project_name in os.listdir(dataset_dir):
        project_dir = os.path.join(dataset_dir, project_name)

        # Skip non-directories and files without "project" in name
        if not os.path.isdir(project_dir) or "project" not in project_name.lower():
            continue

        print(f"\n{'='*40}")
        print(f"Processing project: {project_name}")
        print(f"{'='*40}")

        # Read project contents
        codebase = read_codebase(project_dir)
        package_versions = read_requirements(project_dir)

        if not codebase:
            print(f"No Python code found in {project_name}. Skipping.")
            continue

        # Collect contextual information (existing implementation)
        query = f"Python dependency resolution best practices and package compatibility. Packages:{package_versions}"
        #print(f"Query: {query}")
        urls = search_web(query)
        collected_texts = [collect_data_from_url(url) for url in urls]
        collected_texts = [text for text in collected_texts if text]

        # Build context (existing implementation)
        context = ""
        if collected_texts:
            embedding_model = EmbeddingModel()
            embedding_dim = 384
            vector_db = VectorDB(embedding_dim=embedding_dim)
            for text in collected_texts:
                emb = embedding_model.encode(text)
                vector_db.add(emb, text)

            query_embedding = embedding_model.encode(query)
            search_results = vector_db.search(query_embedding, k=5)
            reranked_results = rerank_results(query_embedding, search_results, embedding_model)
            context = "\n\n".join(reranked_results)

        #print(f"context: {context}")
        #print(len(context))
        # Get dependency status
        status_response = get_dependency_status(context, codebase, package_versions, openai_api_key)
        print("\n=== Dependency Status ===")
        print(status_response)

        # Generate new requirements
        new_requirements = get_new_requirements(context, codebase, package_versions, openai_api_key)

        # Save results per project
        output_dir = os.path.join(repo_dir, "generated_requirements")
        os.makedirs(output_dir, exist_ok=True)
        output_path = os.path.join(output_dir, f"{project_name}_requirements.txt")

        with open(output_path, "w", encoding="utf-8") as f:
            f.write(new_requirements)

        print(f"\n=== Generated requirements saved to: {output_path} ===")

    print("\nProcessing completed for all projects in Dataset folder.")

In [57]:
from google.colab import userdata
OpenAI_API_KEY = userdata.get('OpenAIKey')

In [58]:
run_pipeline(
    github_url="https://github.com/Samin-Sadaf7/Python-Dependency-Resolver.git",
    openai_api_key= OpenAI_API_KEY
)

Repository 'Python-Dependency-Resolver' already exists. Skipping clone.

Processing project: tensorflow_project
Query: Python dependency resolution best practices and package compatibility. Packages:tensorflow==2.5.0
keras==3.0.0  
Error processing https://www.versioneye.com/python/tensorflow/2.5.0: HTTPSConnectionPool(host='www.versioneye.com', port=443): Read timed out. (read timeout=15)

=== Dependency Status ===
Package Version Changes Required: Yes

Explanation: The current requirement for Keras is version 3.0.0, which is affected by a security vulnerability (CVE-2024-55459) that allows attackers to write arbitrary files to the user's machine. It is recommended to update Keras to version 3.8.0, which is a secure version that addresses this vulnerability.

=== Generated requirements saved to: Python-Dependency-Resolver/generated_requirements/tensorflow_project_requirements.txt ===

Processing project: pandas_project
Query: Python dependency resolution best practices and package com