In [12]:
import json

# Read the forgeswagger.json file
with open('./stubapi/specs/forgeswagger.json', 'r') as file:
    forgeswagger_data = json.load(file)

# Display basic info about the loaded data
print(f"Successfully loaded JSON file")
print(f"Type: {type(forgeswagger_data)}")
if isinstance(forgeswagger_data, dict):
    print(f"Top-level keys: {list(forgeswagger_data.keys())}")

Successfully loaded JSON file
Type: <class 'dict'>
Top-level keys: ['openapi', 'info', 'paths', 'components']


In [13]:
from pydantic import BaseModel, Field
from typing import Optional
from enum import IntEnum, Enum
import re

def create_pydantic_class_from_schema(schema_name: str, schema_definition: dict, base_class: str = "BaseModel") -> str:
    """
    Generate a Pydantic class definition from a JSON schema definition.
    
    Args:
        schema_name: Name of the schema/class
        schema_definition: The schema definition dictionary
        base_class: Base class to inherit from (default: "BaseModel")
    
    Returns:
        String containing the Pydantic class definition
    """
    
    def map_json_type_to_python(json_type: str, format_type: str = None) -> str:
        """Map JSON schema types to Python types"""
        type_mapping = {
            "string": "str",
            "integer": "int",
            "number": "float", 
            "boolean": "bool",
            "array": "list",
            "object": "dict"
        }
        
        if json_type == "integer" and format_type == "int32":
            return "int"
        elif json_type == "integer" and format_type == "int64":
            return "int"
        elif json_type == "number" and format_type == "float":
            return "float"
        elif json_type == "number" and format_type == "double":
            return "float"
        
        return type_mapping.get(json_type, "str")
    
    def parse_enum_from_description(description: str, enum_type: str = "integer"):
        """Parse enum values and names from description text like '0 = Value1\n1 = Value2'"""
        if not description:
            return [], []
        
        # Pattern to match "number = name" or "number=name" 
        pattern = r'(\d+)\s*=\s*([^\n\r]+)'
        matches = re.findall(pattern, description.strip())
        
        if not matches:
            return [], []
        
        values = []
        names = []
        for value_str, name in matches:
            if enum_type == "integer":
                values.append(int(value_str))
            else:
                values.append(value_str)
            # Clean up the name - remove extra whitespace
            clean_name = name.strip()
            names.append(clean_name)
        
        return values, names
    
    def create_enum_class(enum_name: str, enum_values: list, enum_names: list = None, enum_type: str = "integer") -> str:
        """Create an enum class definition"""
        base_enum = "IntEnum" if enum_type == "integer" else "Enum"
        lines = [f"class {enum_name}({base_enum}):"]
        
        if not enum_values:
            lines.append("    pass")
            return "\n".join(lines)
        
        for i, value in enumerate(enum_values):
            if enum_names and i < len(enum_names):
                # Use the corresponding name from enum_names
                name = enum_names[i]
            else:
                # Fallback to a generic name if no enum_names provided
                name = f"VALUE_{value}"
            
            # Make sure the name is a valid Python identifier
            # Replace any invalid characters and ensure it doesn't start with a number
            clean_name = ''.join(c if c.isalnum() or c == '_' else '_' for c in str(name))
            if clean_name and clean_name[0].isdigit():
                clean_name = f"_{clean_name}"
            if not clean_name:  # Fallback if name becomes empty
                clean_name = f"VALUE_{value}"
            
            lines.append(f"    {clean_name} = {value}")
        
        return "\n".join(lines)
    
    # Check if this is an enum-only schema (no properties, just description with enum info)
    if ("enum" in schema_definition or 
        ("description" in schema_definition and 
         "properties" not in schema_definition and 
         re.search(r'\d+\s*=\s*[^\n\r]+', schema_definition.get("description", "")))):
        
        # This is an enum schema
        enum_type = schema_definition.get("type", "integer")
        
        if "enum" in schema_definition:
            # Explicit enum values
            enum_values = schema_definition["enum"]
            enum_names = schema_definition.get("x-enumNames", [])
        else:
            # Parse from description
            description = schema_definition.get("description", "")
            enum_values, enum_names = parse_enum_from_description(description, enum_type)
        
        if enum_values:
            return create_enum_class(schema_name, enum_values, enum_names, enum_type)
        else:
            # Fallback to empty class with docstring
            lines = [f"class {schema_name}(IntEnum if enum_type == 'integer' else Enum):"]
            if "description" in schema_definition:
                lines.append(f'    """{schema_definition["description"]}"""')
            lines.append("    pass")
            return "\n".join(lines)
    
    # Collect enum definitions
    enum_classes = []
    
    # Start building the class
    class_lines = [f"class {schema_name}({base_class}):"]
    
    # Add docstring if description exists
    if "description" in schema_definition:
        class_lines.append(f'    """{schema_definition["description"]}"""')
    
    properties = schema_definition.get("properties", {})
    required_fields = schema_definition.get("required", [])
    
    if not properties:
        class_lines.append("    pass")
        return "\n".join(class_lines)
    
    # Generate fields
    for prop_name, prop_def in properties.items():
        # Check if this is an enum
        if "enum" in prop_def:
            prop_type = prop_def.get("type", "string")
            enum_values = prop_def["enum"]
            enum_names = prop_def.get("x-enumNames", [])
            
            # Create enum class name - properly capitalize the property name
            enum_class_name = f"{prop_name}Enum"
            
            # Generate enum class
            enum_class = create_enum_class(enum_class_name, enum_values, enum_names, prop_type)
            enum_classes.append(enum_class)
            
            # Use enum as the type
            python_type = enum_class_name
        else:
            prop_type = prop_def.get("type", "string")
            prop_format = prop_def.get("format")
            python_type = map_json_type_to_python(prop_type, prop_format)
        
        # Determine if field is optional
        is_required = prop_name in required_fields
        
        # Build the field definition
        if is_required:
            field_def = f"    {prop_name}: {python_type}"
        else:
            field_def = f"    {prop_name}: Optional[{python_type}] = None"
        
        # Add Field() with additional constraints if needed
        field_constraints = []
        
        if "description" in prop_def:
            # Clean up description formatting - remove extra newlines and escape quotes
            description = prop_def["description"].strip().replace('\n', ' ').replace('"', '\\"')
            field_constraints.append(f'description="{description}"')
        
        if "minimum" in prop_def:
            field_constraints.append(f'ge={prop_def["minimum"]}')
            
        if "maximum" in prop_def:
            field_constraints.append(f'le={prop_def["maximum"]}')
            
        if "minLength" in prop_def:
            field_constraints.append(f'min_length={prop_def["minLength"]}')
            
        if "maxLength" in prop_def:
            field_constraints.append(f'max_length={prop_def["maxLength"]}')
            
        if "pattern" in prop_def:
            field_constraints.append(f'regex=r"{prop_def["pattern"]}"')
        
        if field_constraints:
            field_def += f" = Field({', '.join(field_constraints)})"
        
        class_lines.append(field_def)
    
    # Combine enum classes and main class
    result_lines = []
    if enum_classes:
        result_lines.extend(enum_classes)
        result_lines.append("")  # Empty line separator
    result_lines.extend(class_lines)
    
    return "\n".join(result_lines)

# Test with the ActivityApprovalStatusDto schema
activity_schema = {
    "required": ["WorkflowStepId"],
    "type": "object",
    "properties": {
        "Id": {
            "type": "integer",
            "format": "int32"
        },
        "WorkflowStepId": {
            "type": "integer",
            "format": "int32"
        }
    },
    "additionalProperties": False
}

# Test with an enum schema
enum_schema = {
    "required": ["SecureNetwork"],
    "type": "object",
    "properties": {
        "SecureNetwork": {
            "enum": [0, 1],
            "type": "integer",
            "description": "\n\n0 = NIPR\n\n1 = SIPR",
            "format": "int32",
            "x-enumNames": ["NIPR", "SIPR"]
        }
    }
}

# Test with the problematic GapWorkflowStep schema
gap_workflow_schema = {
    "required": ["GapWorkflowStep"],
    "type": "object",
    "properties": {
        "GapWorkflowStep": {
            "enum": [1, 3, 11, 28, 30],
            "type": "integer",
            "description": "\n\n1 = Draft\n\n3 = Approved\n\n11 = GapArchived\n\n28 = DraftStaffing\n\n30 = CdidApproved",
            "format": "int32",
            "x-enumNames": ["Draft", "Approved", "GapArchived", "DraftStaffing", "CdidApproved"]
        }
    }
}

# Test with an enum-only schema (like the ones that were causing problems)
enum_only_schema = {
    "enum": [0, 1, 2],
    "type": "integer",
    "description": "\n\n0 = Relevance\n\n1 = Newest\n\n2 = Oldest",
    "format": "int32"
}

# Generate the Pydantic classes
pydantic_class = create_pydantic_class_from_schema("ActivityApprovalStatusDto", activity_schema)
print("ActivityApprovalStatusDto:")
print(pydantic_class)
print("\n" + "="*50 + "\n")

enum_class = create_pydantic_class_from_schema("NetworkDto", enum_schema)
print("NetworkDto with enum:")
print(enum_class)
print("\n" + "="*50 + "\n")

gap_class = create_pydantic_class_from_schema("GapWorkflowDto", gap_workflow_schema)
print("GapWorkflowDto with problematic enum:")
print(gap_class)
print("\n" + "="*50 + "\n")

enum_only_class = create_pydantic_class_from_schema("DocumentSearchSortOrder", enum_only_schema)
print("DocumentSearchSortOrder (enum-only schema):")
print(enum_only_class)

ActivityApprovalStatusDto:
class ActivityApprovalStatusDto(BaseModel):
    Id: Optional[int] = None
    WorkflowStepId: int


NetworkDto with enum:
class SecureNetworkEnum(IntEnum):
    NIPR = 0
    SIPR = 1

class NetworkDto(BaseModel):
    SecureNetwork: SecureNetworkEnum = Field(description="0 = NIPR  1 = SIPR")


GapWorkflowDto with problematic enum:
class GapWorkflowStepEnum(IntEnum):
    Draft = 1
    Approved = 3
    GapArchived = 11
    DraftStaffing = 28
    CdidApproved = 30

class GapWorkflowDto(BaseModel):
    GapWorkflowStep: GapWorkflowStepEnum = Field(description="1 = Draft  3 = Approved  11 = GapArchived  28 = DraftStaffing  30 = CdidApproved")


DocumentSearchSortOrder (enum-only schema):
class DocumentSearchSortOrder(IntEnum):
    VALUE_0 = 0
    VALUE_1 = 1
    VALUE_2 = 2


In [14]:
def print_schemas(swagger_data, count):
    """
    Print each schema from the components.schemas section of the swagger data
    """
    schema_count = 0
    if 'components' in swagger_data and 'schemas' in swagger_data['components']:
        schemas = swagger_data['components']['schemas']
        
        for schema_name, schema_definition in schemas.items():
            schema_count += 1
            # Generate and print the Pydantic class
            pydantic_class = create_pydantic_class_from_schema(schema_name, schema_definition)
            print(pydantic_class)
            print("\n")
            if schema_count >= count:
                print(f"\nReached the limit of {count} schemas. Stopping here.")
                break
    else:
        print("No schemas found in components.schemas")

# Call the function with the loaded data
print_schemas(forgeswagger_data, 999)

class ActivityApprovalStatusDto(BaseModel):
    Id: Optional[int] = None
    WorkflowStepId: int


class ActivityInitialInsightsDto(BaseModel):
    ExpId: Optional[int] = None
    InitialInsight: Optional[str] = None


class ActivityNotesDto(BaseModel):
    ExpId: Optional[int] = None
    Notes: Optional[str] = None


class ActivityPlanningTimelineDto(BaseModel):
    MilestoneId: Optional[int] = None
    ActivityId: Optional[int] = None
    Name: Optional[str] = None
    PlannedStartDate: Optional[str] = None
    PlannedEndDate: Optional[str] = None
    ApprovalAuthority: Optional[str] = None
    Status: Optional[str] = None


class ActivityRecommendationsDto(BaseModel):
    RecommendationId: Optional[str] = None
    ExpId: Optional[int] = None
    Recommendations: Optional[str] = None
    RecommendationDomains: Optional[list] = None
    RecommendationDomainsGridStr: Optional[str] = None
    CreatedDate: Optional[str] = None
    CreatedBy: Optional[str] = None


class AssessmentAccessR

In [31]:
def print_schemanames(swagger_data, count):
    """
    Print each schema from the components.schemas section of the swagger data
    """
    schema_count = 0
    if 'components' in swagger_data and 'schemas' in swagger_data['components']:
        schemas = swagger_data['components']['schemas']
        
        for schema_name, schema_definition in schemas.items():
            schema_count += 1
            # Generate and print the Pydantic class
            print(f"{schema_name},")
            if schema_count >= count:
                print(f"\nReached the limit of {count} schemas. Stopping here.")
                break
    else:
        print("No schemas found in components.schemas")

# Call the function with the loaded data
print_schemanames(forgeswagger_data, 999)

ActivityApprovalStatusDto,
ActivityInitialInsightsDto,
ActivityNotesDto,
ActivityPlanningTimelineDto,
ActivityRecommendationsDto,
AssessmentAccessRequestDto,
AssessmentClaDto,
AssessmentGapRsaDto,
AssessmentGeneralInformationDto,
AssessmentTcbaProjectCreateDto,
ClaDto,
CommentDto,
ConceptCommandPostDto,
ConceptEnablerDto,
ConceptEquipCapabilityChangesDto,
ConceptEquipmentMappingDto,
ConceptGeneralInformationDto,
ConceptLearningEventMappingDto,
ConceptLitLibraryDocumentsDto,
ConceptMeasurementDto,
ConceptNotionDto,
ConceptScenarioMappingDto,
ConceptScienceTechMappingDto,
ConceptTaskMappingDto,
CrcMappingDto,
CrcSolutionSetMappingDto,
DataGridChangeDto,
DataSourceLoadOptions,
DiscriminatorDto,
DocumentSearchSortOrder,
DynamicTableColumnDto,
DynamicTableColumnLookupDto,
DynamicTableDto,
DynamicTableEditingOptionsDto,
EeaDto,
ForgeFileDto,
GapDraftDto,
GapWorkflowStep,
GapWorksheetDto,
GroupingInfo,
GuidLookupDto,
Int32FileMetadataDto,
InterdependenciesDto,
LdResponseMappingDto,
LdResponse

In [15]:
# Test specific schemas that were causing issues
def test_specific_schemas(swagger_data):
    """Test specific schemas that were problematic"""
    if 'components' in swagger_data and 'schemas' in swagger_data['components']:
        schemas = swagger_data['components']['schemas']
        
        # Test schemas that were causing issues
        problem_schemas = [
            "DocumentSearchSortOrder",
            "GapWorkflowStep", 
            "LearningTddType",
            "ReferenceTable",
            "SecureNetwork",
            "SolutionPriorityAlgorithm",
            "TcbaLookupTable"
        ]
        
        for schema_name in problem_schemas:
            if schema_name in schemas:
                print(f"=== {schema_name} ===")
                pydantic_class = create_pydantic_class_from_schema(schema_name, schemas[schema_name])
                print(pydantic_class)
                print()
    else:
        print("No schemas found")

# Run the test
test_specific_schemas(forgeswagger_data)

=== DocumentSearchSortOrder ===
class DocumentSearchSortOrder(IntEnum):
    Relevance = 0
    Newest = 1
    Oldest = 2

=== GapWorkflowStep ===
class GapWorkflowStep(IntEnum):
    Draft = 1
    Approved = 3
    GapArchived = 11
    DraftStaffing = 28
    CdidApproved = 30

=== LearningTddType ===
class LearningTddType(IntEnum):
    Task = 1
    Deliverable = 2
    Decision = 3

=== ReferenceTable ===
class ReferenceTable(IntEnum):
    Invalid = 0
    FeasibilityMatrix = 1
    IntegrationReadinessLevel = 2
    ManufacturingReadinessLevel = 3
    MitigationExtent = 4
    TechnologyReadinessLevel = 5
    Tier1GapCategories = 6

=== SecureNetwork ===
class SecureNetwork(IntEnum):
    NIPR = 0
    SIPR = 1

=== SolutionPriorityAlgorithm ===
class SolutionPriorityAlgorithm(IntEnum):
    MostRecentApprovedRsas = 0
    MostRecentPendingRsas = 1

=== TcbaLookupTable ===
class TcbaLookupTable(IntEnum):
    Invalid = 0
    GapSource = 1
    GeographicalAor = 2
    JointCapabilityArea = 3
    Ser

In [16]:
def generate_fastapi_endpoint(path: str, path_spec: dict, schemas: dict) -> str:
    """
    Generate a FastAPI endpoint function from a single path specification.
    
    Args:
        path: The API path (e.g., "/api/users/{id}")
        path_spec: The path specification dictionary from OpenAPI spec
        schemas: The schemas dictionary for type resolution
    
    Returns:
        String containing the FastAPI endpoint function code
    """
    
    def resolve_schema_type(schema_ref: str, schemas: dict) -> str:
        """Resolve a schema reference to a Python type"""
        if schema_ref.startswith("#/components/schemas/"):
            schema_name = schema_ref.split("/")[-1]
            # Just return the schema name as-is - assume the Pydantic class exists
            return schema_name
        return "dict"  # fallback
    
    def resolve_schema_from_definition(schema_def: dict, schemas: dict) -> str:
        """Resolve schema type from a schema definition object"""
        if "$ref" in schema_def:
            return resolve_schema_type(schema_def["$ref"], schemas)
        elif "type" in schema_def:
            if schema_def["type"] == "array" and "items" in schema_def:
                if "$ref" in schema_def["items"]:
                    item_type = resolve_schema_type(schema_def["items"]["$ref"], schemas)
                    return f"List[{item_type}]"
                else:
                    return "List[dict]"
            elif schema_def["type"] == "object":
                return "dict"
            elif schema_def["type"] == "string":
                return "str"
            elif schema_def["type"] == "integer":
                return "int"
            elif schema_def["type"] == "boolean":
                return "bool"
            elif schema_def["type"] == "number":
                return "float"
        return "dict"  # fallback
    
    def get_response_type(responses: dict, schemas: dict) -> str:
        """Get the response type from the responses specification"""
        # Look for successful response (200, 201, etc.)
        for status_code, response_spec in responses.items():
            if str(status_code).startswith("2"):  # 2xx status codes
                content = response_spec.get("content", {})
                json_content = content.get("application/json", {})
                schema = json_content.get("schema", {})
                
                return resolve_schema_from_definition(schema, schemas)
                        
        return "dict"  # fallback
    
    def get_request_body_type(request_body: dict, schemas: dict) -> str:
        """Get the request body type from the request body specification"""
        content = request_body.get("content", {})
        json_content = content.get("application/json", {})
        schema = json_content.get("schema", {})
        
        return resolve_schema_from_definition(schema, schemas)
    
    def get_parameter_type(param_schema: dict, schemas: dict) -> str:
        """Get the type for a parameter from its schema"""
        return resolve_schema_from_definition(param_schema, schemas)
    
    def parse_path_parameters(path: str) -> list:
        """Extract path parameters from the path string"""
        import re
        # Find all {parameter} patterns
        params = re.findall(r'\{([^}]+)\}', path)
        return params
    
    def sanitize_function_name(path: str, method: str) -> str:
        """Create a valid Python function name from path and method"""
        # Remove path parameters and clean up
        import re
        clean_path = re.sub(r'\{[^}]+\}', '', path)
        clean_path = re.sub(r'[^a-zA-Z0-9]', '_', clean_path)
        clean_path = clean_path.strip('_')
        
        # Combine method and path
        func_name = f"{method.lower()}_{clean_path}".replace('__', '_').strip('_')
        return func_name if func_name else f"{method.lower()}_endpoint"
    
    endpoint_functions = []
    
    # Process each HTTP method for this path
    for method, operation in path_spec.items():
        if method.lower() not in ['get', 'post', 'put', 'delete', 'patch', 'head', 'options']:
            continue
            
        # Get operation details
        operation_id = operation.get("operationId", "")
        summary = operation.get("summary", "")
        description = operation.get("description", "")
        parameters = operation.get("parameters", [])
        request_body = operation.get("requestBody", {})
        responses = operation.get("responses", {})
        
        # Generate function name
        if operation_id:
            func_name = operation_id.replace("-", "_").replace(" ", "_").lower()
        else:
            func_name = sanitize_function_name(path, method)
        
        # Start building the function
        lines = []
        
        # Add function decorator
        fastapi_path = path.replace("{", "{").replace("}", "}")  # FastAPI uses same format
        lines.append(f"@app.{method.lower()}('{fastapi_path}')")
        
        # Build function signature
        func_params = []
        
        # Add path parameters
        path_params = parse_path_parameters(path)
        for param in path_params:
            func_params.append(f"{param}: str")
        
        # Add query and other parameters
        for param in parameters:
            param_name = param["name"]
            param_required = param.get("required", False)
            param_location = param.get("in", "query")
            
            # Get parameter type from schema
            if "schema" in param:
                param_type = get_parameter_type(param["schema"], schemas)
            else:
                param_type = "str"  # Default fallback
            
            if param_location == "query":
                if param_required:
                    func_params.append(f"{param_name}: {param_type}")
                else:
                    func_params.append(f"{param_name}: Optional[{param_type}] = None")
        
        # Add request body parameter
        if request_body:
            body_type = get_request_body_type(request_body, schemas)
            func_params.append(f"body: {body_type}")
        
        # Get response type
        response_type = get_response_type(responses, schemas)
        
        # Build function signature
        params_str = ", ".join(func_params)
        lines.append(f"async def {func_name}({params_str}) -> {response_type}:")
        
        # Add docstring
        if summary or description:
            lines.append('    """')
            if summary:
                lines.append(f"    {summary}")
            if description and description != summary:
                lines.append(f"    ")
                lines.append(f"    {description}")
            lines.append('    """')
        
        # Add placeholder implementation
        lines.append("    # TODO: Implement endpoint logic")
        
        if method.lower() == "get":
            if "List[" in response_type:
                lines.append("    return []")
            else:
                lines.append("    return {}")
        elif method.lower() in ["post", "put", "patch"]:
            if request_body:
                lines.append("    return body")
            else:
                lines.append("    return {}")
        elif method.lower() == "delete":
            lines.append("    return {'status': 'deleted'}")
        else:
            lines.append("    return {}")
        
        endpoint_functions.append("\n".join(lines))
    
    return "\n\n".join(endpoint_functions)

# Test with a path that uses DataSourceLoadOptions
if 'paths' in forgeswagger_data:
    paths = forgeswagger_data['paths']
    schemas = forgeswagger_data.get('components', {}).get('schemas', {})
    
    # Find a path that uses DataSourceLoadOptions
    test_path = "/api/pathfinder/LinkableSolutionSetCrcs"
    if test_path in paths:
        path_spec = paths[test_path]
        
        print(f"Testing endpoint generation for: {test_path}")
        print("="*60)
        
        endpoint_code = generate_fastapi_endpoint(test_path, path_spec, schemas)
        print(endpoint_code)
    else:
        print(f"Path {test_path} not found")
        # Test with first available path
        first_path = list(paths.keys())[0]
        first_path_spec = paths[first_path]
        
        print(f"Testing endpoint generation for: {first_path}")
        print("="*60)
        
        endpoint_code = generate_fastapi_endpoint(first_path, first_path_spec, schemas)
        print(endpoint_code)
else:
    print("No paths found in swagger data")

Testing endpoint generation for: /api/pathfinder/LinkableSolutionSetCrcs
@app.get('/api/pathfinder/LinkableSolutionSetCrcs')
async def get_api_pathfinder_LinkableSolutionSetCrcs(solutionSetId: Optional[str] = None, loadOptions: Optional[DataSourceLoadOptions] = None) -> dict:
    """
    Returns a list of solution ideas available to map to solution set.
    """
    # TODO: Implement endpoint logic
    return {}


In [17]:
# Generate FastAPI endpoints for all paths
all_endpoints = []
schemas = forgeswagger_data.get('components', {}).get('schemas', {})

for path, path_spec in forgeswagger_data['paths'].items():
    endpoint_code = generate_fastapi_endpoint(path, path_spec, schemas)
    all_endpoints.append(endpoint_code)

# Print all endpoints with newlines between them
print('\n'.join(all_endpoints))

@app.post('/api/AiAssistant/CompleteForm')
async def post_api_AiAssistant_CompleteForm(body: str) -> dict:
    """
    Prompts the AI assistant to complete a form
    """
    # TODO: Implement endpoint logic
    return body
@app.get('/api/AiAssistant/SummarizeDocument')
async def get_api_AiAssistant_SummarizeDocument(filePath: Optional[str] = None) -> dict:
    """
    Prompts the AI assistant to summarize a file by its path
    """
    # TODO: Implement endpoint logic
    return {}
@app.get('/api/AiAssistant/SummarizeDocument/{fileId}')
async def get_api_AiAssistant_SummarizeDocument(fileId: str) -> dict:
    """
    Prompts the AI assistant to summarize a file by its ID
    """
    # TODO: Implement endpoint logic
    return {}
@app.post('/api/AiChatBot/Message')
async def post_api_AiChatBot_Message(userMessage: Optional[str] = None) -> dict:
    """
    Sends a user message to the chatbot and retrieves a response.
    """
    # TODO: Implement endpoint logic
    return {}
@app.delet

In [18]:
def generate_fastapi_endpoint_fixed(path: str, path_spec: dict, schemas: dict) -> str:
    """
    Generate a FastAPI endpoint function from a single path specification.
    Fixed version that handles parameter name conflicts between path and query parameters.
    
    Args:
        path: The API path (e.g., "/api/users/{id}")
        path_spec: The path specification dictionary from OpenAPI spec
        schemas: The schemas dictionary for type resolution
    
    Returns:
        String containing the FastAPI endpoint function code
    """
    
    def resolve_schema_type(schema_ref: str, schemas: dict) -> str:
        """Resolve a schema reference to a Python type"""
        if schema_ref.startswith("#/components/schemas/"):
            schema_name = schema_ref.split("/")[-1]
            # Just return the schema name as-is - assume the Pydantic class exists
            return schema_name
        return "dict"  # fallback
    
    def resolve_schema_from_definition(schema_def: dict, schemas: dict) -> str:
        """Resolve schema type from a schema definition object"""
        if "$ref" in schema_def:
            return resolve_schema_type(schema_def["$ref"], schemas)
        elif "type" in schema_def:
            if schema_def["type"] == "array" and "items" in schema_def:
                if "$ref" in schema_def["items"]:
                    item_type = resolve_schema_type(schema_def["items"]["$ref"], schemas)
                    return f"List[{item_type}]"
                else:
                    return "List[dict]"
            elif schema_def["type"] == "object":
                return "dict"
            elif schema_def["type"] == "string":
                return "str"
            elif schema_def["type"] == "integer":
                return "int"
            elif schema_def["type"] == "boolean":
                return "bool"
            elif schema_def["type"] == "number":
                return "float"
        return "dict"  # fallback
    
    def get_response_type(responses: dict, schemas: dict) -> str:
        """Get the response type from the responses specification"""
        # Look for successful response (200, 201, etc.)
        for status_code, response_spec in responses.items():
            if str(status_code).startswith("2"):  # 2xx status codes
                content = response_spec.get("content", {})
                json_content = content.get("application/json", {})
                schema = json_content.get("schema", {})
                
                return resolve_schema_from_definition(schema, schemas)
                        
        return "dict"  # fallback
    
    def get_request_body_type(request_body: dict, schemas: dict) -> str:
        """Get the request body type from the request body specification"""
        content = request_body.get("content", {})
        json_content = content.get("application/json", {})
        schema = json_content.get("schema", {})
        
        return resolve_schema_from_definition(schema, schemas)
    
    def get_parameter_type(param_schema: dict, schemas: dict) -> str:
        """Get the type for a parameter from its schema"""
        return resolve_schema_from_definition(param_schema, schemas)
    
    def parse_path_parameters(path: str) -> list:
        """Extract path parameters from the path string"""
        import re
        # Find all {parameter} patterns
        params = re.findall(r'\{([^}]+)\}', path)
        return params
    
    def sanitize_function_name(path: str, method: str) -> str:
        """Create a valid Python function name from path and method"""
        # Remove path parameters and clean up
        import re
        clean_path = re.sub(r'\{[^}]+\}', '', path)
        clean_path = re.sub(r'[^a-zA-Z0-9]', '_', clean_path)
        clean_path = clean_path.strip('_')
        
        # Combine method and path
        func_name = f"{method.lower()}_{clean_path}".replace('__', '_').strip('_')
        return func_name if func_name else f"{method.lower()}_endpoint"
    
    endpoint_functions = []
    
    # Process each HTTP method for this path
    for method, operation in path_spec.items():
        if method.lower() not in ['get', 'post', 'put', 'delete', 'patch', 'head', 'options']:
            continue
            
        # Get operation details
        operation_id = operation.get("operationId", "")
        summary = operation.get("summary", "")
        description = operation.get("description", "")
        parameters = operation.get("parameters", [])
        request_body = operation.get("requestBody", {})
        responses = operation.get("responses", {})
        
        # Generate function name
        if operation_id:
            func_name = operation_id.replace("-", "_").replace(" ", "_").lower()
        else:
            func_name = sanitize_function_name(path, method)
        
        # Start building the function
        lines = []
        
        # Add function decorator
        fastapi_path = path.replace("{", "{").replace("}", "}")  # FastAPI uses same format
        lines.append(f"@app.{method.lower()}('{fastapi_path}')")
        
        # Build function signature
        func_params = []
        used_param_names = set()
        
        # Add path parameters first
        path_params = parse_path_parameters(path)
        for param in path_params:
            func_params.append(f"{param}: str")
            used_param_names.add(param)
        
        # Add query and other parameters, handling name conflicts
        for param in parameters:
            param_name = param["name"]
            param_required = param.get("required", False)
            param_location = param.get("in", "query")
            
            # Handle name conflicts - if parameter name already used by path param, rename it
            original_name = param_name
            if param_name in used_param_names:
                if param_location == "query":
                    param_name = f"query_{param_name}"
                elif param_location == "header":
                    param_name = f"header_{param_name}"
                elif param_location == "cookie":
                    param_name = f"cookie_{param_name}"
                else:
                    param_name = f"{param_location}_{param_name}"
            
            used_param_names.add(param_name)
            
            # Get parameter type from schema
            if "schema" in param:
                param_type = get_parameter_type(param["schema"], schemas)
            else:
                param_type = "str"  # Default fallback
            
            if param_location == "query":
                if param_required:
                    func_params.append(f"{param_name}: {param_type}")
                else:
                    func_params.append(f"{param_name}: Optional[{param_type}] = None")
            elif param_location == "header":
                if param_required:
                    func_params.append(f"{param_name}: {param_type} = Header()")
                else:
                    func_params.append(f"{param_name}: Optional[{param_type}] = Header(None)")
            elif param_location == "cookie":
                if param_required:
                    func_params.append(f"{param_name}: {param_type} = Cookie()")
                else:
                    func_params.append(f"{param_name}: Optional[{param_type}] = Cookie(None)")
        
        # Add request body parameter
        if request_body:
            body_type = get_request_body_type(request_body, schemas)
            func_params.append(f"body: {body_type}")
        
        # Get response type
        response_type = get_response_type(responses, schemas)
        
        # Build function signature
        params_str = ", ".join(func_params)
        lines.append(f"async def {func_name}({params_str}) -> {response_type}:")
        
        # Add docstring
        if summary or description:
            lines.append('    """')
            if summary:
                lines.append(f"    {summary}")
            if description and description != summary:
                lines.append(f"    ")
                lines.append(f"    {description}")
            lines.append('    """')
        
        # Add placeholder implementation
        lines.append("    # TODO: Implement endpoint logic")
        
        if method.lower() == "get":
            if "List[" in response_type:
                lines.append("    return []")
            else:
                lines.append("    return {}")
        elif method.lower() in ["post", "put", "patch"]:
            if request_body:
                lines.append("    return body")
            else:
                lines.append("    return {}")
        elif method.lower() == "delete":
            lines.append("    return {'status': 'deleted'}")
        else:
            lines.append("    return {}")
        
        endpoint_functions.append("\n".join(lines))
    
    return "\n\n".join(endpoint_functions)

# Test the fixed function with the problematic endpoint
test_endpoint_spec = {
    "post": {
        "tags": ["AssessmentApi"],
        "summary": "Endpoint responsible for saving CLA changes for an Assessment.",
        "parameters": [
            {
                "name": "id",
                "in": "query",
                "schema": {
                    "type": "integer",
                    "format": "int32"
                }
            }
        ],
        "requestBody": {
            "content": {
                "application/json": {
                    "schema": {
                        "type": "array",
                        "items": {
                            "$ref": "#/components/schemas/AssessmentClaDto"
                        }
                    }
                }
            }
        },
        "responses": {
            "200": {
                "description": "OK"
            }
        }
    }
}

print("Testing fixed function with problematic endpoint:")
print("="*60)
schemas = forgeswagger_data.get('components', {}).get('schemas', {})
test_result = generate_fastapi_endpoint_fixed("/api/assessments/Clas/{id}", test_endpoint_spec, schemas)
print(test_result)

Testing fixed function with problematic endpoint:
@app.post('/api/assessments/Clas/{id}')
async def post_api_assessments_Clas(id: str, query_id: Optional[int] = None, body: List[AssessmentClaDto]) -> dict:
    """
    Endpoint responsible for saving CLA changes for an Assessment.
    """
    # TODO: Implement endpoint logic
    return body


In [20]:
# Generate FastAPI endpoints for all paths using the fixed function
all_endpoints_fixed = []
schemas = forgeswagger_data.get('components', {}).get('schemas', {})

for path, path_spec in forgeswagger_data['paths'].items():
    endpoint_code = generate_fastapi_endpoint_fixed(path, path_spec, schemas)
    all_endpoints_fixed.append(endpoint_code)

# Print all endpoints with newlines between them
print('\n'.join(all_endpoints_fixed))

@app.post('/api/AiAssistant/CompleteForm')
async def post_api_AiAssistant_CompleteForm(body: str) -> dict:
    """
    Prompts the AI assistant to complete a form
    """
    # TODO: Implement endpoint logic
    return body
@app.get('/api/AiAssistant/SummarizeDocument')
async def get_api_AiAssistant_SummarizeDocument(filePath: Optional[str] = None) -> dict:
    """
    Prompts the AI assistant to summarize a file by its path
    """
    # TODO: Implement endpoint logic
    return {}
@app.get('/api/AiAssistant/SummarizeDocument/{fileId}')
async def get_api_AiAssistant_SummarizeDocument(fileId: str) -> dict:
    """
    Prompts the AI assistant to summarize a file by its ID
    """
    # TODO: Implement endpoint logic
    return {}
@app.post('/api/AiChatBot/Message')
async def post_api_AiChatBot_Message(userMessage: Optional[str] = None) -> dict:
    """
    Sends a user message to the chatbot and retrieves a response.
    """
    # TODO: Implement endpoint logic
    return {}
@app.delet

In [21]:
def generate_fastapi_endpoint_final(path: str, path_spec: dict, schemas: dict) -> str:
    """
    Generate a FastAPI endpoint function from a single path specification.
    Final version that handles parameter name conflicts and proper parameter ordering.
    
    Args:
        path: The API path (e.g., "/api/users/{id}")
        path_spec: The path specification dictionary from OpenAPI spec
        schemas: The schemas dictionary for type resolution
    
    Returns:
        String containing the FastAPI endpoint function code
    """
    
    def resolve_schema_type(schema_ref: str, schemas: dict) -> str:
        """Resolve a schema reference to a Python type"""
        if schema_ref.startswith("#/components/schemas/"):
            schema_name = schema_ref.split("/")[-1]
            # Just return the schema name as-is - assume the Pydantic class exists
            return schema_name
        return "dict"  # fallback
    
    def resolve_schema_from_definition(schema_def: dict, schemas: dict) -> str:
        """Resolve schema type from a schema definition object"""
        if "$ref" in schema_def:
            return resolve_schema_type(schema_def["$ref"], schemas)
        elif "type" in schema_def:
            if schema_def["type"] == "array" and "items" in schema_def:
                if "$ref" in schema_def["items"]:
                    item_type = resolve_schema_type(schema_def["items"]["$ref"], schemas)
                    return f"List[{item_type}]"
                else:
                    return "List[dict]"
            elif schema_def["type"] == "object":
                return "dict"
            elif schema_def["type"] == "string":
                return "str"
            elif schema_def["type"] == "integer":
                return "int"
            elif schema_def["type"] == "boolean":
                return "bool"
            elif schema_def["type"] == "number":
                return "float"
        return "dict"  # fallback
    
    def get_response_type(responses: dict, schemas: dict) -> str:
        """Get the response type from the responses specification"""
        # Look for successful response (200, 201, etc.)
        for status_code, response_spec in responses.items():
            if str(status_code).startswith("2"):  # 2xx status codes
                content = response_spec.get("content", {})
                json_content = content.get("application/json", {})
                schema = json_content.get("schema", {})
                
                return resolve_schema_from_definition(schema, schemas)
                        
        return "dict"  # fallback
    
    def get_request_body_type(request_body: dict, schemas: dict) -> str:
        """Get the request body type from the request body specification"""
        content = request_body.get("content", {})
        json_content = content.get("application/json", {})
        schema = json_content.get("schema", {})
        
        return resolve_schema_from_definition(schema, schemas)
    
    def get_parameter_type(param_schema: dict, schemas: dict) -> str:
        """Get the type for a parameter from its schema"""
        return resolve_schema_from_definition(param_schema, schemas)
    
    def parse_path_parameters(path: str) -> list:
        """Extract path parameters from the path string"""
        import re
        # Find all {parameter} patterns
        params = re.findall(r'\{([^}]+)\}', path)
        return params
    
    def sanitize_function_name(path: str, method: str) -> str:
        """Create a valid Python function name from path and method"""
        # Remove path parameters and clean up
        import re
        clean_path = re.sub(r'\{[^}]+\}', '', path)
        clean_path = re.sub(r'[^a-zA-Z0-9]', '_', clean_path)
        clean_path = clean_path.strip('_')
        
        # Combine method and path
        func_name = f"{method.lower()}_{clean_path}".replace('__', '_').strip('_')
        return func_name if func_name else f"{method.lower()}_endpoint"
    
    endpoint_functions = []
    
    # Process each HTTP method for this path
    for method, operation in path_spec.items():
        if method.lower() not in ['get', 'post', 'put', 'delete', 'patch', 'head', 'options']:
            continue
            
        # Get operation details
        operation_id = operation.get("operationId", "")
        summary = operation.get("summary", "")
        description = operation.get("description", "")
        parameters = operation.get("parameters", [])
        request_body = operation.get("requestBody", {})
        responses = operation.get("responses", {})
        
        # Generate function name
        if operation_id:
            func_name = operation_id.replace("-", "_").replace(" ", "_").lower()
        else:
            func_name = sanitize_function_name(path, method)
        
        # Start building the function
        lines = []
        
        # Add function decorator
        fastapi_path = path.replace("{", "{").replace("}", "}")  # FastAPI uses same format
        lines.append(f"@app.{method.lower()}('{fastapi_path}')")
        
        # Separate required and optional parameters
        required_params = []
        optional_params = []
        used_param_names = set()
        
        # Add path parameters first (always required)
        path_params = parse_path_parameters(path)
        for param in path_params:
            required_params.append(f"{param}: str")
            used_param_names.add(param)
        
        # Process query and other parameters, separating required from optional
        for param in parameters:
            param_name = param["name"]
            param_required = param.get("required", False)
            param_location = param.get("in", "query")
            
            # Handle name conflicts - if parameter name already used by path param, rename it
            original_name = param_name
            if param_name in used_param_names:
                if param_location == "query":
                    param_name = f"query_{param_name}"
                elif param_location == "header":
                    param_name = f"header_{param_name}"
                elif param_location == "cookie":
                    param_name = f"cookie_{param_name}"
                else:
                    param_name = f"{param_location}_{param_name}"
            
            used_param_names.add(param_name)
            
            # Get parameter type from schema
            if "schema" in param:
                param_type = get_parameter_type(param["schema"], schemas)
            else:
                param_type = "str"  # Default fallback
            
            # Build parameter definition based on location and requirement
            if param_location == "query":
                if param_required:
                    required_params.append(f"{param_name}: {param_type}")
                else:
                    optional_params.append(f"{param_name}: Optional[{param_type}] = None")
            elif param_location == "header":
                if param_required:
                    required_params.append(f"{param_name}: {param_type} = Header()")
                else:
                    optional_params.append(f"{param_name}: Optional[{param_type}] = Header(None)")
            elif param_location == "cookie":
                if param_required:
                    required_params.append(f"{param_name}: {param_type} = Cookie()")
                else:
                    optional_params.append(f"{param_name}: Optional[{param_type}] = Cookie(None)")
        
        # Add request body parameter (always required if present)
        if request_body:
            body_type = get_request_body_type(request_body, schemas)
            required_params.append(f"body: {body_type}")
        
        # Get response type
        response_type = get_response_type(responses, schemas)
        
        # Build function signature with proper parameter ordering
        all_params = required_params + optional_params
        params_str = ", ".join(all_params)
        lines.append(f"async def {func_name}({params_str}) -> {response_type}:")
        
        # Add docstring
        if summary or description:
            lines.append('    """')
            if summary:
                lines.append(f"    {summary}")
            if description and description != summary:
                lines.append(f"    ")
                lines.append(f"    {description}")
            lines.append('    """')
        
        # Add placeholder implementation
        lines.append("    # TODO: Implement endpoint logic")
        
        if method.lower() == "get":
            if "List[" in response_type:
                lines.append("    return []")
            else:
                lines.append("    return {}")
        elif method.lower() in ["post", "put", "patch"]:
            if request_body:
                lines.append("    return body")
            else:
                lines.append("    return {}")
        elif method.lower() == "delete":
            lines.append("    return {'status': 'deleted'}")
        else:
            lines.append("    return {}")
        
        endpoint_functions.append("\n".join(lines))
    
    return "\n\n".join(endpoint_functions)

# Test the final function with the problematic endpoint
print("Testing final function with proper parameter ordering:")
print("="*60)
test_result_final = generate_fastapi_endpoint_final("/api/assessments/Clas/{id}", test_endpoint_spec, schemas)
print(test_result_final)

print("\n" + "="*60 + "\n")

# Test with an endpoint that has both required and optional query parameters
test_mixed_params_spec = {
    "get": {
        "tags": ["TestApi"],
        "summary": "Test endpoint with mixed required and optional parameters",
        "parameters": [
            {
                "name": "required_param",
                "in": "query",
                "required": True,
                "schema": {"type": "string"}
            },
            {
                "name": "optional_param",
                "in": "query",
                "required": False,
                "schema": {"type": "integer"}
            },
            {
                "name": "id",  # Conflicts with path param
                "in": "query",
                "required": False,
                "schema": {"type": "string"}
            }
        ],
        "responses": {
            "200": {"description": "OK"}
        }
    }
}

print("Testing with mixed required/optional parameters:")
test_mixed_result = generate_fastapi_endpoint_final("/api/test/{id}", test_mixed_params_spec, schemas)
print(test_mixed_result)

Testing final function with proper parameter ordering:
@app.post('/api/assessments/Clas/{id}')
async def post_api_assessments_Clas(id: str, body: List[AssessmentClaDto], query_id: Optional[int] = None) -> dict:
    """
    Endpoint responsible for saving CLA changes for an Assessment.
    """
    # TODO: Implement endpoint logic
    return body


Testing with mixed required/optional parameters:
@app.get('/api/test/{id}')
async def get_api_test(id: str, required_param: str, optional_param: Optional[int] = None, query_id: Optional[str] = None) -> dict:
    """
    Test endpoint with mixed required and optional parameters
    """
    # TODO: Implement endpoint logic
    return {}


In [22]:
# Generate FastAPI endpoints for all paths using the final function with proper parameter ordering
all_endpoints_final = []
schemas = forgeswagger_data.get('components', {}).get('schemas', {})

for path, path_spec in forgeswagger_data['paths'].items():
    endpoint_code = generate_fastapi_endpoint_final(path, path_spec, schemas)
    all_endpoints_final.append(endpoint_code)

# Print all endpoints with newlines between them
print('\n'.join(all_endpoints_final))

@app.post('/api/AiAssistant/CompleteForm')
async def post_api_AiAssistant_CompleteForm(body: str) -> dict:
    """
    Prompts the AI assistant to complete a form
    """
    # TODO: Implement endpoint logic
    return body
@app.get('/api/AiAssistant/SummarizeDocument')
async def get_api_AiAssistant_SummarizeDocument(filePath: Optional[str] = None) -> dict:
    """
    Prompts the AI assistant to summarize a file by its path
    """
    # TODO: Implement endpoint logic
    return {}
@app.get('/api/AiAssistant/SummarizeDocument/{fileId}')
async def get_api_AiAssistant_SummarizeDocument(fileId: str) -> dict:
    """
    Prompts the AI assistant to summarize a file by its ID
    """
    # TODO: Implement endpoint logic
    return {}
@app.post('/api/AiChatBot/Message')
async def post_api_AiChatBot_Message(userMessage: Optional[str] = None) -> dict:
    """
    Sends a user message to the chatbot and retrieves a response.
    """
    # TODO: Implement endpoint logic
    return {}
@app.delet

In [25]:
def generate_fastapi_endpoint_ultimate(path: str, path_spec: dict, schemas: dict) -> str:
    """
    Generate a FastAPI endpoint function from a single path specification.
    Ultimate version that handles parameter name conflicts, proper parameter ordering,
    dotted parameter names (e.g., "Filters.ConceptName"), and tags.
    
    Args:
        path: The API path (e.g., "/api/users/{id}")
        path_spec: The path specification dictionary from OpenAPI spec
        schemas: The schemas dictionary for type resolution
    
    Returns:
        String containing the FastAPI endpoint function code
    """
    
    def resolve_schema_type(schema_ref: str, schemas: dict) -> str:
        """Resolve a schema reference to a Python type"""
        if schema_ref.startswith("#/components/schemas/"):
            schema_name = schema_ref.split("/")[-1]
            # Just return the schema name as-is - assume the Pydantic class exists
            return schema_name
        return "dict"  # fallback
    
    def resolve_schema_from_definition(schema_def: dict, schemas: dict) -> str:
        """Resolve schema type from a schema definition object"""
        if "$ref" in schema_def:
            return resolve_schema_type(schema_def["$ref"], schemas)
        elif "type" in schema_def:
            if schema_def["type"] == "array" and "items" in schema_def:
                if "$ref" in schema_def["items"]:
                    item_type = resolve_schema_type(schema_def["items"]["$ref"], schemas)
                    return f"List[{item_type}]"
                else:
                    return "List[dict]"
            elif schema_def["type"] == "object":
                return "dict"
            elif schema_def["type"] == "string":
                return "str"
            elif schema_def["type"] == "integer":
                return "int"
            elif schema_def["type"] == "boolean":
                return "bool"
            elif schema_def["type"] == "number":
                return "float"
        return "dict"  # fallback
    
    def get_response_type(responses: dict, schemas: dict) -> str:
        """Get the response type from the responses specification"""
        # Look for successful response (200, 201, etc.)
        for status_code, response_spec in responses.items():
            if str(status_code).startswith("2"):  # 2xx status codes
                content = response_spec.get("content", {})
                json_content = content.get("application/json", {})
                schema = json_content.get("schema", {})
                
                return resolve_schema_from_definition(schema, schemas)
                        
        return "dict"  # fallback
    
    def get_request_body_type(request_body: dict, schemas: dict) -> str:
        """Get the request body type from the request body specification"""
        content = request_body.get("content", {})
        json_content = content.get("application/json", {})
        schema = json_content.get("schema", {})
        
        return resolve_schema_from_definition(schema, schemas)
    
    def get_parameter_type(param_schema: dict, schemas: dict) -> str:
        """Get the type for a parameter from its schema"""
        return resolve_schema_from_definition(param_schema, schemas)
    
    def parse_path_parameters(path: str) -> list:
        """Extract path parameters from the path string"""
        import re
        # Find all {parameter} patterns
        params = re.findall(r'\{([^}]+)\}', path)
        return params
    
    def sanitize_parameter_name(param_name: str) -> str:
        """
        Convert parameter names to valid Python identifiers.
        Handles cases like 'Filters.ConceptName' -> 'filters_concept_name'
        """
        # Convert to lowercase and replace dots with underscores
        sanitized = param_name.lower().replace('.', '_')
        
        # Replace any other invalid characters with underscores
        import re
        sanitized = re.sub(r'[^a-zA-Z0-9_]', '_', sanitized)
        
        # Ensure it doesn't start with a number
        if sanitized and sanitized[0].isdigit():
            sanitized = f"param_{sanitized}"
        
        # Ensure it's not empty
        if not sanitized:
            sanitized = "param"
        
        return sanitized
    
    def sanitize_function_name(path: str, method: str) -> str:
        """Create a valid Python function name from path and method"""
        # Remove path parameters and clean up
        import re
        clean_path = re.sub(r'\{[^}]+\}', '', path)
        clean_path = re.sub(r'[^a-zA-Z0-9]', '_', clean_path)
        clean_path = clean_path.strip('_')
        
        # Combine method and path
        func_name = f"{method.lower()}_{clean_path}".replace('__', '_').strip('_')
        return func_name if func_name else f"{method.lower()}_endpoint"
    
    def format_tags_for_fastapi(tags: list) -> str:
        """Format tags list for FastAPI decorator"""
        if not tags:
            return ""
        # Convert tags to a properly formatted Python list string
        tags_str = str(tags)
        return f", tags={tags_str}"
    
    endpoint_functions = []
    
    # Process each HTTP method for this path
    for method, operation in path_spec.items():
        if method.lower() not in ['get', 'post', 'put', 'delete', 'patch', 'head', 'options']:
            continue
            
        # Get operation details
        operation_id = operation.get("operationId", "")
        summary = operation.get("summary", "")
        description = operation.get("description", "")
        parameters = operation.get("parameters", [])
        request_body = operation.get("requestBody", {})
        responses = operation.get("responses", {})
        tags = operation.get("tags", [])
        
        # Generate function name
        if operation_id:
            func_name = operation_id.replace("-", "_").replace(" ", "_").lower()
        else:
            func_name = sanitize_function_name(path, method)
        
        # Start building the function
        lines = []
        
        # Add function decorator with tags
        fastapi_path = path.replace("{", "{").replace("}", "}")  # FastAPI uses same format
        tags_param = format_tags_for_fastapi(tags)
        lines.append(f"@app.{method.lower()}('{fastapi_path}'{tags_param})")
        
        # Separate required and optional parameters
        required_params = []
        optional_params = []
        used_param_names = set()
        
        # Add path parameters first (always required)
        path_params = parse_path_parameters(path)
        for param in path_params:
            required_params.append(f"{param}: str")
            used_param_names.add(param)
        
        # Process query and other parameters, separating required from optional
        for param in parameters:
            original_param_name = param["name"]
            param_required = param.get("required", False)
            param_location = param.get("in", "query")
            
            # Sanitize the parameter name to handle dotted names
            param_name = sanitize_parameter_name(original_param_name)
            
            # Handle name conflicts - if parameter name already used, add prefix
            if param_name in used_param_names:
                if param_location == "query":
                    param_name = f"query_{param_name}"
                elif param_location == "header":
                    param_name = f"header_{param_name}"
                elif param_location == "cookie":
                    param_name = f"cookie_{param_name}"
                else:
                    param_name = f"{param_location}_{param_name}"
            
            used_param_names.add(param_name)
            
            # Get parameter type from schema
            if "schema" in param:
                param_type = get_parameter_type(param["schema"], schemas)
            else:
                param_type = "str"  # Default fallback
            
            # Build parameter definition based on location and requirement
            if param_location == "query":
                if param_required:
                    required_params.append(f"{param_name}: {param_type}")
                else:
                    optional_params.append(f"{param_name}: Optional[{param_type}] = None")
            elif param_location == "header":
                if param_required:
                    required_params.append(f"{param_name}: {param_type} = Header()")
                else:
                    optional_params.append(f"{param_name}: Optional[{param_type}] = Header(None)")
            elif param_location == "cookie":
                if param_required:
                    required_params.append(f"{param_name}: {param_type} = Cookie()")
                else:
                    optional_params.append(f"{param_name}: Optional[{param_type}] = Cookie(None)")
        
        # Add request body parameter (always required if present)
        if request_body:
            body_type = get_request_body_type(request_body, schemas)
            required_params.append(f"body: {body_type}")
        
        # Get response type
        response_type = get_response_type(responses, schemas)
        
        # Build function signature with proper parameter ordering
        all_params = required_params + optional_params
        params_str = ", ".join(all_params)
        lines.append(f"async def {func_name}({params_str}) -> {response_type}:")
        
        # Add docstring with tags information
        if summary or description or tags:
            lines.append('    """')
            if summary:
                lines.append(f"    {summary}")
            if description and description != summary:
                lines.append(f"    ")
                lines.append(f"    {description}")
            if tags:
                lines.append(f"    ")
                lines.append(f"    Tags: {', '.join(tags)}")
            lines.append('    """')
        
        # Add placeholder implementation
        lines.append("    # TODO: Implement endpoint logic")
        
        if method.lower() == "get":
            if "List[" in response_type:
                lines.append("    return []")
            else:
                lines.append("    return {}")
        elif method.lower() in ["post", "put", "patch"]:
            if request_body:
                lines.append("    return body")
            else:
                lines.append("    return {}")
        elif method.lower() == "delete":
            lines.append("    return {'status': 'deleted'}")
        else:
            lines.append("    return {}")
        
        endpoint_functions.append("\n".join(lines))
    
    return "\n\n".join(endpoint_functions)

# Test the ultimate function with the DocumentSearch endpoint
document_search_spec = {
    "get": {
        "tags": ["ConceptApi"],
        "summary": "Search the blob storage files for the search term",
        "parameters": [
            {
                "name": "SearchTerm",
                "in": "query",
                "schema": {"type": "string"}
            },
            {
                "name": "IndexName",
                "in": "query",
                "schema": {"type": "string"}
            },
            {
                "name": "SortOrder",
                "in": "query",
                "description": "\n\n0 = Relevance\n\n1 = Newest\n\n2 = Oldest",
                "schema": {"$ref": "#/components/schemas/DocumentSearchSortOrder"}
            },
            {
                "name": "Filters.ConceptName",
                "in": "query",
                "schema": {"type": "string"}
            },
            {
                "name": "Filters.ConceptTypeGuid",
                "in": "query",
                "schema": {"type": "string", "format": "uuid"}
            },
            {
                "name": "Filters.StatusId",
                "in": "query",
                "schema": {"type": "string", "format": "uuid"}
            },
            {
                "name": "Filters.AocYear",
                "in": "query",
                "schema": {"type": "integer", "format": "int32"}
            },
            {
                "name": "Filters.DocumentVersion",
                "in": "query",
                "schema": {"type": "string"}
            },
            {
                "name": "Filters.DocumentTypeId",
                "in": "query",
                "schema": {"type": "integer", "format": "int32"}
            },
            {
                "name": "Filters.DocumentStatus",
                "in": "query",
                "schema": {"type": "string"}
            },
            {
                "name": "Filters.DocumentName",
                "in": "query",
                "schema": {"type": "string"}
            }
        ],
        "responses": {
            "200": {"description": "OK"}
        }
    }
}

print("Testing ultimate function with DocumentSearch endpoint (dotted parameters and tags):")
print("="*80)
schemas = forgeswagger_data.get('components', {}).get('schemas', {})
document_search_result = generate_fastapi_endpoint_ultimate("/api/Concept/DocumentSearch", document_search_spec, schemas)
print(document_search_result)

Testing ultimate function with DocumentSearch endpoint (dotted parameters and tags):
@app.get('/api/Concept/DocumentSearch', tags=['ConceptApi'])
async def get_api_Concept_DocumentSearch(searchterm: Optional[str] = None, indexname: Optional[str] = None, sortorder: Optional[DocumentSearchSortOrder] = None, filters_conceptname: Optional[str] = None, filters_concepttypeguid: Optional[str] = None, filters_statusid: Optional[str] = None, filters_aocyear: Optional[int] = None, filters_documentversion: Optional[str] = None, filters_documenttypeid: Optional[int] = None, filters_documentstatus: Optional[str] = None, filters_documentname: Optional[str] = None) -> dict:
    """
    Search the blob storage files for the search term
    
    Tags: ConceptApi
    """
    # TODO: Implement endpoint logic
    return {}


In [26]:
# Generate FastAPI endpoints for all paths using the ultimate function that handles all edge cases
all_endpoints_ultimate = []
schemas = forgeswagger_data.get('components', {}).get('schemas', {})

for path, path_spec in forgeswagger_data['paths'].items():
    endpoint_code = generate_fastapi_endpoint_ultimate(path, path_spec, schemas)
    all_endpoints_ultimate.append(endpoint_code)

# Print all endpoints with newlines between them
print('\n'.join(all_endpoints_ultimate))

@app.post('/api/AiAssistant/CompleteForm', tags=['AiAssistantApi'])
async def post_api_AiAssistant_CompleteForm(body: str) -> dict:
    """
    Prompts the AI assistant to complete a form
    
    Tags: AiAssistantApi
    """
    # TODO: Implement endpoint logic
    return body
@app.get('/api/AiAssistant/SummarizeDocument', tags=['AiAssistantApi'])
async def get_api_AiAssistant_SummarizeDocument(filepath: Optional[str] = None) -> dict:
    """
    Prompts the AI assistant to summarize a file by its path
    
    Tags: AiAssistantApi
    """
    # TODO: Implement endpoint logic
    return {}
@app.get('/api/AiAssistant/SummarizeDocument/{fileId}', tags=['AiAssistantApi'])
async def get_api_AiAssistant_SummarizeDocument(fileId: str) -> dict:
    """
    Prompts the AI assistant to summarize a file by its ID
    
    Tags: AiAssistantApi
    """
    # TODO: Implement endpoint logic
    return {}
@app.post('/api/AiChatBot/Message', tags=['AiChatBotApi'])
async def post_api_AiChatBot_Message