In [1]:
import os
import json
import time
import fitz  # PyMuPDF
from typing import List, Dict, Union, Any
import google.generativeai as genai
from google.generativeai.types import HarmCategory, HarmBlockThreshold

In [2]:
!pip install --upgrade google-genai

Collecting google-genai
  Downloading google_genai-1.51.0-py3-none-any.whl.metadata (46 kB)
Downloading google_genai-1.51.0-py3-none-any.whl (260 kB)
Installing collected packages: google-genai
  Attempting uninstall: google-genai
    Found existing installation: google-genai 1.50.0
    Uninstalling google-genai-1.50.0:
      Successfully uninstalled google-genai-1.50.0
Successfully installed google-genai-1.51.0


In [None]:
genai.configure(api_key=os.getenv("GEMINI_API_KEY")) 

# Define Model ID and Instance
MODEL_ID = "gemini-2.5-pro"
# Instantiate the model object once globally
MODEL_INSTANCE = genai.GenerativeModel(MODEL_ID)

# Define file paths
PDF_PATH = "SRS_doc.pdf"
OUTPUT_JSON_PATH = "srs_extracted.json"
ANNOTATED_NL_PATH = "srs_annotated.json" # New intermediate file: Selected Sentences
LIFTED_NL_PATH = "srs_lifted_nl.json"     # Intermediate file: Lifted NL
FINAL_SPECS_PATH = "formal_specifications.json"

In [4]:
# --- 1. Step 0: SRS Text Extraction (Unchanged) ---

def extract_text_from_pdf(pdf_path: str, output_json_path: str) -> dict:
    """
    Extracts text from a real PDF file page by page and ensures the output JSON file 
    is created before returning the data.
    """
    
    print(f"1. Starting text extraction from {pdf_path}...")
    data = {"sections": []} # Initialize data structure

    if not os.path.exists(pdf_path):
        print(f"ERROR: PDF file not found at {pdf_path}. Returning empty data structure.")
        # Write failure state to file
        with open(output_json_path, "w", encoding="utf-8") as f:
            json.dump(data, f, indent=2, ensure_ascii=False)
        return data

    # --- ACTUAL EXTRACTION LOGIC ---
    try:
        text_sections = []
        with fitz.open(pdf_path) as doc:
            for i, page in enumerate(doc):
                text = page.get_text("text")
                sentences = [s.strip() for s in text.split(". ") if s.strip()]
                text_sections.append({
                    "id": f"page_{i+1}",
                    "sentences": sentences
                })
        data["sections"] = text_sections
        print(f"     Extracted text successfully from {len(data['sections'])} pages.")
        
    except Exception as e:
        print(f"   Fatal Error during PDF extraction: {e}. Returning empty data structure.")
        data["sections"] = []

    # --- GUARANTEE: Write the data to the JSON file before returning ---
    with open(output_json_path, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2, ensure_ascii=False)
    
    print(f"    Final extracted data written to {output_json_path}")
    return data


In [5]:
# Assuming global variables like PDF_PATH and OUTPUT_JSON_PATH are defined.
print("--- 1. Running Step 0: Extraction ---")
extracted_data = extract_text_from_pdf(PDF_PATH, OUTPUT_JSON_PATH)

if not extracted_data["sections"]:
    print("\nExtraction failed. Cannot continue.")

--- 1. Running Step 0: Extraction ---
1. Starting text extraction from SRS_doc.pdf...
     Extracted text successfully from 37 pages.
    Final extracted data written to srs_extracted.json


In [None]:
ANNOTATION_SCHEMA = { # Schema retained but used only for prompt instruction
    "type": "object", 
    "properties": {
        "requirements": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "section_id": {"type": "string"},
                    "sentence_text": {"type": "string"}
                },
                "required": ["section_id", "sentence_text"]
            }
        }
    },
    "required": ["requirements"]
}

# --- 2. Step 1: Annotation/Selection (FIXED API CALL with Sleep and Robust Parsing) ---

# ANNOTATION_SCHEMA is assumed defined (retained for context)

def step_1_annotate_selection(input_json: dict, model_instance: genai.GenerativeModel) -> List[Dict]:
    """
    Step 1: Identifies and extracts actionable requirements from the full SRS text,
    implementing a retry loop for rate limit errors and robust JSON parsing.
    """
    print(f"\n--- Step 1: Annotation/Selection using {MODEL_ID} ---")
    
    srs_text_summary = json.dumps(input_json, indent=2)
    MAX_RETRIES = 3 # Safety limit for retries
    
    SYSTEM_PROMPT = f"""
    You are an expert in software engineering, analyzing an SRS document.
    Your task is to identify and extract ONLY the sentences that represent 
    actionable system requirements (e.g., state transitions, system constraints, 
    user actions, or "must/shall" statements). Ignore introductory, descriptive, 
    or explanatory text.
    
    You MUST output the result in a clean JSON format strictly adhering to this structure:
    {json.dumps(ANNOTATION_SCHEMA, indent=2)}
    """

    for attempt in range(1, MAX_RETRIES + 1):
        try:
            print(f"   -> Running annotation API call (Attempt {attempt} of {MAX_RETRIES})...")
            
            response = model_instance.generate_content(
                SYSTEM_PROMPT + "\n\nJSON input:\n" + srs_text_summary
            )

            # 1. Robust JSON Parsing (Strip markdown and extra text)
            raw_text = response.text.strip()
            if raw_text.startswith("```"):
                raw_text = raw_text.strip("```json").strip("```").strip()

            annotated_data = json.loads(raw_text)
            
            # 2. Key Error Fix: Use .get() or check for key existence
            # This handles cases where the model returns valid JSON but misses the 'requirements' key.
            if 'requirements' not in annotated_data:
                raise KeyError(f"JSON lacks 'requirements' key. Raw output: {raw_text[:100]}...")

            # Success: Process and return
            selected_requirements = [
                {"Source_NL": req["sentence_text"], "section_id": req["section_id"]}
                for req in annotated_data["requirements"] 
            ]
            
            with open(ANNOTATED_NL_PATH, "w", encoding="utf-8") as f:
                json.dump(selected_requirements, f, indent=2, ensure_ascii=False)
                
            print(f"     Identified {len(selected_requirements)} actionable requirements.")
            print(f"     Annotated requirements saved to {ANNOTATED_NL_PATH}")
            return selected_requirements

        except Exception as e:
            error_message = str(e)
            
            # --- FIX 1: Increase Sleep Time ---
            if "429" in error_message and attempt < MAX_RETRIES:
                # Use 75 seconds for a safe buffer against the strict 60s sliding window
                print(f"     Rate Limit Hit! Sleeping for 75 seconds before retrying...")
                time.sleep(75) 
            elif attempt < MAX_RETRIES:
                # Wait 5 seconds for generic parsing errors (like JSONDecodeError) before retrying
                print(f"     Parsing/Structural Error: {error_message}. Retrying in 5 seconds...")
                time.sleep(5)
            else:
                # Catastrophic failure or max retries exceeded
                print(f"   Final Error during Annotation/Selection (Attempt {attempt}): {error_message}")
                return []
    
    return []


In [7]:
# Assuming MODEL_INSTANCE is defined globally.
print("\n--- 2. Running Step 1: Annotation/Selection ---")
selected_requirements = step_1_annotate_selection(extracted_data, MODEL_INSTANCE)

if not selected_requirements:
    print("\nAnnotation failed. Cannot continue.")

# You can now inspect 'srs_annotated.json' or the 'selected_requirements' list.


--- 2. Running Step 1: Annotation/Selection ---

--- Step 1: Annotation/Selection using gemini-2.5-pro ---
   -> Running annotation API call (Attempt 1 of 3)...
     Identified 93 actionable requirements.
     Annotated requirements saved to srs_annotated.json


In [None]:
# --- 3. Step 2: Lifting (Batched API Call - FINAL FIX) ---

LIFTING_SCHEMA = { 
    "type": "array",
    "items": {
        "type": "object",
        "properties": {
            "Source_NL": {"type": "string", "description": "The original source sentence."},
            "Lifted_NL": {"type": "string", "description": "The structured, intermediate "
            "'Lifted Natural Language' statement."}
        },
        "required": ["Source_NL", "Lifted_NL"]
    }
}
BATCH_SIZE = 50 # Process 50 requirements per API call
MAX_API_RETRIES = 2 # Max retries for a single batch request

def step_2_perform_lifting(selected_requirements: List[Dict], model_instance: genai.GenerativeModel) -> List[Dict]:
    """
    Step 2: Natural Language (NL) to Lifted NL (Lifting).
    Implements a robust retry mechanism for rate limit errors within the batch loop.
    """
    all_lifted_results = []
    num_requests = 0
    
    print(f"\n--- Step 2: NL to Lifted NL (Lifting) using {MODEL_ID} ---")
    
    # We keep the 60s initial wait to clear the quota from Step 1
    print(f"Waiting 60 seconds to ensure quota reset from previous API call (Step 1)...")
    time.sleep(60)
    
    # LIFTING_PROMPT is assumed defined globally.
    
    for i in range(0, len(selected_requirements), BATCH_SIZE):
        batch = selected_requirements[i:i + BATCH_SIZE]
        original_map = {req['Source_NL']: req for req in batch}
        batch_nl = list(original_map.keys())
        
        print(f"   Lifting Batch {num_requests + 1} ({len(batch)} items)...")

        # --- FIX: DEFINE 'contents' HERE (Inside batch loop, Outside retry loop) ---
        contents = [
            # FIX: Serialize LIFTING_SCHEMA dictionary to a string
            {"role": "user", "parts": [{"text": json.dumps(LIFTING_SCHEMA)}]}, 
            
            # Few-shot example 
            {"role": "user", "parts": [{"text": "The manager must verify the plumber after registration if their status is unverified."}]},
            {"role": "model", "parts": [{"text": json.dumps([{"Source_NL": "The manager must verify the plumber after registration if their status is unverified.", 
            "Lifted_NL": "always (manager attempts to verify plumber p AND p's status is unverified) implies (p's status becomes active)"}])}]},
            
            # Current Batch Query
            {"role": "user", "parts": [{"text": "Translate the following NL sentences:\n" + "\n".join(f"- {s}" for s in batch_nl)}]}
        ]
        # --- END FIX ---

        # --- NEW RETRY LOOP FOR API CALL ---
        batch_processed_successfully = False
        
        for attempt in range(1, MAX_API_RETRIES + 1):
            try:
                # API call now safely uses the defined 'contents' variable
                response = model_instance.generate_content(contents=contents)
                num_requests += 1
                batch_processed_successfully = True
                
                # If successful, break the retry loop and proceed to process results
                break 

            except Exception as e:
                error_message = str(e)
                
                if "429" in error_message and attempt < MAX_API_RETRIES:
                    # FIX: Wait longer than the 48.4s suggested delay to ensure reset
                    print(f" Rate Limit Hit on Attempt {attempt}. Sleeping for 75 seconds before retrying...")
                    time.sleep(75) 
                else:
                    # Log the final error if retries are exhausted or it's not a 429 error
                    print(f"     Final Error during Lifting Batch {num_requests}: {error_message}")
                    break
        # --- END RETRY LOOP ---

        # Process results ONLY if the API call was successful
        if batch_processed_successfully:
            try:
                raw_text = response.text.strip()
                if raw_text.startswith("```"):
                    raw_text = raw_text.strip("```json").strip("```").strip()

                batch_results = json.loads(raw_text)
                
                # (Robust results processing logic remains the same)
                if not isinstance(batch_results, list):
                    raise ValueError(f"Model output was not a list (array). Found type: {type(batch_results)}")

                for res in batch_results:
                    source_nl = res.get("Source_NL")
                    original_item = original_map.get(source_nl)

                    if source_nl and original_item:
                        all_lifted_results.append({
                            "Source_NL": source_nl,
                            "Lifted_NL": res.get("Lifted_NL", "ERROR: Lifted NL missing"),
                            "section_id": original_item.get('section_id', 'N/A')
                        })
            except Exception as e:
                print(f"     Critical Parsing/Processing Error: {e}")
                batch_processed_successfully = False # Mark batch as failed during processing

        # Log failed items if the API call or processing failed
        if not batch_processed_successfully:
            for req in batch:
                if req['Source_NL'] not in [r.get('Source_NL') for r in all_lifted_results]:
                    all_lifted_results.append({"Source_NL": req['Source_NL'], "Error": "Batch API/Processing Failed", "Lifted_NL": "ERROR"})


        # --- RATE LIMITING: Sleep for 60 seconds between batches ---
        if i + BATCH_SIZE < len(selected_requirements):
            print(f" Sleeping for 60 seconds (Completed {num_requests} requests)...")
            time.sleep(60)
            
    with open(LIFTED_NL_PATH, "w", encoding="utf-8") as f:
        json.dump(all_lifted_results, f, indent=2, ensure_ascii=False)
    print(f"  Intermediate Lifted NL results saved to {LIFTED_NL_PATH}")
    
    return [r for r in all_lifted_results if 'ERROR' not in r['Lifted_NL']]

In [9]:
# This step automatically waits 60 seconds to clear the Step 1 quota if necessary.
print("\n--- 3. Running Step 2: Lifting (NL -> Lifted NL) ---")
lifted_requirements = step_2_perform_lifting(selected_requirements, MODEL_INSTANCE)

# You can now inspect 'srs_lifted_nl.json' or the 'lifted_requirements' list.


--- 3. Running Step 2: Lifting (NL -> Lifted NL) ---

--- Step 2: NL to Lifted NL (Lifting) using gemini-2.5-pro ---
Waiting 60 seconds to ensure quota reset from previous API call (Step 1)...
   Lifting Batch 1 (50 items)...
 Sleeping for 60 seconds (Completed 1 requests)...
   Lifting Batch 2 (43 items)...
  Intermediate Lifted NL results saved to srs_lifted_nl.json


In [10]:
FINAL_SPEC_SCHEMA_BATCH = {
    "type": "array",
    "items": {
        "type": "object",
        "properties": {
            "Lifted_NL": {"type": "string", "description": "The Lifted NL input."},
            "LABEL": {"type": "string", "description": "The specification label (e.g., VERIFY_PLUMBER_OK)."},
            "Precondition": {"type": "string", "description": "Logical constraints that must hold before execution."},
            "Function": {"type": "string", "description": "Function signature (name, parameters, return type, HTTP code)."},
            "Postcondition": {"type": "string", "description": "Logical constraints on the global state after execution, using primed globals (U')."}
        },
        "required": ["Lifted_NL", "LABEL", "Precondition", "Function", "Postcondition"]
    }
}

def step_3_perform_translation(lifted_requirements: List[Dict], model_instance: genai.GenerativeModel) -> List[Dict]:
    """
    Step 3: Lifted NL to Formal Specification Translation.
    Processes lifted requirements in batches for translation.
    """
    final_specs = []
    num_requests = 0
    
    print(f"\n--- Step 3: Lifted NL to Formal Specification Translation using {MODEL_ID} ---")

    # --- UPDATED PROMPT: Includes detailed instructions and the desired output structure ---
    TRANSLATION_PROMPT = f"""
    You are an expert Formal Specification Translator. Your task is to convert a list of structured 'Lifted NL' statements into formal specifications using predicate logic.

    The formal specification must always follow these components: LABEL, Precondition, Function, and Postcondition.

    CONTEXT: U is the set of users. m is a manager ID, p is a plumber ID.
    - Precondition: Must type-check to Bool.
    - Function: Name(param: type) â†’ return [HTTP].
    - Postcondition: Uses primed globals (U') to describe the after-state.

    You MUST output a single JSON array strictly adhering to this structure:
    {json.dumps(FINAL_SPEC_SCHEMA_BATCH, indent=2)}
    """
    
    # --- FEW-SHOT EXAMPLE DATA ---
    # Define the few-shot example that adheres to the new schema
    few_shot_lifted = "always (manager attempts to verify plumber p AND p's status is unverified) implies (p's status becomes active)"
    few_shot_formal = json.dumps({
        "Lifted_NL": few_shot_lifted,
        "LABEL": "VERIFY_PLUMBER_OK",
        "Precondition": "m âˆˆ dom(U) âˆ§ U[m].role = 'manager' âˆ§ p âˆˆ dom(U) âˆ§ U[p].role = 'plumber' âˆ§ U[p].status = 'unverified'",
        "Function": "verify_plumber(managerID: m, plumberID: p) â†’ 200 OK",
        "Postcondition": "U' = U with U'[p].status = 'active'"
    })
    
    for i in range(0, len(lifted_requirements), BATCH_SIZE):
        batch = lifted_requirements[i:i + BATCH_SIZE]
        batch_lifted_nl = [req['Lifted_NL'] for req in batch]
        
        print(f"   Translating Batch {num_requests + 1} ({len(batch)} items)...")

        # Few-shot history and current batch query (contents list construction remains the same)
        contents = [
    # FIX: Explicitly dump the schema dictionary to a string
    {"role": "user", "parts": [{"text": json.dumps(FINAL_SPEC_SCHEMA_BATCH)}]}, 
    
    {"role": "user", "parts": [{"text": few_shot_lifted}]},
    {"role": "model", "parts": [{"text": few_shot_formal}]},
    {"role": "user", "parts": [{"text": "Translate the following Lifted NL sentences:\n" + "\n".join(f"- {s}" for s in batch_lifted_nl)}]}
]
        # --- NEW RETRY LOOP FOR API CALL ---
        batch_processed_successfully = False
        
        for attempt in range(1, MAX_API_RETRIES + 1):
            try:
                response = model_instance.generate_content(contents=contents)
                num_requests += 1
                batch_processed_successfully = True
                break 

            except Exception as e:
                error_message = str(e)
                
                if "429" in error_message and attempt < MAX_API_RETRIES:
                    # FIX: INCREASE SLEEP DURATION FOR FREE TIER RELIABILITY
                    print(f"Rate Limit Hit on Attempt {attempt}. Sleeping for 75 seconds before retrying...")
                    time.sleep(75) 
                else:
                    # Log the final error if retries are exhausted or it's not a 429 error
                    print(f"Final Error during Translation Batch {num_requests}: {error_message}")
                    break
        # --- END RETRY LOOP ---

        # Process results ONLY if the API call was successful
        if batch_processed_successfully:
            try:
                raw_text = response.text.strip()
                if raw_text.startswith("```"):
                    raw_text = raw_text.strip("```json").strip("```").strip()

                batch_formal_specs = json.loads(raw_text)
                
                # Combine results
                for j, formal_spec in enumerate(batch_formal_specs):
                    final_specs.append({
                        "Source_NL": batch[j]['Source_NL'],
                        "Lifted_NL": formal_spec['Lifted_NL'],
                        "Formal_Spec": {
                            "LABEL": formal_spec['LABEL'],
                            "Precondition": formal_spec['Precondition'],
                            "Function": formal_spec['Function'],
                            "Postcondition": formal_spec['Postcondition']
                        }
                    })

            except Exception as e:
                print(f"     Critical Parsing/Processing Error: {e}")
                for req in batch:
                    final_specs.append({"Source_NL": req['Source_NL'], "Lifted_NL": req['Lifted_NL'], "Error": str(e)})

        # --- RATE LIMITING: Sleep for 60 seconds between batches (Kept as 60s, but now the retry loop handles the critical delay) ---
        if i + BATCH_SIZE < len(lifted_requirements):
            print(f"     ðŸ’¤ Sleeping for 60 seconds (Completed {num_requests} requests, {len(final_specs)} items)...")
            time.sleep(60)

    with open(FINAL_SPECS_PATH, "w", encoding="utf-8") as f:
        json.dump(final_specs, f, indent=2, ensure_ascii=False)
    print(f"  Final formal specifications saved to {FINAL_SPECS_PATH}")
    
    return final_specs

In [11]:
# This step automatically waits 60 seconds to clear the Step 2 quota if necessary.
print("\n--- 4. Running Step 3: Formal Translation ---")
final_specifications = step_3_perform_translation(lifted_requirements, MODEL_INSTANCE)

print(f"\n  Final specifications saved to {FINAL_SPECS_PATH}.")


--- 4. Running Step 3: Formal Translation ---

--- Step 3: Lifted NL to Formal Specification Translation using gemini-2.5-pro ---
   Translating Batch 1 (27 items)...
  Final formal specifications saved to formal_specifications.json

  Final specifications saved to formal_specifications.json.
