### Clear memory

In [1]:
%reset -f
import gc
gc.collect()

0

### Import

In [2]:
from pathlib import Path
from datetime import datetime
import uvicorn, nest_asyncio, sys, torch
from pydantic import BaseModel, Field
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [3]:
import warnings, logging
warnings.filterwarnings('ignore', category=DeprecationWarning)
warnings.filterwarnings('ignore', category=FutureWarning)
warnings.filterwarnings('ignore', category=UserWarning)
logging.getLogger('chromadb').setLevel(logging.WARNING)
logging.getLogger('sentence_transformers').setLevel(logging.WARNING)

In [4]:
sys.path.append('..')
from src.data_utils import download_file, load_and_analyze_pdf, create_chunks
from src.langchain_RAG import setup_data_collection, langchain_rag_pipeline

# Data Preparation 
### Download PDF Document and Load to DataFrame

In [5]:
PDF_URL = 'https://docs.aws.amazon.com/pdfs/vsts/latest/userguide/vsts-ug.pdf'

PDF_NAME = 'aws_vsts.pdf'

download_file(url=PDF_URL, name=PDF_NAME, overwrite=False)

PDF_PATH = Path('../data/raw/') / PDF_NAME

df = load_and_analyze_pdf(PDF_PATH)

File aws_vsts.pdf already exists


Extracting pages:   0%|          | 0/118 [00:00<?, ?it/s]

Filename: aws_vsts.pdf
Title: AWS Toolkit for Microsoft Azure DevOps - User Guide
Author: 
Subject: 
File size: 1.30 MB
Number of pages: 118
Total number of chars: 195058

Describe char distribution


count     118.000000
mean     1653.033898
std       875.753907
min        89.000000
25%      1325.250000
50%      1621.500000
75%      1942.750000
max      5605.000000
Name: char_count, dtype: float64

### Create Chunks with the Best Configuration

In [6]:
chunks = create_chunks(df, chunk_size=1000, overlap=200, EOS=100)

chunks.to_json(
    path_or_buf='../data/processed/final_chunks.json',
    orient='records',
    force_ascii=False,
    indent=4,
)

print(f'Number of chunks: {len(chunks)}')

Chunking:   0%|          | 0/118 [00:00<?, ?it/s]

Number of chunks: 300


# API Development
## Logging Configuration
Establish structured logging for API operations to enable monitoring, debugging, and audit trails in production deployments.

In [7]:
logging.basicConfig(level=logging.INFO)
api_logger = logging.getLogger('rag_api')

## Define API Data Models

We use Pydantic for request/response validation and automatic API documentation. Each model defines the structure and constraints for API endpoints.

### Health Check Response Model

The `/health` endpoint returns system status and database information.

In [8]:
class HealthResponse(BaseModel):
    """Response model for health check endpoint."""
    status: str = Field(..., description='Service status (healthy/unhealthy)')
    timestamp: str = Field(..., description='Health check timestamp (ISO format)')
    vectorstore_loaded: bool = Field(..., description='Whether vector database is loaded')

### Query Response Model

The `/query` endpoint returns the generated answer with source references.

In [9]:
class QueryResponse(BaseModel):
    """Response model for RAG query endpoint."""
    timestamp: str = Field(..., description='Response timestamp (ISO format)')
    question: str = Field(..., description='Original user question')
    answer: str = Field(..., description='Generated answer from RAG pipeline')

### Query Request Model
The `/query` endpoint receives a string query. 

In [10]:
class QueryRequest(BaseModel):
    """Request model for RAG query endpoint."""
    question: str = Field(
        ...,
        description='User question about AWS documentation',
        min_length=5,
        max_length=500,
        examples=['How do I create an AWS account?'],
    )

## FastAPI Application Configuration
Configure the FastAPI application with proper lifecycle management, documentation, and middleware for production deployment.
### Application Lifecycle Management
Implement proper startup and shutdown procedures

In [11]:
@asynccontextmanager
async def lifespan(_app: FastAPI):
    global vectorstore
    try:
        api_logger.info('Establishing vector database...')
        vectorstore = setup_data_collection(
            chunks_filename='final_chunks',
            collection_name='aws_docs_final',
            overwrite=True,
            device=device,
        )
        api_logger.info('Vector database initialized successfully')
    except Exception as e:
        api_logger.error(f'Failed to initialize vector database: {str(e)}')
        raise e

    yield
    api_logger.info('Shutting down...')
    vectorstore = None

vectorstore = None

### Run FastAPI Application

In [12]:
app = FastAPI(
    title='Industrial RAG Agent API',
    description='RAG',
    version='1.0.0',
    docs_url='/docs',
    redoc_url='/redoc',
    lifespan=lifespan,
)

### CORS Configuration for Development

In [13]:
app.add_middleware(
    CORSMiddleware,
    allow_origins=['*'],  # in production this must be specified
    allow_credentials=True,
    allow_methods=['GET', 'POST'],
    allow_headers=['*'],
)

## API Endpoint Implementation
Complete set of RESTful endpoints 
### Health Monitoring Endpoint (`GET /health`)

In [14]:
@app.get('/health', response_model=HealthResponse)
async def health_check():
    """Provides system health status and vector database availability."""
    try:
        vectorstore_loaded = vectorstore is not None

        if vectorstore_loaded:
            overall_status = 'healthy'
        else:
            overall_status = 'degraded'

        return HealthResponse(
            status=overall_status,
            timestamp=datetime.now().isoformat(),
            vectorstore_loaded=vectorstore_loaded,
        )

    except Exception as e:
        api_logger.error(f'Health check failed: {str(e)}')
        return HealthResponse(
            status='unhealthy',
            timestamp=datetime.now().isoformat(),
            vectorstore_loaded=False,
        )

### Primary RAG Query Endpoint (`POST /query`)

In [15]:
@app.post('/query', response_model=QueryResponse)
async def query_rag(q:QueryRequest):
    """Processes user question through RAG pipeline and returns generated answer."""
    start_time = datetime.now()

    try:
        result = langchain_rag_pipeline(q.question, vectorstore)

        response = QueryResponse(
            timestamp=datetime.now().isoformat(),
            question=q.question,
            answer=result['answer'],
        )

        processing_time = (datetime.now() - start_time).total_seconds() * 1000

        api_logger.info(f'Completed successfully in {processing_time:.2f}ms')
        return response

    except HTTPException:
        raise

    except Exception as e:
        api_logger.error(f'Unexpected error: {str(e)}')
        raise HTTPException(status_code=500, detail=f'Failed: {str(e)}')

### API Discovery Endpoint (`GET /`)

In [16]:
@app.get('/')
async def root():
    """Root endpoint providing API overview and available endpoints."""
    r = {
        'message': 'RAG API Server',
        'version': '1.0.0',
        'documentation': '/docs',
        'health': '/health',
        'input_format': 'String Query',
        'endpoints': {
            'query': 'POST /query - Generate answers using RAG pipeline',
            'health': 'GET /health - Service health check',
        }
    }
    return r

## Server Configuration and Testing
Configure and launch the FastAPI server for local development and testing.

### Server Configuration Function

In [17]:
def run_api_server(app, host: str = '127.0.0.1', port: int = 8000, reload: bool = False):
    print(f'Starting RAG API server...')

    uvicorn.run(
        app=app,
        host=host,
        port=port,
        reload=reload,
        log_level='info',
    )

### Development Server Launch

In [18]:
nest_asyncio.apply()
run_api_server(app)

Starting RAG API server...


INFO:     Started server process [10612]
INFO:     Waiting for application startup.
INFO:rag_api:Establishing vector database...
INFO:rag_api:Vector database initialized successfully
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)


INFO:     127.0.0.1:64695 - "GET / HTTP/1.1" 200 OK
INFO:     127.0.0.1:64696 - "GET /docs HTTP/1.1" 200 OK
INFO:     127.0.0.1:64696 - "GET /openapi.json HTTP/1.1" 200 OK
INFO:     127.0.0.1:64696 - "GET / HTTP/1.1" 200 OK
INFO:     127.0.0.1:64706 - "GET /health HTTP/1.1" 200 OK


INFO:httpx:HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 200 OK"
INFO:rag_api:Completed successfully in 2161.65ms


INFO:     127.0.0.1:64782 - "POST /query HTTP/1.1" 200 OK


INFO:httpx:HTTP Request: POST https://api.anthropic.com/v1/messages "HTTP/1.1 200 OK"
INFO:rag_api:Completed successfully in 1593.45ms


INFO:     127.0.0.1:64839 - "POST /query HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:rag_api:Shutting down...
INFO:     Application shutdown complete.
INFO:     Finished server process [10612]


Everything works as expected! 