In [None]:
!pip install -q fastapi uvicorn sqlalchemy pymysql huggingface_hub serpapi python-dotenv nest_asyncio pyngrok google-search-results

  Preparing metadata (setup.py) ... [?25l[?25hdone
  Building wheel for google-search-results (setup.py) ... [?25l[?25hdone


In [None]:
import os, getpass

# Prompt for secrets
os.environ["HF_TOKEN"] = getpass.getpass("Enter your HF_TOKEN: ")
os.environ["SERPAPI_API_KEY"] = getpass.getpass("Enter your SERPAPI_API_KEY: ")

# Prompt for RDS credentials
os.environ["DB_HOST"]     = input("Enter your RDS DB_HOST: ")
os.environ["DB_USER"]     = input("Enter your RDS DB_USER: ")
os.environ["DB_PASSWORD"] = getpass.getpass("Enter your RDS DB_PASSWORD: ")
os.environ["DB_NAME"]     = input("Enter your RDS DB_NAME (default log_db): ") or "log_db"
os.environ["ngrok_auth_token"] = input("Enter your ngrok auth token")

Enter your HF_TOKEN: ··········
Enter your SERPAPI_API_KEY: ··········
Enter your RDS DB_HOST: mydb-instance.c36ok0iyywfv.us-west-1.rds.amazonaws.com
Enter your RDS DB_USER: admin
Enter your RDS DB_PASSWORD: ··········
Enter your RDS DB_NAME (default log_db): log_db
Enter your ngrok auth token2w7RzfN8r38uSdwjlpNTmVULQsx_7ZR7heaTjXiPDaBzJhE5C


In [None]:
%%bash
cat > Combined1.py << 'EOF'
import os
import re
from typing import Any, List, Literal, Optional
from functools import lru_cache
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from serpapi import GoogleSearch
from huggingface_hub import InferenceClient
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from datetime import datetime

HF_TOKEN        = os.getenv("HF_TOKEN")
SERPAPI_API_KEY = os.getenv("SERPAPI_API_KEY")
DB_HOST         = os.getenv("DB_HOST")
DB_USER         = os.getenv("DB_USER")
DB_PASSWORD     = os.getenv("DB_PASSWORD")
DB_NAME         = os.getenv("DB_NAME")

cohere_client     = InferenceClient(provider="cohere",     api_key=HF_TOKEN)
hyperbolic_client = InferenceClient(provider="hyperbolic", api_key=HF_TOKEN)
search_client     = GoogleSearch({"engine":"google", "api_key":SERPAPI_API_KEY})
engine            = create_engine(f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}/{DB_NAME}")

SQL_SYSTEM_PROMPT = (
    "You are an expert SQL generator. Given this schema:\\n"
    "  logs(id, log_file, log_timestamp, log_level,\\n"
    "       log_module, log_request_id, log_user_id,\\n"
    "       log_ip_address, log_request_method, log_request_url,\\n"
    "       log_protocol, log_status, log_length, log_time, message)\\n"
    "Translate the following user request into exactly one valid MySQL query. Output only the SQL statement."
    " If the user asks for anomalous or error logs, automatically filter by log_level IN ('WARNING','ERROR','CRITICAL')."
)
SQL_FEW_SHOTS = [
    {"role":"user","content":"List ERROR logs for glance-api from 2025-04-01 to 2025-04-07."},
    {"role":"assistant","content":"SELECT * FROM logs WHERE log_module='glance-api' AND log_level='ERROR' AND log_timestamp BETWEEN '2025-04-01' AND '2025-04-07';"},
    {"role":"user","content":"Count GET requests to '/servers/detail'."},
    {"role":"assistant","content":"SELECT COUNT(*) FROM logs WHERE log_request_method='GET' AND log_request_url LIKE '/servers/detail%';"}
]
INJECTION_PATTERNS = [r"ignore all previous instructions",r"system settings",r"api[_\\- ]?key",r"token",r"internal settings"]
def is_prompt_injection(text: str)->bool:
    t=text.lower(); return any(re.search(p,t) for p in INJECTION_PATTERNS)

def get_llm_client(model_name:str)->InferenceClient:
    return hyperbolic_client if model_name.startswith("Qwen/") else cohere_client

def fetch_recent_logs(limit:int=5)->str:
    with engine.connect() as c:
        rows=c.execute(text(
            "SELECT log_file, log_level, log_time, message "
            "FROM logs ORDER BY log_time DESC LIMIT :n"
        ),{"n":limit}).fetchall()
    return "### RECENT LOG EXAMPLES:\\n" + "\\n".join(f"{r.log_file}|{r.log_level}|{r.log_time}|{r.message}" for r in rows)

def search_web(query:str,num_results:int=5)->List[Any]:
    if is_prompt_injection(query): return []
    params={"q":query,"engine":"google","api_key":SERPAPI_API_KEY,"num":num_results}
    return search_client.get_dict(params).get("organic_results",[])

@lru_cache(maxsize=128)
def sql_from_nl(nl_query:str,model_name:str)->Any:
    if is_prompt_injection(nl_query): return {"sql":"","rows":[]}
    llm=get_llm_client(model_name)
    msgs=[{"role":"system","content":SQL_SYSTEM_PROMPT},*SQL_FEW_SHOTS,
          {"role":"system","content":fetch_recent_logs()},{"role":"user","content":nl_query}]
    resp=llm.chat.completions.create(model=model_name,messages=msgs, max_tokens=256, temperature=0.0)
    raw=resp.choices[0].message.content.strip()
    cleaned=re.sub(r"^```(?:sql)?\s*","",raw); cleaned=re.sub(r"\s*```$","",cleaned).strip()
    cleaned=re.sub(r"\blog_message\b","message",cleaned)
    return {"sql":cleaned,"rows":[]}

@lru_cache(maxsize=128)
def ask_general(nl_query:str,model_name:str)->Any:
    if is_prompt_injection(nl_query): return {"advice":"Prompt injection detected: advice generation refused."}
    llm=get_llm_client(model_name)
    system_msg={"role":"system","content":"You are a site-reliability expert. Think step by step then provide a numbered list of recommendations."}
    user_msg={"role":"user","content":nl_query}
    resp=llm.chat.completions.create(model=model_name,messages=[system_msg,user_msg],max_tokens=256,temperature=0.7)
    return {"advice":resp.choices[0].message.content.strip()}

class AssistantRequest(BaseModel):
    mode: Literal["search","sql","advice"]
    query:str
    model:Optional[str]="CohereLabs/c4ai-command-a-03-2025"

class AssistantResponse(BaseModel):
    mode:str
    data:Any

app=FastAPI()
app.add_middleware(CORSMiddleware,allow_origins=["*"],allow_credentials=True,allow_methods=["*"],allow_headers=["*"])

@app.post("/assistant", response_model=AssistantResponse)
async def assistant(req:AssistantRequest):
    if req.mode=="search": data=search_web(req.query)
    elif req.mode=="sql":
        out=sql_from_nl(req.query,req.model); sql,out_rows=out["sql"],out["rows"]
        if sql:
            try: out_rows=[list(r) for r in engine.connect().execute(text(sql)).fetchall()]
            except SQLAlchemyError: out_rows=[]
        data={"sql":sql,"rows":out_rows}
    elif req.mode=="advice": data=ask_general(req.query,req.model)
    else: data=None
    return {"mode":req.mode,"data":data}
EOF


In [None]:
# Expose port 8000 via ngrok and run the server in-process
from pyngrok import ngrok
import nest_asyncio
nest_asyncio.apply()
ngrok.set_auth_token(os.getenv("ngrok_auth_token"))
public_url = ngrok.connect(8000)
print("🔗 Your FastAPI is live at:", public_url)

# Start uvicorn (will block—stop with the square “stop” button in Colab)
!uvicorn Combined1:app --host 0.0.0.0 --port 8000

🔗 Your FastAPI is live at: NgrokTunnel: "https://1af3-34-141-184-242.ngrok-free.app" -> "http://localhost:8000"
[32mINFO[0m:     Started server process [[36m3740[0m]
[32mINFO[0m:     Waiting for application startup.
[32mINFO[0m:     Application startup complete.
[32mINFO[0m:     Uvicorn running on [1mhttp://0.0.0.0:8000[0m (Press CTRL+C to quit)
[32mINFO[0m:     174.160.40.182:0 - "[1mPOST /assistant HTTP/1.1[0m" [32m200 OK[0m
