In [5]:
import json
import re
import os

def load_openapi_spec(file_path):
    """Load and parse the OpenAPI JSON specification."""
    with open(file_path, 'r') as f:
        return json.load(f)

def clean_html(text):
    """Remove HTML tags and clean up formatting."""
    if not isinstance(text, str):
        return "No description provided"
    
    # Remove HTML tags
    clean_text = re.sub(r'<.*?>', ' ', text)
    # Normalize whitespace
    clean_text = re.sub(r'\s+', ' ', clean_text)
    return clean_text.strip()

def extract_extension_fields(method_info):
    """Extract OpenAPI extension fields starting with 'x-'."""
    extensions = {}
    for key, value in method_info.items():
        if key.startswith('x-'):
            extensions[key] = value
    return extensions

def resolve_schema_reference(ref_path, openapi_spec):
    """Resolve a schema reference to its actual schema definition."""
    if not ref_path.startswith('#/'):
        return None  # External references not supported
    
    # Remove '#/' from the path and split by '/'
    path_parts = ref_path.lstrip('#/').split('/')
    
    # Navigate through the OpenAPI spec to find the referenced schema
    current = openapi_spec
    for part in path_parts:
        if part in current:
            current = current[part]
        else:
            return None  # Reference path not found
    
    return current

def generate_sample_json(schema, openapi_spec=None, is_root=True):
    """Generate a sample JSON object from an OpenAPI schema."""
    if not schema:
        return None
    
    # Handle $ref references
    if '$ref' in schema and openapi_spec:
        # Resolve the reference to get the actual schema
        resolved_schema = resolve_schema_reference(schema['$ref'], openapi_spec)
        if resolved_schema:
            return generate_sample_json(resolved_schema, openapi_spec, is_root)
        return {"$ref": schema['$ref']}  # Fallback if resolution fails
    
    # Handle different schema types
    schema_type = schema.get('type')
    
    if schema_type == 'object':
        result = {}
        if 'properties' in schema:
            for prop_name, prop_schema in schema['properties'].items():
                result[prop_name] = generate_sample_json(prop_schema, openapi_spec, False)
        return result
    
    elif schema_type == 'array':
        if 'items' in schema:
            return [generate_sample_json(schema['items'], openapi_spec, False)]
        return []
    
    elif schema_type == 'string':
        return "string_value"
    
    elif schema_type == 'number' or schema_type == 'integer':
        return 0
    
    elif schema_type == 'boolean':
        return False
    
    # If no specific type or unsupported type
    return None

def format_property(name, details, indent=""):
    """Format a schema property in a RAG-friendly way."""
    prop_type = details.get('type', 'undefined')
    description = details.get('description', 'No description provided')
    description = clean_html(description)
    
    if description == '{…}':
        description = "Additional nested properties (abbreviated in schema)"
    
    return f"{indent}* {name} ({prop_type}): {description}"

def format_schema_properties(schema, indent_level=0):
    """Recursively format schema properties."""
    lines = []
    indent = "  " * indent_level
    
    if not schema:
        return lines
    
    if '$ref' in schema:
        ref = schema['$ref'].split('/')[-1]
        return [f"{indent}References schema: {ref}"]
    
    if 'properties' in schema:
        for prop_name, prop_details in schema['properties'].items():
            lines.append(format_property(prop_name, prop_details, indent))
            
            # Handle nested objects
            if prop_details.get('type') == 'object' and 'properties' in prop_details:
                lines.append(f"{indent}  Nested properties:")
                for nested_lines in format_schema_properties(prop_details, indent_level + 2):
                    lines.append(nested_lines)
            
            # Handle arrays
            if prop_details.get('type') == 'array' and 'items' in prop_details:
                lines.append(f"{indent}  Array items:")
                for nested_lines in format_schema_properties(prop_details['items'], indent_level + 2):
                    lines.append(nested_lines)
    
    if 'required' in schema and schema['required']:
        lines.append(f"{indent}Required fields: {', '.join(schema['required'])}")
    
    return lines

def format_endpoint(path, method_info, http_method, openapi_spec):
    """Format a single endpoint into RAG-friendly text."""
    lines = []
    
    # Basic endpoint information
    title = method_info.get('summary', 'Unnamed Endpoint')
    lines.append(f"ENDPOINT: {title}")
    lines.append(f"PATH: {path}")
    lines.append(f"METHOD: {http_method.upper()}")
    
    # Tags
    if 'tags' in method_info:
        lines.append(f"TAGS: {', '.join(method_info['tags'])}")
    
    # Description
    if 'description' in method_info:
        description = clean_html(method_info['description'])
        lines.append(f"DESCRIPTION: {description}")
    
    # Extension fields (metadata)
    extensions = extract_extension_fields(method_info)
    if extensions:
        lines.append("METADATA:")
        for ext_key, ext_value in extensions.items():
            lines.append(f"  * {ext_key}: {ext_value}")
    
    # Parameters
    if 'parameters' in method_info and method_info['parameters']:
        lines.append("PARAMETERS:")
        for param in method_info['parameters']:
            param_name = param.get('name', 'unnamed')
            param_in = param.get('in', 'undefined')
            param_required = "Required" if param.get('required', False) else "Optional"
            param_description = clean_html(param.get('description', 'No description provided'))
            lines.append(f"  * {param_name} ({param_in}, {param_required}): {param_description}")
    
    # Request Body
    if 'requestBody' in method_info:
        req_body = method_info['requestBody']
        req_required = "Required" if req_body.get('required', False) else "Optional"
        lines.append(f"REQUEST BODY: {req_required}")
        
        if 'content' in req_body:
            for content_type, content_details in req_body['content'].items():
                lines.append(f"  Content Type: {content_type}")
                
                if 'schema' in content_details:
                    schema = content_details['schema']
                    lines.append("  Schema Properties:")
                    lines.extend(format_schema_properties(schema, 2))
                    
                    # Generate sample request JSON
                    sample_json = generate_sample_json(schema, openapi_spec)
                    if sample_json:
                        lines.append("  Sample Request JSON:")
                        lines.append(f"  ```json\n  {json.dumps(sample_json, indent=2)}\n  ```")
    
    # Responses
    if 'responses' in method_info:
        lines.append("RESPONSES:")
        for status_code, response_info in method_info['responses'].items():
            lines.append(f"  Status Code: {status_code}")
            description = clean_html(response_info.get('description', 'No description provided'))
            lines.append(f"  Description: {description}")
            
            if 'content' in response_info:
                for content_type, content_details in response_info['content'].items():
                    lines.append(f"  Content Type: {content_type}")
                    
                    if 'schema' in content_details:
                        schema = content_details['schema']
                        # In the responses section where it handles $ref
                        if '$ref' in schema:
                            ref_path = schema['$ref']
                            schema_name = ref_path.split('/')[-1]
                            
                            # Instead of just referencing the schema name
                            # lines.append(f"  Response Schema: References {schema_name}")
                            
                            # Get the full schema from components
                            full_schema = None
                            if ref_path.startswith('#/components/schemas/'):
                                schema_name = ref_path.split('/')[-1]
                                if 'components' in openapi_spec and 'schemas' in openapi_spec['components'] and schema_name in openapi_spec['components']['schemas']:
                                    full_schema = openapi_spec['components']['schemas'][schema_name]
                            
                            if full_schema:
                                lines.append(f"  Response Schema: {schema_name}")
                                # Add schema description if available
                                if 'description' in full_schema:
                                    description = clean_html(full_schema.get('description', 'No description provided'))
                                    lines.append(f"  Description: {description}")
                                
                                lines.append("  Response Body Properties:")
                                lines.extend(format_schema_properties(full_schema, 2))
                                
                                # Generate sample response JSON
                                sample_json = generate_sample_json(full_schema, openapi_spec)
                                if sample_json:
                                    lines.append("  Sample Response JSON:")
                                    lines.append(f"  ```json\n  {json.dumps(sample_json, indent=2)}\n  ```")
                            else:
                                # Fallback to just showing the reference if we can't find the schema
                                lines.append(f"  Response Schema: References {schema_name}")
                                resolved_schema = resolve_schema_reference(schema['$ref'], openapi_spec)
                                if resolved_schema:
                                    lines.append("  Response Schema Properties:")
                                    lines.extend(format_schema_properties(resolved_schema, 2))
                                    
                                    # Generate sample response JSON from resolved schema
                                    sample_json = generate_sample_json(resolved_schema, openapi_spec)
                                    if sample_json:
                                        lines.append("  Sample Response JSON:")
                                        lines.append(f"  ```json\n  {json.dumps(sample_json, indent=2)}\n  ```")
    
    # Security
    if 'security' in method_info:
        security_schemes = []
        for security_item in method_info['security']:
            for scheme, scopes in security_item.items():
                security_schemes.append(scheme)
        if security_schemes:
            lines.append(f"SECURITY: {', '.join(security_schemes)}")
    
    return "\n".join(lines)

def format_all_endpoints(openapi_spec):
    """Process all endpoints in the OpenAPI specification."""
    all_endpoints = []
    
    # API info
    info = openapi_spec.get('info', {})
    api_title = info.get('title', 'Unnamed API')
    api_version = info.get('version', 'Unknown Version')
    
    # all_endpoints.append(f"API: {api_title}")
    # all_endpoints.append(f"VERSION: {api_version}")
    # all_endpoints.append("")
    
    # Process each path and method
    for path, path_item in openapi_spec.get('paths', {}).items():
        for method, method_info in path_item.items():
            # Skip non-HTTP methods
            if method in ['parameters', 'servers', 'summary', 'description']:
                continue
                
            endpoint_text = format_endpoint(path, method_info, method, openapi_spec)
            all_endpoints.append(endpoint_text)
            all_endpoints.append("--------" * 10) 
    
    # Remove the COMPONENT SCHEMAS section completely
    
    return "\n".join(all_endpoints)

def main():
    """Process all OpenAPI spec files in the json_files directory."""
    # Define directories
    json_dir = "./json_files/"
    output_dir = "../processed_data/"

    # Ensure output directory exists
    os.makedirs(output_dir, exist_ok=True)

    # Get list of JSON files
    json_files = [os.path.join(json_dir, f) for f in os.listdir(json_dir) if f.endswith('.json')]

    if not json_files:
        print(f"No JSON files found in {json_dir}")
        return

    # Process each file
    endpoint_count = 0

    for json_file in json_files:
        # Get base filename without extension
        base_name = os.path.basename(json_file)
        file_name_without_ext = os.path.splitext(base_name)[0]
        output_file = os.path.join(output_dir, f"{file_name_without_ext}.txt")
        
        # Process the file
        print(f"Processing {base_name}...")
        api_spec = load_openapi_spec(json_file)
        api_text = format_all_endpoints(api_spec)
        
        # Count endpoints
        endpoint_lines = [line for line in api_text.split('\n') if line.startswith("ENDPOINT:")]
        endpoint_count += len(endpoint_lines)
        
        # Write individual file
        with open(output_file, 'w', encoding='utf-8') as f:
            f.write(api_text)
        
        print(f"Created {output_file} with {len(endpoint_lines)} endpoints")

    print(f"Processing complete. {endpoint_count} total endpoints from {len(json_files)} files.")

if __name__ == "__main__":
    main()

Processing UserManagement.json...
Created ../processed_data/UserManagement.txt with 33 endpoints
Processing PolicyMangement.json...
Created ../processed_data/PolicyMangement.txt with 9 endpoints
Processing ApplicationManagement.json...
Created ../processed_data/ApplicationManagement.txt with 35 endpoints
Processing complete. 77 total endpoints from 3 files.


In [3]:
from urllib.parse import urlparse
from git import Repo
import logging
import shutil
import os

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)


## Step 3: Configure Private Git Repository

In [4]:
# Configure private Git repository details
def setup_private_repo(repo_url, auth_token):
    """
    Set up credentials for private Git repository
    
    Args:
        repo_url (str): URL of the private Git repository
        auth_token (str): Authentication token for the private repository
    
    Returns:
        str: The repository URL with embedded authentication token
    """
    # Parse repository URL
    parsed_url = urlparse(repo_url)
    
    # Construct repository URL with authentication
    if parsed_url.scheme == "https":
        # Format: https://{token}@github.com/username/repo.git
        auth_url = f"https://{auth_token}@{parsed_url.netloc}{parsed_url.path}"
    else:
        # If not HTTPS, keep URL as is and rely on other authentication methods
        logger.warning("Non-HTTPS repository URL provided. Token authentication might not work.")
        auth_url = repo_url
        
    return auth_url

# Set your private repository details here
private_repo_url = "https://github.com/yourusername/your-private-repo.git"  # Replace with your repository URL
auth_token = "your-auth-token"  # Replace with your personal access token

# Create authenticated repository URL
authenticated_repo_url = setup_private_repo(private_repo_url, auth_token)

In [5]:
def commit_to_git(repo_path, files_to_commit, commit_message):
    """Commit files to a Git repository"""
    try:
        # Initialize repository
        repo = Repo(repo_path)
        
        # Check if repo is dirty (has uncommitted changes)
        if repo.is_dirty(untracked_files=True):
            # Add files
            for file_path in files_to_commit:
                relative_path = os.path.relpath(file_path, repo_path)
                repo.git.add(relative_path)
            
            # Commit changes
            repo.git.commit('-m', commit_message)
            logger.info(f"Committed {len(files_to_commit)} files to repository")
            
            # You could add push here if needed
            # repo.git.push()
            
            return True
        else:
            logger.info("No changes to commit")
            return False
    
    except Exception as e:
        logger.error(f"Git error: {str(e)}")
        return False

In [6]:
def commit_to_private_repo(repo_url, files_to_commit, commit_message):
    """
    Clone private repository, add files, commit and push changes
    
    Args:
        repo_url (str): URL of the private Git repository with authentication token
        files_to_commit (list): List of file paths to commit
        commit_message (str): Commit message
    
    Returns:
        bool: True if successful, False otherwise
    """
    temp_dir = '/tmp/private_repo_clone'
    
    try:
        # Remove temp directory if it exists
        if os.path.exists(temp_dir):
            shutil.rmtree(temp_dir)
        
        # Clone the repository
        logger.info(f"Cloning private repository...")
        repo = Repo.clone_from(repo_url, temp_dir)
        
        # Create target directory in the cloned repo
        target_dir = os.path.join(temp_dir, 'text_files')
        os.makedirs(target_dir, exist_ok=True)
        
        # Copy files to the target directory
        for file_path in files_to_commit:
            file_name = os.path.basename(file_path)
            target_path = os.path.join(target_dir, file_name)
            shutil.copy2(file_path, target_path)
            logger.info(f"Copied {file_path} to {target_path}")
        
        # Add all files
        repo.git.add(A=True)
        
        # Check if there are changes to commit
        if repo.is_dirty(untracked_files=True):
            # Commit changes
            repo.git.commit('-m', commit_message)
            logger.info(f"Committed {len(files_to_commit)} files to private repository")
            
            # Push changes
            logger.info("Pushing changes to private repository...")
            repo.git.push()
            logger.info("Successfully pushed changes to private repository")
            
            return True
        else:
            logger.info("No changes to commit in private repository")
            return False
            
    except Exception as e:
        logger.error(f"Error with private repository: {str(e)}")
        return False
    finally:
        # Clean up - remove temp directory
        if os.path.exists(temp_dir):
            shutil.rmtree(temp_dir)

In [7]:
import datetime as date
# Commit the files to the repository
commit_message = "Add converted JSON files from YAML sources - }" + date.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

# Read text files from the processed_data directory
processed_data_dir = "../processed_data"
text_files = []

# Check if directory exists
if os.path.exists(processed_data_dir):
    # Get all text files
    text_files = [os.path.join(processed_data_dir, f) for f in os.listdir(processed_data_dir) 
                 if f.endswith('.txt')]
    logger.info(f"Found {len(text_files)} text files in {processed_data_dir}")
else:
    logger.warning(f"Directory {processed_data_dir} does not exist")

if text_files:
    # # For local workspace repository (original approach)
    # workspace_repo_path = '/workspaces/RAG_BOT'
    # workspace_success = commit_to_git(workspace_repo_path, json_files, commit_message)
    
    # if workspace_success:
    #     print("Successfully committed files to workspace repository")
    # else:
    #     print("Failed to commit files to workspace repository")
    
    # For private repository (new approach)
    # Uncomment and fill in the details when ready to use
    auth_token = "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"
    private_repo_url = "https://github.com/Venkata-Thrivedi-WILP/DataStore.git"
    authenticated_repo_url = setup_private_repo(private_repo_url, auth_token)
    private_success = commit_to_private_repo(authenticated_repo_url, text_files, commit_message)
    
    if private_success:
        print("Successfully committed files to private repository")
    else:
        print("Failed to commit files to private repository")

2025-06-07 07:40:42,464 - INFO - Found 3 text files in ../processed_data
2025-06-07 07:40:42,465 - INFO - Cloning private repository...
2025-06-07 07:40:43,670 - INFO - Copied ../processed_data/UserManagement.txt to /tmp/private_repo_clone/text_files/UserManagement.txt
2025-06-07 07:40:43,671 - INFO - Copied ../processed_data/ApplicationManagement.txt to /tmp/private_repo_clone/text_files/ApplicationManagement.txt
2025-06-07 07:40:43,672 - INFO - Copied ../processed_data/PolicyMangement.txt to /tmp/private_repo_clone/text_files/PolicyMangement.txt
2025-06-07 07:40:43,692 - INFO - Committed 3 files to private repository
2025-06-07 07:40:43,693 - INFO - Pushing changes to private repository...
2025-06-07 07:40:44,730 - INFO - Successfully pushed changes to private repository


Successfully committed files to private repository
