In [8]:
import requests
import sys
import json

from src.core.exception import CustomException
from src.core.logger import logging


def generate_query_from_nl(nl_query: str, db_type, db_metadata: dict = {}, model="qwen2.5-coder:7b"):
    """
    Converts natural language query into database-specific query using a local LLM.
    Streams back the query token-by-token.
    Yields tokens in real-time so the frontend can show partial results.
    """

    try:
        prompt_text = f"""
        You are a professional {db_type} query generation expert. Your job is to convert natural language into executable {db_type} queries.

        Below is metadata about the user's database (tables, columns, etc.):
        {db_metadata}

        ---

        User Query:
        {nl_query}

        ---

        Strict Instructions:
        - ONLY return the final executable query (no explanations, no markdown, no comments).
        - Do NOT wrap the query in triple backticks.
        - Do NOT prefix with language (like "```sql" or "MongoDB query:").
        - Do NOT add any explanation, text, natural language, or notes.
        - Your entire response should be a single query string.

        Examples:
        Input: "Get first 5 rows from employee table"
        Output: SELECT * FROM employee LIMIT 5;

        Input: "Show employees with age > 30"
        Output: SELECT * FROM employee WHERE age > 30;

        Input: "Fetch documents with status 'active'"
        Output: db.collection.find({{ status: "active" }})

        If unsure, return an empty string.

        Now respond with the query only:
        """

        payload = {
            "model": model,
            "prompt": prompt_text,
            "stream": True
        }

        # Stream response from Ollama
        with requests.post("http://localhost:11434/api/generate", json=payload, stream=True) as resp:
            resp.raise_for_status()
            full_query = ""

            for line in resp.iter_lines():
                if not line:
                    continue

                try:
                    data = json.loads(line.decode("utf-8"))
                except json.JSONDecodeError:
                    logging.warning(f"Skipping malformed JSON line: {line}")
                    continue

                token = data.get("response", "")
                if token:
                    full_query += token
                    yield token  # Stream token to caller immediately

                if data.get("done", False):
                    logging.info(f"Final generated query: {full_query.strip()}")
                    break

    except Exception as e:
        logging.error("Error in generate_query_from_nl", exc_info=True)
        raise CustomException(e, sys)


In [9]:
query = 'generate a generic query'

generate_query_from_nl(nl_query=query, db_type='postgresql')

<generator object generate_query_from_nl at 0x0000020C5D95FB50>

In [61]:
output = '```json{"query": "SELECT * FROM employee;","comment": "test"}```'

In [62]:
if '```json' in output:
    output = output.strip('``` json')
output


'{"query": "SELECT * FROM employee;","comment": "test"}'

In [63]:
import ast

json_formated = ast.literal_eval(output)
json_formated

{'query': 'SELECT * FROM employee;', 'comment': 'test'}

In [64]:
print(type(json_formated))

<class 'dict'>


In [65]:
query = json_formated.get('query')

print(query)

comment = json_formated.get('comment')
if comment == '':
    del comment

print(comment if 'comment' in locals() else '')

SELECT * FROM employee;
test


In [66]:
output = ('SELECT * FROM employee;', '')

In [68]:
output[0]

'SELECT * FROM employee;'