In [2]:
import requests
from bs4 import BeautifulSoup
from sentence_transformers import SentenceTransformer
from pinecone import Pinecone, ServerlessSpec, PodSpec
import ollama
import os
import uuid
import time
import pickle
import pandas as pd
import re
import pandas as pd
from rouge import Rouge
import pandas as pd
from nltk.translate.bleu_score import sentence_bleu
import nltk
import warnings
warnings.filterwarnings('ignore')
import openpyxl
from bert_score import score
import itertools
import hf_xet
import zlib
import subprocess
import tempfile
import traceback
from googlesearch import search as google_search_func # Renamed to avoid conflict
GOOGLE_SEARCH_ENABLED = True

PINECONE_API_KEY = "pcsk_71bnuL_HGU1YACobTvL5gJNzHsZG1NMNx3RGmz1ohyC7xMiUYoWnuZpEn5SuvWpuTxnuzm"
PINECONE_ENVIRONMENT = "us-east-1"

# --- Constants ---
INDEX_NAME = "self-rag-docstring"
EMBEDDING_MODEL = 'all-MiniLM-L6-v2' # HuggingFace sentence transformer
OLLAMA_MODEL = 'deepseek-coder:6.7b' # Local Ollama model name (Ensure this is pulled: `ollama pull qwen2.5-coder:1.5b`)
OLLAMA_REWRITER_MODEL = 'deepseek-r1:1.5b'
TARGET_URL = [
    "https://peps.python.org/pep-0257/",
    "https://www.kaggle.com/code/hagzilla/what-are-docstrings",
    "https://github.com/keleshev/pep257/blob/master/pep257.py",
    "https://github.com/chadrik/doc484",
    "https://zerotomastery.io/blog/python-docstring/",
    "https://google.github.io/styleguide/pyguide.html",
    "https://www.geeksforgeeks.org/python-docstrings/",
    "https://pandas.pydata.org/docs/development/contributing_docstring.html",
    "https://www.coding-guidelines.lftechnology.com/docs/python/docstrings/",
    "https://realpython.com/python-pep8/",
    "https://pypi.org/project/AIDocStringGenerator/",
    "https://www.geeksforgeeks.org/pep-8-coding-style-guide-python/",
    "https://llego.dev/posts/write-python-docstrings-guide-documenting-functions/",
    "https://www.datacamp.com/tutorial/pep8-tutorial-python-code",
    "https://www.programiz.com/python-programming/docstrings",
    "https://marketplace.visualstudio.com/items?itemName=ShanthoshS.docstring-generator-ext",
    "https://stackoverflow.com/questions/3898572/what-are-the-most-common-python-docstring-formats",
    "https://stackoverflow.com/questions/78753860/what-is-the-proper-way-of-including-examples-in-python-docstrings",
    "https://sphinxcontrib-napoleon.readthedocs.io/en/latest/example_google.html",
    "https://www.dataquest.io/blog/documenting-in-python-with-docstrings/",
    "https://www.tutorialspoint.com/python/python_docstrings.htm"
]
VECTOR_DIMENSION = 384 # Dimension for all-MiniLM-L6-v2
METRIC = "cosine"
CLOUD = "aws"
REGION = "us-east-1"

print("Initializing services...")
try:
    model = SentenceTransformer(EMBEDDING_MODEL)
    print("Embedding model loaded.")

    # Pinecone
    pc = Pinecone(api_key=PINECONE_API_KEY)
    print(f"Pinecone initialized.") # Environment info is handled internally

    # Ollama Client
    ollama_client = ollama.Client()
    print(f"Ollama client initialized. Attempting to use model: {OLLAMA_MODEL}")
    print(f"Ensure '{OLLAMA_MODEL}' is available locally in Ollama (`ollama pull {OLLAMA_MODEL}`).")


except Exception as e:
    print(f"Error initializing services: {e}")
    exit()

Initializing services...
Embedding model loaded.
Pinecone initialized.
Ollama client initialized. Attempting to use model: deepseek-coder:6.7b
Ensure 'deepseek-coder:6.7b' is available locally in Ollama (`ollama pull deepseek-coder:6.7b`).


In [4]:
# --- 1. Initialize Pinecone ---
pinecone_index = None
if not PINECONE_API_KEY:
    print("ERROR: Pinecone API key not found in environment variables.")
    exit(1)
try:
    pc = Pinecone(api_key=PINECONE_API_KEY)
    existing_indexes = [index_info["name"] for index_info in pc.list_indexes()]
    print(f"Available Pinecone indexes: {existing_indexes}")

    if INDEX_NAME not in existing_indexes:
        print(f"Index '{INDEX_NAME}' not found. Creating new index...")
        pc.create_index(
            name=INDEX_NAME, dimension=VECTOR_DIMENSION, metric=METRIC,
            spec=ServerlessSpec(cloud=CLOUD, region=REGION)
        )
        while not pc.describe_index(INDEX_NAME).status["ready"]:
            print(f"Waiting for index '{INDEX_NAME}' to become ready...")
            time.sleep(5)
        print(f"Index '{INDEX_NAME}' created and ready.")
    else:
        print(f"Connecting to existing index '{INDEX_NAME}'.")
        # Optional: Clear index if you want to re-index fresh
        # print(f"WARNING: Deleting all vectors from existing index '{INDEX_NAME}'...")
        # index_to_clear = pc.Index(INDEX_NAME)
        # index_to_clear.delete(delete_all=True)
        # print(f"All vectors deleted from '{INDEX_NAME}'.")

    pinecone_index = pc.Index(INDEX_NAME)
    print(f"Successfully connected to index '{INDEX_NAME}'. Stats: {pinecone_index.describe_index_stats()}")
except Exception as e:
    print(f"ERROR: Failed to initialize or connect to Pinecone index '{INDEX_NAME}': {e}")
    exit(1)

Available Pinecone indexes: ['fusion-rag-docstring', 'self-rag-docstring', 'rag-docstring', 'corrective-rag-docstring', 'code-aware-rag-docstring']
Connecting to existing index 'self-rag-docstring'.
Successfully connected to index 'self-rag-docstring'. Stats: {'dimension': 384,
 'index_fullness': 0.0,
 'metric': 'cosine',
 'namespaces': {'': {'vector_count': 14}},
 'total_vector_count': 14,
 'vector_type': 'dense'}


In [5]:
# --- Load Data into Pinecone (Only if index is empty) ---
index_stats = pinecone_index.describe_index_stats()
if index_stats.total_vector_count == 0:
    total_docs_loaded = 0
    # Loop through each URL in the list
    for url in TARGET_URL:
        print(f"\nProcessing URL: {url}")
        try:
            # Fetch URL content
            response = requests.get(url, timeout=30) # Use timeout
            response.raise_for_status() # Check for HTTP errors

            # Parse HTML
            soup = BeautifulSoup(response.content, 'html.parser')
            main_content = soup.find('main') or soup.find('article') or soup.find('body')
            page_text = ""
            if main_content:
                page_text = main_content.get_text(separator='\n', strip=True)
            else:
                page_text = soup.get_text(separator='\n', strip=True) # Fallback

            if not page_text or len(page_text) < 50: # Basic check for meaningful content
                print(f" -> Warning: Could not extract sufficient text content from {url}. Skipping.")
                continue # Skip to the next URL

            print(f" -> Extracted text length: {len(page_text)} characters.")

            # Generate embedding
            # Note: Encoding large pages as a single vector might lose detail.
            # Chunking the text into smaller parts is better for real applications.
            embedding = model.encode(page_text).tolist()

            # Prepare and upsert data
            doc_id = str(uuid.uuid4())
            metadata = {"text": page_text, "source": url} # Store the specific URL as source

            pinecone_index.upsert(vectors=[(doc_id, embedding, metadata)])
            print(f" -> Data from {url} loaded into Pinecone with ID: {doc_id}")
            total_docs_loaded += 1
            time.sleep(0.5) # Small delay to be polite to the server

        except requests.exceptions.RequestException as e:
            # Handle errors fetching specific URL, continue with the next
            print(f" -> Error fetching URL {url}: {e}")
            continue
        except Exception as e:
            # Handle other errors during processing/upserting for this URL
            print(f" -> Error processing or upserting data for {url}: {e}")
            continue

    if total_docs_loaded > 0:
        print("Waiting a moment for indexing...")
        time.sleep(2)
        print(pinecone_index.describe_index_stats()) # Show final stats
    else:
        print("Warning: No documents were loaded into the index.")

else:
    print(f"\nIndex already contains {index_stats.total_vector_count} vectors. Skipping data loading.")


Index already contains 14 vectors. Skipping data loading.


In [8]:
# *** ADDED: Get separate query for context retrieval ***
def context_qry(user_code):
    context_query = (f"""
    Provide clear, concise, informative, and accurate docstrings for the given python code following PEP 257 conventions and standards, 
    to generate the content for a Python docstring based on the provided code snippet and relevant PEP contexts.
    
    **Instructions:**
    1.  Start with a concise summary line explaining the function/method's purpose.
    2.  If applicable, add a blank line and then more detailed explanation.
    3.  Use the 'Args:' section to describe each parameter, its type, and what it represents.
    4.  Use the 'Returns:' section to describe the return value and its type.
    5.  Use the 'Raises:' section to list any exceptions explicitly raised by the code.
    6.  Adhere strictly to PEP 257 formatting.
    7.  Base the docstring primarily on the 'Code Snippet to Document'. Use the 'Relevant Context' for 
    clarification or examples if needed.
    
    Also, check relevant content for the user given input code: {user_code}
    """)
    return context_query

# --- Get User Input ---
user_code = ("""
class LabelBinarizer(TransformerMixin, BaseEstimator, auto_wrap_output_keys=None):
    _parameter_constraints: dict = {'neg_label': [Integral], 'pos_label': [Integral], 'sparse_output': ['boolean']}

    def __init__(self, *, neg_label=0, pos_label=1, sparse_output=False):
        self.neg_label = neg_label
        self.pos_label = pos_label
        self.sparse_output = sparse_output

    @_fit_context(prefer_skip_nested_validation=True)
    def fit(self, y):
        if self.neg_label >= self.pos_label:
            raise ValueError(f'neg_label={self.neg_label} must be strictly less than pos_label={self.pos_label}.')
        if self.sparse_output and (self.pos_label == 0 or self.neg_label != 0):
            raise ValueError(f'Sparse binarization is only supported with non zero pos_label and zero neg_label, got pos_label={self.pos_label} and neg_label={self.neg_label}')
        self.y_type_ = type_of_target(y, input_name='y')
        if 'multioutput' in self.y_type_:
            raise ValueError('Multioutput target data is not supported with label binarization')
        if _num_samples(y) == 0:
            raise ValueError('y has 0 samples: %r' % y)
        self.sparse_input_ = sp.issparse(y)
        self.classes_ = unique_labels(y)
        return self

    def fit_transform(self, y):
        return self.fit(y).transform(y)

    def transform(self, y):
        check_is_fitted(self)
        y_is_multilabel = type_of_target(y).startswith('multilabel')
        if y_is_multilabel and (not self.y_type_.startswith('multilabel')):
            raise ValueError('The object was not fitted with multilabel input.')
        return label_binarize(y, classes=self.classes_, pos_label=self.pos_label, neg_label=self.neg_label, sparse_output=self.sparse_output)

    def inverse_transform(self, Y, threshold=None):
        check_is_fitted(self)
        if threshold is None:
            threshold = (self.pos_label + self.neg_label) / 2.0
        if self.y_type_ == 'multiclass':
            y_inv = _inverse_binarize_multiclass(Y, self.classes_)
        else:
            y_inv = _inverse_binarize_thresholding(Y, self.y_type_, self.classes_, threshold)
        if self.sparse_input_:
            y_inv = sp.csr_matrix(y_inv)
        elif sp.issparse(y_inv):
            y_inv = y_inv.toarray()
        return y_inv

    def _more_tags(self):
        return {'X_types': ['1dlabels']}
""")
if not user_code.strip():
    print("No code provided. Exiting.")
    exit()

In [11]:
# --- Step 1: Initial Generation Attempt (No RAG) ---
def initial_doc_without_rag(user_code, OLLAMA_GENERATOR_MODEL):
    initial_docstring = ""
    try:
        initial_prompt = f"""
        Generate a concise and accurate Python docstring for the following code. Focus only on the code provided.
        Return only the docstring for the given code, dont give or return the code.
        Python Code:
        ---
        {user_code}
        ---
    
        Generate only the docstring content, formatted appropriately:
        """
        response = ollama_client.generate(
            model=OLLAMA_GENERATOR_MODEL,
            prompt=initial_prompt
        )
        initial_docstring = response.get('response', '').strip()
    except Exception as e:
        print(f" -> ERROR during initial generation: {e}")
        initial_docstring = "" # Ensure empty on error
    return initial_docstring

In [13]:
# --- Step 2: Self-Critique (Simulated) ---
def self_critique(initial_docstring, OLLAMA_HELPER_MODEL):
    critique_passed = False
    if not initial_docstring:
        #print(" -> Skipping critique: No initial docstring was generated.")
        needs_improvement = True # Treat empty generation as needing improvement
    else:
        try:
            # Extract parameter names from user code for critique prompt
            param_names = []
            param_match = re.search(r'def\s+\w+\s*\((.*?)\):', user_code)
            if param_match:
                params_str = param_match.group(1)
                # Basic parsing, handles simple cases, might need refinement for complex signatures
                params = [p.strip().split(':')[0].split('=')[0].strip() for p in params_str.split(',') if p.strip() and p.strip() not in ['self', 'cls']]
                param_names = [p for p in params if p and p != '*'] # Filter out empty strings or just '*'
    
            critique_prompt = f"""
            Task: Evaluate the quality of the generated Python Docstring based *only* on the original Python Code. Check if the docstring provides a reasonable summary and mentions key elements like parameters (if any).
            Answer ONLY with GOOD or NEEDS_IMPROVEMENT.
    
            Original Python Code:
            ---
            {user_code}
            ---
            Generated Docstring:
            ---
            {initial_docstring}
            ---
    
            Evaluation Criteria:
            - Does the docstring provide a basic summary of what the code might do?
            - If the code has parameters ({', '.join(param_names) if param_names else 'None'}), does the docstring attempt to mention or describe them?
            - Is the docstring format plausible (starts with triple quotes)?
    
            Quality Assessment (GOOD or NEEDS_IMPROVEMENT):
            """
            response = ollama_client.generate(
                model=OLLAMA_HELPER_MODEL, # Use helper model for critique
                prompt=critique_prompt,
                options={'temperature': 0.0} # Deterministic critique
            )
            critique_result = response.get('response', '').strip().upper()
            #print(f" -> Self-critique result: {critique_result}")
    
            if "GOOD" in critique_result:
                critique_passed = True
                needs_improvement = False
            else: # Assume NEEDS_IMPROVEMENT or other non-GOOD response means improvement needed
                needs_improvement = True
    
        except Exception as e:
            print(f" -> ERROR during self-critique: {e}")
            needs_improvement = True # Assume improvement needed if critique fails
    return (critique_result, needs_improvement)

In [15]:
# --- Step 3: Adaptive Retrieval (Trigger RAG if needed) ---
def self_RAG(context_query, user_code, needs_improvement, pinecone_index, embedding_model):
    final_context = ""
    final_source_description = "N/A"
    
    if needs_improvement and pinecone_index:
        rag_triggered = True # Set flag
        #--- RAG Step 3.1: Retrieve Initial Candidate Documents ---
        #print(" -> RAG 3.1: Retrieving initial documents from Pinecone...")
        retrieved_matches = []
        initial_docs_for_refinement = []
        if context_query.strip():
            try:
                query_embedding = embedding_model.encode(context_query).tolist()
                search_results = pinecone_index.query(vector=query_embedding, top_k=3, include_metadata=True)
                retrieved_matches = search_results.matches
                initial_docs_for_refinement = [{"text": m.metadata.get('text',''), "source": m.metadata.get('source','N/A')} for m in retrieved_matches if m.metadata.get('text')]
                #print(f"    -> Retrieved {len(initial_docs_for_refinement)} candidate documents.")
            except Exception as e: print(f"    -> ERROR querying Pinecone: {e}")
        else: print("    -> No context query provided, skipping Pinecone retrieval.")
    
        # --- RAG Step 3.2: Corrective Web Search ---
        web_context_docs = [] # Store raw web docs for Step 4
        if context_query.strip() and GOOGLE_SEARCH_ENABLED:
            try:
                code_first_line = user_code.split('\n', 1)[0].strip()
                if len(code_first_line) > 50: code_first_line = code_first_line[:50] + "..."
                web_query = f"python {context_query} documentation for `{code_first_line}`"
                #print(f" -> Web search query: '{web_query}'")
                #print(f" -> Searching Google (asking for num=3 results)...")
                search_urls = list(google_search_func(web_query, lang="en"))
        
                if search_urls:
                    #print(f" -> Found {len(search_urls)} Google Search URLs. Fetching content...")
                    for i, url in enumerate(search_urls[:3]):
                        #print(f"    -> Fetching content from URL {i+1}: {url}")
                        try:
                            headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
                            response = requests.get(url, timeout=15, headers=headers, allow_redirects=True)
                            response.raise_for_status()
                            soup = BeautifulSoup(response.content, 'html.parser')
                            main_content = soup.find('main') or soup.find('article') or soup.find('body')
                            page_text = ""
                            if main_content: page_text = main_content.get_text(separator='\n', strip=True)
                            else: page_text = soup.get_text(separator='\n', strip=True)
        
                            if page_text:
                                 #print(f"       -> Storing snippet (length {len(page_text)}) from: {url} for refinement.")
                                 max_snippet_length = 4000 # Allow longer snippets before chunking
                                 web_context_docs.append({
                                     "text": page_text[:max_snippet_length],
                                     "source": url,
                                     "type": "web"
                                 })
                            else: print(f"       -> No text content extracted from: {url}")
                        except requests.exceptions.Timeout: print(f"       -> Timeout fetching URL: {url}")
                        except requests.exceptions.RequestException as req_err: print(f"       -> Failed to fetch/process URL {url}: {req_err}")
                        except Exception as parse_err: print(f"       -> Failed to parse URL {url}: {parse_err}")
                        time.sleep(1.5)
                else: print(" -> Google Search did not return result URLs.")
            except Exception as e:
                print(f" -> ERROR during Google Search / content fetching process: {e}")
                traceback.print_exc()
        elif not context_query.strip(): print(" -> Skipping web search: No context query was provided.")
        elif not GOOGLE_SEARCH_ENABLED: print(" -> Skipping web search: 'googlesearch-python' is not installed or failed to import.")
        relevant_chunks = []
        combined_sources_list = []
        all_potential_docs = initial_docs_for_refinement + web_context_docs
    
        # --- Helper Function for Rule-Based Grading (Copied from previous CRAG version) ---
        def grade_chunk_relevance(chunk_text, user_code, context_query):
            if not chunk_text or len(chunk_text.strip()) < 10: return False
            chunk_lower = chunk_text.lower()
            code_lower = user_code.lower()
            query_lower = context_query.lower()
            core_keywords = ["docstring", "parameter", "argument", "return", "yield", "attribute", "class", "function", "method", "pep 257", "summary", "description", "example", "usage", "type hint"]
            if any(keyword in chunk_lower for keyword in core_keywords): return True
            if query_lower:
                query_keywords = re.findall(r'\b\w{3,}\b', query_lower)
                stop_words = {"how", "what", "the", "and", "for", "does", "work", "python", "use", "create"}
                query_keywords = [kw for kw in query_keywords if kw not in stop_words]
                if query_keywords and any(keyword in chunk_lower for keyword in query_keywords): return True
            if ("def " in chunk_lower and "def " in code_lower) or ("class " in chunk_lower and "class " in code_lower): return True
            param_match = re.search(r'def\s+\w+\s*\((.*?)\):', user_code)
            if param_match:
                params = [p.strip().split('=')[0].strip() for p in param_match.group(1).split(',') if p.strip()]
                if any(f"parameter {p}" in chunk_lower or f"{p}:" in chunk_lower for p in params if p not in ['self', 'cls']): return True
            return False
        # --- End Helper Function ---
    
        if not all_potential_docs:
            print("    -> No documents from Pinecone or Web to refine.")
        else:
            #print(f"    -> Processing {len(all_potential_docs)} documents for chunking...")
            for doc in all_potential_docs:
                doc_text = doc.get('text', '')
                doc_source = doc.get('source', 'N/A')
                doc_type = doc.get('type', 'unknown')
                if not doc_text: continue
                chunks = [chunk.strip() for chunk in doc_text.split('\n\n') if chunk.strip()]
                # print(f"       -> Doc from {doc_source.split('/')[-1]} yielded {len(chunks)} chunks.") # Verbose
                graded_chunk_count = 0
                for chunk in chunks:
                    if grade_chunk_relevance(chunk, user_code, context_query):
                        relevant_chunks.append(chunk)
                        graded_chunk_count += 1
                        source_tag = f"{doc_type.capitalize()}:{doc_source.split('/')[-1]}"
                        if source_tag not in combined_sources_list: combined_sources_list.append(source_tag)
                # print(f"          -> Kept {graded_chunk_count} relevant chunks.") # Verbose
    
        if relevant_chunks:
            final_context = "\n\n".join(relevant_chunks)
            final_source_description = " | ".join(combined_sources_list)
            #print(f" -> Final refined context compiled from {len(relevant_chunks)} relevant chunks.")
            #print(f" -> Sources: {final_source_description}")
        else:
            print(" -> No relevant chunks found after refinement.")
            final_context = ""
    
    elif not pinecone_index:
         print("\nStep 3: Skipping RAG pipeline as Pinecone index is unavailable.")
    else: # Critique passed
        print("\nStep 3: Initial generation passed critique. Skipping RAG pipeline.")
    return final_context

In [17]:
def final_content_generation(final_context, user_code, OLLAMA_GENERATOR_MODEL):
    messages = [
    {'role': 'system', 'content': 'You are an expert Python programmer tasked with generating docstrings. You will receive context (if found) in one message, and the code to document in the final message. Use the context only if it is directly relevant to explaining the provided code. Return only the docstring and dont include the given python code in the output'}
]

    # Add the retrieved context as a separate user message, if it exists
    if final_context:
        # Include source information in the context message for clarity
        messages.append({'role': 'user', 'content': f"Here is potentially relevant context retrieved from knowledge base \n---\n{final_context} for the python\n{user_code}\n :\n---"})
    else:
        # Explicitly state if no context was found or provided
        messages.append({'role': 'user', 'content': "No specific context was retrieved or provided for this request."})
    
    # Add the final user message with the code and the explicit request
    messages.append({'role': 'user', 'content': f"Based on any relevant context provided earlier, generate the Python docstring for the following code:\n```python\n{user_code}\n```\n\nOutput *only* the complete docstring content itself, starting with triple quotes. Dont include python codes in the output"})
    
    try:
        # Use ollama.chat instead of ollama.generate
        response = ollama_client.chat(
            model=OLLAMA_GENERATOR_MODEL,
            messages=messages
            # You could add options here if needed, e.g., options={'temperature': 0.5}
        )
    
        # Extract the content from the 'message' dictionary in the response
        generated_docstring = response.get('message', {}).get('content', '').strip()
    
        # --- Print Output ---
        #print("\n--- Generated Docstring ---")
    
        # Basic cleaning: Chat models might add explanatory text or markdown fences.
        # Try to remove common prefixes/suffixes if the model didn't follow instructions precisely.
        if generated_docstring.startswith("```python"):
            generated_docstring = generated_docstring[len("```python"):].strip()
        elif generated_docstring.startswith("```"):
             generated_docstring = generated_docstring[len("```"):].strip()
    
        if generated_docstring.endswith("```"):
            generated_docstring = generated_docstring[:-len("```")].strip()
    
        #print("---------------------------\n")
        #print(generated_docstring)
        #print("---------------------------\n")
    
    except KeyError:
         print(f"Error: Unexpected response structure from Ollama chat: {response}")
         traceback.print_exc()
    except Exception as e:
        print(f"Error communicating with Ollama chat endpoint: {e}")
    return (generated_docstring)
    

In [19]:
def run_self_rag_pipeline(user_code, pinecone_index, emb_model, ollama_llm_client, generator_model_name, helper_model_name):
    user_cd = user_code
    context_query = context_qry(user_cd)
    doc_without_rag = initial_doc_without_rag(user_cd, generator_model_name)
    critique_result, needs_improvement = self_critique(doc_without_rag, helper_model_name)
    #print(needs_improvement)
    final_context = self_RAG(doc_without_rag, user_cd, needs_improvement, pinecone_index, emb_model)
    retrieved_contexts_list.append(final_context)
    generated_docstring = final_content_generation(final_context, user_cd, generator_model_name)
    return(generated_docstring)

In [21]:
class_files_df = pd.read_pickle('class_files_df.pkl')

In [23]:
ground_truth = class_files_df["Comments"].to_list()

In [25]:
generated_docstrings_list = []
retrieved_contexts_list = []
rewritten_contexts_list = []

In [27]:
for i, row in class_files_df.iterrows():
    user_code = row["Code_without_comments"]
    output = run_self_rag_pipeline(user_code, pinecone_index, model, ollama_client, OLLAMA_MODEL, OLLAMA_REWRITER_MODEL)
    generated_docstrings_list.append(output)
class_files_df["RAG_Docstring"] = generated_docstrings_list


Step 3: Initial generation passed critique. Skipping RAG pipeline.

Step 3: Initial generation passed critique. Skipping RAG pipeline.

Step 3: Initial generation passed critique. Skipping RAG pipeline.

Step 3: Initial generation passed critique. Skipping RAG pipeline.

Step 3: Initial generation passed critique. Skipping RAG pipeline.
       -> Failed to fetch/process URL /search?num=12: Invalid URL '/search?num=12': No scheme supplied. Perhaps you meant https:///search?num=12?

Step 3: Initial generation passed critique. Skipping RAG pipeline.
       -> Failed to fetch/process URL https://stackoverflow.com/questions/64879115/how-to-implement-conv1dtranspose-keras: 403 Client Error: Forbidden for url: https://stackoverflow.com/questions/64879115/how-to-implement-conv1dtranspose-keras

Step 3: Initial generation passed critique. Skipping RAG pipeline.
       -> Failed to fetch/process URL https://stackoverflow.com/questions/61816921/using-conv2dtranspose-to-output-the-double-of-its-i

In [29]:
def clean_rag_docstring(docstring_text):
    if not isinstance(docstring_text, str):
        return docstring_text

    if docstring_text.startswith("# ERROR:") or docstring_text.startswith("# SKIPPED:"):
        return docstring_text

    text = docstring_text.strip()

    if text.startswith("```python"):
        text = text[len("```python"):].strip()
    elif text.startswith("```"):
        text = text[len("```"):].strip()
    if text.endswith("```"):
        text = text[:-len("```")].strip()

    content_inside_quotes = None
    first_double_quotes = text.find('"""')
    if first_double_quotes != -1:
        last_double_quotes = text.rfind('"""')
        if last_double_quotes > first_double_quotes and (last_double_quotes + 3) <= len(text):
            content_inside_quotes = text[first_double_quotes + 3 : last_double_quotes].strip()

    if content_inside_quotes is None or not content_inside_quotes.strip():
        first_single_quotes = text.find("'''")
        if first_single_quotes != -1:
            last_single_quotes = text.rfind("'''")
            if last_single_quotes > first_single_quotes and (last_single_quotes + 3) <= len(text):
                content_inside_quotes = text[first_single_quotes + 3 : last_single_quotes].strip()
    
    if content_inside_quotes is not None and content_inside_quotes.strip():
        final_text_to_clean = content_inside_quotes
    else:
        final_text_to_clean = text
        if final_text_to_clean.startswith('"""') and final_text_to_clean.endswith('"""') and len(final_text_to_clean) >= 6:
            final_text_to_clean = final_text_to_clean[3:-3].strip()
        elif final_text_to_clean.startswith("'''") and final_text_to_clean.endswith("'''") and len(final_text_to_clean) >= 6:
            final_text_to_clean = final_text_to_clean[3:-3].strip()

    final_text_to_clean = re.sub(r"(?i)^class\s+\w+:\s*\n?", "", final_text_to_clean).strip()
    
    return final_text_to_clean

class_files_df["RAG_Docstring"] = class_files_df["RAG_Docstring"].astype(str).apply(clean_rag_docstring)

In [31]:
#class_files_df.to_excel('self_rag.xlsx')

In [33]:
def calculate_rouge(df, reference_column, hypothesis_column):
    rouge = Rouge()

    def calculate_score(row):
        scores = rouge.get_scores(row[hypothesis_column].lower(), row[reference_column].lower())
        return scores[0]['rouge-1']['f']

    df['ROUGE-1 ' + reference_column] = df.apply(calculate_score, axis=1)
    return df

# Calculate ROUGE-1 scores
data_1 = calculate_rouge(class_files_df, 'Comments', 'RAG_Docstring')

In [34]:
def calculate_bleu(df, reference_column, hypothesis_column):
    nltk.download('punkt')

    def calculate_score(row):
        reference = [row[reference_column].lower().split()]
        hypothesis = row[hypothesis_column].lower().split()
        score = sentence_bleu(reference, hypothesis, weights=(0.25, 0.25, 0.25, 0.25))
        return score

    df['BLEU Score ' + reference_column] = df.apply(calculate_score, axis=1)
    return df

In [35]:
# Calculate BLEU scores
data_1 = calculate_bleu(data_1, 'Comments', 'RAG_Docstring')

[nltk_data] Downloading package punkt to
[nltk_data]     /Users/balajivenktesh/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [36]:
# Calculate BERT encoding score, using cosine similarity
def calculate_bert_score(ground_truth, generated):
    # Calculate BERT score
    _, _, bert_score_f1 = score([ground_truth], [generated], lang='en', model_type='bert-base-uncased')

    return bert_score_f1.item()   

In [37]:
# Calculate BLEU scores
list_append_1 = []
for index, row in data_1.iterrows():
    list_append_1.append(calculate_bert_score(str(row["Comments"]), str(row["RAG_Docstring"])))

In [38]:
data_1["Accuracy"] = list_append_1

In [39]:
# Calculate number of syllables in docstring
def count_syllables(word):
    # Remove punctuation
    word = re.sub(r'[^a-zA-Z]', '', word)
    
    # Vowel count
    vowels = 'aeiouy'
    syllables = 0
    last_was_vowel = False
    for char in word:
        if char.lower() in vowels:
            if not last_was_vowel:
                syllables += 1
            last_was_vowel = True
        else:
            last_was_vowel = False
    
    # Adjust syllable count for words ending in 'e'
    if word.endswith(('e', 'es', 'ed')):
        syllables -= 1
    
    # Adjust syllable count for words with no vowels
    if syllables == 0:
        syllables = 1
    
    return syllables

In [44]:
# Calculate Flesch reading score
def flesch_reading_ease(text):
    sentences = text.count('.') + text.count('!') + text.count('?') + 1
    words = len(re.findall(r'\b\w+\b', text))
    syllables = sum(count_syllables(word) for word in text.split())
    
    # Calculate Flesch Reading Ease score
    score = 206.835 - 1.015 * (words / sentences) - 84.6 * (syllables / words)
    
    return score

In [45]:
# Calculate Easy scores
list_append_2 = []
for index, row in data_1.iterrows():
    list_append_2.append(flesch_reading_ease(str(row["RAG_Docstring"])))

In [46]:
data_1["Ease"] = list_append_2

In [47]:
#%%
def compress(input):
	return zlib.compress(input.encode())

In [48]:
def conciness(ground_truth, generated):
    comp1 = compress(ground_truth)
    comp2 = compress(generated)
    return sys.getsizeof(comp2) / sys.getsizeof(comp1)

In [49]:
# Calculate Conciseness scores
list_append_3 = []
for index, row in data_1.iterrows():
    list_append_3.append(conciness(str(row["Comments"]), str(row["RAG_Docstring"])))

In [50]:
data_1["Conciseness"] = list_append_3

In [51]:
def calculate_parameter_coverage(code_str, docstring_str):
    """
    Calculates the proportion of function/method parameters mentioned in the docstring.
    Returns a float (0.0 to 1.0) or None if no parameters are found in the code.
    """        
    match = re.search(r"def\s+\w+\s*\((.*?)\):", code_str)
    if not match:
        match = re.search(r"async\s+def\s+\w+\s*\((.*?)\):", code_str) 

    if not match:
        return None 

    params_str = match.group(1)
    if not params_str.strip(): 
        return 1.0 

    potential_params = [p.strip().split('=')[0].split(':')[0].strip() for p in params_str.split(',')]
    actual_params = [p for p in potential_params if p and p not in ('self', 'cls') and not p.startswith('*')]

    if not actual_params:
        return 1.0 

    covered_params = 0
    docstring_lower = docstring_str.lower()
    for param_name in actual_params:
        if re.search(r"\b" + re.escape(param_name.lower()) + r"\b", docstring_lower):
            covered_params += 1
        elif f"{param_name.lower()}:" in docstring_lower or f"parameter {param_name.lower()}" in docstring_lower:
             covered_params += 1
    return covered_params / len(actual_params) if actual_params else 1.0

In [52]:
# --- Return Value Coverage Calculation Function ---
def calculate_return_coverage(code_str, docstring_str):
    """
    Checks if the docstring mentions a return value if the code seems to have one.
    Returns 1 if covered/not applicable, 0 if potentially missing, None on error.
    """
    has_return_statement = False
    for line in code_str.splitlines():
        stripped_line = line.strip()
        if stripped_line.startswith("return ") and not stripped_line.endswith("return None") and len(stripped_line) > len("return "):
            has_return_statement = True
            break
    
    if not has_return_statement:
        return 1.0 

    docstring_lower = docstring_str.lower()
    return_keywords = ["return", "returns", "yield", "yields"] 
    if any(keyword in docstring_lower for keyword in return_keywords):
        return 1.0
    else:
        return 0.0

In [53]:
# --- Basic Faithfulness Metric Function ---
def calculate_basic_faithfulness(generated_docstring, retrieved_context_text):
    """
    Calculates a basic faithfulness score based on token overlap.
    This is a crude proxy for actual faithfulness.
    Returns a float (0.0 to 1.0) or None.
    """
    # Simple tokenization and stopword removal
    stop_words = set(["a", "an", "the", "is", "are", "was", "were", "be", "been", "being", "have", "has", "had", "do", "does", "did", "will", "would", "should", "can", "could", "may", "might", "must", "and", "or", "but", "if", "of", "at", "by", "for", "with", "about", "to", "in", "on", "this", "that", "it", "its", "you", "your", "i", "me", "my", "he", "she", "him", "her", "they", "them", "their"])
    
    try:
        gen_tokens = set(token.lower() for token in re.findall(r'\b\w+\b', generated_docstring) if token.lower() not in stop_words)
        ctx_tokens = set(token.lower() for token in re.findall(r'\b\w+\b', retrieved_context_text) if token.lower() not in stop_words)
    except Exception as e:
        print(f"Error tokenizing for faithfulness: {e}")
        return None

    if not gen_tokens: # If generated docstring has no valid tokens after filtering
        return 0.0 

    overlapping_tokens = gen_tokens.intersection(ctx_tokens)
    
    return len(overlapping_tokens) / len(gen_tokens) if gen_tokens else 0.0

In [54]:
def calculate_exception_coverage(code_str, docstring_str):
    if not all(isinstance(s, str) for s in [code_str, docstring_str]) or not docstring_str.strip() or docstring_str.startswith(("# ERROR:", "# SKIPPED:")): return None
    raised_exceptions = set(re.findall(r"raise\s+(\w+)", code_str)) # Basic: finds exception names
    if not raised_exceptions: return 1.0 # No exceptions to cover
    
    docstring_lower = docstring_str.lower()
    mentions_raises_section = "raises:" in docstring_lower
    covered_exceptions = 0
    for exc_name in raised_exceptions:
        if re.search(r"\b" + re.escape(exc_name.lower()) + r"\b", docstring_lower):
            covered_exceptions += 1
            
    # If a "Raises:" section exists, it's good, even if not all specific exceptions are named (simple check)
    if mentions_raises_section and raised_exceptions: return 1.0 
    if not raised_exceptions: return 1.0 # Should have been caught above
    return covered_exceptions / len(raised_exceptions) if raised_exceptions else 1.0

In [55]:
# --- Adherence to Docstring Conventions (Pydocstyle) ---
PYDOCSTYLE_ENABLED = True
def check_docstring_adherence_pydocstyle(code_str, generated_docstring_content):
    """
    Checks adherence of a generated docstring to PEP 257 using pydocstyle.
    The generated_docstring_content should be the *content* of the docstring,
    not including the triple quotes.
    Returns:
        float: A score from 0.0 to 1.0 (1.0 means no errors, 0.0 means many errors).
               Returns None if pydocstyle is not enabled or an error occurs.
    """
    # Sanitize content for embedding within triple quotes
    safe_content = generated_docstring_content.replace('\\', '\\\\') # Escape backslashes
    safe_content = safe_content.replace('"""', '\\"\\"\\"') # Escape internal triple-double-quotes
    safe_content = safe_content.replace("'''", "\\'\\'\\'") # Escape internal triple-single-quotes
    
    # Prepare the content for insertion, ensuring correct indentation for multi-line docstrings
    lines = safe_content.split('\n')
    if len(lines) == 1:
        # Single line docstring content, no special indentation needed beyond the initial one
        indented_docstring_body = lines[0]
    else:
        # Multi-line: first line as is, subsequent lines indented with 4 spaces
        # This assumes the docstring will be placed with an initial 4-space indent.
        indented_docstring_body = lines[0] + '\n' + '\n'.join(['    ' + line for line in lines[1:]])


    # Construct a minimal, valid Python snippet for pydocstyle
    # Try to place the docstring correctly within a class or function if identifiable
    code_prefix = ""
    code_suffix = "\n    pass" # Default suffix

    class_match = re.search(r"^(.*\bclass\s+\w+\s*\(?.*\)?:)", code_str, re.MULTILINE)
    func_match = re.search(r"^(.*\b(async\s+)?def\s+\w+\s*\(?.*\)?:)", code_str, re.MULTILINE)

    if class_match:
        header = class_match.group(1)
        # Find the end of the header line to insert the docstring
        code_prefix = code_str[:class_match.end()] + f'\n    """{indented_docstring_body}"""'
        code_suffix = code_str[class_match.end():] # The rest of the original class code
        # Ensure there's at least a 'pass' or some body if the original was just a header
        if not code_suffix.strip() or code_suffix.strip().startswith("#"):
            code_suffix = "\n    pass" + code_suffix 
        code_for_pydocstyle_check = code_prefix + code_suffix

    elif func_match:
        header = func_match.group(1)
        code_prefix = code_str[:func_match.end()] + f'\n    """{indented_docstring_body}"""'
        code_suffix = code_str[func_match.end():]
        if not code_suffix.strip() or code_suffix.strip().startswith("#"):
            code_suffix = "\n    pass" + code_suffix
        code_for_pydocstyle_check = code_prefix + code_suffix
    else:
        # Fallback: treat as module-level docstring if no class/def found
        # This is less ideal as the original code_str might not be a full module
        code_for_pydocstyle_check = f'"""{generated_docstring_content}"""\n{code_str}'


    errors_count = 0
    filtered_errors_count = 0
    tmp_file_path = None 
    try:
        with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as tmp_file:
            tmp_file.write(code_for_pydocstyle_check)
            tmp_file_path = tmp_file.name
        
        command = ['pydocstyle', tmp_file_path]
        process = subprocess.run(command, capture_output=True, text=True, encoding='utf-8')
        
        output = process.stdout.strip()
        print
        if output:
            all_errors = output.splitlines()
            errors_count = len(all_errors)
            # Filter out D100 (Missing docstring in public module) as it's an artifact
            # and D101, D102, D103 if we are only checking the first docstring.
            # For now, just D100 as the dummy structure is a module.
            filtered_errors = [err for err in all_errors if not err.strip().endswith("D100: Missing docstring in public module")]
            filtered_errors_count = len(filtered_errors)
        
        if process.stderr:
            if "Cannot parse file" in process.stderr or "unexpected EOF while parsing" in process.stderr or "invalid syntax" in process.stderr :
                 print(f"Pydocstyle CRITICAL PARSE ERROR for temp file {tmp_file_path}: {process.stderr}")
                 print("--- Content written to temp file that failed parsing: ---")
                 print(code_for_pydocstyle_check)
                 print("--------------------------------------------------------")
                 return 0.0 # Penalize heavily for parse error

    except Exception as e:
        print(f"An exception occurred during pydocstyle check: {str(e)}")
        if tmp_file_path and os.path.exists(tmp_file_path): # Check if tmp_file_path was assigned
             try:
                with open(tmp_file_path, 'r', encoding='utf-8') as f_err:
                    print(f"Content of temp file '{tmp_file_path}' that caused exception:\n{f_err.read()}")
             except Exception as read_err:
                print(f"Could not read temp file {tmp_file_path}: {read_err}")
        return None # Error during check
    finally:
        if tmp_file_path and os.path.exists(tmp_file_path):
            os.remove(tmp_file_path)
    
    # Normalize score based on filtered errors.
    # Using 10 as the denominator makes the score less harsh than 5.
    return max(0.0, 1.0 - (filtered_errors_count / 10.0))

In [56]:
param_coverage_list = []
return_coverage_list = []
faithfulness_list = []

In [57]:
for index, row in data_1.iterrows():
    param_coverage_list.append(calculate_parameter_coverage(str(row["Code_without_comments"]), str(row["RAG_Docstring"])))

In [58]:
data_1["Parameter_Coverage"] = param_coverage_list

In [59]:
for index, row in data_1.iterrows():
    return_coverage_list.append(calculate_return_coverage(str(row["Code_without_comments"]), str(row["RAG_Docstring"])))

In [60]:
data_1["Return_Coverage"] = return_coverage_list

In [61]:
data_1["Retrieved_Contexts"] = retrieved_contexts_list

In [62]:
for index, row in data_1.iterrows():
    faithfulness_list.append(calculate_basic_faithfulness(str(row["RAG_Docstring"]), str(row["Retrieved_Contexts"])))
    #faithfulness_list.append(faithfulness_score)
#if faithfulness_score is not None: print(f"    -> Basic Faithfulness: {faithfulness_score:.4f}")

In [63]:
data_1["Faithfulness_Score"] = faithfulness_list

In [64]:
pydocstyle_adherence_list_1 = []

In [65]:
for index, row in data_1.iterrows():
    pydocstyle_adherence_list_1.append(check_docstring_adherence_pydocstyle(str(row["Code_without_comments"]), str(row["RAG_Docstring"])))

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Av

In [66]:
data_1["PythonStyle_Adherence"] = pydocstyle_adherence_list_1

In [67]:
exception_coverage_list = []

In [68]:
for index, row in data_1.iterrows():
    exception_coverage_list.append(calculate_exception_coverage(str(row["Code_without_comments"]), str(row["RAG_Docstring"])))

In [69]:
data_1["Exception_Coverage"] = exception_coverage_list

In [70]:
data_1.to_excel('./deepseek/self_rag.xlsx')

In [71]:
data_1.to_pickle('./deepseek/self_rag.pkl')